Skip to content
1 change: 1 addition & 0 deletions internal/servers/destination/v0/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error {
select {
case resources <- convertedResource:
case <-ctx.Done():
convertedResource.Release()
close(resources)
if err := eg.Wait(); err != nil {
return status.Errorf(codes.Internal, "Context done: %v and failed to wait for plugin: %v", ctx.Err(), err)
Expand Down
14 changes: 9 additions & 5 deletions plugins/destination/managed_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *arrow.Schem
p.logger.Info().Str("table", tableName).Int("len", batchSize).Dur("duration", time.Since(start)).Msg("batch written successfully")
atomic.AddUint64(&metrics.Writes, uint64(batchSize))
}
for _, r := range resources {
r.Release()
}
}

func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Record) []arrow.Record {
Expand All @@ -81,18 +84,19 @@ func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Recor

pks := make(map[string]struct{}, len(resources))
res := make([]arrow.Record, 0, len(resources))
var reported bool
for _, r := range resources {
if r.NumRows() > 1 {
panic(fmt.Sprintf("record with more than 1 row: %d", r.NumRows()))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added this panic because this function's logic will break down if a record ever contains more than one row.

I wonder if we may want to add a special type that guarantees we're dealing with a arrow.Record with only a single row.

}
key := pk.String(r)
_, ok := pks[key]
switch {
case !ok:
if !ok {
pks[key] = struct{}{}
res = append(res, r)
continue
case reported:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reported variable was not being used

continue
}
// duplicate, release early
r.Release()
}

return res
Expand Down