Skip to content

Commit

Permalink
[protocolv2] Use new simpler readable and writable interfaces (#250)
Browse files Browse the repository at this point in the history
## Why

Follow up on #249 to actually use the new interfaces. Removed extra
refactor done in #249 that merges the interfaces as that proved to be a
challenging API (un-yak-shave 🙅 🐃)

## What changed

- Removed close requests (mostly cherry-picked from #248)
- Otherwise a simple swapping out of the interfaces

## Versioning

- [ ] Breaking protocol change
- [ ] Breaking ts/js API change

<!-- Kind reminder to add tests and updated documentation if needed -->
  • Loading branch information
masad-frost authored Aug 1, 2024
1 parent 251758b commit bab2aa8
Show file tree
Hide file tree
Showing 18 changed files with 406 additions and 590 deletions.
4 changes: 0 additions & 4 deletions .replit
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ channel = "stable-23_05"
localPort = 3000
externalPort = 80

[[ports]]
localPort = 24678
externalPort = 3000

[languages.eslint]
pattern = "**{*.ts,*.js,*.tsx,*.jsx}"
[languages.eslint.languageServer]
Expand Down
5 changes: 1 addition & 4 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ The `controlFlags` property is a [bit field](https://en.wikipedia.org/wiki/Bit_f
- The `StreamOpenBit` (`0b00010`) MUST be set for the first message of a new stream.
- The `StreamAbortBit` (`0b00100`) MUST be set when a stream is to be abruptly closed due to cancellations or an internal error condition.
- The `StreamClosedBit` (`0b01000`) MUST be set for the last message of a stream.
- The `StreamCloseRequestBit` (`0b10000`) MUST be set for a message that is requesting the other side to close the stream.

All messages MUST have no control flags set (i.e., the `controlFlags` field is `0b00000`) unless:

Expand All @@ -157,8 +156,6 @@ All messages MUST have no control flags set (i.e., the `controlFlags` field is `
- All further messages MAY omit `serviceName` and `procedureName` as they are implied by the first message and are constant throughout the lifetime of a stream.
- It is the last message of a stream, in which case the `StreamClosedBit` MUST be set.
- If this is sent with no payload, it is a control message the payload MUST Be a `ControlClose`.
- It is a message requesting the other side to stop writing to the stream, in which case the `StreamCloseRequestBit` MUST be set.
- This is a control message and the payload MUST be a `ControlClose`.
- It is a message aborting the stream, in which case the `StreamAbortBit` MUST be set.
- This message MUST contain a `ProtocolError` payload.
- It is an explicit heartbeat, so the `AckBit` MUST be the only bit set.
Expand Down Expand Up @@ -199,7 +196,7 @@ There are 4 `Control` payloads:

```ts
// Used in cases where we want to send a close without
// a payload. MUST have either a `StreamClosedBit` or `StreamCloseRequestBit` flag
// a payload. MUST have a `StreamClosedBit`
interface ControlClose {
type: 'CLOSE';
}
Expand Down
47 changes: 19 additions & 28 deletions __tests__/abort.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ describe.each(testMatrix())(

abortController.abort();

expect(reqWriter.isClosed());
expect(reqWriter.isWritable()).toEqual(false);
await expect(finalize()).resolves.toEqual({
ok: false,
payload: {
Expand Down Expand Up @@ -220,8 +220,7 @@ describe.each(testMatrix())(

abortController.abort();

expect(resReader.isClosed());
await expect(resReader.asArray()).resolves.toEqual([
await expect(resReader.collect()).resolves.toEqual([
{
ok: false,
payload: {
Expand Down Expand Up @@ -293,9 +292,9 @@ describe.each(testMatrix())(

abortController.abort();

expect(resReader.isClosed());
expect(reqWriter.isClosed());
await expect(resReader.asArray()).resolves.toEqual([
expect(reqWriter.isWritable()).toEqual(false);

await expect(resReader.collect()).resolves.toEqual([
{
ok: false,
payload: {
Expand Down Expand Up @@ -469,7 +468,7 @@ describe.each(testMatrix())(
clientAbortController.abort();
// this should be ignored by the client since it already aborted
resWriter.write({ ok: true, payload: {} });
expect(await resReader.asArray()).toEqual([
expect(await resReader.collect()).toEqual([
{
ok: false,
payload: {
Expand All @@ -479,13 +478,12 @@ describe.each(testMatrix())(
},
},
]);
expect(resReader.isClosed());
expect(reqWriter.isClosed());
expect(reqWriter.isWritable()).toEqual(false);

await waitFor(() => {
expect(onClientAbort).toHaveBeenCalled();
});
expect(await reqReader.asArray()).toEqual([
expect(await reqReader.collect()).toEqual([
{
ok: false,
payload: {
Expand All @@ -495,8 +493,7 @@ describe.each(testMatrix())(
},
},
]);
expect(reqReader.isClosed());
expect(resWriter.isClosed());
expect(resWriter.isWritable()).toEqual(false);
});
});
},
Expand Down Expand Up @@ -628,7 +625,7 @@ describe.each(testMatrix())(
message: expect.any(String),
},
});
expect(reqWriter.isClosed());
expect(reqWriter.isWritable()).toEqual(false);
});

test('stream', async () => {
Expand Down Expand Up @@ -676,7 +673,7 @@ describe.each(testMatrix())(
),
);

await expect(resReader.asArray()).resolves.toEqual([
await expect(resReader.collect()).resolves.toEqual([
{
ok: false,
payload: {
Expand All @@ -686,7 +683,7 @@ describe.each(testMatrix())(
},
},
]);
expect(reqWriter.isClosed());
expect(reqWriter.isWritable()).toEqual(false);
});

test('subscribe', async () => {
Expand Down Expand Up @@ -733,7 +730,7 @@ describe.each(testMatrix())(
),
);

await expect(resReader.asArray()).resolves.toEqual([
await expect(resReader.collect()).resolves.toEqual([
{
ok: false,
payload: {
Expand Down Expand Up @@ -978,7 +975,7 @@ describe.each(testMatrix())(
ctx.abortController.abort();
// this should be ignored by the server since it already aborted
reqWriter.write({ ok: true, payload: {} });
expect(await reqReader.asArray()).toEqual([
expect(await reqReader.collect()).toEqual([
{
ok: false,
payload: {
Expand All @@ -988,10 +985,9 @@ describe.each(testMatrix())(
},
},
]);
expect(reqReader.isClosed());
expect(resWriter.isClosed());
expect(resWriter.isWritable()).toEqual(false);

expect(await resReader.asArray()).toEqual([
expect(await resReader.collect()).toEqual([
{
ok: false,
payload: {
Expand All @@ -1001,8 +997,6 @@ describe.each(testMatrix())(
},
},
]);
expect(resReader.isClosed());
expect(reqWriter.isClosed());
});
});
},
Expand Down Expand Up @@ -1160,7 +1154,7 @@ describe.each(testMatrix())(
rejectable.reject(new Error(errorMessage));
// this should be ignored by the server since it already aborted
reqWriter.write({ ok: true, payload: {} });
expect(await reqReader.asArray()).toEqual([
expect(await reqReader.collect()).toEqual([
{
ok: false,
payload: {
Expand All @@ -1169,10 +1163,9 @@ describe.each(testMatrix())(
},
},
]);
expect(reqReader.isClosed());
expect(resWriter.isClosed());
expect(resWriter.isWritable()).toEqual(false);

expect(await resReader.asArray()).toEqual([
expect(await resReader.collect()).toEqual([
{
ok: false,
payload: {
Expand All @@ -1181,8 +1174,6 @@ describe.each(testMatrix())(
},
},
]);
expect(resReader.isClosed());
expect(reqWriter.isClosed());
});
});
},
Expand Down
46 changes: 23 additions & 23 deletions __tests__/cleanup.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { beforeEach, describe, expect, test, vi } from 'vitest';
import {
closeAllConnections,
getIteratorFromStream,
iterNext,
readNextResult,
isReadableDone,
numberOfConnections,
} from '../util/testHelpers';
import {
Expand Down Expand Up @@ -205,8 +205,7 @@ describe.each(testMatrix())(
reqWriter.write({ msg: '1', ignore: false });
reqWriter.write({ msg: '2', ignore: false });

const outputIterator = getIteratorFromStream(resReader);
const result1 = await iterNext(outputIterator);
const result1 = await readNextResult(resReader);
expect(result1).toStrictEqual({
ok: true,
payload: { response: '1' },
Expand All @@ -215,18 +214,16 @@ describe.each(testMatrix())(
// ensure we only have one stream despite pushing multiple messages.
reqWriter.close();
await waitFor(() => expect(server.openStreams.size).toEqual(1));
await resReader.requestClose();
// ensure we no longer have any open streams since the input was closed.
await waitFor(() => expect(server.openStreams.size).toEqual(0));

const result2 = await iterNext(outputIterator);
const result2 = await readNextResult(resReader);
expect(result2).toStrictEqual({
ok: true,
payload: { response: '2' },
});

const result3 = await outputIterator.next();
expect(result3.done).toBe(true);
expect(await isReadableDone(resReader)).toEqual(true);
// end procedure

// number of message handlers shouldn't increase after stream ends
Expand Down Expand Up @@ -271,33 +268,37 @@ describe.each(testMatrix())(
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure

const { resReader } = client.subscribable.value.subscribe({});
const outputIterator = getIteratorFromStream(resReader);
let result = await iterNext(outputIterator);
const abortController = new AbortController();
const { resReader } = client.subscribable.value.subscribe(
{},
{ signal: abortController.signal },
);
let result = await readNextResult(resReader);
expect(result).toStrictEqual({
ok: true,
payload: { result: 0 },
});

const add1 = await client.subscribable.add.rpc({ n: 1 });
expect(add1).toStrictEqual({ ok: true, payload: { result: 1 } });
result = await iterNext(outputIterator);
result = await readNextResult(resReader);
expect(result).toStrictEqual({
ok: true,
payload: { result: 1 },
});

await resReader.requestClose();
abortController.abort();
// end procedure

// number of message handlers shouldn't increase after subscription ends
expect(
serverTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(serverListeners);
expect(
clientTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(clientListeners);
await waitFor(() => {
expect(
serverTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(serverListeners);
expect(
clientTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(clientListeners);
});

// check number of connections
expect(numberOfConnections(clientTransport)).toEqual(1);
Expand Down Expand Up @@ -384,8 +385,7 @@ describe.each(testMatrix())(
const { reqWriter, resReader } = client.test.echo.stream({});
reqWriter.write({ msg: '1', ignore: false });

const outputIterator = getIteratorFromStream(resReader);
const result1 = await iterNext(outputIterator);
const result1 = await readNextResult(resReader);
expect(result1).toStrictEqual({ ok: true, payload: { response: '1' } });

// wait for session to disconnect
Expand All @@ -404,7 +404,7 @@ describe.each(testMatrix())(
// push on the old stream and make sure its not sent

expect(() => reqWriter.write({ msg: '2', ignore: false })).toThrow();
const result2 = await iterNext(outputIterator);
const result2 = await readNextResult(resReader);
expect(result2).toMatchObject({ ok: false });

await testFinishesCleanly({
Expand Down
Loading

0 comments on commit bab2aa8

Please sign in to comment.