Skip to content

Commit fef3aaf

Browse files
authored
feat: custom fetch param for node envs (#46)
- Adds `{ fetch }` param to `OpenAIOptions` for `node-fetch` support
2 parents 4ece01f + 13032c6 commit fef3aaf

File tree

7 files changed

+99
-6
lines changed

7 files changed

+99
-6
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
},
5353
"dependencies": {
5454
"eventsource-parser": "^1.0.0",
55+
"node-fetch": "^3.3.1",
5556
"openai": "^3.2.1",
5657
"yield-stream": "^3.0.0"
5758
},

pnpm-lock.yaml

Lines changed: 41 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/lib/backoff.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
11
/* eslint-disable no-console */
2+
import nodeFetch, { RequestInfo as NodeFetchRequestInfo, RequestInit as NodeFetchRequestInit } from "node-fetch";
3+
24
export interface BackoffOptions {
35
maxRetries: number;
46
delay: number;
57
}
68

9+
const globalFetch = typeof fetch === "undefined" ? nodeFetch : fetch;
10+
711
export const fetchWithBackoff = async (
8-
input: RequestInfo,
9-
init?: RequestInit,
12+
input: RequestInfo & NodeFetchRequestInfo,
13+
init?: RequestInit & NodeFetchRequestInit,
14+
fetch = globalFetch,
1015
{ delay, maxRetries }: BackoffOptions = {
1116
delay: 500,
1217
maxRetries: 7
1318
}
1419
) => {
1520
for (let i = 0; i <= maxRetries; i++) {
1621
try {
22+
if (!fetch) {
23+
throw new Error("No fetch implementation.");
24+
}
25+
1726
const response = await fetch(input, init);
1827

1928
if (!response.ok) {

src/lib/openai/edge.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ export const OpenAI: OpenAIEdgeClient = async (
2828
apiHeaders = {},
2929
controller,
3030
onDone,
31-
onParse
31+
onParse,
32+
fetch
3233
} = {}
3334
) => {
3435
if (!apiKey) {
@@ -50,7 +51,7 @@ export const OpenAI: OpenAIEdgeClient = async (
5051
...apiHeaders,
5152
},
5253
signal: controller?.signal,
53-
});
54+
}, fetch);
5455

5556
switch (response.status) {
5657
case 401:

src/lib/streaming/streams.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { ChatParser, TokenParser } from "./transforms";
33

44
import { createParser } from "eventsource-parser";
55
import { Transform, pipeline, yieldStream } from "yield-stream";
6+
import { yieldStream as yieldStreamNode } from "yield-stream/node";
67
import { OpenAIError } from "../errors";
78

89
export type StreamMode = "raw" | "tokens";
@@ -26,7 +27,7 @@ export interface OpenAIStreamOptions {
2627
}
2728

2829
export type OpenAIStream = (
29-
stream: ReadableStream<Uint8Array>,
30+
stream: NodeJS.ReadableStream | ReadableStream<Uint8Array>,
3031
options: OpenAIStreamOptions
3132
) => ReadableStream<Uint8Array>;
3233

@@ -85,10 +86,17 @@ export const EventStream: OpenAIStream = (
8586
}
8687
}
8788
});
89+
90+
// Check if the stream is a NodeJS stream or a browser stream.
91+
// @ts-ignore - TS doesn't know about `pipe` on streams.
92+
const isNodeJsStream = typeof stream.pipe === "function";
93+
8894
/**
8995
* Feed the parser with decoded chunks from the raw stream.
9096
*/
91-
for await (const chunk of yieldStream(stream)) {
97+
for await (const chunk of isNodeJsStream
98+
? yieldStreamNode<Uint8Array>(stream as NodeJS.ReadableStream)
99+
: yieldStream(stream as ReadableStream<Uint8Array>)) {
92100
const decoded = DECODER.decode(chunk);
93101

94102
try {

src/lib/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import type {
99

1010
import { OpenAIStreamOptions } from "./streaming";
1111

12+
import nodeFetch from "node-fetch";
13+
1214
export const OpenAIAPIEndpoints = {
1315
chat: "chat/completions",
1416
completions: "completions",
@@ -56,6 +58,11 @@ export interface OpenAIOptions extends OpenAIStreamOptions {
5658
* mid-flight.
5759
*/
5860
controller?: AbortController;
61+
/**
62+
* An optional fetch implementation, which can be used to replace the default global fetch call used for making
63+
* API requests. This is useful in environments like node where a global fetch is not provided by default.
64+
*/
65+
fetch?: typeof fetch | typeof nodeFetch;
5966
}
6067

6168
/**

test/streams.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import test from "ava";
55
import { OpenAI } from "../src";
66
import { yieldStream } from "yield-stream";
77
import { DECODER } from "../src/globs/shared";
8+
import nodeFetch from "node-fetch";
89

910
test.serial("'completions' endpoint", async (t) => {
1011
const stream = await OpenAI("completions", {
@@ -237,3 +238,28 @@ test.serial("cancelling streams", async (t) => {
237238

238239
t.is(i, 5, "Stream should have been cancelled after 5 chunks.");
239240
});
241+
242+
test.serial("should work with custom fetch", async (t) => {
243+
let didUseMock = false;
244+
245+
const mockFetch: typeof nodeFetch = (...params) => {
246+
didUseMock = true;
247+
return nodeFetch(...params);
248+
};
249+
250+
const stream = await OpenAI("chat", {
251+
model: "gpt-3.5-turbo",
252+
messages: [
253+
{ role: "system", content: "You are a helpful assistant." },
254+
{ role: "user", content: "This is a test message, say hello!" },
255+
],
256+
}, { fetch: mockFetch });
257+
258+
const DECODER = new TextDecoder();
259+
for await (const serialized of yieldStream(stream)) {
260+
const string = DECODER.decode(serialized);
261+
console.log(string);
262+
}
263+
264+
t.true(didUseMock, "Did not use custom fetch.");
265+
});

0 commit comments

Comments
 (0)