Skip to content

Commit 5c58e8e

Browse files
elsmrdana-gill
andauthored
fix(Kafka Node): Upgrade kafkajs and add tests (#14326)
Co-authored-by: Dana Lee <[email protected]>
1 parent db38149 commit 5c58e8e

File tree

7 files changed

+723
-72
lines changed

7 files changed

+723
-72
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
2+
import { mock } from 'jest-mock-extended';
3+
import type { Producer } from 'kafkajs';
4+
import { Kafka as apacheKafka } from 'kafkajs';
5+
import path from 'path';
6+
7+
import { getWorkflowFilenames, testWorkflows } from '@test/nodes/Helpers';
8+
9+
jest.mock('kafkajs');
10+
jest.mock('@kafkajs/confluent-schema-registry');
11+
12+
describe('Kafka Node', () => {
13+
let mockProducer: jest.Mocked<Producer>;
14+
let mockKafka: jest.Mocked<apacheKafka>;
15+
let mockRegistry: jest.Mocked<SchemaRegistry>;
16+
let mockProducerConnect: jest.Mock;
17+
let mockProducerSend: jest.Mock;
18+
let mockProducerDisconnect: jest.Mock;
19+
let mockRegistryEncode: jest.Mock;
20+
21+
beforeAll(() => {
22+
mockProducerConnect = jest.fn();
23+
mockProducerSend = jest.fn().mockImplementation(async () => []);
24+
mockProducerDisconnect = jest.fn();
25+
26+
mockProducer = mock<Producer>({
27+
connect: mockProducerConnect,
28+
send: mockProducerSend,
29+
sendBatch: mockProducerSend,
30+
disconnect: mockProducerDisconnect,
31+
});
32+
33+
mockKafka = mock<apacheKafka>({
34+
producer: jest.fn().mockReturnValue(mockProducer),
35+
});
36+
37+
mockRegistryEncode = jest.fn((_id, input) => Buffer.from(JSON.stringify(input)));
38+
mockRegistry = mock<SchemaRegistry>({
39+
encode: mockRegistryEncode,
40+
});
41+
42+
(apacheKafka as jest.Mock).mockReturnValue(mockKafka);
43+
(SchemaRegistry as jest.Mock).mockReturnValue(mockRegistry);
44+
});
45+
46+
const workflows = getWorkflowFilenames(path.join(__dirname, 'test'));
47+
testWorkflows(workflows);
48+
49+
test('should publish the correct kafka messages', async () => {
50+
expect(mockProducerSend).toHaveBeenCalledTimes(2);
51+
expect(mockProducerSend).toHaveBeenCalledWith({
52+
acks: 1,
53+
compression: 1,
54+
timeout: 1000,
55+
topicMessages: [
56+
{
57+
messages: [
58+
{
59+
headers: { header: 'value' },
60+
key: 'messageKey',
61+
value: '{"name":"First item","code":1}',
62+
},
63+
],
64+
topic: 'test-topic',
65+
},
66+
{
67+
messages: [
68+
{
69+
headers: { header: 'value' },
70+
key: 'messageKey',
71+
value: '{"name":"Second item","code":2}',
72+
},
73+
],
74+
topic: 'test-topic',
75+
},
76+
],
77+
});
78+
expect(mockProducerSend).toHaveBeenCalledWith({
79+
acks: 0,
80+
compression: 0,
81+
topicMessages: [
82+
{
83+
messages: [
84+
{
85+
headers: { headerKey: 'headerValue' },
86+
key: null,
87+
value: Buffer.from(JSON.stringify({ foo: 'bar' })),
88+
},
89+
],
90+
topic: 'test-topic',
91+
},
92+
{
93+
messages: [
94+
{
95+
headers: { headerKey: 'headerValue' },
96+
key: null,
97+
value: Buffer.from(JSON.stringify({ foo: 'bar' })),
98+
},
99+
],
100+
topic: 'test-topic',
101+
},
102+
],
103+
});
104+
});
105+
});

0 commit comments

Comments
 (0)