Skip to content

Commit e001168

Browse files
authored
Merge pull request #441 from streamich/rx-rpc-improvements
JSON Reactive RPC improvements
2 parents ad753ca + e1bc52b commit e001168

File tree

8 files changed

+315
-259
lines changed

8 files changed

+315
-259
lines changed

src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ export class TypeRouterCaller<Router extends TypeRouter<any>, Ctx = unknown> ext
4545
: (req: unknown) => {
4646
const error = validator(req);
4747
if (error) {
48-
throw RpcError.value(RpcError.validation(error.message, error));
48+
const message = error.message + (Array.isArray(error?.path) ? ' Path: /' + error.path.join('/') : '');
49+
throw RpcError.value(RpcError.validation(message, error));
4950
}
5051
};
5152
method =
@@ -79,7 +80,13 @@ export class TypeRouterCaller<Router extends TypeRouter<any>, Ctx = unknown> ext
7980
request: MethodReq<Routes<Router>[K]>,
8081
ctx: Ctx = {} as any,
8182
): Promise<MethodRes<Routes<Router>[K]>> {
82-
return (await super.call(id as string, request, ctx)).data as any;
83+
try {
84+
const res = await this.call(id as string, request, ctx);
85+
return res.data;
86+
} catch (err) {
87+
const error = err as Value<RpcError>;
88+
throw error.data;
89+
}
8390
}
8491

