Skip to content

Commit 8f5af40

Browse files
Merge pull request #29 from FPierre/feat/scoped-env
Scoped env
2 parents 0735261 + 9b0c15e commit 8f5af40

File tree

8 files changed

+243
-19
lines changed

8 files changed

+243
-19
lines changed

README.md

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,18 @@ $ npm i --save @google-cloud/pubsub nestjs-google-pubsub-microservice
2525
To use the Pub/Sub transporter, pass the following options object to the `createMicroservice()` method:
2626

2727
```typescript
28-
const app = await NestFactory.createMicroservice<MicroserviceOptions>(ApplicationModule, {
29-
strategy: new GCPubSubServer({
30-
topic: 'cats_topic',
31-
subscription: 'cats_subscription',
32-
client: {
33-
projectId: 'microservice',
34-
},
35-
}),
36-
});
28+
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
29+
ApplicationModule,
30+
{
31+
strategy: new GCPubSubServer({
32+
topic: 'cats_topic',
33+
subscription: 'cats_subscription',
34+
client: {
35+
projectId: 'microservice',
36+
},
37+
}),
38+
},
39+
);
3740
```
3841

3942
#### Options
@@ -44,7 +47,7 @@ The `options` property is specific to the chosen transporter. The <strong>GCloud
4447
<tr>
4548
<td><code>topic</code></td>
4649
<td>Topic name which your server subscription will belong to</td>
47-
</tr>
50+
</tr>
4851
<tr>
4952
<td><code>subscription</code></td>
5053
<td>Subscription name which your server will listen to</td>
@@ -85,18 +88,21 @@ The `options` property is specific to the chosen transporter. The <strong>GCloud
8588
<td><code>subscriber</code></td>
8689
<td>Additional subscriber options (read more <a href="https://googleapis.dev/nodejs/pubsub/latest/global.html#SubscriberOptions" rel="nofollow" target="_blank">here</a>)</td>
8790
</tr>
91+
<tr>
92+
<td><code>scopedEnvKey</code></td>
93+
<td>Scope topics and subscriptions to avoid losing messages when several people are working on the same code base. Will prefixes topics and subscriptions with this key (read more <a href="https://github.com/p-fedyukovich/nestjs-google-pubsub-microservice/pull/29" rel="nofollow" target="_blank">here</a>)</td>
94+
</tr>
8895
</table>
8996

9097
#### Client
9198

