Skip to content

Commit 806087b

Browse files
ihar-tsykalaIharenvision-ci-agent
authored
fix/cleaning callback maps in nats and workers service[4626] (#4677)
* fix/responseCallbacksMap by balance subscribtion[4626] * refactor code * refactor code * feat/increase hashgraph/sdk version[4626] * refactor code * [skip ci] Add swagger.yaml * fix: hashgraph/sdk issue and refactor[4626] * fix: callback maps in workers, analitics, guardian, policy[4626] * fix: lint errors --------- Co-authored-by: Ihar <[email protected]> Co-authored-by: envision-ci-agent <[email protected]>
1 parent d801ac3 commit 806087b

File tree

7 files changed

+52
-30
lines changed

7 files changed

+52
-30
lines changed

common/src/helpers/workers.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,9 @@ export class Workers extends NatsService {
121121
* @param task
122122
* @param priority
123123
* @param userId
124+
* @param registerCallback
124125
*/
125-
public addNonRetryableTask(task: ITask, priority: number, userId?: string | null): Promise<any> {
126+
public addNonRetryableTask(task: ITask, priority: number, userId?: string | null, registerCallback: boolean = true): Promise<any> {
126127
if (!task.data.network) {
127128
task.data.network = Environment.network;
128129
}
@@ -138,7 +139,7 @@ export class Workers extends NatsService {
138139
if (!task.data.localNodeProtocol) {
139140
task.data.localNodeProtocol = Environment.localNodeProtocol;
140141
}
141-
return this.addTask(task, priority, false, 0, true, userId);
142+
return this.addTask(task, priority, false, 0, registerCallback, userId);
142143
}
143144

144145
/**
@@ -147,8 +148,9 @@ export class Workers extends NatsService {
147148
* @param priority
148149
* @param attempts
149150
* @param userId
151+
* @param registerCallback
150152
*/
151-
public addRetryableTask(task: ITask, priority: number, attempts: number = 0, userId: string = null): Promise<any> {
153+
public addRetryableTask(task: ITask, priority: number, attempts: number = 0, userId: string = null, registerCallback: boolean = true): Promise<any> {
152154
if (!task.data.network) {
153155
task.data.network = Environment.network;
154156
}
@@ -164,7 +166,7 @@ export class Workers extends NatsService {
164166
if (!task.data.localNodeProtocol) {
165167
task.data.localNodeProtocol = Environment.localNodeProtocol;
166168
}
167-
return this.addTask(task, priority, true, attempts, true, userId);
169+
return this.addTask(task, priority, true, attempts, registerCallback, userId);
168170
}
169171

170172
/**
@@ -189,6 +191,7 @@ export class Workers extends NatsService {
189191
if (this.tasksCallbacks.has(data.id)) {
190192
const activeTask = this.tasksCallbacks.get(data.id);
191193
activeTask.callback(data.data, data.error, data.isTimeoutError);
194+
this.tasksCallbacks.delete(data.id)
192195
}
193196

194197
})

common/src/mq/nats-service.ts

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import { NatsConnection, headers, Subscription } from 'nats';
2-
import { GenerateUUIDv4 } from '@guardian/interfaces';
3-
import { ZipCodec } from './zip-codec.js';
4-
import { IMessageResponse } from '../models/index.js';
1+
import {NatsConnection, headers, Subscription} from 'nats';
2+
import {GenerateUUIDv4} from '@guardian/interfaces';
3+
import {ZipCodec} from './zip-codec.js';
4+
import {IMessageResponse} from '../models/index.js';
55

66
type CallbackFunction = (body: any, error?: string, code?: number) => void;
77

88
class MessageError extends Error {
99
public code: number;
10+
1011
constructor(message: any, code?: number) {
1112
super(message);
1213
this.code = code;
@@ -107,6 +108,7 @@ export abstract class NatsService {
107108
*/
108109
public subscribe(subject: string, cb: Function): Subscription {
109110
const sub = this.connection.subscribe(subject);
111+
110112
const fn = async (_sub: Subscription) => {
111113
for await (const m of _sub) {
112114
try {
@@ -124,19 +126,24 @@ export abstract class NatsService {
124126
* Send message
125127
* @param subject
126128
* @param data
129+
* @param isResponseCallback
127130
*/
128-
public sendMessage<T>(subject: string, data?: unknown): Promise<T> {
131+
public sendMessage<T>(subject: string, data?: unknown, isResponseCallback: boolean = true): Promise<T> {
129132
const messageId = GenerateUUIDv4();
130133
return new Promise(async (resolve, reject) => {
131134
const head = headers();
132135
head.append('messageId', messageId);
133-
this.responseCallbacksMap.set(messageId, (body: T, error?: string, code?: number) => {
134-
if (error) {
135-
reject(new MessageError(error, code));
136-
} else {
137-
resolve(body);
138-
}
139-
})
136+
if (isResponseCallback) {
137+
this.responseCallbacksMap.set(messageId, (body: T, error?: string, code?: number) => {
138+
if (error) {
139+
reject(new MessageError(error, code));
140+
} else {
141+
resolve(body);
142+
}
143+
})
144+
} else {
145+
resolve(null);
146+
}
140147

141148
this.connection.publish(subject, await this.codec.encode(data), {
142149
reply: this.replySubject,
@@ -155,7 +162,9 @@ export abstract class NatsService {
155162
return Promise.race([
156163
this.sendMessage<T>(subject, data),
157164
new Promise<T>((_, reject) => {
158-
setTimeout(() => { reject(new Error('Timeout exceed')) }, timeout)
165+
setTimeout(() => {
166+
reject(new Error('Timeout exceed'))
167+
}, timeout)
159168
})
160169
])
161170
}
@@ -206,7 +215,7 @@ export abstract class NatsService {
206215
}
207216
// head.append('rawMessage', isRaw);
208217
if (!noRespond) {
209-
msg.respond(await this.codec.encode(await cb(await this.codec.decode(msg.data), msg.headers)), { headers: head });
218+
msg.respond(await this.codec.encode(await cb(await this.codec.decode(msg.data), msg.headers)), {headers: head});
210219
} else {
211220
cb(await this.codec.decode(msg.data), msg.headers);
212221
}

guardian-service/src/policy-engine/policy-engine.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ export class PolicyEngineService {
200200

201201
PolicyComponentsUtils.ExternalEventFn = async (...args: any[]) => {
202202
try {
203-
this.channel.sendMessage(ExternalMessageEvents.BLOCK_EVENTS, args);
203+
this.channel.sendMessage(ExternalMessageEvents.BLOCK_EVENTS, args, false);
204204
} catch (error) {
205205
console.error(error);
206206
}

policy-service/src/policy-engine/policy-components-utils.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export type PolicyActionMap = Map<string, Map<PolicyInputEventType, EventCallbac
108108
*/
109109
export function updateBlockEvent(blocks: string[], user: PolicyUser): void {
110110
const type = 'update';
111-
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blocks, user.toJson()] });
111+
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blocks, user.toJson()] }, false);
112112
}
113113

114114
/**
@@ -118,7 +118,7 @@ export function updateBlockEvent(blocks: string[], user: PolicyUser): void {
118118
*/
119119
export function errorBlockEvent(blockType: string, message: any, user: PolicyUser): void {
120120
const type = 'error';
121-
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blockType, message, user.toJson()] });
121+
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blockType, message, user.toJson()] }, false);
122122
}
123123

124124
/**
@@ -128,7 +128,7 @@ export function errorBlockEvent(blockType: string, message: any, user: PolicyUse
128128
*/
129129
export function infoBlockEvent(user: PolicyUser, policy: Policy): void {
130130
const type = 'update-user';
131-
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [user.toJson(), policy] });
131+
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [user.toJson(), policy] }, false);
132132
}
133133

134134
/**
@@ -138,7 +138,7 @@ export function infoBlockEvent(user: PolicyUser, policy: Policy): void {
138138
*/
139139
export function externalBlockEvent(event: ExternalEvent<any>): void {
140140
const type = 'external';
141-
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [event] });
141+
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [event] }, false);
142142
}
143143

