Skip to content
Merged
11 changes: 7 additions & 4 deletions common/src/helpers/workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ export class Workers extends NatsService {
* @param task
* @param priority
* @param userId
* @param registerCallback
*/
public addNonRetryableTask(task: ITask, priority: number, userId?: string | null): Promise<any> {
public addNonRetryableTask(task: ITask, priority: number, userId?: string | null, registerCallback: boolean = true): Promise<any> {
if (!task.data.network) {
task.data.network = Environment.network;
}
Expand All @@ -138,7 +139,7 @@ export class Workers extends NatsService {
if (!task.data.localNodeProtocol) {
task.data.localNodeProtocol = Environment.localNodeProtocol;
}
return this.addTask(task, priority, false, 0, true, userId);
return this.addTask(task, priority, false, 0, registerCallback, userId);
}

/**
Expand All @@ -147,8 +148,9 @@ export class Workers extends NatsService {
* @param priority
* @param attempts
* @param userId
* @param registerCallback
*/
public addRetryableTask(task: ITask, priority: number, attempts: number = 0, userId: string = null): Promise<any> {
public addRetryableTask(task: ITask, priority: number, attempts: number = 0, userId: string = null, registerCallback: boolean = true): Promise<any> {
if (!task.data.network) {
task.data.network = Environment.network;
}
Expand All @@ -164,7 +166,7 @@ export class Workers extends NatsService {
if (!task.data.localNodeProtocol) {
task.data.localNodeProtocol = Environment.localNodeProtocol;
}
return this.addTask(task, priority, true, attempts, true, userId);
return this.addTask(task, priority, true, attempts, registerCallback, userId);
}

/**
Expand All @@ -189,6 +191,7 @@ export class Workers extends NatsService {
if (this.tasksCallbacks.has(data.id)) {
const activeTask = this.tasksCallbacks.get(data.id);
activeTask.callback(data.data, data.error, data.isTimeoutError);
this.tasksCallbacks.delete(data.id)
}

})
Expand Down
37 changes: 23 additions & 14 deletions common/src/mq/nats-service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { NatsConnection, headers, Subscription } from 'nats';
import { GenerateUUIDv4 } from '@guardian/interfaces';
import { ZipCodec } from './zip-codec.js';
import { IMessageResponse } from '../models/index.js';
import {NatsConnection, headers, Subscription} from 'nats';
import {GenerateUUIDv4} from '@guardian/interfaces';
import {ZipCodec} from './zip-codec.js';
import {IMessageResponse} from '../models/index.js';

type CallbackFunction = (body: any, error?: string, code?: number) => void;

class MessageError extends Error {
public code: number;

constructor(message: any, code?: number) {
super(message);
this.code = code;
Expand Down Expand Up @@ -107,6 +108,7 @@ export abstract class NatsService {
*/
public subscribe(subject: string, cb: Function): Subscription {
const sub = this.connection.subscribe(subject);

const fn = async (_sub: Subscription) => {
for await (const m of _sub) {
try {
Expand All @@ -124,19 +126,24 @@ export abstract class NatsService {
* Send message
* @param subject
* @param data
* @param isResponseCallback
*/
public sendMessage<T>(subject: string, data?: unknown): Promise<T> {
public sendMessage<T>(subject: string, data?: unknown, isResponseCallback: boolean = true): Promise<T> {
const messageId = GenerateUUIDv4();
return new Promise(async (resolve, reject) => {
const head = headers();
head.append('messageId', messageId);
this.responseCallbacksMap.set(messageId, (body: T, error?: string, code?: number) => {
if (error) {
reject(new MessageError(error, code));
} else {
resolve(body);
}
})
if (isResponseCallback) {
this.responseCallbacksMap.set(messageId, (body: T, error?: string, code?: number) => {
if (error) {
reject(new MessageError(error, code));
} else {
resolve(body);
}
})
} else {
resolve(null);
}

this.connection.publish(subject, await this.codec.encode(data), {
reply: this.replySubject,
Expand All @@ -155,7 +162,9 @@ export abstract class NatsService {
return Promise.race([
this.sendMessage<T>(subject, data),
new Promise<T>((_, reject) => {
setTimeout(() => { reject(new Error('Timeout exceed')) }, timeout)
setTimeout(() => {
reject(new Error('Timeout exceed'))
}, timeout)
})
])
}
Expand Down Expand Up @@ -206,7 +215,7 @@ export abstract class NatsService {
}
// head.append('rawMessage', isRaw);
if (!noRespond) {
msg.respond(await this.codec.encode(await cb(await this.codec.decode(msg.data), msg.headers)), { headers: head });
msg.respond(await this.codec.encode(await cb(await this.codec.decode(msg.data), msg.headers)), {headers: head});
} else {
cb(await this.codec.decode(msg.data), msg.headers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ export class PolicyEngineService {

PolicyComponentsUtils.ExternalEventFn = async (...args: any[]) => {
try {
this.channel.sendMessage(ExternalMessageEvents.BLOCK_EVENTS, args);
this.channel.sendMessage(ExternalMessageEvents.BLOCK_EVENTS, args, false);
} catch (error) {
console.error(error);
}
Expand Down
8 changes: 4 additions & 4 deletions policy-service/src/policy-engine/policy-components-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export type PolicyActionMap = Map<string, Map<PolicyInputEventType, EventCallbac
*/
export function updateBlockEvent(blocks: string[], user: PolicyUser): void {
const type = 'update';
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blocks, user.toJson()] });
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blocks, user.toJson()] }, false);
}

/**
Expand All @@ -118,7 +118,7 @@ export function updateBlockEvent(blocks: string[], user: PolicyUser): void {
*/
export function errorBlockEvent(blockType: string, message: any, user: PolicyUser): void {
const type = 'error';
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blockType, message, user.toJson()] });
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [blockType, message, user.toJson()] }, false);
}

/**
Expand All @@ -128,7 +128,7 @@ export function errorBlockEvent(blockType: string, message: any, user: PolicyUse
*/
export function infoBlockEvent(user: PolicyUser, policy: Policy): void {
const type = 'update-user';
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [user.toJson(), policy] });
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [user.toJson(), policy] }, false);
}

/**
Expand All @@ -138,7 +138,7 @@ export function infoBlockEvent(user: PolicyUser, policy: Policy): void {
*/
export function externalBlockEvent(event: ExternalEvent<any>): void {
const type = 'external';
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [event] });
new BlockTreeGenerator().sendMessage(PolicyEvents.BLOCK_UPDATE_BROADCAST, { type, data: [event] }, false);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion worker-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"@filebase/client": "^0.0.5",
"@guardian/common": "^3.1.0",
"@guardian/interfaces": "^3.1.0",
"@hashgraph/sdk": "2.52.0",
"@hashgraph/sdk": "2.59.0",
"@nestjs/common": "^9.4.1",
"@nestjs/core": "^9.4.1",
"@nestjs/microservices": "^9.4.1",
Expand Down
8 changes: 6 additions & 2 deletions worker-service/src/api/helpers/hedera-sdk-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -949,9 +949,13 @@ export class HederaSDKHelper {
const signedMessage = tx.signedMessages[0];
if (signedMessage) {
const pubKey = PublicKey.fromStringED25519(signedMessage.publicKey);
const signature = Buffer.from(signedMessage.signature.fullSig, 'hex');
const signatureBuffer = Buffer.from(signedMessage.signature.fullSig, 'hex');
const signature = new Uint8Array(signatureBuffer);

try {
messageTransaction.addSignature(pubKey, signature);
await messageTransaction.signWith(pubKey, async () => {
return signature;
});
} catch (error) {
throw new Error(error);
}
Expand Down
14 changes: 10 additions & 4 deletions worker-service/src/api/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { MessageBrokerChannel, MessageResponse, NatsService, NotificationHelper, PinoLogger, SecretManager, Users } from '@guardian/common';
import {
MessageBrokerChannel,
MessageResponse,
NatsService,
NotificationHelper,
PinoLogger,
SecretManager,
Users
} from '@guardian/common';
import { ExternalMessageEvents, GenerateUUIDv4, ISignOptions, ITask, ITaskResult, WorkerEvents, WorkerTaskType } from '@guardian/interfaces';
import { HederaSDKHelper, NetworkOptions } from './helpers/hedera-sdk-helper.js';
import { IpfsClientClass } from './ipfs-client-class.js';
Expand Down Expand Up @@ -138,7 +146,6 @@ export class Worker extends NatsService {
public async init(): Promise<void> {
await super.init();
this.channel = new MessageBrokerChannel(this.connection, 'worker');

try {
await this.ipfsClient.createClient()
} catch (e) {
Expand Down Expand Up @@ -226,7 +233,7 @@ export class Worker extends NatsService {
balance,
unit: 'Hbar',
operatorAccountId
});
}, false);
} catch (error) {
throw new Error(`Worker (${['api-gateway', 'update-user-balance'].join('.')}) send: ` + error);
}
Expand Down Expand Up @@ -542,7 +549,6 @@ export class Worker extends NatsService {
operatorKey,
adminKey,
} = task.data;

client = new HederaSDKHelper(operatorId, operatorKey, null, networkOptions);
result.data = await client.deleteToken(
TokenId.fromString(tokenId),
Expand Down