Skip to content

Commit 558a637

Browse files
ihar-tsykalaIharenvision-ci-agent
authored andcommitted
fix/cleaning callback maps in nats and workers service[4626] (#4677)
* fix/responseCallbacksMap by balance subscribtion[4626] * refactor code * refactor code * feat/increase hashgraph/sdk version[4626] * refactor code * [skip ci] Add swagger.yaml * fix: hashgraph/sdk issue and refactor[4626] * fix: callback maps in workers, analitics, guardian, policy[4626] * fix: lint errors --------- Co-authored-by: Ihar <[email protected]> Co-authored-by: envision-ci-agent <[email protected]>
1 parent fc217ad commit 558a637

File tree

7 files changed

+286
-268
lines changed

7 files changed

+286
-268
lines changed

common/src/helpers/workers.ts

Lines changed: 100 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { ExternalMessageEvents, GenerateUUIDv4, HederaResponseCode, IActiveTask, ITask, QueueEvents, TimeoutError, WorkerEvents } from '@guardian/interfaces';
21
import { Singleton } from '../decorators/singleton.js';
2+
import { GenerateUUIDv4, HederaResponseCode, IActiveTask, ITask, QueueEvents, TimeoutError, WorkerEvents, } from '@guardian/interfaces';
33
import { Environment } from '../hedera-modules/index.js';
44
import { NatsService } from '../mq/index.js';
55

@@ -70,36 +70,29 @@ export const NON_RETRYABLE_HEDERA_ERRORS = [
7070
*/
7171
@Singleton
7272
export class Workers extends NatsService {
73-
/**
74-
* Check error message for retryable
75-
* @param error Error
76-
* @returns Is not retryable
77-
*/
78-
public static isNotRetryableError(error: any) {
79-
return typeof error === 'string'
80-
&& NON_RETRYABLE_HEDERA_ERRORS.some(code => error.indexOf(code) !== -1);
81-
}
82-
8373
/**
8474
* Tasks sended to work
8575
* @private
8676
*/
8777
private readonly tasksCallbacks: Map<string, IActiveTask> = new Map();
88-
/**
89-
* Queue
90-
* @private
91-
*/
92-
private readonly queue: Set<ITask> = new Set();
78+
9379
/**
9480
* Message queue name
9581
*/
9682
public messageQueueName = 'workers-service-' + GenerateUUIDv4();
83+
9784
/**
9885
* Reply subject
9986
* @private
10087
*/
10188
public replySubject = this.messageQueueName + `-reply-${GenerateUUIDv4()}`;
10289

90+
/**
91+
* Queue
92+
* @private
93+
*/
94+
private readonly queue: Set<ITask> = new Set();
95+
10396
/**
10497
* Max Repetitions
10598
* @private
@@ -113,6 +106,97 @@ export class Workers extends NatsService {
113106
return error;
114107
}
115108

109+
/**
110+
* Check error message for retryable
111+
* @param error Error
112+
* @returns Is not retryable
113+
*/
114+
public static isNotRetryableError(error: any) {
115+
return typeof error === 'string'
116+
&& NON_RETRYABLE_HEDERA_ERRORS.some(code => error.indexOf(code) !== -1);
117+
}
118+
119+
/**
120+
* Add non retryable task
121+
* @param task
122+
* @param priority
123+
* @param userId
124+
* @param registerCallback
125+
*/
126+
public addNonRetryableTask(task: ITask, priority: number, userId?: string | null, registerCallback: boolean = true): Promise<any> {
127+
if (!task.data.network) {
128+
task.data.network = Environment.network;
129+
}
130+
if (!task.data.nodes) {
131+
task.data.nodes = Environment.nodes;
132+
}
133+
if (!task.data.mirrorNodes) {
134+
task.data.mirrorNodes = Environment.mirrorNodes;
135+
}
136+
if (!task.data.localNodeAddress) {
137+
task.data.localNodeAddress = Environment.localNodeAddress;
138+
}
139+
if (!task.data.localNodeProtocol) {
140+
task.data.localNodeProtocol = Environment.localNodeProtocol;
141+
}
142+
return this.addTask(task, priority, false, 0, registerCallback, userId);
143+
}
144+
145+
/**
146+
* Add retryable task
147+
* @param task
148+
* @param priority
149+
* @param attempts
150+
* @param userId
151+
* @param registerCallback
152+
*/
153+
public addRetryableTask(task: ITask, priority: number, attempts: number = 0, userId: string = null, registerCallback: boolean = true): Promise<any> {
154+
if (!task.data.network) {
155+
task.data.network = Environment.network;
156+
}
157+
if (!task.data.nodes) {
158+
task.data.nodes = Environment.nodes;
159+
}
160+
if (!task.data.mirrorNodes) {
161+
task.data.mirrorNodes = Environment.mirrorNodes;
162+
}
163+
if (!task.data.localNodeAddress) {
164+
task.data.localNodeAddress = Environment.localNodeAddress;
165+
}
166+
if (!task.data.localNodeProtocol) {
167+
task.data.localNodeProtocol = Environment.localNodeProtocol;
168+
}
169+
return this.addTask(task, priority, true, attempts, registerCallback, userId);
170+
}
171+
172+
/**
173+
* Init listeners
174+
*/
175+
public initListeners() {
176+
this.subscribe(WorkerEvents.WORKER_READY, async () => {
177+
await this.searchAndUpdateTasks();
178+
});
179+
180+
setInterval(async () => {
181+
await this.searchAndUpdateTasks();
182+
}, 1000);
183+
184+
this.subscribe(QueueEvents.TASK_COMPLETE, async (data: any) => {
185+
if (!data.id) {
186+
throw new Error('Message without id');
187+
}
188+
if (data.error) {
189+
console.error(data);
190+
}
191+
if (this.tasksCallbacks.has(data.id)) {
192+
const activeTask = this.tasksCallbacks.get(data.id);
193+
activeTask.callback(data.data, data.error, data.isTimeoutError);
194+
this.tasksCallbacks.delete(data.id)
195+
}
196+
197+
})
198+
}
199+
116200
/**
117201
* Get free workers
118202
* @private
@@ -213,97 +297,6 @@ export class Workers extends NatsService {
213297
})
214298
}
215299

216-
/**
217-
* Add non retryable task
218-
* @param task
219-
* @param priority
220-
* @param userId
221-
*/
222-
public addNonRetryableTask(task: ITask, priority: number, userId?: string | null): Promise<any> {
223-
if (!task.data.network) {
224-
task.data.network = Environment.network;
225-
}
226-
if (!task.data.nodes) {
227-
task.data.nodes = Environment.nodes;
228-
}
229-
if (!task.data.mirrorNodes) {
230-
task.data.mirrorNodes = Environment.mirrorNodes;
231-
}
232-
if (!task.data.localNodeAddress) {
233-
task.data.localNodeAddress = Environment.localNodeAddress;
234-
}
235-
if (!task.data.localNodeProtocol) {
236-
task.data.localNodeProtocol = Environment.localNodeProtocol;
237-
}
238-
return this.addTask(task, priority, false, 0, true, userId);
239-
}
240-
241-
/**
242-
* Add retryable task
243-
* @param task
244-
* @param priority
245-
* @param attempts
246-
* @param userId
247-
*/
248-
public addRetryableTask(task: ITask, priority: number, attempts: number = 0, userId: string = null): Promise<any> {
249-
if (!task.data.network) {
250-
task.data.network = Environment.network;
251-
}
252-
if (!task.data.nodes) {
253-
task.data.nodes = Environment.nodes;
254-
}
255-
if (!task.data.mirrorNodes) {
256-
task.data.mirrorNodes = Environment.mirrorNodes;
257-
}
258-
if (!task.data.localNodeAddress) {
259-
task.data.localNodeAddress = Environment.localNodeAddress;
260-
}
261-
if (!task.data.localNodeProtocol) {
262-
task.data.localNodeProtocol = Environment.localNodeProtocol;
263-
}
264-
return this.addTask(task, priority, true, attempts, true, userId);
265-
}
266-
267-
/**
268-
* Init listeners
269-
*/
270-
public initListeners() {
271-
this.subscribe(WorkerEvents.WORKER_READY, async () => {
272-
await this.searchAndUpdateTasks();
273-
});
274-
275-
setInterval(async () => {
276-
await this.searchAndUpdateTasks();
277-
}, 1000);
278-
279-
this.subscribe(QueueEvents.TASK_COMPLETE, async (data: any) => {
280-
if (!data.id) {
281-
throw new Error('Message without id');
282-
}
283-
if (data.error) {
284-
console.error(data);
285-
}
286-
if (this.tasksCallbacks.has(data.id)) {
287-
const activeTask = this.tasksCallbacks.get(data.id);
288-
activeTask.callback(data.data, data.error, data.isTimeoutError);
289-
}
290-
291-
})
292-
}
293-
294-
/**
295-
* External mint event
296-
* @param data
297-
* @return {any}
298-
*/
299-
public async sendExternalMintEvent(data: any): Promise<void> {
300-
try {
301-
await this.sendMessage<any>(ExternalMessageEvents.TOKEN_MINT_COMPLETE, data);
302-
} catch (e) {
303-
console.error(e.message)
304-
}
305-
}
306-
307300
/**
308301
* Update worker settings
309302
*/

common/src/mq/nats-service.ts

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import { NatsConnection, headers, Subscription } from 'nats';
2-
import { GenerateUUIDv4 } from '@guardian/interfaces';
3-
import { ZipCodec } from './zip-codec.js';
4-
import { IMessageResponse } from '../models/index.js';
1+
import {NatsConnection, headers, Subscription} from 'nats';
2+
import {GenerateUUIDv4} from '@guardian/interfaces';
3+
import {ZipCodec} from './zip-codec.js';
4+
import {IMessageResponse} from '../models/index.js';
55

66
type CallbackFunction = (body: any, error?: string, code?: number) => void;
77

88
class MessageError extends Error {
99
public code: number;
10+
1011
constructor(message: any, code?: number) {
1112
super(message);
1213
this.code = code;
@@ -107,6 +108,7 @@ export abstract class NatsService {
107108
*/
108109
public subscribe(subject: string, cb: Function): Subscription {
109110
const sub = this.connection.subscribe(subject);
111+
110112
const fn = async (_sub: Subscription) => {
111113
for await (const m of _sub) {
112114
try {
@@ -124,19 +126,24 @@ export abstract class NatsService {
124126
* Send message
125127
* @param subject
126128
* @param data
129+
* @param isResponseCallback
127130
*/
128-
public sendMessage<T>(subject: string, data?: unknown): Promise<T> {
131+
public sendMessage<T>(subject: string, data?: unknown, isResponseCallback: boolean = true): Promise<T> {
129132
const messageId = GenerateUUIDv4();
130133
return new Promise(async (resolve, reject) => {
131134
const head = headers();
132135
head.append('messageId', messageId);
133-
this.responseCallbacksMap.set(messageId, (body: T, error?: string, code?: number) => {
134-
if (error) {
135-
reject(new MessageError(error, code));
136-
} else {
137-
resolve(body);
138-
}
139-
})
136+
if (isResponseCallback) {
137+
this.responseCallbacksMap.set(messageId, (body: T, error?: string, code?: number) => {
138+
if (error) {
139+
reject(new MessageError(error, code));
140+
} else {
141+
resolve(body);
142+
}
143+
})
144+
} else {
145+
resolve(null);
146+
}
140147

141148
this.connection.publish(subject, await this.codec.encode(data), {
142149
reply: this.replySubject,
@@ -155,7 +162,9 @@ export abstract class NatsService {
155162
return Promise.race([
156163
this.sendMessage<T>(subject, data),
157164
new Promise<T>((_, reject) => {
158-
setTimeout(() => { reject(new Error('Timeout exceed')) }, timeout)
165+
setTimeout(() => {
166+
reject(new Error('Timeout exceed'))
167+
}, timeout)
159168
})
160169
])
161170
}
@@ -206,7 +215,7 @@ export abstract class NatsService {
206215
}
207216
// head.append('rawMessage', isRaw);
208217
if (!noRespond) {
209-
msg.respond(await this.codec.encode(await cb(await this.codec.decode(msg.data), msg.headers)), { headers: head });
218+
msg.respond(await this.codec.encode(await cb(await this.codec.decode(msg.data), msg.headers)), {headers: head});
210219
} else {
211220
cb(await this.codec.decode(msg.data), msg.headers);
212221
}

guardian-service/src/policy-engine/policy-engine.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ export class PolicyEngineService {
200200

201201
PolicyComponentsUtils.ExternalEventFn = async (...args: any[]) => {
202202
try {
203-
this.channel.sendMessage(ExternalMessageEvents.BLOCK_EVENTS, args);
203+
this.channel.sendMessage(ExternalMessageEvents.BLOCK_EVENTS, args, false);
204204
} catch (error) {
205205
console.error(error);
206206
}

policy-service/src/policy-engine/policy-components-utils.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export type PolicyActionMap = Map<string, Map<PolicyInputEventType, EventCallbac
108108
*/
109109
export function updateBlockEvent(blocks: string[], user: PolicyUser): void {
110110
const type = 'update';
111-
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blocks, user.toJson()] });
111+
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blocks, user.toJson()] }, false);
112112
}
113113

114114
/**
@@ -118,7 +118,7 @@ export function updateBlockEvent(blocks: string[], user: PolicyUser): void {
118118
*/
119119
export function errorBlockEvent(blockType: string, message: any, user: PolicyUser): void {
120120
const type = 'error';
121-
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blockType, message, user.toJson()] });
121+
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blockType, message, user.toJson()] }, false);
122122
}
123123

124124
/**
@@ -128,7 +128,7 @@ export function errorBlockEvent(blockType: string, message: any, user: PolicyUse
128128
*/
129129
export function infoBlockEvent(user: PolicyUser, policy: Policy): void {
130130
const type = 'update-user';
131-
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [user.toJson(), policy] });
131+
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [user.toJson(), policy] }, false);
132132
}
133133

134134
/**
@@ -138,7 +138,7 @@ export function infoBlockEvent(user: PolicyUser, policy: Policy): void {
138138
*/
139139
export function externalBlockEvent(event: ExternalEvent<any>): void {
140140
const type = 'external';
141-
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [event] });
141+
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [event] }, false);
142142
}
143143

144144
/**

worker-service/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"@filebase/client": "^0.0.5",
55
"@guardian/common": "^3.1.0",
66
"@guardian/interfaces": "^3.1.0",
7-
"@hashgraph/sdk": "2.52.0",
7+
"@hashgraph/sdk": "2.59.0",
88
"@nestjs/common": "^9.4.1",
99
"@nestjs/core": "^9.4.1",
1010
"@nestjs/microservices": "^9.4.1",

worker-service/src/api/helpers/hedera-sdk-helper.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -949,9 +949,13 @@ export class HederaSDKHelper {
949949
const signedMessage = tx.signedMessages[0];
950950
if (signedMessage) {
951951
const pubKey = PublicKey.fromStringED25519(signedMessage.publicKey);
952-
const signature = Buffer.from(signedMessage.signature.fullSig, 'hex');
952+
const signatureBuffer = Buffer.from(signedMessage.signature.fullSig, 'hex');
953+
const signature = new Uint8Array(signatureBuffer);
954+
953955
try {
954-
messageTransaction.addSignature(pubKey, signature);
956+
await messageTransaction.signWith(pubKey, async () => {
957+
return signature;
958+
});
955959
} catch (error) {
956960
throw new Error(error);
957961
}

0 commit comments

Comments
 (0)