Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.DS_Store
.vscode
.idea
coverage
dist
node_modules
Expand Down
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ import { PubSub } from 'graphql-subscriptions';
export const pubsub = new PubSub();
```

Now, implement your Subscriptions type resolver, using the `pubsub.asyncIterator` to map the event you need:
Now, implement your Subscriptions type resolver, using `pubsub.asyncIterable` to map the event you need:

```js
const SOMETHING_CHANGED_TOPIC = 'something_changed';

export const resolvers = {
Subscription: {
somethingChanged: {
subscribe: () => pubsub.asyncIterator(SOMETHING_CHANGED_TOPIC),
subscribe: () => pubsub.asyncIterable(SOMETHING_CHANGED_TOPIC),
},
},
}
Expand All @@ -81,10 +81,10 @@ pubsub.publish(SOMETHING_CHANGED_TOPIC, { somethingChanged: { id: "123" }});

When publishing data to subscribers, we need to make sure that each subscribers get only the data it need.

To do so, we can use `withFilter` helper from this package, which wraps `AsyncIterator` with a filter function, and let you control each publication for each user.
To do so, we can use `withFilter` helper from this package, which wraps `AsyncIterable` with a filter function, and let you control each publication for each user.

`withFilter` API:
- `asyncIteratorFn: (rootValue, args, context, info) => AsyncIterator<any>` : A function that returns `AsyncIterator` you got from your `pubsub.asyncIterator`.
- `asyncIterableFn: (rootValue, args, context, info) => AsyncIterable<any>` : A function that returns `AsyncIterable` you got from your `pubsub.asyncIterable`.
- `filterFn: (payload, variables, context, info) => boolean | Promise<boolean>` - A filter function, executed with the payload (the published value), variables, context and operation info, must return `boolean` or `Promise<boolean>` indicating if the payload should pass to the subscriber.

