Skip to content

Commit ad753ca

Browse files
authored
Merge pull request #433 from streamich/server
Demo server
2 parents 9601ff4 + 0ef6bb3 commit ad753ca

37 files changed

+1753
-29
lines changed

src/json-type/system/TypeRouter.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,23 @@ export class TypeRouter<Routes extends RoutesBase> {
4444
const router = new TypeRouter({system: this.system, routes: routes(this)});
4545
return this.merge(router);
4646
}
47+
48+
public fn<K extends string, R extends classes.FunctionType<any, any>>(
49+
name: K,
50+
type: R,
51+
): TypeRouter<Routes & {[KK in K]: R}> {
52+
this.routes[name] = <any>type;
53+
return <any>this;
54+
}
55+
56+
public fn$<K extends string, R extends classes.FunctionStreamingType<any, any>>(
57+
name: K,
58+
type: R,
59+
): TypeRouter<Routes & {[KK in K]: R}> {
60+
this.routes[name] = <any>type;
61+
return <any>this;
62+
}
4763
}
4864

49-
export type RoutesBase = Record<string, classes.FunctionType<any, any>>;
65+
export type RoutesBase = Record<string, classes.FunctionType<any, any> | classes.FunctionStreamingType<any, any>>;
5066
type TypeRouterRoutes<R extends TypeRouter<any>> = R extends TypeRouter<infer R2> ? R2 : never;

src/reactive-rpc/__demos__/server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const app = new RpcApp({
1414
caller,
1515
codecs,
1616
maxRequestBodySize: 1024 * 1024,
17+
augmentContext: (ctx) => ctx,
1718
});
1819

1920
app.enableCors();

src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import {StaticRpcMethod, type StaticRpcMethodOptions} from '../methods/StaticRpc
55
import {StreamingRpcMethod, type StreamingRpcMethodOptions} from '../methods/StreamingRpcMethod';
66
import type {Schema, SchemaOf, TypeOf, TypeSystem} from '../../../../json-type';
77
import type {TypeRouter} from '../../../../json-type/system/TypeRouter';
8+
import type {Value} from '../../messages/Value';
9+
import type {Observable} from 'rxjs';
810

911
export interface TypedApiCallerOptions<Router extends TypeRouter<any>, Ctx = unknown>
1012
extends Omit<RpcApiCallerOptions<Ctx>, 'getMethod'> {
@@ -68,9 +70,25 @@ export class TypeRouterCaller<Router extends TypeRouter<any>, Ctx = unknown> ext
6870
id: K,
6971
request: MethodReq<Routes<Router>[K]>,
7072
ctx: Ctx,
71-
): Promise<MethodRes<Routes<Router>[K]>> {
73+
): Promise<Value<MethodRes<Routes<Router>[K]>>> {
7274
return super.call(id as string, request, ctx) as any;
7375
}
76+
77+
public async callSimple<K extends keyof Routes<Router>>(
78+
id: K,
79+
request: MethodReq<Routes<Router>[K]>,
80+
ctx: Ctx = {} as any,
81+
): Promise<MethodRes<Routes<Router>[K]>> {
82+
return (await super.call(id as string, request, ctx)).data as any;
83+
}
84+
85+
public call$<K extends keyof Routes<Router>>(
86+
id: K,
87+
request: Observable<MethodReq<Routes<Router>[K]>>,
88+
ctx: Ctx,
89+
): Observable<Value<MethodRes<Routes<Router>[K]>>> {
90+
return super.call$(id as string, request, ctx) as any;
91+
}
7492
}
7593

7694
type Routes<Router> = Router extends TypeRouter<infer R> ? R : never;

src/reactive-rpc/common/rpc/caller/error/RpcError.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,21 @@ export class RpcError extends Error implements IRpcError {
3939
return RpcError.internal();
4040
}
4141

42-
public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined) {
42+
public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined): RpcError {
4343
const code = RpcErrorCodes[errno];
4444
return new RpcError(message || code, code, errno, undefined, meta || undefined);
4545
}
4646

47-
public static internal(message: string = 'Internal Server Error') {
47+
public static internal(message: string = 'Internal Server Error'): RpcError {
4848
return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message);
4949
}
5050

5151
/** @todo Rename to "badRequest". */
52-
public static invalidRequest() {
52+
public static invalidRequest(): RpcError {
5353
return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, 'Bad Request');
5454
}
5555

56-
public static validation(message: string, meta?: unknown) {
56+
public static validation(message: string, meta?: unknown): RpcError {
5757
return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message, meta);
5858
}
5959

src/reactive-rpc/server/uws/RpcApp.ts

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import {enableCors} from './util';
22
import {Match, Router} from '../../../util/router';
3-
import {listToUint8} from '../../../util/buffers/concat';
43
import {IncomingBatchMessage, RpcMessageBatchProcessor} from '../../common/rpc/RpcMessageBatchProcessor';
54
import {RpcError, RpcErrorCodes, RpcErrorType} from '../../common/rpc/caller/error';
65
import {ConnectionContext} from '../context';
@@ -11,15 +10,7 @@ import {RpcMessageFormat} from '../../common/codec/constants';
1110
import {RpcCodecs} from '../../common/codec/RpcCodecs';
1211
import {type ReactiveRpcMessage, RpcMessageStreamProcessor, ReactiveRpcClientMessage} from '../../common';
1312
import type {Codecs} from '../../../json-pack/codecs/Codecs';
14-
import type {
15-
TemplatedApp,
16-
HttpRequest,
17-
HttpResponse,
18-
HttpMethodPermissive,
19-
JsonRouteHandler,
20-
WebSocket,
21-
RpcWebSocket,
22-
} from './types';
13+
import type * as types from './types';
2314
import type {RouteHandler} from './types';
2415
import type {RpcCaller} from '../../common/rpc/caller/RpcCaller';
2516
import type {JsonValueCodec} from '../../../json-pack/codecs/types';
@@ -31,15 +22,16 @@ const ERR_NOT_FOUND = RpcError.fromCode(RpcErrorCodes.NOT_FOUND, 'Not Found');
3122
const ERR_INTERNAL = RpcError.internal();
3223

