Skip to content

Commit 23fccf7

Browse files
author
Ihar
committed
refactor code
1 parent 16e1406 commit 23fccf7

File tree

3 files changed

+48
-50
lines changed

3 files changed

+48
-50
lines changed

common/src/mq/nats-service.ts

Lines changed: 34 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { NatsConnection, headers, Subscription } from 'nats';
2-
import { GenerateUUIDv4 } from '@guardian/interfaces';
3-
import { ZipCodec } from './zip-codec.js';
4-
import { IMessageResponse } from '../models/index.js';
1+
import {NatsConnection, headers, Subscription} from 'nats';
2+
import {GenerateUUIDv4} from '@guardian/interfaces';
3+
import {ZipCodec} from './zip-codec.js';
4+
import {IMessageResponse} from '../models/index.js';
55

66
type CallbackFunction = (body: any, error?: string, code?: number) => void;
77

@@ -56,28 +56,25 @@ export abstract class NatsService {
5656
throw new Error('Connection must set first');
5757
}
5858
this.connection.subscribe(this.replySubject, {
59-
callback: this.callbackSubscribe
60-
});
61-
}
62-
63-
public callbackSubscribe = async (error, msg) => {
64-
if (!error) {
65-
const messageId = msg.headers.get('messageId');
66-
const fn = this.responseCallbacksMap.get(messageId);
67-
if (fn) {
68-
const message = (await this.codec.decode(msg.data)) as IMessageResponse<any>;
69-
if (!message) {
70-
fn(null);
59+
callback: async (error, msg) => {
60+
if (!error) {
61+
const messageId = msg.headers.get('messageId');
62+
const fn = this.responseCallbacksMap.get(messageId);
63+
if (fn) {
64+
const message = (await this.codec.decode(msg.data)) as IMessageResponse<any>;
65+
if (!message) {
66+
fn(null)
67+
} else {
68+
fn(message.body, message.error, message.code);
69+
}
70+
this.responseCallbacksMap.delete(messageId)
71+
}
7172
} else {
72-
fn(message.body, message.error, message.code);
73+
console.error(error);
7374
}
74-
this.responseCallbacksMap.delete(messageId);
7575
}
76-
} else {
77-
console.error(error);
78-
}
79-
};
80-
76+
});
77+
}
8178

8279
/**
8380
* Set connection
@@ -129,19 +126,25 @@ export abstract class NatsService {
129126
* Send message
130127
* @param subject
131128
* @param data
129+
* @param isResponseCallback
132130
*/
133-
public sendMessage<T>(subject: string, data?: unknown): Promise<T> {
131+
public sendMessage<T>(subject: string, data?: unknown, isResponseCallback: boolean = true): Promise<T> {
132+
console.log('isResponseCallback', isResponseCallback)
134133
const messageId = GenerateUUIDv4();
135134
return new Promise(async (resolve, reject) => {
136135
const head = headers();
137136
head.append('messageId', messageId);
138-
this.responseCallbacksMap.set(messageId, (body: T, error?: string, code?: number) => {
139-
if (error) {
140-
reject(new MessageError(error, code));
141-
} else {
142-
resolve(body);
143-
}
144-
})
137+
if (isResponseCallback) {
138+
this.responseCallbacksMap.set(messageId, (body: T, error?: string, code?: number) => {
139+
if (error) {
140+
reject(new MessageError(error, code));
141+
} else {
142+
resolve(body);
143+
}
144+
})
145+
} else {
146+
resolve(null);
147+
}
145148

146149
this.connection.publish(subject, await this.codec.encode(data), {
147150
reply: this.replySubject,
@@ -194,17 +197,6 @@ export abstract class NatsService {
194197
});
195198
}
196199

197-
getMapSizeInMB(map: Map<any, any>): number {
198-
try {
199-
const jsonString = JSON.stringify([...map]);
200-
const bytes = new TextEncoder().encode(jsonString).length;
201-
return bytes / (1024 * 1024);
202-
} catch (error) {
203-
console.error("Error calculating Map size:", error);
204-
return 0;
205-
}
206-
}
207-
208200
/**
209201
* Get messages
210202
* @param subject
@@ -229,8 +221,6 @@ export abstract class NatsService {
229221
cb(await this.codec.decode(msg.data), msg.headers);
230222
}
231223
console.log(`📊 Active subscriptions: ${this.responseCallbacksMap.size}`);
232-
233-
console.log(`💾 Map size: ${this.getMapSizeInMB(this.responseCallbacksMap).toFixed(4)} MB`);
234224
} catch (error) {
235225
console.error(error);
236226
}

worker-service/src/api/worker.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,6 @@ export class Worker extends NatsService {
152152
public async init(): Promise<void> {
153153
await super.init();
154154
this.channel = new MessageBrokerChannel(this.connection, 'worker');
155-
156-
this.connection.subscribe('update-user-balance', {
157-
callback: (error, msg) => this.callbackSubscribe(error, msg)
158-
});
159-
160155
try {
161156
await this.ipfsClient.createClient()
162157
} catch (e) {
@@ -244,7 +239,7 @@ export class Worker extends NatsService {
244239
balance,
245240
unit: 'Hbar',
246241
operatorAccountId
247-
});
242+
}, false);
248243
} catch (error) {
249244
throw new Error(`Worker (${['api-gateway', 'update-user-balance'].join('.')}) send: ` + error);
250245
}

worker-service/src/app.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,19 @@ Promise.all([
3636

3737
const logger: PinoLogger = pinoLoggerInitialization(loggerMongo);
3838

39+
setInterval(() => {
40+
const memoryUsage = process.memoryUsage();
41+
const stack = (memoryUsage.rss - memoryUsage.heapTotal - memoryUsage.external) / 1024 / 1024;
42+
console.log({
43+
service: 'worker',
44+
rss: `${(memoryUsage.rss / 1024 / 1024).toFixed(2)} MB`,
45+
heapTotal: `${(memoryUsage.heapTotal / 1024 / 1024).toFixed(2)} MB`,
46+
heapUsed: `${(memoryUsage.heapUsed / 1024 / 1024).toFixed(2)} MB`,
47+
external: `${(memoryUsage.external / 1024 / 1024).toFixed(2)} MB`,
48+
stack: `${stack.toFixed(2)} MB`
49+
});
50+
}, 10000);
51+
3952
await new Users().setConnection(cn).init();
4053
const state = new ApplicationState();
4154
await state.setServiceName('WORKER').setConnection(cn).init();

0 commit comments

Comments
 (0)