Skip to content

Commit

Permalink
fix regression in asClientUpload, make deleteSession idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyzha0 committed Aug 12, 2024
1 parent cdb57e8 commit d9d607f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
3 changes: 3 additions & 0 deletions transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ export abstract class Transport<ConnType extends Connection> {
session: Session<ConnType>,
options?: DeleteSessionOptions,
) {
// ensure idempotency esp re: dispatching events
if (session._isConsumed) return;

const loggingMetadata = session.loggingMetadata;
if (loggingMetadata.tags && options?.unhealthy) {
loggingMetadata.tags.push('unhealthy-session');
Expand Down
16 changes: 7 additions & 9 deletions util/testHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
Err,
PayloadType,
Procedure,
ProcedureResult,
ServiceContext,
ServiceContextWithTransportInfo,
UNCAUGHT_ERROR,
Expand Down Expand Up @@ -178,11 +179,7 @@ export function asClientRpc<
extendedContext?: Omit<ServiceContext, 'state'>,
session: Session<Connection> = dummySession(),
) {
return async (
msg: Static<I>,
): Promise<
Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>
> => {
return async (msg: Static<I>): Promise<ProcedureResult<O, E>> => {
return await proc
.handler(dummyCtx(state, session, extendedContext), msg)
.catch(catchProcError);
Expand All @@ -203,7 +200,7 @@ export function asClientStream<
session: Session<Connection> = dummySession(),
) {
const input = pushable<Static<I>>({ objectMode: true });
const output = pushable<Result<Static<O>, Static<E>>>({
const output = pushable<ProcedureResult<O, E>>({
objectMode: true,
});

Expand Down Expand Up @@ -235,7 +232,7 @@ export function asClientSubscription<
extendedContext?: Omit<ServiceContext, 'state'>,
session: Session<Connection> = dummySession(),
) {
const output = pushable<Result<Static<O>, Static<E>>>({
const output = pushable<ProcedureResult<O, E>>({
objectMode: true,
});

Expand Down Expand Up @@ -263,15 +260,16 @@ export function asClientUpload<
session: Session<Connection> = dummySession(),
) {
const input = pushable<Static<I>>({ objectMode: true });
let result: Promise<ProcedureResult<O, E>>;
if (init) {
const _proc = proc as Procedure<State, 'upload', I, O, E, PayloadType>;
const result = _proc
result = _proc
.handler(dummyCtx(state, session, extendedContext), init, input)
.catch(catchProcError);
return [input, result] as const;
} else {
const _proc = proc as Procedure<State, 'upload', I, O, E>;
const result = _proc
result = _proc
.handler(dummyCtx(state, session, extendedContext), input)
.catch(catchProcError);
return [input, result] as const;
Expand Down

0 comments on commit d9d607f

Please sign in to comment.