Skip to content

Commit b24e5f8

Browse files
committed
add kafka connection
1 parent 6064528 commit b24e5f8

File tree

7 files changed

+100
-4
lines changed

7 files changed

+100
-4
lines changed

.env.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ MAILER_USER=
1717
MAILER_PASSWORD=
1818
MAILER_PROVIDER=nodemailer
1919

20+
KAFKA_URL=localhost:9092
21+
KAFKA_TOPIC_DAI=test-topic
22+
KAFKA_TOPIC_RAI=test-topic
23+
2024

2125
INBOX_MAILBOX_NAME=INBOX
2226
INBOX_SUCCESSBOX_NAME=INBOX.INBOX.Success

package-lock.json

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ import { initKnex } from './repositories/db';
22
import { initKysely } from './repositories/kysely';
33
import { createServer } from './server';
44
import { tryToFixResiduesWithUnknownLabel } from './services/imapService/tryToFixUnknownLabels';
5+
import { createConsumer } from './services/kafkaService';
56
import config from './utils/config';
67

78
initKnex();
89
initKysely(config.databaseUrl);
910
createServer().start();
1011
await tryToFixResiduesWithUnknownLabel();
12+
const consumer = await createConsumer();
1113

1214
process.on('SIGINT', () => {
13-
console.log('\nGracefully shutting down from SIGINT (Ctrl-C)');
14-
process.exit();
15+
console.log('\nGracefully shutting down from SIGINT...');
16+
consumer.disconnect().then(() => process.exit());
1517
});

server/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"type": "module",
66
"private": true,
77
"scripts": {
8-
"dev": "npm run migrate-latest && node --enable-source-maps --import @swc-node/register/esm-register --watch --inspect ./index.ts",
8+
"dev": "npm run migrate-latest && node --enable-source-maps --import @swc-node/register/esm-register --watch --watch-kill-signal SIGINT --inspect ./index.ts",
99
"build": "tsc",
1010
"tsc": "tsc",
1111
"start": "npm run migrate-latest && node --enable-source-maps --import @swc-node/register/esm-register ./index.ts",
@@ -51,6 +51,7 @@
5151
"http-proxy-middleware": "^3.0.5",
5252
"imapflow": "^1.1.1",
5353
"jsonwebtoken": "^9.0.0",
54+
"kafkajs": "^2.2.4",
5455
"knex": "^2.4.2",
5556
"knex-stringcase": "^1.5.5",
5657
"kysely": "^0.27.4",

server/services/ediSacha/sachaToXML.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
} from './sachaValidator';
1515

1616
const xml = z.string().brand('XML');
17-
type Xml = z.infer<typeof xml>;
17+
export type Xml = z.infer<typeof xml>;
1818

1919
export const generateXMLAcquitement = (
2020
messagesAcquittement: Acquittement['MessageAcquittement'],

server/services/kafkaService.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import {
2+
Consumer,
3+
EachMessagePayload,
4+
Kafka,
5+
Partitioners,
6+
ProducerRecord
7+
} from 'kafkajs';
8+
import config from '../utils/config';
9+
import { Xml } from './ediSacha/sachaToXML';
10+
11+
const kafka = new Kafka({
12+
clientId: 'client-id',
13+
brokers: [config.kafka.url]
14+
});
15+
16+
export const createConsumer = async (): Promise<Consumer> => {
17+
const consumer = kafka.consumer({
18+
groupId: 'consumer-group'
19+
});
20+
21+
try {
22+
await consumer.connect();
23+
await consumer.subscribe({
24+
topic: config.kafka.topicRAI
25+
});
26+
27+
await consumer.run({
28+
eachMessage: async (messagePayload: EachMessagePayload) => {
29+
const { topic, partition, message } = messagePayload;
30+
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`;
31+
console.log(`- ${prefix} ${message.key}#${message.value}`);
32+
}
33+
});
34+
} catch (error) {
35+
console.log('Error: ', error);
36+
}
37+
38+
return consumer;
39+
};
40+
41+
export const sendMessage = async (message: Xml): Promise<void> => {
42+
const producer = kafka.producer({
43+
createPartitioner: Partitioners.DefaultPartitioner
44+
});
45+
46+
const topicMessages: ProducerRecord = {
47+
topic: config.kafka.topicDAI,
48+
messages: [{ value: message }]
49+
};
50+
await producer.connect();
51+
await producer.send(topicMessages);
52+
await producer.disconnect();
53+
};

server/utils/config.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ interface Config {
9696
m2mBasicToken: string;
9797
mattermostIncomingWebhook: string | null;
9898
browserlessUrl: string;
99+
kafka: {
100+
url: string;
101+
topicDAI: string;
102+
topicRAI: string;
103+
};
99104
}
100105

101106
const config = convict<Config>({
@@ -357,6 +362,27 @@ const config = convict<Config>({
357362
sensitive: true,
358363
nullable: false,
359364
default: isProduction ? null : 'ws://localhost:3002?token=1234512345'
365+
},
366+
kafka: {
367+
url: {
368+
env: 'KAFKA_URL',
369+
format: String,
370+
sensitive: true,
371+
nullable: false,
372+
default: isProduction ? null : 'localhost:9092'
373+
},
374+
topicDAI: {
375+
env: 'KAFKA_TOPIC_DAI',
376+
format: String,
377+
nullable: false,
378+
default: isProduction ? null : 'test-topic'
379+
},
380+
topicRAI: {
381+
env: 'KAFKA_TOPIC_RAI',
382+
format: String,
383+
nullable: false,
384+
default: isProduction ? null : 'test-topic'
385+
}
360386
}
361387
})
362388
.validate({ allowed: 'strict' })

0 commit comments

Comments
 (0)