144144
/**

worker-service/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"@filebase/client": "^0.0.5",
55
"@guardian/common": "^3.1.0",
66
"@guardian/interfaces": "^3.1.0",
7-
"@hashgraph/sdk": "2.52.0",
7+
"@hashgraph/sdk": "2.59.0",
88
"@nestjs/common": "^9.4.1",
99
"@nestjs/core": "^9.4.1",
1010
"@nestjs/microservices": "^9.4.1",

worker-service/src/api/helpers/hedera-sdk-helper.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -949,9 +949,13 @@ export class HederaSDKHelper {
949949
const signedMessage = tx.signedMessages[0];
950950
if (signedMessage) {
951951
const pubKey = PublicKey.fromStringED25519(signedMessage.publicKey);
952-
const signature = Buffer.from(signedMessage.signature.fullSig, 'hex');
952+
const signatureBuffer = Buffer.from(signedMessage.signature.fullSig, 'hex');
953+
const signature = new Uint8Array(signatureBuffer);
954+
953955
try {
954-
messageTransaction.addSignature(pubKey, signature);
956+
await messageTransaction.signWith(pubKey, async () => {
957+
return signature;
958+
});
955959
} catch (error) {
956960
throw new Error(error);
957961
}

worker-service/src/api/worker.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
import { MessageBrokerChannel, MessageResponse, NatsService, NotificationHelper, PinoLogger, SecretManager, Users } from '@guardian/common';
1+
import {
2+
MessageBrokerChannel,
3+
MessageResponse,
4+
NatsService,
5+
NotificationHelper,
6+
PinoLogger,
7+
SecretManager,
8+
Users
9+
} from '@guardian/common';
210
import { ExternalMessageEvents, GenerateUUIDv4, ISignOptions, ITask, ITaskResult, WorkerEvents, WorkerTaskType } from '@guardian/interfaces';
311
import { HederaSDKHelper, NetworkOptions } from './helpers/hedera-sdk-helper.js';
412
import { IpfsClientClass } from './ipfs-client-class.js';
@@ -138,7 +146,6 @@ export class Worker extends NatsService {
138146
public async init(): Promise<void> {
139147
await super.init();
140148
this.channel = new MessageBrokerChannel(this.connection, 'worker');
141-
142149
try {
143150
await this.ipfsClient.createClient()
144151
} catch (e) {
@@ -226,7 +233,7 @@ export class Worker extends NatsService {
226233
balance,
227234
unit: 'Hbar',
228235
operatorAccountId
229-
});
236+
}, false);
230237
} catch (error) {
231238
throw new Error(`Worker (${['api-gateway', 'update-user-balance'].join('.')}) send: ` + error);
232239
}
@@ -542,7 +549,6 @@ export class Worker extends NatsService {
542549
operatorKey,
543550
adminKey,
544551
} = task.data;
545-
546552
client = new HederaSDKHelper(operatorId, operatorKey, null, networkOptions);
547553
result.data = await client.deleteToken(
548554
TokenId.fromString(tokenId),

0 commit comments

Comments
 (0)