Skip to content

Commit 79bb7f4

Browse files
axwdragonlord93
authored andcommitted
receiver/kafkareceiver: fix hyphenated text encodings (open-telemetry#39806)
#### Description Fix support for text encodings with hyphens in their names. If the encoding name has a hyphen then it is an invalid extension ID, but we should not return an error due to this if it's a built-in encoding. #### Link to tracking issue Fixes open-telemetry#39793 #### Testing Added a new unit test covering hyphenated text encoding names (fails without the associated fix). #### Documentation N/A
1 parent d8388cc commit 79bb7f4

File tree

3 files changed

+48
-5
lines changed

3 files changed

+48
-5
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix support for built-in text encodings with hyphens in the encoding name
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39793]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/encoding.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ import (
2020
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/unmarshaler"
2121
)
2222

23-
var errUnknownEncodingExtension = errors.New("unknown encoding extension")
23+
var (
24+
errUnknownEncodingExtension = errors.New("unknown encoding extension")
25+
errInvalidComponentType = errors.New("invalid component type")
26+
)
2427

2528
func newTracesUnmarshaler(encoding string, _ receiver.Settings, host component.Host) (ptrace.Unmarshaler, error) {
2629
// Extensions take precedence.
2730
if unmarshaler, err := loadEncodingExtension[ptrace.Unmarshaler](host, encoding, "traces"); err != nil {
28-
if !errors.Is(err, errUnknownEncodingExtension) {
31+
if !errors.Is(err, errInvalidComponentType) && !errors.Is(err, errUnknownEncodingExtension) {
2932
return nil, err
3033
}
3134
} else {
@@ -53,7 +56,7 @@ func newTracesUnmarshaler(encoding string, _ receiver.Settings, host component.H
5356
func newLogsUnmarshaler(encoding string, set receiver.Settings, host component.Host) (plog.Unmarshaler, error) {
5457
// Extensions take precedence.
5558
if unmarshaler, err := loadEncodingExtension[plog.Unmarshaler](host, encoding, "logs"); err != nil {
56-
if !errors.Is(err, errUnknownEncodingExtension) {
59+
if !errors.Is(err, errInvalidComponentType) && !errors.Is(err, errUnknownEncodingExtension) {
5760
return nil, err
5861
}
5962
} else {
@@ -91,7 +94,7 @@ func newLogsUnmarshaler(encoding string, set receiver.Settings, host component.H
9194
func newMetricsUnmarshaler(encoding string, _ receiver.Settings, host component.Host) (pmetric.Unmarshaler, error) {
9295
// Extensions take precedence.
9396
if unmarshaler, err := loadEncodingExtension[pmetric.Unmarshaler](host, encoding, "metrics"); err != nil {
94-
if !errors.Is(err, errUnknownEncodingExtension) {
97+
if !errors.Is(err, errInvalidComponentType) && !errors.Is(err, errUnknownEncodingExtension) {
9598
return nil, err
9699
}
97100
} else {
@@ -128,7 +131,7 @@ func loadEncodingExtension[T any](host component.Host, encoding, signalType stri
128131
func encodingToComponentID(encoding string) (*component.ID, error) {
129132
componentType, err := component.NewType(encoding)
130133
if err != nil {
131-
return nil, fmt.Errorf("invalid component type: %w", err)
134+
return nil, fmt.Errorf("%w: %w", errInvalidComponentType, err)
132135
}
133136
id := component.NewID(componentType)
134137
return &id, nil

receiver/kafkareceiver/encoding_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,19 @@ func TestNewLogsUnmarshaler(t *testing.T) {
131131
assert.NoError(t, plogtest.CompareLogs(logs, actual, plogtest.IgnoreObservedTimestamp()))
132132
},
133133
},
134+
{
135+
encoding: "text_utf-16", // hyphen in the name
136+
input: func() []byte {
137+
out, _ := unicode.UTF16(
138+
unicode.LittleEndian,
139+
unicode.IgnoreBOM,
140+
).NewEncoder().Bytes([]byte("hello world"))
141+
return out
142+
}(),
143+
check: func(t *testing.T, actual plog.Logs) {
144+
assert.NoError(t, plogtest.CompareLogs(logs, actual, plogtest.IgnoreObservedTimestamp()))
145+
},
146+
},
134147
} {
135148
t.Run(tc.encoding, func(t *testing.T) {
136149
u := mustNewLogsUnmarshaler(t, tc.encoding, componenttest.NewNopHost())

0 commit comments

Comments
 (0)