Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package sdk

/*

import (
"bytes"
"context"
Expand Down Expand Up @@ -1119,3 +1121,6 @@ func (a acceptanceTest) context(t *testing.T) context.Context {
}
return ctx
}


*/
5 changes: 5 additions & 0 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package sdk

/*

import (
"context"
"fmt"
Expand Down Expand Up @@ -183,3 +185,6 @@ func (bm *benchmarkSource) reportMetrics(b *testing.B) {
b.ReportMetric(bm.firstAck.Seconds(), "firstAck")
b.ReportMetric(float64(b.N-1)/bm.allAcks.Seconds(), "acks/s")
}


*/
88 changes: 55 additions & 33 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,16 @@ import (
// All implementations must embed UnimplementedDestination for forward
// compatibility.
type Destination interface {
// Parameters is a map of named Parameters that describe how to configure
// the Destination.
Parameters() config.Parameters

// Configure is the first function to be called in a connector. It provides the
// connector with the configuration that needs to be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The connector SDK will sanitize, apply defaults and validate the
// configuration before calling this function. This means that the
// configuration will always contain all keys defined in Parameters
// (unprovided keys will have their default values) and all non-empty
// values will be of the correct type.
Configure(context.Context, config.Config) error
// Config returns the configuration that the destination expects. It should
// return a pointer to a struct that contains all the configuration keys that
// the destination expects. The struct should be annotated with the necessary
// validation tags. The value should be a pointer to allow the SDK to
// populate it using the values from the configuration.
//
// The returned DestinationConfig should contain all the configuration keys
// that the destination expects, including middleware fields (see
// [DefaultDestinationMiddleware]).
Config() DestinationConfig

// Open is called after Configure to signal the plugin it can prepare to
// start writing records. If needed, the plugin should open connections in
Expand Down Expand Up @@ -93,20 +88,35 @@ type Destination interface {
mustEmbedUnimplementedDestination()
}

// DestinationConfig represents the configuration containing all configuration
// keys that a destination expects. The type needs to implement [Validatable],
// which will be used to automatically validate the config when configuring the
// connector.
type DestinationConfig interface {
Validatable

mustEmbedUnimplementedDestinationConfig()
}

// NewDestinationPlugin takes a Destination and wraps it into an adapter that
// converts it into a pconnector.DestinationPlugin. If the parameter is nil it
// will wrap UnimplementedDestination instead.
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig) pconnector.DestinationPlugin {
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig, parameters config.Parameters) pconnector.DestinationPlugin {
if impl == nil {
// prevent nil pointers
impl = UnimplementedDestination{}
}
return &destinationPluginAdapter{impl: impl, cfg: cfg}
return &destinationPluginAdapter{
impl: impl,
cfg: cfg,
parameters: parameters,
}
}

type destinationPluginAdapter struct {
impl Destination
cfg pconnector.PluginConfig
impl Destination
cfg pconnector.PluginConfig
parameters config.Parameters

// lastPosition holds the position of the last record passed to the connector's
// Write method. It is used to determine when the connector should stop.
Expand All @@ -119,29 +129,29 @@ type destinationPluginAdapter struct {

func (a *destinationPluginAdapter) Configure(ctx context.Context, req pconnector.DestinationConfigureRequest) (pconnector.DestinationConfigureResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatchConfig{})

err := a.impl.Configure(ctx, req.Config)
if err != nil {
return pconnector.DestinationConfigureResponse{}, err
cfg := a.impl.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return pconnector.DestinationConfigureResponse{}, nil
}

a.configureWriteStrategy(ctx)
return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records
err := Util.ParseConfig(ctx, req.Config, cfg, a.parameters)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("failed to parse configuration: %w", err)
}

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
err = cfg.Validate(ctx)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("configuration invalid: %w", err)
}

return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.DestinationOpenRequest) (pconnector.DestinationOpenResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatch{})

a.lastPosition = new(csync.ValueWatcher[opencdc.Position])

Expand All @@ -164,9 +174,21 @@ func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.Destin
}()

err := a.impl.Open(ctxOpen)
a.configureWriteStrategy(ctxOpen)

return pconnector.DestinationOpenResponse{}, err
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
}
}

func (a *destinationPluginAdapter) Run(ctx context.Context, stream pconnector.DestinationRunStream) error {
ctx = internal.Enrich(ctx, a.cfg)
a.writeStrategy.SetStream(stream.Server())
Expand Down
Loading