Skip to content

Commit 0053fe8

Browse files
feat(js): add streaming utility templates for streamedListObjects
Adds NDJSON streaming parser template to support streamedListObjects in the JavaScript SDK. Templates provide parsing utilities; actual streaming logic remains in custom code (client.ts, common.ts) maintained in js-sdk repository. Templates Added/Modified (5 files): - streaming.mustache (NEW): NDJSON parser for Node.js streams - Proper error propagation (reject pending promises on error) - onEnd guard prevents processing after error - Uint8Array handling alongside string/Buffer - Stream destruction in return()/throw() methods - Widened type signature for better DX - index.mustache: Export parseNDJSONStream utility - config.overrides.json: Register streaming + supportsStreamedListObjects flag - README_calling_api.mustache: Usage documentation - README_api_endpoints.mustache: API endpoint table entry Architecture: - Templates generate utilities (streaming.ts, exports) - Custom js-sdk code implements streaming (common.ts, client.ts) - No OpenAPI spec changes required Generated & Verified: - streaming.ts includes all error handling fixes - parseNDJSONStream exported correctly - Works with custom js-sdk streaming implementation Related: - Fixes #76 (JavaScript SDK) - Implements openfga/js-sdk#236 - Related PR: openfga/js-sdk#280
1 parent 19d1271 commit 0053fe8

File tree

5 files changed

+206
-1
lines changed

5 files changed

+206
-1
lines changed

config/clients/js/config.overrides.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"fossaComplianceNoticeId": "9c7d9da4-2a75-47c9-bfc9-31b301fb764f",
1010
"useSingleRequestParameter": false,
1111
"supportsES6": true,
12+
"supportsStreamedListObjects": true,
1213
"modelPropertyNaming": "original",
1314
"openTelemetryDocumentation": "docs/opentelemetry.md",
1415
"files": {
@@ -23,6 +24,10 @@
2324
"npmrc.mustache": {
2425
"destinationFilename": ".npmrc",
2526
"templateType": "SupportingFiles"
27+
},
28+
"streaming.mustache": {
29+
"destinationFilename": "streaming.ts",
30+
"templateType": "SupportingFiles"
2631
}
2732
}
2833
}

