Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/tigran_stefexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: stefexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement async exporting

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [39958]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 7 additions & 3 deletions exporter/stefexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ Several helper files are leveraged to provide additional capabilities automatica

- [gRPC settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configgrpc/README.md)
- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
- [Queuing, timeout and retry settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md).
Note that `timeout` setting controls how long the exporter waits for ACK of a data sent
over STEF/gRPC stream.
- Queuing, timeout and retry settings, particularly:
- The `timeout` setting controls how long the exporter waits for ACK of a data sent
over STEF/gRPC stream.
- The `num_consumers` setting defines how many unacknowledged batches can be in-flight.
Copy link
Member Author

@tigrannajaryan tigrannajaryan May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the setting does not quite match what it controls. I wanted to keep this existing setting and avoid creating a new one, but I am not sure now this was a very good idea. cc @dmitryax

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a regular exporter, it controls outbound concurrency as well, so I think the name is fine. The description clarifies it well enough

If the destination is slow to acknowledge then increasing this
number can help increase the throughput.
- For the rest of settings [see here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md).
272 changes: 70 additions & 202 deletions exporter/stefexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ package stefexporter // import "github.com/open-telemetry/opentelemetry-collecto

import (
"context"
"fmt"
"sync"
"time"

stefgrpc "github.com/splunk/stef/go/grpc"
"github.com/splunk/stef/go/grpc/stef_proto"
"github.com/splunk/stef/go/otel/oteltef"
stefpdatametrics "github.com/splunk/stef/go/pdata/metrics"
stefpkg "github.com/splunk/stef/go/pkg"
"go.opentelemetry.io/collector/component"
Expand All @@ -36,42 +32,26 @@ type stefExporter struct {
set component.TelemetrySettings
cfg Config
compression stefpkg.Compression
started bool

// connMutex is taken when connecting, disconnecting or checking connection status.
connMutex sync.Mutex
isConnected bool
connID uint64
grpcConn *grpc.ClientConn
client *stefgrpc.Client
connCancel context.CancelFunc

// The STEF writer we write metrics to and which in turns sends them over gRPC.
stefWriter *oteltef.MetricsWriter
stefWriterMutex sync.Mutex // protects stefWriter

// lastAckID is the maximum ack ID received so far.
lastAckID uint64
// Cond to protect and signal lastAckID.
ackCond *internal.CancellableCond
}
grpcConn *grpc.ClientConn

type loggerWrapper struct {
logger *zap.Logger
connMan *internal.ConnManager
sync2Async *internal.Sync2Async
}

func (w *loggerWrapper) Debugf(_ context.Context, format string, v ...any) {
w.logger.Debug(fmt.Sprintf(format, v...))
}
const (
flushPeriod = 100 * time.Millisecond
reconnectPeriod = 10 * time.Minute
)

func (w *loggerWrapper) Errorf(_ context.Context, format string, v ...any) {
w.logger.Error(fmt.Sprintf(format, v...))
}
// TODO: make connection count configurable.
const connCount = 1

func newStefExporter(set component.TelemetrySettings, cfg *Config) *stefExporter {
exp := &stefExporter{
set: set,
cfg: *cfg,
ackCond: internal.NewCancellableCond(),
set: set,
cfg: *cfg,
}

exp.compression = stefpkg.CompressionNone
Expand All @@ -91,155 +71,78 @@ func (s *stefExporter) Start(ctx context.Context, host component.Host) error {
return err
}

// No need to block Start(), we will begin connection attempt in a goroutine.
// Create a connection creator and manager to take care of the connections.
connCreator := internal.NewStefConnCreator(s.set.Logger, s.grpcConn, s.compression)
s.connMan = internal.NewConnManager(s.set.Logger, connCreator, connCount, flushPeriod, reconnectPeriod)
s.connMan.Start()

// Wrap async implementation of sendMetricsAsync into a sync-callable API.
s.sync2Async = internal.NewSync2Async(s.set.Logger, s.cfg.QueueConfig.NumConsumers, s.sendMetricsAsync)

// Begin connection attempt in a goroutine to avoid blocking Start().
go func() {
if err := s.ensureConnected(ctx); err != nil {
// Acquire() triggers a connection attempt and blocks until it succeeds or fails.
conn, err := s.connMan.Acquire(ctx)
if err != nil {
s.set.Logger.Error("Error connecting to destination", zap.Error(err))
// This is not a fatal error. exportMetrics() will try to connect again as needed.
// This is not a fatal error. Next sending attempt will try to
// connect again as needed.
return
}
// Connection is established. Return it, this is all we needed for now.
s.connMan.Release(conn)
}()
return nil
}

func (s *stefExporter) Shutdown(ctx context.Context) error {
s.disconnect(ctx)
if s.grpcConn != nil {
if err := s.grpcConn.Close(); err != nil {
s.set.Logger.Error("Failed to close grpc connection", zap.Error(err))
}
s.grpcConn = nil
}
s.started = true
return nil
}

func (s *stefExporter) ensureConnected(ctx context.Context) error {
s.connMutex.Lock()
defer s.connMutex.Unlock()

if s.isConnected {
func (s *stefExporter) Shutdown(ctx context.Context) error {
if !s.started {
return nil
}

s.set.Logger.Debug("Connecting to destination", zap.String("endpoint", s.cfg.Endpoint))

s.ackCond.Cond.L.Lock()
// Reset lastAckID. New STEF stream ack IDs will start from 1.
s.lastAckID = 0
// Increment connection ID, to make sure we don't confuse the new and
// previous (stale) connections.
s.connID++
connID := s.connID
s.ackCond.Cond.L.Unlock()

// Prepare to open a STEF/gRPC stream to the server.
grpcClient := stef_proto.NewSTEFDestinationClient(s.grpcConn)

// Let server know about our schema.
schema, err := oteltef.MetricsWireSchema()
if err != nil {
return err
if err := s.connMan.Stop(ctx); err != nil {
s.set.Logger.Error("Failed to stop connection manager", zap.Error(err))
}

settings := stefgrpc.ClientSettings{
Logger: &loggerWrapper{s.set.Logger},
GrpcClient: grpcClient,
ClientSchema: &schema,
Callbacks: stefgrpc.ClientCallbacks{
OnAck: func(ackId uint64) error { return s.onGrpcAck(connID, ackId) },
},
}
s.client = stefgrpc.NewClient(settings)

s.connCancel = nil
connCtx, connCancel := context.WithCancel(context.Background())

connectionAttemptDone := make(chan struct{})
defer close(connectionAttemptDone)

// Start a goroutine that waits for success, failure or cancellation of
// the connection attempt.
go func() {
// Wait for either connection attempt to be done or for the caller
// of ensureConnected() to give up.
select {
case <-ctx.Done():
// The caller of ensureConnected() cancelled while we are waiting
// for connection to be established. We have to cancel the
// connection attempt (and the whole connection if it raced us and
// managed to connect - we will reconnect later again in that case).
s.set.Logger.Debug("Canceling connection context because ensureConnected() caller cancelled.")
connCancel()
case <-connectionAttemptDone:
// Connection attempt finished (successfully or no). No need to wait for the
// previous case, calling connCancel() is not needed anymore now. It will be
// called later, when disconnecting.
// From this moment we are essentially detaching from the Context
// that passed to ensureConnected() since we wanted to honor it only
// for the duration of the connection attempt, but not for the duration
// of the entire existence of the connection.
if s.grpcConn != nil {
if err := s.grpcConn.Close(); err != nil {
s.set.Logger.Error("Failed to close grpc connection", zap.Error(err))
}
}()

grpcWriter, opts, err := s.client.Connect(connCtx)
if err != nil {
connCancel()
return fmt.Errorf("failed to connect to destination: %w", err)
}

opts.Compression = s.compression

// Create STEF record writer over gRPC.
s.stefWriter, err = oteltef.NewMetricsWriter(grpcWriter, opts)
if err != nil {
connCancel()
return err
s.grpcConn = nil
}

// From this point on we consider the connection successfully established.
s.isConnected = true

// We need to call the cancel func when this connection is over so that we don't
// leak the Context we just created. This will be done in disconnect().
s.connCancel = connCancel

s.set.Logger.Debug("Connected to destination", zap.String("endpoint", s.cfg.Endpoint))

s.started = false
return nil
}

func (s *stefExporter) disconnect(ctx context.Context) {
s.connMutex.Lock()
defer s.connMutex.Unlock()

if !s.isConnected {
return
}

if s.connCancel != nil {
s.set.Logger.Debug("Calling cancel on connection context to avoid leaks")
s.connCancel()
s.connCancel = nil
}

if err := s.client.Disconnect(ctx); err != nil {
s.set.Logger.Error("Failed to disconnect", zap.Error(err))
}

s.set.Logger.Debug("Disconnected.")
s.isConnected = false
func (s *stefExporter) exportMetrics(ctx context.Context, data pmetric.Metrics) error {
return s.sync2Async.DoSync(ctx, data)
}

func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) error {
if err := s.ensureConnected(ctx); err != nil {
return err
// sendMetricsAsync is an async implementation of sending metric data.
// The result of sending will be reported via resultChan.
func (s *stefExporter) sendMetricsAsync(
ctx context.Context,
data any,
resultChan internal.ResultChan,
) (internal.DataID, error) {
// Acquire a connection to send the data over.
conn, err := s.connMan.Acquire(ctx)
if err != nil {
return 0, err
}

// stefWriter is not safe for concurrent writing, protect it.
s.stefWriterMutex.Lock()
defer s.stefWriterMutex.Unlock()
// It must be a StefConn with a Writer.
stefConn := conn.Conn().(*internal.StefConn)
stefWriter := stefConn.Writer()

md := data.(pmetric.Metrics)

// Convert and write the data to the Writer.
converter := stefpdatametrics.OtlpToSTEFUnsorted{}
err := converter.WriteMetrics(md, s.stefWriter)
err = converter.WriteMetrics(md, stefWriter)
if err != nil {
s.set.Logger.Debug("WriteMetrics failed", zap.Error(err))

Expand All @@ -251,62 +154,27 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er
//
// We need to reconnect. Disconnect here and the next exportMetrics()
// call will connect again.
s.disconnect(ctx)
s.connMan.DiscardAndClose(conn)

// TODO: check if err is because STEF encoding failed. If so we must not
// try to re-encode the same data. Return consumererror.NewPermanent(err)
// to the caller. This requires changes in STEF Go library.

// Return an error to retry sending these metrics again next time.
return err
return 0, err
}

// According to STEF gRPC spec the destination ack IDs match written record number.
// When the data we have just written is received by destination it will send us
// back and ack ID that numerically matches the last written record number.
expectedAckID := s.stefWriter.RecordCount()

// stefWriter normally buffers written records in memory. Flush() ensures buffered
// data is sent to network. This is necessary so that the server receives it and
// sends an acknowledgement back.
if err = s.stefWriter.Flush(); err != nil {
s.set.Logger.Debug("Flush failed", zap.Error(err))
// back an ack ID that numerically matches the last written record number.
expectedAckID := stefWriter.RecordCount()

// Failure to write the gRPC stream normally means something is
// wrong with the connection. We need to reconnect. Disconnect here
// and the next exportMetrics() call will connect again.
s.disconnect(ctx)
// Register to be notified via resultChan when the ack of the
// written record is received.
stefConn.OnAck(expectedAckID, resultChan)

// Return an error to retry sending these metrics again next time.
return err
}
// We are done with the connection.
s.connMan.Release(conn)

// Wait for acknowledgement.
err = s.ackCond.Wait(ctx, func() bool { return s.lastAckID >= expectedAckID })
if err != nil {
return fmt.Errorf("error waiting for ack ID %d: %w", expectedAckID, err)
}

return nil
}

func (s *stefExporter) onGrpcAck(connID uint64, ackID uint64) error {
s.ackCond.Cond.L.Lock()
defer s.ackCond.Cond.L.Unlock()

if s.connID != connID {
// This is an ack from a previous (stale) connection. This can happen
// due to a race if the ack from the old stream arrives after we decided
// to reconnect but the old stream is still functioning. We just need
// to ignore this ack, it is no longer relevant.
return nil
}

// The IDs are expected to always monotonically increase. Check it anyway in case
// the server misbehaves and violates the expectation.
if s.lastAckID < ackID {
s.lastAckID = ackID
s.ackCond.Cond.Broadcast()
}
return nil
return internal.DataID(expectedAckID), nil
}
Loading
Loading