diff --git a/testUtil/fixtures/cleanup.ts b/testUtil/fixtures/cleanup.ts index 68a7a03a..0eaf06e7 100644 --- a/testUtil/fixtures/cleanup.ts +++ b/testUtil/fixtures/cleanup.ts @@ -1,4 +1,4 @@ -import { expect, vi } from 'vitest'; +import { assert, expect, vi } from 'vitest'; import { ClientTransport, Connection, @@ -68,9 +68,16 @@ export async function ensureTransportBuffersAreEventuallyEmpty( [...t.sessions] .map(([client, sess]) => { // get all messages that are not heartbeats - const buff = sess.sendBuffer.filter((msg) => { - return !Value.Check(ControlMessageAckSchema, msg.payload); - }); + const buff = sess.sendBuffer + .map((encodedMsg) => { + const msg = sess.parseMsg(encodedMsg.data); + assert(msg); + + return msg; + }) + .filter( + (msg) => !Value.Check(ControlMessageAckSchema, msg.payload), + ); return [client, buff] as [ string, diff --git a/transport/message.ts b/transport/message.ts index 1879341d..17054996 100644 --- a/transport/message.ts +++ b/transport/message.ts @@ -280,6 +280,17 @@ export function cancelMessage( export type OpaqueTransportMessage = TransportMessage; export type TransportClientId = string; +/** + * An encoded message that is ready to be send over the transport. + * The seq number is kept to keep track of which messages have been + * acked by the peer. + */ +export interface EncodedTransportMessage { + id: string; + seq: number; + data: Uint8Array; +} + /** * Checks if the given control flag (usually found in msg.controlFlag) is an ack message. * @param controlFlag - The control flag to check. diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index 0de225f1..6c7a77de 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -49,11 +49,11 @@ export class SessionConnected< } send(msg: PartialTransportMessage): string { - const constructedMsg = this.constructMsg(msg); - this.sendBuffer.push(constructedMsg); - this.conn.send(this.options.codec.toBuffer(constructedMsg)); + const encodedMsg = this.encodeMsg(msg); + this.sendBuffer.push(encodedMsg); + this.conn.send(encodedMsg.data); - return constructedMsg.id; + return encodedMsg.id; } constructor(props: SessionConnectedProps) { @@ -75,7 +75,7 @@ export class SessionConnected< ); for (const msg of this.sendBuffer) { - this.conn.send(this.options.codec.toBuffer(msg)); + this.conn.send(msg.data); } } diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index d12fc884..06939e2d 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -1,12 +1,12 @@ import { Logger, MessageMetadata } from '../../logging'; import { TelemetryInfo } from '../../tracing'; import { + EncodedTransportMessage, OpaqueTransportMessage, OpaqueTransportMessageSchema, PartialTransportMessage, ProtocolVersion, TransportClientId, - TransportMessage, } from '../message'; import { Value } from '@sinclair/typebox/value'; import { Codec } from '../../codec'; @@ -204,7 +204,7 @@ export interface IdentifiedSessionProps extends CommonSessionProps { to: TransportClientId; seq: number; ack: number; - sendBuffer: Array; + sendBuffer: Array; telemetry: TelemetryInfo; protocolVersion: ProtocolVersion; } @@ -224,7 +224,7 @@ export abstract class IdentifiedSession extends CommonSession { * Number of unique messages we've received this session (excluding handshake) */ ack: number; - sendBuffer: Array; + sendBuffer: Array; constructor(props: IdentifiedSessionProps) { const { id, to, seq, ack, sendBuffer, telemetry, log, protocolVersion } = @@ -258,9 +258,9 @@ export abstract class IdentifiedSession extends CommonSession { return metadata; } - constructMsg( + encodeMsg( partialMsg: PartialTransportMessage, - ): TransportMessage { + ): EncodedTransportMessage { const msg = { ...partialMsg, id: generateId(), @@ -270,9 +270,15 @@ export abstract class IdentifiedSession extends CommonSession { ack: this.ack, }; + const encodedMsg = { + id: msg.id, + seq: msg.seq, + data: this.options.codec.toBuffer(msg), + }; + this.seq++; - return msg; + return encodedMsg; } nextSeq(): number { @@ -280,7 +286,7 @@ export abstract class IdentifiedSession extends CommonSession { } send(msg: PartialTransportMessage): string { - const constructedMsg = this.constructMsg(msg); + const constructedMsg = this.encodeMsg(msg); this.sendBuffer.push(constructedMsg); return constructedMsg.id; diff --git a/transport/sessionStateMachine/stateMachine.test.ts b/transport/sessionStateMachine/stateMachine.test.ts index c6dac186..17eec550 100644 --- a/transport/sessionStateMachine/stateMachine.test.ts +++ b/transport/sessionStateMachine/stateMachine.test.ts @@ -1891,8 +1891,8 @@ describe('session state machine', () => { expect(onConnectionClosed).not.toHaveBeenCalled(); expect(onConnectionErrored).not.toHaveBeenCalled(); - const msg = session.constructMsg(payloadToTransportMessage('hello')); - session.conn.emitData(session.options.codec.toBuffer(msg)); + const msg = session.encodeMsg(payloadToTransportMessage('hello')); + session.conn.emitData(msg.data); await waitFor(async () => { expect(onMessage).toHaveBeenCalledTimes(1); @@ -1940,15 +1940,13 @@ describe('session state machine', () => { // send a heartbeat conn.emitData( - session.options.codec.toBuffer( - session.constructMsg({ - streamId: 'heartbeat', - controlFlags: ControlFlags.AckBit, - payload: { - type: 'ACK', - } satisfies Static, - }), - ), + session.encodeMsg({ + streamId: 'heartbeat', + controlFlags: ControlFlags.AckBit, + payload: { + type: 'ACK', + } satisfies Static, + }).data, ); // make sure the session acks the heartbeat @@ -1962,15 +1960,13 @@ describe('session state machine', () => { // send a heartbeat conn.emitData( - session.options.codec.toBuffer( - session.constructMsg({ - streamId: 'heartbeat', - controlFlags: ControlFlags.AckBit, - payload: { - type: 'ACK', - } satisfies Static, - }), - ), + session.encodeMsg({ + streamId: 'heartbeat', + controlFlags: ControlFlags.AckBit, + payload: { + type: 'ACK', + } satisfies Static, + }).data, ); expect(sessionHandle.onMessage).not.toHaveBeenCalled(); diff --git a/transport/sessionStateMachine/transitions.ts b/transport/sessionStateMachine/transitions.ts index 595959f9..7e694bd5 100644 --- a/transport/sessionStateMachine/transitions.ts +++ b/transport/sessionStateMachine/transitions.ts @@ -1,4 +1,3 @@ -import { OpaqueTransportMessage, TransportClientId } from '..'; import { SessionConnecting, SessionConnectingListeners, @@ -38,7 +37,11 @@ import { SessionBackingOff, SessionBackingOffListeners, } from './SessionBackingOff'; -import { ProtocolVersion } from '../message'; +import { + EncodedTransportMessage, + ProtocolVersion, + TransportClientId, +} from '../message'; function inheritSharedSession( session: IdentifiedSession, @@ -78,7 +81,7 @@ export const SessionStateGraph = { ) => { const id = `session-${generateId()}`; const telemetry = createSessionTelemetryInfo(id, to, from); - const sendBuffer: Array = []; + const sendBuffer: Array = []; const session = new SessionNoConnection({ listeners, diff --git a/transport/transport.ts b/transport/transport.ts index 548db290..a933fa43 100644 --- a/transport/transport.ts +++ b/transport/transport.ts @@ -38,9 +38,7 @@ export interface DeleteSessionOptions { unhealthy: boolean; } -export type SessionBoundSendFn = ( - msg: PartialTransportMessage, -) => string | undefined; +export type SessionBoundSendFn = (msg: PartialTransportMessage) => string; /** * Transports manage the lifecycle (creation/deletion) of sessions