Skip to content

Commit

Permalink
[transport]: first-class streams (#11)
Browse files Browse the repository at this point in the history
* first-class streams

* cleanup

* add tests for concurrent requests

* use const enum instead of bare consts

* [docs]: Add docstrings to everything (#12)

* add more docs to control flags
  • Loading branch information
jackyzha0 authored Nov 15, 2023
1 parent 4dd58d9 commit eabd2f9
Show file tree
Hide file tree
Showing 21 changed files with 737 additions and 202 deletions.
4 changes: 4 additions & 0 deletions __tests__/bandwidth.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const dummyPayloadSmall = () => ({
to: 'SERVER',
serviceName: 'test',
procedureName: 'test',
streamId: 'test',
controlFlags: 0,
payload: {
msg: 'cool',
},
Expand All @@ -31,6 +33,8 @@ const dummyPayloadLarge = () => ({
to: 'SERVER',
serviceName: 'test',
procedureName: 'test',
streamId: 'test',
controlFlags: 0,
payload: largePayload,
});

Expand Down
64 changes: 62 additions & 2 deletions __tests__/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ const OrderingServiceConstructor = () =>
.defineProcedure('add', {
type: 'rpc',
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ ok: Type.Boolean() }),
output: Type.Object({ n: Type.Number() }),
async handler(ctx, msg) {
const { n } = msg.payload;
ctx.state.msgs.push(n);
return reply(msg, { ok: true });
return reply(msg, { n });
},
})
.defineProcedure('getAll', {
Expand Down Expand Up @@ -241,4 +241,64 @@ describe('client <-> server integration test', async () => {
const res = await client.test.getAll({});
return expect(res.msgs).toStrictEqual(expected);
});

const CONCURRENCY = 10;
test('concurrent rpcs', async () => {
const [clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: OrderingServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const promises = [];
for (let i = 0; i < CONCURRENCY; i++) {
promises.push(client.test.add({ n: i }));
}

for (let i = 0; i < CONCURRENCY; i++) {
await expect(promises[i]).resolves.toStrictEqual({ n: i });
}
});

test('concurrent streams', async () => {
const [clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const openStreams = [];
for (let i = 0; i < CONCURRENCY; i++) {
const streamHandle = await client.test.echo();
const input = streamHandle[0];
input.push({ msg: `${i}-1`, ignore: false });
input.push({ msg: `${i}-2`, ignore: false });
openStreams.push(streamHandle);
}

for (let i = 0; i < CONCURRENCY; i++) {
const output = openStreams[i][1];
await expect(
output.next().then((res) => res.value),
).resolves.toStrictEqual({
response: `${i}-1`,
});
await expect(
output.next().then((res) => res.value),
).resolves.toStrictEqual({
response: `${i}-2`,
});
}

// cleanup
for (let i = 0; i < CONCURRENCY; i++) {
const [input, _output, close] = openStreams[i];
input.end();
close();
}
});
});
4 changes: 4 additions & 0 deletions codec/json.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { Codec } from './types';

/**
* Naive JSON codec implementation using JSON.stringify and JSON.parse.
* @type {Codec}
*/
export const NaiveJsonCodec: Codec = {
toStringBuf: JSON.stringify,
fromStringBuf: (s: string) => {
Expand Down
14 changes: 14 additions & 0 deletions codec/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
/**
* Codec interface for encoding and decoding objects to and from string buffers.
* Used to prepare messages for use by the transport layer.
*/
export interface Codec {
/**
* Encodes an object to a string buffer.
* @param obj - The object to encode.
* @returns The encoded string buffer.
*/
toStringBuf(obj: object): string;
/**
* Decodes an object from a string buffer.
* @param buf - The string buffer to decode.
* @returns The decoded object, or null if decoding failed.
*/
fromStringBuf(buf: string): object | null;
}
12 changes: 12 additions & 0 deletions logging/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ export type Logger = {
[key in LoggingLevel]: (msg: string) => void;
};

/**
* The global River logger instance.
*/
export let log: Logger | undefined;
const defaultLoggingLevel: LoggingLevel = 'warn';

/**
* Binds the given write function to River's logger {@link log}.
* @param write - The function to write log messages.
* @param color - Whether to use colored log levels.
*/
export function bindLogger(write: (msg: string) => void, color?: boolean) {
const info = color ? '\u001b[37minfo\u001b[0m' : 'info';
const warn = color ? '\u001b[33mwarn\u001b[0m' : 'warn';
Expand All @@ -36,6 +44,10 @@ export function bindLogger(write: (msg: string) => void, color?: boolean) {
};
}

/**
* Sets the minimum logging level for the logger.
* @param level - The minimum logging level to set.
*/
export function setLevel(level: LoggingLevel) {
if (log) {
log.minLevel = level;
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
"name": "@replit/river",
"sideEffects": false,
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.3.1",
"version": "0.4.0",
"type": "module",
"exports": {
".": "./dist/router/index.js",
"./logging": "./dist/logging/index.js",
"./codec": "./dist/codec/index.js",
"./test-util": "./dist/testUtils.js",
"./package.json": "./package.json",
"./transport": "./dist/transport/index.js",
"./transport/ws": "./dist/transport/impls/ws.js",
"./transport/stdio": "./dist/transport/impls/stdio.js"
Expand Down
79 changes: 78 additions & 1 deletion router/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,28 @@ import type { Pushable } from 'it-pushable';
import { TransportMessage } from '../transport/message';
import { ServiceContextWithState } from './context';

/**
* The valid {@link Procedure} types.
*/
export type ValidProcType = 'stream' | 'rpc';

/**
* A generic procedure listing where the keys are the names of the procedures
* and the values are the {@link Procedure} definitions. This is not meant to
* be constructed directly, use the {@link ServiceBuilder} class instead.
*/
export type ProcListing = Record<
string,
Procedure<object, ValidProcType, TObject, TObject>
>;

/**
* Represents a service with a name, state, and procedures.
* This is not meant to be constructed directly, use the {@link ServiceBuilder} class instead.
* @template Name The type of the service name.
* @template State The type of the service state.
* @template Procs The type of the service procedures.
*/
export interface Service<
Name extends string,
State extends object,
Expand All @@ -22,6 +39,11 @@ export interface Service<
}
export type AnyService = Service<string, object, any>;

/**
* Serializes a service object into its corresponding JSON Schema Draft 7 type.
* @param {AnyService} s - The service object to serialize.
* @returns A plain object representing the serialized service.
*/
export function serializeService(s: AnyService): object {
return {
name: s.name,
Expand All @@ -41,24 +63,53 @@ export function serializeService(s: AnyService): object {
};
}

// extract helpers
/**
* Helper to get the type definition for a specific handler of a procedure in a service.
* @template S - The service.
* @template ProcName - The name of the procedure.
*/
export type ProcHandler<
S extends AnyService,
ProcName extends keyof S['procedures'],
> = S['procedures'][ProcName]['handler'];

/**
* Helper to get the type definition for the procedure input of a service.
* @template S - The service.
* @template ProcName - The name of the procedure.
*/
export type ProcInput<
S extends AnyService,
ProcName extends keyof S['procedures'],
> = S['procedures'][ProcName]['input'];

/**
* Helper to get the type definition for the procedure output of a service.
* @template S - The service.
* @template ProcName - The name of the procedure.
*/
export type ProcOutput<
S extends AnyService,
ProcName extends keyof S['procedures'],
> = S['procedures'][ProcName]['output'];

/**
* Helper to get the type of procedure in a service.
* @template S - The service.
* @template ProcName - The name of the procedure.
*/
export type ProcType<
S extends AnyService,
ProcName extends keyof S['procedures'],
> = S['procedures'][ProcName]['type'];

/**
* Defines a Procedure type that can be either an RPC or a stream procedure.
* @template State - The TypeBox schema of the state object.
* @template Ty - The type of the procedure, either 'rpc' or 'stream'.
* @template I - The TypeBox schema of the input object.
* @template O - The TypeBox schema of the output object.
*/
export type Procedure<
State extends object | unknown,
Ty extends ValidProcType,
Expand All @@ -85,16 +136,31 @@ export type Procedure<
type: Ty;
};

/**
* A builder class for creating River Services.
* You must call the finalize method to get the finalized schema for use in a service router.
* @template T The type of the service.
*/
export class ServiceBuilder<T extends Service<string, object, ProcListing>> {
private readonly schema: T;
private constructor(schema: T) {
this.schema = schema;
}

/**
* Finalizes the schema for the service.
* @returns {T} The finalized schema for the service.
*/
finalize(): T {
return this.schema;
}

/**
* Sets the initial state for the service.
* @template InitState The type of the initial state.
* @param {InitState} state The initial state for the service.
* @returns {ServiceBuilder<{ name: T['name']; state: InitState; procedures: T['procedures']; }>} A new ServiceBuilder instance with the updated schema.
*/
initialState<InitState extends T['state']>(
state: InitState,
): ServiceBuilder<{
Expand All @@ -108,6 +174,12 @@ export class ServiceBuilder<T extends Service<string, object, ProcListing>> {
});
}

/**
* Defines a new procedure for the service.
* @param {ProcName} procName The name of the procedure.
* @param {Procedure<T['state'], Ty, I, O>} procDef The definition of the procedure.
* @returns {ServiceBuilder<{ name: T['name']; state: T['state']; procedures: T['procedures'] & { [k in ProcName]: Procedure<T['state'], Ty, I, O>; }; }>} A new ServiceBuilder instance with the updated schema.
*/
defineProcedure<
ProcName extends string,
Ty extends ValidProcType,
Expand Down Expand Up @@ -138,6 +210,11 @@ export class ServiceBuilder<T extends Service<string, object, ProcListing>> {
});
}

/**
* Creates a new instance of ServiceBuilder.
* @param {Name} name The name of the service.
* @returns {ServiceBuilder<{ name: Name; state: {}; procedures: {}; }>} A new instance of ServiceBuilder.
*/
static create<Name extends string>(
name: Name,
): ServiceBuilder<{
Expand Down
Loading

0 comments on commit eabd2f9

Please sign in to comment.