config/clients/js/template/README_api_endpoints.mustache

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@
1414
| [**BatchCheck**]({{apiDocsUrl}}#/Relationship%20Queries/BatchCheck) | **POST** /stores/{store_id}/batch-check | Similar to check, but accepts a list of relations to check |
1515
| [**Expand**]({{apiDocsUrl}}#/Relationship%20Queries/Expand) | **POST** /stores/{store_id}/expand | Expand all relationships in userset tree format, and following userset rewrite rules. Useful to reason about and debug a certain relationship |
1616
| [**ListObjects**]({{apiDocsUrl}}#/Relationship%20Queries/ListObjects) | **POST** /stores/{store_id}/list-objects | [EXPERIMENTAL] Get all objects of the given type that the user has a relation with |
17-
| [**ReadAssertions**]({{apiDocsUrl}}#/Assertions/ReadAssertions) | **GET** /stores/{store_id}/assertions/{authorization_model_id} | Read assertions for an authorization model ID |
17+
{{#supportsStreamedListObjects}}| [**StreamedListObjects**]({{apiDocsUrl}}#/Relationship%20Queries/StreamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with (Node.js only) |
18+
{{/supportsStreamedListObjects}}| [**ReadAssertions**]({{apiDocsUrl}}#/Assertions/ReadAssertions) | **GET** /stores/{store_id}/assertions/{authorization_model_id} | Read assertions for an authorization model ID |
1819
| [**WriteAssertions**]({{apiDocsUrl}}#/Assertions/WriteAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID |

config/clients/js/template/README_calling_api.mustache

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,28 @@ const response = await fgaClient.listObjects({
466466
// response.objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"]
467467
```
468468

469+
{{#supportsStreamedListObjects}}
470+
##### Streamed List Objects
471+
472+
List the objects of a particular type that the user has a certain relation to, using the streaming API.
473+
474+
> **Note**: This is a Node.js-only feature. The streaming API allows you to retrieve more than the standard 1000 object limit.
475+
476+
[API Documentation]({{apiDocsUrl}}#/Relationship%20Queries/StreamedListObjects)
477+
478+
```javascript
479+
const fgaClient = new OpenFgaClient({ apiUrl: "http://localhost:8080", storeId: "01H0H015178Y2V4CX10C2KGHF4" });
480+
481+
for await (const response of fgaClient.streamedListObjects({
482+
user: "user:anne",
483+
relation: "owner",
484+
type: "document"
485+
})) {
486+
console.log(response.object);
487+
}
488+
```
489+
490+
{{/supportsStreamedListObjects}}
469491
##### List Relations
470492

471493
List the relations a user has with an object. This wraps around [BatchCheck](#batchcheck) to allow checking multiple relationships at once.

config/clients/js/template/index.mustache

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ export * from "./telemetry/counters";
1313
export * from "./telemetry/histograms";
1414
export * from "./telemetry/metrics";
1515
export * from "./errors";
16+
export { parseNDJSONStream } from "./streaming";
1617

1718
{{#withSeparateModelsAndApi}}export * from "./{{tsModelPackage}}";{{/withSeparateModelsAndApi}}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
{{>partial_header}}
2+
3+
import type { Readable } from "node:stream";
4+
5+
// Helper: create async iterable from classic EventEmitter-style Readable streams
6+
const createAsyncIterableFromReadable = (readable: any): AsyncIterable<any> => {
7+
return {
8+
[Symbol.asyncIterator](): AsyncIterator<any> {
9+
const chunkQueue: any[] = [];
10+
const pendings: Array<{ resolve: (v: IteratorResult<any>) => void; reject: (e?: any) => void }> = [];
11+
let ended = false;
12+
let error: any = null;
13+
14+
const onData = (chunk: any) => {
15+
if (pendings.length > 0) {
16+
const { resolve } = pendings.shift()!;
17+
resolve({ value: chunk, done: false });
18+
} else {
19+
chunkQueue.push(chunk);
20+
}
21+
};
22+
23+
const onEnd = () => {
24+
if (error) return; // Don't process end if error already occurred
25+
ended = true;
26+
while (pendings.length > 0) {
27+
const { resolve } = pendings.shift()!;
28+
resolve({ value: undefined, done: true });
29+
}
30+
};
31+
32+
const onError = (err: any) => {
33+
error = err;
34+
while (pendings.length > 0) {
35+
const { reject } = pendings.shift()!;
36+
reject(err);
37+
}
38+
cleanup();
39+
};
40+
41+
readable.on("data", onData);
42+
readable.once("end", onEnd);
43+
readable.once("error", onError);
44+
45+
const cleanup = () => {
46+
readable.off("data", onData);
47+
readable.off("end", onEnd);
48+
readable.off("error", onError);
49+
};
50+
51+
return {
52+
next(): Promise<IteratorResult<any>> {
53+
if (error) {
54+
return Promise.reject(error);
55+
}
56+
if (chunkQueue.length > 0) {
57+
const value = chunkQueue.shift();
58+
return Promise.resolve({ value, done: false });
59+
}
60+
if (ended) {
61+
cleanup();
62+
return Promise.resolve({ value: undefined, done: true });
63+
}
64+
return new Promise<IteratorResult<any>>((resolve, reject) => {
65+
pendings.push({ resolve, reject });
66+
});
67+
},
68+
return(): Promise<IteratorResult<any>> {
69+
try {
70+
cleanup();
71+
} finally {
72+
if (readable && typeof readable.destroy === "function") {
73+
readable.destroy();
74+
}
75+
}
76+
return Promise.resolve({ value: undefined, done: true });
77+
},
78+
throw(e?: any): Promise<IteratorResult<any>> {
79+
try {
80+
cleanup();
81+
} finally {
82+
if (readable && typeof readable.destroy === "function") {
83+
readable.destroy(e);
84+
}
85+
}
86+
return Promise.reject(e);
87+
}
88+
};
89+
}
90+
};
91+
};
92+
93+
/**
94+
* Parse newline-delimited JSON (NDJSON) from a Node.js readable stream
95+
* @param stream - Node.js readable stream, AsyncIterable, string, or Buffer
96+
* @returns AsyncGenerator that yields parsed JSON objects
97+
*/
98+
export async function* parseNDJSONStream(
99+
stream: Readable | AsyncIterable<Uint8Array | string | Buffer> | string | Uint8Array | Buffer
100+
): AsyncGenerator<any> {
101+
const decoder = new TextDecoder("utf-8");
102+
let buffer = "";
103+
104+
// If stream is actually a string or Buffer-like, handle as whole payload
105+
const isString = typeof stream === "string";
106+
const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream);
107+
const isUint8Array = typeof Uint8Array !== "undefined" && stream instanceof Uint8Array;
108+
109+
if (isString || isBuffer || isUint8Array) {
110+
const text = isString
111+
? (stream as string)
112+
: decoder.decode(isBuffer ? new Uint8Array(stream as Buffer) : (stream as Uint8Array));
113+
const lines = text.split("\n");
114+
115+
for (const line of lines) {
116+
const trimmed = line.trim();
117+
if (!trimmed) {
118+
continue;
119+
}
120+
121+
try {
122+
yield JSON.parse(trimmed);
123+
} catch (err) {
124+
console.warn("Failed to parse JSON line:", err);
125+
}
126+
}
127+
return;
128+
}
129+
130+
const isAsyncIterable = stream && typeof (stream as any)[Symbol.asyncIterator] === "function";
131+
const source: AsyncIterable<any> = isAsyncIterable ? (stream as any) : createAsyncIterableFromReadable(stream as any);
132+
133+
for await (const chunk of source) {
134+
// Node.js streams can return Buffer or string chunks
135+
// Convert to Uint8Array if needed for TextDecoder
136+
const uint8Chunk = typeof chunk === "string"
137+
? new TextEncoder().encode(chunk)
138+
: chunk instanceof Buffer
139+
? new Uint8Array(chunk)
140+
: chunk;
141+
142+
// Append decoded chunk to buffer
143+
buffer += decoder.decode(uint8Chunk, { stream: true });
144+
145+
// Split on newlines
146+
const lines = buffer.split("\n");
147+
148+
// Keep the last (potentially incomplete) line in the buffer
149+
buffer = lines.pop() || "";
150+
151+
// Parse and yield complete lines
152+
for (const line of lines) {
153+
const trimmed = line.trim();
154+
if (trimmed) {
155+
try {
156+
yield JSON.parse(trimmed);
157+
} catch (err) {
158+
console.warn("Failed to parse JSON line:", err);
159+
}
160+
}
161+
}
162+
}
163+
164+
// Flush any remaining decoder state
165+
buffer += decoder.decode();
166+
167+
// Handle any remaining data in buffer
168+
if (buffer.trim()) {
169+
try {
170+
yield JSON.parse(buffer);
171+
} catch (err) {
172+
console.warn("Failed to parse final JSON buffer:", err);
173+
}
174+
}
175+
}
176+

0 commit comments

Comments
 (0)