For example, if `somethingChanged` would also accept a variable with the ID that is relevant, we can use the following code to filter according to it:
Expand All @@ -97,7 +97,7 @@ const SOMETHING_CHANGED_TOPIC = 'something_changed';
export const resolvers = {
Subscription: {
somethingChanged: {
subscribe: withFilter(() => pubsub.asyncIterator(SOMETHING_CHANGED_TOPIC), (payload, variables) => {
subscribe: withFilter(() => pubsub.asyncIterable(SOMETHING_CHANGED_TOPIC), (payload, variables) => {
return payload.somethingChanged.id === variables.relevantId;
}),
},
Expand All @@ -119,7 +119,7 @@ const SOMETHING_REMOVED = 'something_removed';
export const resolvers = {
Subscription: {
somethingChanged: {
subscribe: () => pubsub.asyncIterator([ SOMETHING_UPDATED, SOMETHING_CREATED, SOMETHING_REMOVED ]),
subscribe: () => pubsub.asyncIterable([ SOMETHING_UPDATED, SOMETHING_CREATED, SOMETHING_REMOVED ]),
},
},
}
Expand All @@ -139,7 +139,7 @@ export const resolvers = {
// Manipulate and return the new value
return payload.somethingChanged;
},
subscribe: () => pubsub.asyncIterator(SOMETHING_UPDATED),
subscribe: () => pubsub.asyncIterable(SOMETHING_UPDATED),
},
},
}
Expand Down Expand Up @@ -176,20 +176,20 @@ export const resolvers = {
}
````

### Custom `AsyncIterator` Wrappers
### Custom `AsyncIterable` Wrappers

The value you should return from your `subscribe` resolver must be an `AsyncIterator`.
The value you should return from your `subscribe` resolver must be an `AsyncIterable`.

You can use this value and wrap it with another `AsyncIterator` to implement custom logic over your subscriptions.
You can use this value and wrap it with another `AsyncIterable` to implement custom logic over your subscriptions.

For example, the following implementation manipulate the payload by adding some static fields:

```typescript
import { $$asyncIterator } from 'iterall';

export const withStaticFields = (asyncIterator: AsyncIterator<any>, staticFields: Object): Function => {
return (rootValue: any, args: any, context: any, info: any): AsyncIterator<any> => {
import { $$asyncIterator, getAsyncIterator } from 'iterall';

export const withStaticFields = (asyncIterable: AsyncIterable<any>, staticFields: Object): Function => {
return (rootValue: any, args: any, context: any, info: any): AsyncIterable<any> => {
const asyncIterator = getAsyncIterator(asyncIterable);
return {
next() {
return asyncIterator.next().then(({ value, done }) => {
Expand All @@ -211,14 +211,14 @@ export const withStaticFields = (asyncIterator: AsyncIterator<any>, staticFields
[$$asyncIterator]() {
return this;
},
};
} as AsyncIterator<any> as AsyncIterableIterator<any>;
};
};
```

> You can also take a look at `withFilter` for inspiration.

For more information about `AsyncIterator`:
For more information about `AsyncIterable` and `AsyncIterator`:
- [TC39 Proposal](https://github.com/tc39/proposal-async-iteration)
- [iterall](https://github.com/leebyron/iterall)
- [IxJS](https://github.com/ReactiveX/IxJS)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { $$asyncIterator } from 'iterall';
import { EventEmitter } from 'events';

export function eventEmitterAsyncIterator<T>(eventEmitter: EventEmitter,
eventsNames: string | string[]): AsyncIterator<T> {
export function eventEmitterAsyncIterable<T>(eventEmitter: EventEmitter,
eventsNames: string | string[]): AsyncIterableIterator<T> {
const pullQueue = [];
const pushQueue = [];
const eventsArray = typeof eventsNames === 'string' ? [eventsNames] : eventsNames;
Expand Down Expand Up @@ -71,5 +71,6 @@ export function eventEmitterAsyncIterator<T>(eventEmitter: EventEmitter,
[$$asyncIterator]() {
return this;
},
};
} as AsyncIterator<T> as any as AsyncIterableIterator<T>;
// Asserting as AsyncIterator first so that next, return, and throw are still type checked
}
2 changes: 1 addition & 1 deletion src/pubsub-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ export interface PubSubEngine {
publish(triggerName: string, payload: any): Promise<void>;
subscribe(triggerName: string, onMessage: Function, options: Object): Promise<number>;
unsubscribe(subId: number);
asyncIterator<T>(triggers: string | string[]): AsyncIterator<T>;
asyncIterable<T>(triggers: string | string[]): AsyncIterable<T>;
}
6 changes: 3 additions & 3 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';
import { PubSubEngine } from './pubsub-engine';
import { eventEmitterAsyncIterator } from './event-emitter-to-async-iterator';
import { eventEmitterAsyncIterable } from './event-emitter-to-async-iterable';

export interface PubSubOptions {
eventEmitter?: EventEmitter;
Expand Down Expand Up @@ -36,7 +36,7 @@ export class PubSub implements PubSubEngine {
this.ee.removeListener(triggerName, onMessage);
}

public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
return eventEmitterAsyncIterator<T>(this.ee, triggers);
public asyncIterable<T>(triggers: string | string[]): AsyncIterable<T> {
return eventEmitterAsyncIterable<T>(this.ee, triggers);
}
}
11 changes: 6 additions & 5 deletions src/test/asyncIteratorSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as chaiAsPromised from 'chai-as-promised';
import { spy } from 'sinon';
import * as sinonChai from 'sinon-chai';

import { isAsyncIterable } from 'iterall';
import { getAsyncIterator, isAsyncIterable } from 'iterall';
import { PubSub } from '../pubsub';
import { withFilter, FilterFn } from '../with-filter';
import { ExecutionResult } from 'graphql';
Expand Down Expand Up @@ -64,7 +64,7 @@ describe('GraphQL-JS asyncIterator', () => {
}
`);
const pubsub = new PubSub();
const origIterator = pubsub.asyncIterator(FIRST_EVENT);
const origIterator = pubsub.asyncIterable(FIRST_EVENT);
const schema = buildSchema(origIterator);


Expand All @@ -90,7 +90,7 @@ describe('GraphQL-JS asyncIterator', () => {
}
`);
const pubsub = new PubSub();
const origIterator = pubsub.asyncIterator(FIRST_EVENT);
const origIterator = pubsub.asyncIterable(FIRST_EVENT);
const schema = buildSchema(origIterator, () => Promise.resolve(true));

const results = await subscribe(schema, query) as AsyncIterator<ExecutionResult>;
Expand All @@ -115,7 +115,7 @@ describe('GraphQL-JS asyncIterator', () => {
`);

const pubsub = new PubSub();
const origIterator = pubsub.asyncIterator(FIRST_EVENT);
const origIterator = pubsub.asyncIterable(FIRST_EVENT);

let counter = 0;

Expand Down Expand Up @@ -155,7 +155,8 @@ describe('GraphQL-JS asyncIterator', () => {
`);

const pubsub = new PubSub();
const origIterator = pubsub.asyncIterator(FIRST_EVENT);
const origIterable = pubsub.asyncIterable(FIRST_EVENT);
const origIterator = getAsyncIterator(origIterable);
const returnSpy = spy(origIterator, 'return');
const schema = buildSchema(origIterator);

Expand Down
14 changes: 7 additions & 7 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as chaiAsPromised from 'chai-as-promised';
import * as sinonChai from 'sinon-chai';

import { PubSub } from '../pubsub';
import { isAsyncIterable } from 'iterall';
import { getAsyncIterator, isAsyncIterable } from 'iterall';

chai.use(chaiAsPromised);
chai.use(sinonChai);
Expand Down Expand Up @@ -37,15 +37,15 @@ describe('AsyncIterator', () => {
it('should expose valid asyncIterator for a specific event', () => {
const eventName = 'test';
const ps = new PubSub();
const iterator = ps.asyncIterator(eventName);
const iterator = ps.asyncIterable(eventName);
expect(iterator).to.not.be.undefined;
expect(isAsyncIterable(iterator)).to.be.true;
});

it('should trigger event on asyncIterator when published', done => {
const eventName = 'test';
const ps = new PubSub();
const iterator = ps.asyncIterator(eventName);
const iterator = getAsyncIterator(ps.asyncIterable(eventName));

iterator.next().then(result => {
expect(result).to.not.be.undefined;
Expand All @@ -60,7 +60,7 @@ describe('AsyncIterator', () => {
it('should not trigger event on asyncIterator when publishing other event', () => {
const eventName = 'test2';
const ps = new PubSub();
const iterator = ps.asyncIterator('test');
const iterator = getAsyncIterator(ps.asyncIterable('test'));
const spy = sinon.spy();

iterator.next().then(spy);
Expand All @@ -71,7 +71,7 @@ describe('AsyncIterator', () => {
it('register to multiple events', done => {
const eventName = 'test2';
const ps = new PubSub();
const iterator = ps.asyncIterator(['test', 'test2']);
const iterator = getAsyncIterator(ps.asyncIterable(['test', 'test2']));
const spy = sinon.spy();

iterator.next().then(() => {
Expand All @@ -85,7 +85,7 @@ describe('AsyncIterator', () => {
it('should not trigger event on asyncIterator already returned', done => {
const eventName = 'test';
const ps = new PubSub();
const iterator = ps.asyncIterator(eventName);
const iterator = getAsyncIterator(ps.asyncIterable(eventName));

iterator.next().then(result => {
expect(result).to.not.be.undefined;
Expand Down Expand Up @@ -115,7 +115,7 @@ describe('AsyncIterator', () => {
}
}
const ps = new TestPubSub();
ps.asyncIterator(testEventName);
ps.asyncIterable(testEventName);

expect(ps.listenerCount(testEventName)).to.equal(0);
});
Expand Down
14 changes: 8 additions & 6 deletions src/with-filter.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { $$asyncIterator } from 'iterall';
import { $$asyncIterator, getAsyncIterator } from 'iterall';

export type FilterFn = (rootValue?: any, args?: any, context?: any, info?: any) => boolean | Promise<boolean>;
export type ResolverFn = (rootValue?: any, args?: any, context?: any, info?: any) => AsyncIterator<any>;
export type ResolverFn = (rootValue?: any, args?: any, context?: any, info?: any) => AsyncIterable<any>;

export const withFilter = (asyncIteratorFn: ResolverFn, filterFn: FilterFn): ResolverFn => {
return (rootValue: any, args: any, context: any, info: any): AsyncIterator<any> => {
const asyncIterator = asyncIteratorFn(rootValue, args, context, info);
export const withFilter = (asyncIterableFn: ResolverFn, filterFn: FilterFn): ResolverFn => {
return (rootValue: any, args: any, context: any, info: any): AsyncIterableIterator<any> => {
const asyncIterable = asyncIterableFn(rootValue, args, context, info);
const asyncIterator = getAsyncIterator(asyncIterable);

const getNextPromise = () => {
return asyncIterator
Expand Down Expand Up @@ -41,6 +42,7 @@ export const withFilter = (asyncIteratorFn: ResolverFn, filterFn: FilterFn): Res
[$$asyncIterator]() {
return this;
},
};
} as AsyncIterator<any> as any as AsyncIterableIterator<any>;
// Asserting as AsyncIterator first so that next, return, and throw are still type checked
};
};