3324
export interface RpcAppOptions {
34-
uws: TemplatedApp;
25+
uws: types.TemplatedApp;
3526
maxRequestBodySize: number;
3627
codecs: Codecs;
37-
caller: RpcCaller;
28+
caller: RpcCaller<any>;
29+
augmentContext: (ctx: ConnectionContext) => void;
3830
}
3931

4032
export class RpcApp<Ctx extends ConnectionContext> {
4133
public readonly codecs: RpcCodecs;
42-
protected readonly app: TemplatedApp;
34+
protected readonly app: types.TemplatedApp;
4335
protected readonly maxRequestBodySize: number;
4436
protected readonly router = new Router();
4537
protected readonly batchProcessor: RpcMessageBatchProcessor<Ctx>;
@@ -55,12 +47,12 @@ export class RpcApp<Ctx extends ConnectionContext> {
5547
enableCors(this.options.uws);
5648
}
5749

58-
public routeRaw(method: HttpMethodPermissive, path: string, handler: RouteHandler<Ctx>): void {
59-
method = method.toLowerCase() as HttpMethodPermissive;
50+
public routeRaw(method: types.HttpMethodPermissive, path: string, handler: RouteHandler<Ctx>): void {
51+
method = method.toLowerCase() as types.HttpMethodPermissive;
6052
this.router.add(method + path, handler);
6153
}
6254

63-
public route(method: HttpMethodPermissive, path: string, handler: JsonRouteHandler<Ctx>): void {
55+
public route(method: types.HttpMethodPermissive, path: string, handler: types.JsonRouteHandler<Ctx>): void {
6456
this.routeRaw(method, path, async (ctx: Ctx) => {
6557
const result = await handler(ctx);
6658
const res = ctx.res!;
@@ -112,6 +104,7 @@ export class RpcApp<Ctx extends ConnectionContext> {
112104

113105
public enableWsRpc(path: string = '/rpc'): this {
114106
const maxBackpressure = 4 * 1024 * 1024;
107+
const augmentContext = this.options.augmentContext;
115108
this.app.ws(path, {
116109
idleTimeout: 0,
117110
maxPayloadLength: 4 * 1024 * 1024,
@@ -120,11 +113,12 @@ export class RpcApp<Ctx extends ConnectionContext> {
120113
const secWebSocketProtocol = req.getHeader('sec-websocket-protocol');
121114
const secWebSocketExtensions = req.getHeader('sec-websocket-extensions');
122115
const ctx = ConnectionContext.fromReqRes(req, res, null, this);
116+
augmentContext(ctx);
123117
/* This immediately calls open handler, you must not use res after this call */
124118
res.upgrade({ctx}, secWebSocketKey, secWebSocketProtocol, secWebSocketExtensions, context);
125119
},
126-
open: (ws_: WebSocket) => {
127-
const ws = ws_ as RpcWebSocket<Ctx>;
120+
open: (ws_: types.WebSocket) => {
121+
const ws = ws_ as types.RpcWebSocket<Ctx>;
128122
const ctx = ws.ctx;
129123
const resCodec = ctx.resCodec;
130124
const msgCodec = ctx.msgCodec;
@@ -144,8 +138,8 @@ export class RpcApp<Ctx extends ConnectionContext> {
144138
bufferTime: 0,
145139
});
146140
},
147-
message: (ws_: WebSocket, buf: ArrayBuffer, isBinary: boolean) => {
148-
const ws = ws_ as RpcWebSocket<Ctx>;
141+
message: (ws_: types.WebSocket, buf: ArrayBuffer, isBinary: boolean) => {
142+
const ws = ws_ as types.RpcWebSocket<Ctx>;
149143
const ctx = ws.ctx;
150144
const reqCodec = ctx.reqCodec;
151145
const msgCodec = ctx.msgCodec;
@@ -158,8 +152,8 @@ export class RpcApp<Ctx extends ConnectionContext> {
158152
rpc.sendNotification('.err', RpcError.value(RpcError.invalidRequest()));
159153
}
160154
},
161-
close: (ws_: WebSocket, code: number, message: ArrayBuffer) => {
162-
const ws = ws_ as RpcWebSocket<Ctx>;
155+
close: (ws_: types.WebSocket, code: number, message: ArrayBuffer) => {
156+
const ws = ws_ as types.RpcWebSocket<Ctx>;
163157
ws.rpc!.stop();
164158
},
165159
});
@@ -170,7 +164,8 @@ export class RpcApp<Ctx extends ConnectionContext> {
170164
const matcher = this.router.compile();
171165
const codecs = this.codecs;
172166
let responseCodec: JsonValueCodec = codecs.value.json;
173-
this.app.any('/*', async (res: HttpResponse, req: HttpRequest) => {
167+
const augmentContext = this.options.augmentContext;
168+
this.app.any('/*', async (res: types.HttpResponse, req: types.HttpRequest) => {
174169
res.onAborted(() => {
175170
res.aborted = true;
176171
});
@@ -189,6 +184,7 @@ export class RpcApp<Ctx extends ConnectionContext> {
189184
const params = match.params;
190185
const ctx = ConnectionContext.fromReqRes(req, res, params, this) as Ctx;
191186
responseCodec = ctx.resCodec;
187+
augmentContext(ctx);
192188
await handler(ctx);
193189
} catch (err) {
194190
if (err instanceof RpcError) {

0 commit comments

Comments
 (0)