Skip to content

Commit daa8e87

Browse files
authored
🛣️ fix: Chat Stream Hangup (#4822)
Embedded sse.js code converted into an external dependency. Custom access token refresh logic moved to useSSE.ts hook. Closes #4820
1 parent ebae494 commit daa8e87

File tree

6 files changed

+64
-275
lines changed

6 files changed

+64
-275
lines changed

client/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
"remark-gfm": "^4.0.0",
9797
"remark-math": "^6.0.0",
9898
"remark-supersub": "^1.0.0",
99+
"sse.js": "^2.5.0",
99100
"tailwind-merge": "^1.9.1",
100101
"tailwindcss-animate": "^1.0.5",
101102
"tailwindcss-radix": "^2.8.0",

client/src/hooks/SSE/useSSE.ts

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
1-
import { v4 } from 'uuid';
2-
import { useSetRecoilState } from 'recoil';
3-
import { useEffect, useState } from 'react';
1+
import type { EventSubmission, TMessage, TPayload, TSubmission } from 'librechat-data-provider';
42
import {
53
/* @ts-ignore */
6-
SSE,
74
createPayload,
85
isAgentsEndpoint,
9-
removeNullishValues,
106
isAssistantsEndpoint,
7+
removeNullishValues,
8+
request,
119
} from 'librechat-data-provider';
12-
import { useGetUserBalance, useGetStartupConfig } from 'librechat-data-provider/react-query';
13-
import type { TMessage, TSubmission, TPayload, EventSubmission } from 'librechat-data-provider';
14-
import type { EventHandlerParams } from './useEventHandlers';
10+
import { useGetStartupConfig, useGetUserBalance } from 'librechat-data-provider/react-query';
11+
import { useEffect, useState } from 'react';
12+
import { useSetRecoilState } from 'recoil';
13+
import { SSE } from 'sse.js';
14+
import { v4 } from 'uuid';
1515
import type { TResData } from '~/common';
1616
import { useGenTitleMutation } from '~/data-provider';
1717
import { useAuthContext } from '~/hooks/AuthContext';
18-
import useEventHandlers from './useEventHandlers';
1918
import store from '~/store';
19+
import type { EventHandlerParams } from './useEventHandlers';
20+
import useEventHandlers from './useEventHandlers';
2021

2122
type ChatHelpers = Pick<
2223
EventHandlerParams,
@@ -94,21 +95,21 @@ export default function useSSE(
9495

9596
let textIndex = null;
9697

97-
const events = new SSE(payloadData.server, {
98+
const sse = new SSE(payloadData.server, {
9899
payload: JSON.stringify(payload),
99100
headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${token}` },
100101
});
101102

102-
events.onattachment = (e: MessageEvent) => {
103+
sse.addEventListener('attachment', (e: MessageEvent) => {
103104
try {
104105
const data = JSON.parse(e.data);
105106
attachmentHandler({ data, submission: submission as EventSubmission });
106107
} catch (error) {
107108
console.error(error);
108109
}
109-
};
110+
});
110111

111-
events.onmessage = (e: MessageEvent) => {
112+
sse.addEventListener('message', (e: MessageEvent) => {
112113
const data = JSON.parse(e.data);
113114

114115
if (data.final != null) {
@@ -155,14 +156,14 @@ export default function useSSE(
155156
messageHandler(text, { ...submission, plugin, plugins, userMessage, initialResponse });
156157
}
157158
}
158-
};
159+
});
159160

160-
events.onopen = () => {
161+
sse.addEventListener('open', () => {
161162
setAbortScroll(false);
162163
console.log('connection is opened');
163-
};
164+
});
164165

165-
events.oncancel = async () => {
166+
sse.addEventListener('cancel', async () => {
166167
const streamKey = (submission as TSubmission | null)?.['initialResponse']?.messageId;
167168
if (completed.has(streamKey)) {
168169
setIsSubmitting(false);
@@ -181,9 +182,27 @@ export default function useSSE(
181182
submission as EventSubmission,
182183
latestMessages,
183184
);
184-
};
185+
});
186+
187+
sse.addEventListener('error', async (e: MessageEvent) => {
188+
/* @ts-ignore */
189+
if (e.responseCode === 401) {
190+
/* token expired, refresh and retry */
191+
try {
192+
const refreshResponse = await request.refreshToken();
193+
sse.headers = {
194+
'Content-Type': 'application/json',
195+
Authorization: `Bearer ${refreshResponse.token}`,
196+
};
197+
request.dispatchTokenUpdatedEvent(refreshResponse.token);
198+
sse.stream();
199+
return;
200+
} catch (error) {
201+
/* token refresh failed, continue handling the original 401 */
202+
console.log(error);
203+
}
204+
}
185205

186-
events.onerror = function (e: MessageEvent) {
187206
console.log('error in server stream.');
188207
(startupConfig?.checkBalance ?? false) && balanceQuery.refetch();
189208

@@ -197,18 +216,18 @@ export default function useSSE(
197216
}
198217

199218
errorHandler({ data, submission: { ...submission, userMessage } as EventSubmission });
200-
};
219+
});
201220

202221
setIsSubmitting(true);
203-
events.stream();
222+
sse.stream();
204223

205224
return () => {
206-
const isCancelled = events.readyState <= 1;
207-
events.close();
208-
// setSource(null);
225+
const isCancelled = sse.readyState <= 1;
226+
sse.close();
209227
if (isCancelled) {
210228
const e = new Event('cancel');
211-
events.dispatchEvent(e);
229+
/* @ts-ignore */
230+
sse.dispatchEvent(e);
212231
}
213232
};
214233
// eslint-disable-next-line react-hooks/exhaustive-deps

package-lock.json

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/data-provider/src/index.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,25 @@ export * from './artifacts';
88
/* schema helpers */
99
export * from './parsers';
1010
/* custom/dynamic configurations */
11-
export * from './models';
1211
export * from './generate';
12+
export * from './models';
1313
/* RBAC */
1414
export * from './roles';
1515
/* types (exports schemas from `./types` as they contain needed in other defs) */
1616
export * from './types';
1717
export * from './types/agents';
1818
export * from './types/assistants';
19-
export * from './types/queries';
2019
export * from './types/files';
2120
export * from './types/mutations';
21+
export * from './types/queries';
2222
export * from './types/runs';
2323
/* query/mutation keys */
2424
export * from './keys';
2525
/* api call helpers */
2626
export * from './headers-helpers';
2727
export { default as request } from './request';
28-
import * as dataService from './data-service';
2928
export { dataService };
29+
import * as dataService from './data-service';
3030
/* general helpers */
31-
export * from './sse';
3231
export * from './actions';
3332
export { default as createPayload } from './createPayload';

packages/data-provider/src/request.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* eslint-disable @typescript-eslint/no-explicit-any */
2-
import axios, { AxiosRequestConfig, AxiosError } from 'axios';
3-
import { setTokenHeader } from './headers-helpers';
2+
import axios, { AxiosError, AxiosRequestConfig } from 'axios';
43
import * as endpoints from './api-endpoints';
4+
import { setTokenHeader } from './headers-helpers';
55

66
async function _get<T>(url: string, options?: AxiosRequestConfig): Promise<T> {
77
const response = await axios.get(url, { ...options });
@@ -65,6 +65,11 @@ let failedQueue: { resolve: (value?: any) => void; reject: (reason?: any) => voi
6565

6666
const refreshToken = (retry?: boolean) => _post(endpoints.refreshToken(retry));
6767

68+
const dispatchTokenUpdatedEvent = (token: string) => {
69+
setTokenHeader(token);
70+
window.dispatchEvent(new CustomEvent('tokenUpdated', { detail: token }));
71+
};
72+
6873
const processQueue = (error: AxiosError | null, token: string | null = null) => {
6974
failedQueue.forEach((prom) => {
7075
if (error) {
@@ -109,8 +114,7 @@ axios.interceptors.response.use(
109114

110115
if (token) {
111116
originalRequest.headers['Authorization'] = 'Bearer ' + token;
112-
setTokenHeader(token);
113-
window.dispatchEvent(new CustomEvent('tokenUpdated', { detail: token }));
117+
dispatchTokenUpdatedEvent(token);
114118
processQueue(null, token);
115119
return await axios(originalRequest);
116120
} else {
@@ -139,4 +143,5 @@ export default {
139143
deleteWithOptions: _deleteWithOptions,
140144
patch: _patch,
141145
refreshToken,
146+
dispatchTokenUpdatedEvent,
142147
};

0 commit comments

Comments
 (0)