8592
public call$<K extends keyof Routes<Router>>(

src/reactive-rpc/server/uws/RpcApp.ts

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,59 @@ import {Value} from '../../common/messages/Value';
88
import {EncodingFormat} from '../../../json-pack/constants';
99
import {RpcMessageFormat} from '../../common/codec/constants';
1010
import {RpcCodecs} from '../../common/codec/RpcCodecs';
11+
import {Codecs} from '../../../json-pack/codecs/Codecs';
1112
import {type ReactiveRpcMessage, RpcMessageStreamProcessor, ReactiveRpcClientMessage} from '../../common';
12-
import type {Codecs} from '../../../json-pack/codecs/Codecs';
1313
import type * as types from './types';
1414
import type {RouteHandler} from './types';
1515
import type {RpcCaller} from '../../common/rpc/caller/RpcCaller';
1616
import type {JsonValueCodec} from '../../../json-pack/codecs/types';
17+
import {Writer} from '../../../util/buffers/Writer';
1718

1819
const HDR_BAD_REQUEST = Buffer.from('400 Bad Request', 'utf8');
1920
const HDR_NOT_FOUND = Buffer.from('404 Not Found', 'utf8');
2021
const HDR_INTERNAL_SERVER_ERROR = Buffer.from('500 Internal Server Error', 'utf8');
2122
const ERR_NOT_FOUND = RpcError.fromCode(RpcErrorCodes.NOT_FOUND, 'Not Found');
2223
const ERR_INTERNAL = RpcError.internal();
2324

25+
const noop = (x: any) => {};
26+
2427
export interface RpcAppOptions {
2528
uws: types.TemplatedApp;
26-
maxRequestBodySize: number;
27-
codecs: Codecs;
2829
caller: RpcCaller<any>;
29-
augmentContext: (ctx: ConnectionContext) => void;
30+
31+
/**
32+
* Maximum request body size in bytes. Default is 1MB.
33+
*/
34+
maxRequestBodySize?: number;
35+
36+
/**
37+
* Serializers and de-serializers for request and response bodies.
38+
*/
39+
codecs?: Codecs;
40+
41+
/**
42+
* HTTP port to listen on. If not specified, the PORT environment variable
43+
* will be used, or 9999 if not set.
44+
*/
45+
port?: number;
46+
47+
/**
48+
* Host to listen to. If not specified, the HOST environment variable will be
49+
* used, or '0.0.0.0' if not set.
50+
*/
51+
host?: string;
52+
53+
/**
54+
* This method allows to augment connection context with additional data.
55+
*
56+
* @param ctx Connection context.
57+
*/
58+
augmentContext?: (ctx: ConnectionContext) => void;
59+
60+
/**
61+
* Logger to use for logging. If not specified, console will be used.
62+
*/
63+
logger?: types.ServerLogger;
3064
}
3165

3266
export class RpcApp<Ctx extends ConnectionContext> {
@@ -38,8 +72,8 @@ export class RpcApp<Ctx extends ConnectionContext> {
3872

3973
constructor(protected readonly options: RpcAppOptions) {
4074
this.app = options.uws;
41-
this.maxRequestBodySize = options.maxRequestBodySize;
42-
this.codecs = new RpcCodecs(options.codecs, new RpcMessageCodecs());
75+
(this.maxRequestBodySize = options.maxRequestBodySize ?? 1024 * 1024),
76+
(this.codecs = new RpcCodecs(options.codecs ?? new Codecs(new Writer()), new RpcMessageCodecs()));
4377
this.batchProcessor = new RpcMessageBatchProcessor<Ctx>({caller: options.caller});
4478
}
4579

@@ -104,7 +138,7 @@ export class RpcApp<Ctx extends ConnectionContext> {
104138

105139
public enableWsRpc(path: string = '/rpc'): this {
106140
const maxBackpressure = 4 * 1024 * 1024;
107-
const augmentContext = this.options.augmentContext;
141+
const augmentContext = this.options.augmentContext ?? noop;
108142
this.app.ws(path, {
109143
idleTimeout: 0,
110144
maxPayloadLength: 4 * 1024 * 1024,
@@ -164,7 +198,9 @@ export class RpcApp<Ctx extends ConnectionContext> {
164198
const matcher = this.router.compile();
165199
const codecs = this.codecs;
166200
let responseCodec: JsonValueCodec = codecs.value.json;
167-
const augmentContext = this.options.augmentContext;
201+
const options = this.options;
202+
const augmentContext = options.augmentContext ?? noop;
203+
const logger = options.logger ?? console;
168204
this.app.any('/*', async (res: types.HttpResponse, req: types.HttpRequest) => {
169205
res.onAborted(() => {
170206
res.aborted = true;
@@ -203,13 +239,31 @@ export class RpcApp<Ctx extends ConnectionContext> {
203239
});
204240
return;
205241
}
206-
// tslint:disable-next-line:no-console
207-
console.error(err);
242+
logger.error(err);
208243
res.cork(() => {
209244
res.writeStatus(HDR_INTERNAL_SERVER_ERROR);
210245
res.end(RpcErrorType.encode(responseCodec, ERR_INTERNAL));
211246
});
212247
}
213248
});
214249
}
250+
251+
public startWithDefaults(): void {
252+
this.enableCors();
253+
this.enableHttpPing();
254+
this.enableHttpRpc();
255+
this.enableWsRpc();
256+
this.startRouting();
257+
const options = this.options;
258+
const port = options.port ?? +(process.env.PORT || 9999);
259+
const host = options.host ?? process.env.HOST ?? '0.0.0.0';
260+
const logger = options.logger ?? console;
261+
this.options.uws.listen(host, port, (token) => {
262+
if (token) {
263+
logger.log({msg: 'SERVER_STARTED', url: `http://localhost:${port}`});
264+
} else {
265+
logger.error(`Failed to listen on ${port} port.`);
266+
}
267+
});
268+
}
215269
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
export type RecognizedString =
2+
| string
3+
| ArrayBuffer
4+
| Uint8Array
5+
| Int8Array
6+
| Uint16Array
7+
| Int16Array
8+
| Uint32Array
9+
| Int32Array
10+
| Float32Array
11+
| Float64Array;
12+
13+
export interface HttpRequest {
14+
/** Returns the lowercased header value or empty string. */
15+
getHeader(lowerCaseKey: RecognizedString): string;
16+
/** Returns the URL including initial /slash */
17+
getUrl(): string;
18+
/** Returns the HTTP method, useful for "any" routes. */
19+
getMethod(): string;
20+
/** Returns the raw querystring (the part of URL after ? sign) or empty string. */
21+
getQuery(): string;
22+
/** Returns a decoded query parameter value or empty string. */
23+
getQuery(key: string): string;
24+
}
25+
26+
export interface HttpResponse {
27+
/**
28+
* Writes the HTTP status message such as "200 OK".
29+
* This has to be called first in any response, otherwise
30+
* it will be called automatically with "200 OK".
31+
*
32+
* If you want to send custom headers in a WebSocket
33+
* upgrade response, you have to call writeStatus with
34+
* "101 Switching Protocols" before you call writeHeader,
35+
* otherwise your first call to writeHeader will call
36+
* writeStatus with "200 OK" and the upgrade will fail.
37+
*
38+
* As you can imagine, we format outgoing responses in a linear
39+
* buffer, not in a hash table. You can read about this in
40+
* the user manual under "corking".
41+
*/
42+
writeStatus(status: RecognizedString): HttpResponse;
43+
/**
44+
* Writes key and value to HTTP response.
45+
* See writeStatus and corking.
46+
*/
47+
writeHeader(key: RecognizedString, value: RecognizedString): HttpResponse;
48+
/** Enters or continues chunked encoding mode. Writes part of the response. End with zero length write. Returns true if no backpressure was added. */
49+
write(chunk: RecognizedString): boolean;
50+
/** Ends this response by copying the contents of body. */
51+
end(body?: RecognizedString, closeConnection?: boolean): HttpResponse;
52+
/** Immediately force closes the connection. Any onAborted callback will run. */
53+
close(): HttpResponse;
54+
/**
55+
* Every HttpResponse MUST have an attached abort handler IF you do not respond
56+
* to it immediately inside of the callback. Returning from an Http request handler
57+
* without attaching (by calling onAborted) an abort handler is ill-use and will terminate.
58+
* When this event emits, the response has been aborted and may not be used.
59+
*/
60+
onAborted(handler: () => void): HttpResponse;
61+
/**
62+
* Handler for reading data from POST and such requests. You MUST copy the
63+
* data of chunk if isLast is not true. We Neuter ArrayBuffers on return,
64+
* making it zero length.
65+
*/
66+
onData(handler: (chunk: ArrayBuffer, isLast: boolean) => void): HttpResponse;
67+
/** Returns the remote IP address as text. */
68+
getRemoteAddressAsText(): ArrayBuffer;
69+
/** Corking a response is a performance improvement in both CPU and network, as you ready the IO system for writing multiple chunks at once.
70+
* By default, you're corked in the immediately executing top portion of the route handler. In all other cases, such as when returning from
71+
* await, or when being called back from an async database request or anything that isn't directly executing in the route handler, you'll want
72+
* to cork before calling writeStatus, writeHeader or just write. Corking takes a callback in which you execute the writeHeader, writeStatus and
73+
* such calls, in one atomic IO operation. This is important, not only for TCP but definitely for TLS where each write would otherwise result
74+
* in one TLS block being sent off, each with one send syscall.
75+
*
76+
* Example usage:
77+
*
78+
* res.cork(() => {
79+
* res.writeStatus("200 OK").writeHeader("Some", "Value").write("Hello world!");
80+
* });
81+
*/
82+
cork(cb: () => void): HttpResponse;
83+
/** Upgrades a HttpResponse to a WebSocket. See UpgradeAsync, UpgradeSync example files. */
84+
upgrade<T>(
85+
userData: T,
86+
secWebSocketKey: RecognizedString,
87+
secWebSocketProtocol: RecognizedString,
88+
secWebSocketExtensions: RecognizedString,
89+
context: unknown,
90+
): void;
91+
/** Arbitrary user data may be attached to this object */
92+
[key: string]: any;
93+
}
94+
95+
/** TemplatedApp is either an SSL or non-SSL app. See App for more info, read user manual. */
96+
export interface TemplatedApp {
97+
/** Listens to hostname & port. Callback hands either false or a listen socket. */
98+
listen(host: RecognizedString, port: number, cb: (listenSocket: unknown) => void): TemplatedApp;
99+
/** Registers an HTTP GET handler matching specified URL pattern. */
100+
get(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
101+
/** Registers an HTTP POST handler matching specified URL pattern. */
102+
post(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
103+
/** Registers an HTTP OPTIONS handler matching specified URL pattern. */
104+
options(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
105+
/** Registers an HTTP DELETE handler matching specified URL pattern. */
106+
del(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
107+
/** Registers an HTTP PATCH handler matching specified URL pattern. */
108+
patch(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
109+
/** Registers an HTTP PUT handler matching specified URL pattern. */
110+
put(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
111+
/** Registers an HTTP HEAD handler matching specified URL pattern. */
112+
head(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
113+
/** Registers an HTTP CONNECT handler matching specified URL pattern. */
114+
connect(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
115+
/** Registers an HTTP TRACE handler matching specified URL pattern. */
116+
trace(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
117+
/** Registers an HTTP handler matching specified URL pattern on any HTTP method. */
118+
any(pattern: RecognizedString, handler: (res: HttpResponse, req: HttpRequest) => void): TemplatedApp;
119+
/** Registers a handler matching specified URL pattern where WebSocket upgrade requests are caught. */
120+
ws(pattern: RecognizedString, behavior: WebSocketBehavior): TemplatedApp;
121+
}
122+
123+
/** A structure holding settings and handlers for a WebSocket URL route handler. */
124+
export interface WebSocketBehavior {
125+
/** Maximum length of received message. If a client tries to send you a message larger than this, the connection is immediately closed. Defaults to 16 * 1024. */
126+
maxPayloadLength?: number;
127+
/** Maximum amount of seconds that may pass without sending or getting a message. Connection is closed if this timeout passes. Resolution (granularity) for timeouts are typically 4 seconds, rounded to closest.
128+
* Disable by using 0. Defaults to 120.
129+
*/
130+
idleTimeout?: number;
131+
/** What permessage-deflate compression to use. uWS.DISABLED, uWS.SHARED_COMPRESSOR or any of the uWS.DEDICATED_COMPRESSOR_xxxKB. Defaults to uWS.DISABLED. */
132+
compression?: number;
133+
/** Maximum length of allowed backpressure per socket when publishing or sending messages. Slow receivers with too high backpressure will be skipped until they catch up or timeout. Defaults to 1024 * 1024. */
134+
maxBackpressure?: number;
135+
/** Whether or not we should automatically send pings to uphold a stable connection given whatever idleTimeout. */
136+
sendPingsAutomatically?: boolean;
137+
/** Upgrade handler used to intercept HTTP upgrade requests and potentially upgrade to WebSocket.
138+
* See UpgradeAsync and UpgradeSync example files.
139+
*/
140+
upgrade?: (res: HttpResponse, req: HttpRequest, context: unknown) => void;
141+
/** Handler for new WebSocket connection. WebSocket is valid from open to close, no errors. */
142+
open?: (ws: WebSocket) => void;
143+
/** Handler for a WebSocket message. Messages are given as ArrayBuffer no matter if they are binary or not. Given ArrayBuffer is valid during the lifetime of this callback (until first await or return) and will be neutered. */
144+
message?: (ws: WebSocket, message: ArrayBuffer, isBinary: boolean) => void;
145+
/** Handler for when WebSocket backpressure drains. Check ws.getBufferedAmount(). Use this to guide / drive your backpressure throttling. */
146+
drain?: (ws: WebSocket) => void;
147+
/** Handler for close event, no matter if error, timeout or graceful close. You may not use WebSocket after this event. Do not send on this WebSocket from within here, it is closed. */
148+
close?: (ws: WebSocket, code: number, message: ArrayBuffer) => void;
149+
/** Handler for received ping control message. You do not need to handle this, pong messages are automatically sent as per the standard. */
150+
ping?: (ws: WebSocket, message: ArrayBuffer) => void;
151+
/** Handler for received pong control message. */
152+
pong?: (ws: WebSocket, message: ArrayBuffer) => void;
153+
}
154+
155+
/** A WebSocket connection that is valid from open to close event.
156+
* Read more about this in the user manual.
157+
*/
158+
export interface WebSocket {
159+
/** Sends a message. Returns 1 for success, 2 for dropped due to backpressure limit, and 0 for built up backpressure that will drain over time. You can check backpressure before or after sending by calling getBufferedAmount().
160+
*
161+
* Make sure you properly understand the concept of backpressure. Check the backpressure example file.
162+
*/
163+
send(message: RecognizedString, isBinary?: boolean, compress?: boolean): number;
164+
/** Returns the bytes buffered in backpressure. This is similar to the bufferedAmount property in the browser counterpart.
165+
* Check backpressure example.
166+
*/
167+
getBufferedAmount(): number;
168+
/** Gracefully closes this WebSocket. Immediately calls the close handler.
169+
* A WebSocket close message is sent with code and shortMessage.
170+
*/
171+
end(code?: number, shortMessage?: RecognizedString): void;
172+
/** Forcefully closes this WebSocket. Immediately calls the close handler.
173+
* No WebSocket close message is sent.
174+
*/
175+
close(): void;
176+
/** Sends a ping control message. Returns sendStatus similar to WebSocket.send (regarding backpressure). This helper function correlates to WebSocket::send(message, uWS::OpCode::PING, ...) in C++. */
177+
ping(message?: RecognizedString): number;
178+
/** Subscribe to a topic. */
179+
subscribe(topic: RecognizedString): boolean;
180+
/** Unsubscribe from a topic. Returns true on success, if the WebSocket was subscribed. */
181+
unsubscribe(topic: RecognizedString): boolean;
182+
/** Returns whether this websocket is subscribed to topic. */
183+
isSubscribed(topic: RecognizedString): boolean;
184+
/** Returns a list of topics this websocket is subscribed to. */
185+
getTopics(): string[];
186+
/** Publish a message under topic. Backpressure is managed according to maxBackpressure, closeOnBackpressureLimit settings.
187+
* Order is guaranteed since v20.
188+
*/
189+
publish(topic: RecognizedString, message: RecognizedString, isBinary?: boolean, compress?: boolean): boolean;
190+
/** See HttpResponse.cork. Takes a function in which the socket is corked (packing many sends into one single syscall/SSL block) */
191+
cork(cb: () => void): WebSocket;
192+
/** Returns the remote IP address. Note that the returned IP is binary, not text.
193+
*
194+
* IPv4 is 4 byte long and can be converted to text by printing every byte as a digit between 0 and 255.
195+
* IPv6 is 16 byte long and can be converted to text in similar ways, but you typically print digits in HEX.
196+
*
197+
* See getRemoteAddressAsText() for a text version.
198+
*/
199+
getRemoteAddress(): ArrayBuffer;
200+
/** Returns the remote IP address as text. See RecognizedString. */
201+
getRemoteAddressAsText(): ArrayBuffer;
202+
/** Arbitrary user data may be attached to this object. In C++ this is done by using getUserData(). */
203+
[key: string]: any;
204+
}

0 commit comments

Comments
 (0)