9299
```typescript
93-
const client =
94-
new GCPubSubClient({
95-
client: {
96-
apiEndpoint: 'localhost:8681',
97-
projectId: 'microservice',
98-
},
99-
});
100+
const client = new GCPubSubClient({
101+
client: {
102+
apiEndpoint: 'localhost:8681',
103+
projectId: 'microservice',
104+
},
105+
});
100106
client
101107
.send('pattern', 'Hello world!')
102108
.subscribe((response) => console.log(response));

lib/gc-pubsub.client.spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,27 @@ describe('GCPubSubClient', () => {
2020
sandbox.restore();
2121
});
2222

23+
describe('constructor', () => {
24+
describe('when the scopedEnvKey is defined', () => {
25+
beforeEach(() => {
26+
client = getInstance({
27+
topic: 'topic',
28+
replyTopic: 'replyTopic',
29+
replySubscription: 'replySubscription',
30+
scopedEnvKey: 'my-key',
31+
});
32+
});
33+
34+
it('should set the scopedEnvKey on topics and subscriptions', () => {
35+
expect(client['topicName']).to.be.eq('my-keytopic');
36+
expect(client['replyTopicName']).to.be.eq('my-keyreplyTopic');
37+
expect(client['replySubscriptionName']).to.be.eq(
38+
'my-keyreplySubscription',
39+
);
40+
});
41+
});
42+
});
43+
2344
describe('connect', () => {
2445
describe('when is not connected', () => {
2546
describe('when check existence is true', () => {

lib/gc-pubsub.client.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,24 +46,28 @@ export class GCPubSubClient extends ClientProxy {
4646
protected replySubscription: Subscription | null = null;
4747
protected topic: Topic | null = null;
4848
protected init: boolean;
49+
protected readonly scopedEnvKey: string | null;
4950
protected readonly checkExistence: boolean;
5051

5152
constructor(protected readonly options: GCPubSubOptions) {
5253
super();
5354

5455
this.clientConfig = this.options.client || GC_PUBSUB_DEFAULT_CLIENT_CONFIG;
5556

57+
this.scopedEnvKey = this.options.scopedEnvKey ?? '';
58+
5659
this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC;
60+
this.topicName = `${this.scopedEnvKey}${this.topicName}`;
5761

5862
this.subscriberConfig =
5963
this.options.subscriber || GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG;
6064

6165
this.publisherConfig =
6266
this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG;
6367

64-
this.replyTopicName = this.options.replyTopic;
68+
this.replyTopicName = `${this.scopedEnvKey}${this.options.replyTopic}`;
6569

66-
this.replySubscriptionName = this.options.replySubscription;
70+
this.replySubscriptionName = `${this.scopedEnvKey}${this.options.replySubscription}`;
6771

6872
this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK;
6973
this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT;

lib/gc-pubsub.interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export interface GCPubSubOptions {
1313
init?: boolean;
1414
useAttributes?: boolean;
1515
checkExistence?: boolean;
16+
scopedEnvKey?: string | null;
1617
publisher?: PublishOptions;
1718
subscriber?: SubscriberOptions;
1819
serializer?: Serializer;

lib/gc-pubsub.server.spec.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,21 @@ describe('GCPubSubServer', () => {
2020
sandbox.restore();
2121
});
2222

23+
describe('constructor', () => {
24+
describe('when the scopedEnvKey is defined', () => {
25+
it('should set the scopedEnvKey on topics and subscriptions', () => {
26+
const scopedEnvKey = 'my-key';
27+
28+
server = getInstance({ scopedEnvKey } as GCPubSubOptions);
29+
30+
expect(server['topicName']).to.eq(`${scopedEnvKey}default_topic`);
31+
expect(server['subscriptionName']).to.eq(
32+
`${scopedEnvKey}default_subscription`,
33+
);
34+
});
35+
});
36+
});
37+
2338
describe('listen', () => {
2439
describe('when is check existence is true', () => {
2540
beforeEach(async () => {
@@ -191,6 +206,24 @@ describe('GCPubSubServer', () => {
191206
}),
192207
).to.be.true;
193208
});
209+
210+
describe('when scopedEnvKey is defined', () => {
211+
beforeEach(async () => {
212+
server = getInstance({ scopedEnvKey: 'my-key' });
213+
await server.listen(() => {});
214+
});
215+
216+
it('should set scopedEnvKey on replyTo', async () => {
217+
const message = { test: true };
218+
const replyTo = 'test';
219+
const correlationId = '0';
220+
221+
await server.sendMessage(message, replyTo, correlationId);
222+
expect(Array.from(server['replyTopics'].values())).to.deep.eq([
223+
'my-keytest',
224+
]);
225+
});
226+
});
194227
});
195228

196229
describe('handleEvent', () => {

lib/gc-pubsub.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
4848
protected readonly replyTopics: Set<string>;
4949
protected readonly init: boolean;
5050
protected readonly checkExistence: boolean;
51+
protected readonly scopedEnvKey: string | null;
5152

5253
protected client: PubSub | null = null;
5354
protected subscription: Subscription | null = null;
@@ -56,12 +57,16 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
5657
super();
5758

5859
this.clientConfig = this.options.client || GC_PUBSUB_DEFAULT_CLIENT_CONFIG;
60+
this.scopedEnvKey = this.options.scopedEnvKey ?? '';
5961

6062
this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC;
63+
this.topicName = `${this.scopedEnvKey}${this.topicName}`;
6164

6265
this.subscriptionName =
6366
this.options.subscription || GC_PUBSUB_DEFAULT_SUBSCRIPTION;
6467

68+
this.subscriptionName = `${this.scopedEnvKey}${this.subscriptionName}`;
69+
6570
this.subscriberConfig =
6671
this.options.subscriber || GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG;
6772

@@ -205,6 +210,8 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
205210
message as unknown as OutgoingResponse,
206211
);
207212

213+
replyTo = `${this.scopedEnvKey}${replyTo}`;
214+
208215
this.replyTopics.add(replyTo);
209216

210217
await this.client
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { INestApplication } from '@nestjs/common';
2+
import { Test } from '@nestjs/testing';
3+
import { expect } from 'chai';
4+
import * as request from 'supertest';
5+
import { GCPubSubServer } from '../../lib';
6+
import {
7+
GCPubSubScopedEnvController1,
8+
GCPubSubScopedEnvController2,
9+
} from '../src/gc-pubsub-scoped-env.controller';
10+
11+
describe('GC PubSub transport', () => {
12+
let server;
13+
let app: INestApplication;
14+
15+
describe('useAttributes=false', () => {
16+
beforeEach(async () => {
17+
await Test.createTestingModule({
18+
controllers: [GCPubSubScopedEnvController2],
19+
}).compile();
20+
const module = await Test.createTestingModule({
21+
controllers: [GCPubSubScopedEnvController1],
22+
}).compile();
23+
24+
app = module.createNestApplication();
25+
server = app.getHttpAdapter().getInstance();
26+
27+
app.connectMicroservice({
28+
strategy: new GCPubSubServer({
29+
client: {
30+
apiEndpoint: 'localhost:8681',
31+
projectId: 'microservice',
32+
},
33+
scopedEnvKey: 'foobar',
34+
}),
35+
});
36+
await app.startAllMicroservices();
37+
await app.init();
38+
});
39+
40+
it('/POST', () => {
41+
request(server).post('/rpc').expect(200, 'scoped RPC');
42+
});
43+
44+
it('/POST (event notification)', (done) => {
45+
request(server)
46+
.post('/notify')
47+
.end(() => {
48+
setTimeout(() => {
49+
expect(GCPubSubScopedEnvController1.IS_NOTIFIED).to.be.true;
50+
expect(GCPubSubScopedEnvController2.IS_NOTIFIED).to.be.false;
51+
done();
52+
}, 1000);
53+
});
54+
});
55+
56+
afterEach(async () => {
57+
await app.close();
58+
});
59+
});
60+
});
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import {
2+
Controller,
3+
HttpCode,
4+
OnApplicationShutdown,
5+
Post,
6+
} from '@nestjs/common';
7+
import {
8+
ClientProxy,
9+
EventPattern,
10+
MessagePattern,
11+
} from '@nestjs/microservices';
12+
import { GCPubSubClient } from '../../lib';
13+
import { Observable } from 'rxjs';
14+
15+
@Controller()
16+
export class GCPubSubScopedEnvController1 implements OnApplicationShutdown {
17+
static IS_NOTIFIED = false;
18+
19+
client: ClientProxy;
20+
21+
constructor() {
22+
this.client = new GCPubSubClient({
23+
client: {
24+
apiEndpoint: 'localhost:8681',
25+
projectId: 'microservice',
26+
},
27+
replyTopic: 'default_reply_topic',
28+
replySubscription: 'default_reply_subscription',
29+
scopedEnvKey: 'foobar',
30+
});
31+
}
32+
33+
onApplicationShutdown(signal?: string) {
34+
return this.client.close();
35+
}
36+
37+
@Post()
38+
@HttpCode(200)
39+
call() {
40+
return this.client.send({ cmd: 'rpc' }, {});
41+
}
42+
43+
@Post('notify')
44+
async sendNotification(): Promise<any> {
45+
return this.client.emit<{ notification: boolean; id: string }>(
46+
'notification',
47+
{ notification: true, id: 'id' },
48+
);
49+
}
50+
51+
@MessagePattern({ cmd: 'rpc' })
52+
rpc(): string {
53+
return 'scoped RPC';
54+
}
55+
56+
@EventPattern('notification')
57+
eventHandler(data: { notification: boolean; id: string }) {
58+
GCPubSubScopedEnvController1.IS_NOTIFIED = data.notification;
59+
}
60+
}
61+
62+
@Controller()
63+
export class GCPubSubScopedEnvController2 implements OnApplicationShutdown {
64+
static IS_NOTIFIED = false;
65+
66+
client: ClientProxy;
67+
68+
constructor() {
69+
this.client = new GCPubSubClient({
70+
client: {
71+
apiEndpoint: 'localhost:8681',
72+
projectId: 'microservice',
73+
},
74+
replyTopic: 'default_reply_topic',
75+
replySubscription: 'default_reply_subscription',
76+
});
77+
}
78+
79+
onApplicationShutdown(signal?: string) {
80+
return this.client.close();
81+
}
82+
83+
@MessagePattern({ cmd: 'rpc' })
84+
rpc(): string {
85+
return 'RPC';
86+
}
87+
88+
@EventPattern('notification')
89+
eventHandler(data: { notification: boolean; id: string }) {
90+
GCPubSubScopedEnvController2.IS_NOTIFIED = data.notification;
91+
}
92+
}

0 commit comments

Comments
 (0)