Skip to content

Commit b70cbd0

Browse files
committed
[awsfirehosereceiver] support encoding extensions
Add support for using encoding extensions for unmarshalling records transmitted via Amazon Data Firehose. The "record_type" config is now deprecated and has been replaced by "encoding". This new config setting supports all of the existing encodings (cwlogs, cwmetrics otlp_v1) as well as support for loading additional encodings via extensions.
1 parent 1b8070f commit b70cbd0

File tree

15 files changed

+332
-161
lines changed

15 files changed

+332
-161
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsfirehosereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for encoding extensions
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37113]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Adds `encoding` config setting, and deprecates the `record_type` setting.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/awsfirehosereceiver/README.md

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,28 @@ See [documentation](https://github.com/open-telemetry/opentelemetry-collector/bl
4545

4646
A `cert_file` and `key_file` are required.
4747

48-
### record_type:
49-
The type of record being received from the delivery stream. Each unmarshaler handles a specific type, so the field allows the receiver to use the correct one.
48+
### encoding:
49+
50+
The ID of an [encoding extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding) for decoding logs or metrics.
51+
This configuration also supports the built-in encodings listed in the [Encodings](#encodings) section.
52+
If no encoding is specified, then the receiver will default to a signal-specific encoding: `cwmetrics` for metrics, and `cwlogs` for logs.
5053

51-
default: `cwmetrics`
54+
### record_type:
5255

53-
See the [Record Types](#record-types) section for all available options.
56+
Deprecated, use `encoding` instead. `record_type` will be removed in a future release; it is an alias for `encoding`.
5457

5558
### access_key (Optional):
5659
The access key to be checked on each request received. This can be set when creating or updating the delivery stream.
5760
See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http) for details.
5861

59-
## Record Types
62+
## Encodings
6063

6164
### cwmetrics
62-
The record type for the CloudWatch metric stream. Expects the format for the records to be JSON.
65+
The encoding for the CloudWatch metric stream. Expects the format for the records to be JSON.
6366
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details.
6467

6568
### cwlogs
66-
The record type for the CloudWatch log stream. Expects the format for the records to be JSON.
69+
The encoding for the CloudWatch log stream. Expects the format for the records to be JSON.
6770
For example:
6871

6972
```json
@@ -84,5 +87,5 @@ For example:
8487
```
8588

8689
### otlp_v1
87-
The OTLP v1 format as produced by CloudWatch metric streams.
90+
The OTLP v1 encoding as produced by CloudWatch metric streams.
8891
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details.

receiver/awsfirehosereceiver/config.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,24 @@ import (
88

99
"go.opentelemetry.io/collector/config/confighttp"
1010
"go.opentelemetry.io/collector/config/configopaque"
11+
"go.uber.org/zap"
1112
)
1213

14+
var errRecordTypeEncodingSet = errors.New("record_type must not be set when encoding is set")
15+
1316
type Config struct {
1417
// ServerConfig is used to set up the Firehose delivery
1518
// endpoint. The Firehose delivery stream expects an HTTPS
1619
// endpoint, so TLSSettings must be used to enable that.
1720
confighttp.ServerConfig `mapstructure:",squash"`
18-
// RecordType is the key used to determine which unmarshaler to use
19-
// when receiving the requests.
21+
// Encoding identifies the encoding of records received from
22+
// Firehose. Defaults to telemetry-specific encodings: "cwlog"
23+
// for logs, and "cwmetrics" for metrics.
24+
Encoding string `mapstructure:"encoding"`
25+
// RecordType is an alias for Encoding for backwards compatibility.
26+
// It is an error to specify both encoding and record_type.
27+
//
28+
// Deprecated: use Encoding instead.
2029
RecordType string `mapstructure:"record_type"`
2130
// AccessKey is checked against the one received with each request.
2231
// This can be set when creating or updating the Firehose delivery
@@ -30,10 +39,14 @@ func (c *Config) Validate() error {
3039
if c.Endpoint == "" {
3140
return errors.New("must specify endpoint")
3241
}
33-
// If a record type is specified, it must be valid.
34-
// An empty string is acceptable, however, because it will use a telemetry-type-specific default.
35-
if c.RecordType != "" {
36-
return validateRecordType(c.RecordType)
42+
if c.RecordType != "" && c.Encoding != "" {
43+
return errRecordTypeEncodingSet
3744
}
3845
return nil
3946
}
47+
48+
func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) {
49+
if cfg.RecordType != "" {
50+
logger.Warn("record_type is deprecated, and will be removed in a future version. Use encoding instead.")
51+
}
52+
}

receiver/awsfirehosereceiver/config_test.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
func TestLoadConfig(t *testing.T) {
2121
for _, configType := range []string{
22-
"cwmetrics", "cwlogs", "otlp_v1", "invalid",
22+
"cwmetrics", "cwlogs", "otlp_v1",
2323
} {
2424
t.Run(configType, func(t *testing.T) {
2525
fileName := configType + "_config.yaml"
@@ -34,24 +34,35 @@ func TestLoadConfig(t *testing.T) {
3434
require.NoError(t, sub.Unmarshal(cfg))
3535

3636
err = component.ValidateConfig(cfg)
37-
if configType == "invalid" {
38-
assert.Error(t, err)
39-
} else {
40-
assert.NoError(t, err)
41-
require.Equal(t, &Config{
42-
RecordType: configType,
43-
AccessKey: "some_access_key",
44-
ServerConfig: confighttp.ServerConfig{
45-
Endpoint: "0.0.0.0:4433",
46-
TLSSetting: &configtls.ServerConfig{
47-
Config: configtls.Config{
48-
CertFile: "server.crt",
49-
KeyFile: "server.key",
50-
},
37+
assert.NoError(t, err)
38+
require.Equal(t, &Config{
39+
RecordType: configType,
40+
AccessKey: "some_access_key",
41+
ServerConfig: confighttp.ServerConfig{
42+
Endpoint: "0.0.0.0:4433",
43+
TLSSetting: &configtls.ServerConfig{
44+
Config: configtls.Config{
45+
CertFile: "server.crt",
46+
KeyFile: "server.key",
5147
},
5248
},
53-
}, cfg)
54-
}
49+
},
50+
}, cfg)
5551
})
5652
}
5753
}
54+
55+
func TestLoadConfigInvalid(t *testing.T) {
56+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "invalid_config.yaml"))
57+
require.NoError(t, err)
58+
59+
factory := NewFactory()
60+
cfg := factory.CreateDefaultConfig()
61+
62+
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
63+
require.NoError(t, err)
64+
require.NoError(t, sub.Unmarshal(cfg))
65+
66+
err = component.ValidateConfig(cfg)
67+
assert.ErrorIs(t, err, errRecordTypeEncodingSet)
68+
}

receiver/awsfirehosereceiver/factory.go

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,19 @@ package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-c
55

66
import (
77
"context"
8-
"errors"
98

109
"go.opentelemetry.io/collector/component"
1110
"go.opentelemetry.io/collector/config/confighttp"
1211
"go.opentelemetry.io/collector/consumer"
13-
"go.opentelemetry.io/collector/pdata/plog"
14-
"go.opentelemetry.io/collector/pdata/pmetric"
1512
"go.opentelemetry.io/collector/receiver"
16-
"go.uber.org/zap"
1713

1814
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
19-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"
20-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
21-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
2215
)
2316

2417
const (
2518
defaultEndpoint = "localhost:4433"
2619
)
2720

28-
var (
29-
errUnrecognizedRecordType = errors.New("unrecognized record type")
30-
availableRecordTypes = map[string]bool{
31-
cwmetricstream.TypeStr: true,
32-
cwlog.TypeStr: true,
33-
otlpmetricstream.TypeStr: true,
34-
}
35-
)
36-
3721
// NewFactory creates a receiver factory for awsfirehose. Currently, only
3822
// available in metrics pipelines.
3923
func NewFactory() receiver.Factory {
@@ -44,34 +28,6 @@ func NewFactory() receiver.Factory {
4428
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
4529
}
4630

47-
// validateRecordType checks the available record types for the
48-
// passed in one and returns an error if not found.
49-
func validateRecordType(recordType string) error {
50-
if _, ok := availableRecordTypes[recordType]; !ok {
51-
return errUnrecognizedRecordType
52-
}
53-
return nil
54-
}
55-
56-
// defaultMetricsUnmarshalers creates a map of the available metrics
57-
// unmarshalers.
58-
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]pmetric.Unmarshaler {
59-
cwmsu := cwmetricstream.NewUnmarshaler(logger)
60-
otlpv1msu := otlpmetricstream.NewUnmarshaler(logger)
61-
return map[string]pmetric.Unmarshaler{
62-
cwmsu.Type(): cwmsu,
63-
otlpv1msu.Type(): otlpv1msu,
64-
}
65-
}
66-
67-
// defaultLogsUnmarshalers creates a map of the available logs unmarshalers.
68-
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]plog.Unmarshaler {
69-
u := cwlog.NewUnmarshaler(logger)
70-
return map[string]plog.Unmarshaler{
71-
u.Type(): u,
72-
}
73-
}
74-
7531
// createDefaultConfig creates a default config with the endpoint set
7632
// to port 8443 and the record type set to the CloudWatch metric stream.
7733
func createDefaultConfig() component.Config {
@@ -89,7 +45,9 @@ func createMetricsReceiver(
8945
cfg component.Config,
9046
nextConsumer consumer.Metrics,
9147
) (receiver.Metrics, error) {
92-
return newMetricsReceiver(cfg.(*Config), set, defaultMetricsUnmarshalers(set.Logger), nextConsumer)
48+
c := cfg.(*Config)
49+
handleDeprecatedConfig(c, set.Logger)
50+
return newMetricsReceiver(c, set, nextConsumer)
9351
}
9452

9553
// createMetricsReceiver implements the CreateMetricsReceiver function type.
@@ -99,5 +57,7 @@ func createLogsReceiver(
9957
cfg component.Config,
10058
nextConsumer consumer.Logs,
10159
) (receiver.Logs, error) {
102-
return newLogsReceiver(cfg.(*Config), set, defaultLogsUnmarshalers(set.Logger), nextConsumer)
60+
c := cfg.(*Config)
61+
handleDeprecatedConfig(c, set.Logger)
62+
return newLogsReceiver(c, set, nextConsumer)
10363
}

receiver/awsfirehosereceiver/factory_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"go.opentelemetry.io/collector/component/componenttest"
1212
"go.opentelemetry.io/collector/consumer/consumertest"
1313
"go.opentelemetry.io/collector/receiver/receivertest"
14-
15-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
1614
)
1715

1816
func TestValidConfig(t *testing.T) {
@@ -41,10 +39,3 @@ func TestCreateLogsReceiver(t *testing.T) {
4139
require.NoError(t, err)
4240
require.NotNil(t, r)
4341
}
44-
45-
func TestValidateRecordType(t *testing.T) {
46-
require.NoError(t, validateRecordType(defaultMetricsRecordType))
47-
require.NoError(t, validateRecordType(defaultLogsRecordType))
48-
require.NoError(t, validateRecordType(otlpmetricstream.TypeStr))
49-
require.Error(t, validateRecordType("nop"))
50-
}

receiver/awsfirehosereceiver/go.mod

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/gogo/protobuf v1.3.2
77
github.com/json-iterator/go v1.1.12
88
github.com/klauspost/compress v1.17.11
9-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.119.0
9+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.0.0
1010
github.com/stretchr/testify v1.10.0
1111
go.opentelemetry.io/collector/component v0.119.0
1212
go.opentelemetry.io/collector/component/componentstatus v0.119.0
@@ -80,8 +80,4 @@ retract (
8080
v0.65.0
8181
)
8282

83-
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
84-
8583
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
86-
87-
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

receiver/awsfirehosereceiver/go.sum

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)