Skip to content

Commit

Permalink
refactor: start polling instance only when a subscription is active (#…
Browse files Browse the repository at this point in the history
…558)

* refactor: start polling instance only when a subscription is active

* test: no best block when calling eth_subscribe

* feat: set pollInstance to undefined after stop listening
  • Loading branch information
Valazan authored Feb 14, 2024
1 parent 5c0799b commit d1e7635
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 75 deletions.
113 changes: 76 additions & 37 deletions packages/provider/src/providers/vechain-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class VechainProvider extends EventEmitter implements EIP1193ProviderMessage {
readonly wallet?: Wallet
) {
super();
this.startSubscriptionsPolling();
}

/**
Expand All @@ -58,6 +57,7 @@ class VechainProvider extends EventEmitter implements EIP1193ProviderMessage {
this.thorClient.destroy();
if (this.pollInstance !== undefined) {
this.pollInstance.stopListen();
this.pollInstance = undefined;
}
}

Expand Down Expand Up @@ -93,43 +93,85 @@ class VechainProvider extends EventEmitter implements EIP1193ProviderMessage {
* This method leverages the `Poll.createEventPoll` utility to create the polling mechanism,
* which is then started by invoking `startListen` on the poll instance.
*/
private startSubscriptionsPolling(): void {
this.pollInstance = Poll.createEventPoll(async () => {
const data: SubscriptionEvent[] = [];

const currentBlock = await this.getCurrentBlock();

if (currentBlock !== null) {
if (
this.subscriptionManager.newHeadsSubscription !== undefined
) {
data.push({
method: 'eth_subscription',
params: {
subscription:
this.subscriptionManager.newHeadsSubscription
.subscriptionId,
result: currentBlock
}
});
public startSubscriptionsPolling(): boolean {
let result = false;
if (this.pollInstance === undefined) {
this.pollInstance = Poll.createEventPoll(async () => {
const data: SubscriptionEvent[] = [];

const currentBlock = await this.getCurrentBlock();

if (currentBlock !== null) {
if (
this.subscriptionManager.newHeadsSubscription !==
undefined
) {
data.push({
method: 'eth_subscription',
params: {
subscription:
this.subscriptionManager
.newHeadsSubscription.subscriptionId,
result: currentBlock
}
});
}
if (this.subscriptionManager.logSubscriptions.size > 0) {
const logs = await this.getLogsRPC();
data.push(...logs);
}

this.subscriptionManager.currentBlockNumber++;
}
if (this.subscriptionManager.logSubscriptions.size > 0) {
const logs = await this.getLogsRPC();
data.push(...logs);
return data;
}, POLLING_INTERVAL).onData(
(subscriptionEvents: SubscriptionEvent[]) => {
subscriptionEvents.forEach((event) => {
this.emit('message', event);
});
}
);

this.subscriptionManager.currentBlockNumber++;
}
return data;
}, POLLING_INTERVAL).onData(
(subscriptionEvents: SubscriptionEvent[]) => {
subscriptionEvents.forEach((event) => {
this.emit('message', event);
});
}
this.pollInstance.startListen();
result = true;
}
return result;
}

/**
* Stops the polling mechanism for subscription events.
* This method stops the polling mechanism for subscription events, if it is active.
*
* @returns {boolean} A boolean indicating whether the polling mechanism was stopped.
*/
public stopSubscriptionsPolling(): boolean {
let result = false;
if (this.pollInstance !== undefined) {
this.pollInstance.stopListen();
this.pollInstance = undefined;
result = true;
}
return result;
}

/**
* Checks if there are active subscriptions.
* This method checks if there are any active log subscriptions or a new heads subscription.
*
* @returns {boolean} A boolean indicating whether there are active subscriptions.
*/
public isThereActiveSubscriptions(): boolean {
return (
this.subscriptionManager.logSubscriptions.size > 0 ||
this.subscriptionManager.newHeadsSubscription !== undefined
);
}

this.pollInstance.startListen();
/**
* Returns the poll instance for subscriptions.
*/
public getPollInstance(): EventPoll<SubscriptionEvent[]> | undefined {
return this.pollInstance;
}

/**
Expand Down Expand Up @@ -193,10 +235,7 @@ class VechainProvider extends EventEmitter implements EIP1193ProviderMessage {
let result: BlockDetail | null = null;

// Proceed only if there are active log subscriptions or a new heads subscription is present
if (
this.subscriptionManager.logSubscriptions.size > 0 ||
this.subscriptionManager.newHeadsSubscription !== undefined
) {
if (this.isThereActiveSubscriptions()) {
// Fetch the block details for the current block number
const block = await this.thorClient.blocks.getBlock(
this.subscriptionManager.currentBlockNumber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
type FilterOptions,
type VechainProvider
} from '../../../../providers';
import { buildError, ERROR_CODES } from '@vechain/vechain-sdk-errors';
import { buildProviderError, JSONRPC } from '@vechain/vechain-sdk-errors';
import { dataUtils } from '@vechain/vechain-sdk-core';

/**
Expand Down Expand Up @@ -53,39 +53,50 @@ const ethSubscribe = async (
provider?: VechainProvider
): Promise<string> => {
if (provider === undefined) {
throw buildError(
ERROR_CODES.JSONRPC.INTERNAL_ERROR,
'Provider not available',
throw buildProviderError(
JSONRPC.INTERNAL_ERROR,
`Method 'ethSubscribe' failed: provider not available\n
Params: ${JSON.stringify(params)}\n
URL: ${thorClient.httpClient.baseURL}`,
{
message: 'The Provider is not defined',
code: -32603
params,
provider
}
);
}
if (
params[0] !== SUBSCRIPTION_TYPE.NEW_HEADS &&
params[0] !== SUBSCRIPTION_TYPE.LOGS
) {
throw buildError(
ERROR_CODES.JSONRPC.INVALID_PARAMS,
'Invalid subscription type param',
throw buildProviderError(
JSONRPC.INVALID_PARAMS,
`Method 'ethSubscribe' failed: Invalid subscription type param\n
Params: ${JSON.stringify(params)}\n
URL: ${thorClient.httpClient.baseURL}`,
{
message: 'Invalid subscription type param',
code: -32602
params
}
);
}

// I check if some subscription is already active, if not I set a new starting block number for the subscription
if (
provider.subscriptionManager.logSubscriptions.size === 0 &&
provider.subscriptionManager.newHeadsSubscription === undefined
) {
// I check if a poll instance is already active, if not I set a new starting block number for the subscription
if (provider.getPollInstance() === undefined) {
const block = await thorClient.blocks.getBestBlock();

if (block !== undefined && block !== null) {
provider.subscriptionManager.currentBlockNumber = block.number;
}
} else
throw buildProviderError(
JSONRPC.INTERNAL_ERROR,
`Method 'ethSubscribe' failed: Best block not available\n
Params: ${JSON.stringify(params)}\n
URL: ${thorClient.httpClient.baseURL}`,
{
params
}
);

provider.startSubscriptionsPolling();
}
const subscriptionId = dataUtils.generateRandomHexOfLength(32);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { VechainProvider } from '../../../../providers';
import { buildError, ERROR_CODES } from '@vechain/vechain-sdk-errors';
import { buildProviderError, JSONRPC } from '@vechain/vechain-sdk-errors';

/**
* Asynchronously unsubscribes from a vechain event subscription.
Expand Down Expand Up @@ -28,16 +28,19 @@ const ethUnsubscribe = async (
provider?: VechainProvider
): Promise<boolean> => {
let result: boolean = false;

if (provider === undefined) {
throw buildError(
ERROR_CODES.JSONRPC.INTERNAL_ERROR,
'Provider not available',
throw buildProviderError(
JSONRPC.INTERNAL_ERROR,
`Method 'ethSubscribe' failed: provider not available\n
Params: ${JSON.stringify(params)}`,
{
message: 'The Provider is not defined',
code: -32603
params,
provider
}
);
}

const subscriptionId = params[0] as string;

// Unsubscribe from 'newHeads' events if the subscription ID matches the newHeads subscription
Expand All @@ -57,6 +60,10 @@ const ethUnsubscribe = async (
);
}

if (!provider.isThereActiveSubscriptions()) {
provider.stopSubscriptionsPolling();
}

return await Promise.resolve(result);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,15 @@ describe('Vechain provider tests', () => {
* eth_subscribe latest blocks and then unsubscribe RPC call test
*/
test('Should be able to get to subscribe to the latest blocks and then unsubscribe', async () => {
expect(provider.getPollInstance()).toBeUndefined();
// Call RPC function
const subscriptionId = await provider.request({
method: 'eth_subscribe',
params: ['newHeads']
});

expect(provider.getPollInstance()).toBeDefined();

expect(subscriptionId).toBeDefined();
expect(
provider.subscriptionManager.newHeadsSubscription?.subscriptionId
Expand All @@ -116,6 +119,8 @@ describe('Vechain provider tests', () => {
params: [subscriptionId]
});

expect(provider.getPollInstance()).toBeUndefined();

expect(
provider.subscriptionManager.newHeadsSubscription?.subscriptionId
).toBeUndefined();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { afterEach, beforeEach, describe, expect, test } from '@jest/globals';
import {
afterEach,
beforeEach,
describe,
expect,
jest,
test
} from '@jest/globals';
import { ThorClient } from '@vechain/vechain-sdk-network';
import { testNetwork } from '../../../fixture';
import { RPC_METHODS, RPCMethodsMap, VechainProvider } from '../../../../src';
import { JSONRPCInternalError } from '@vechain/vechain-sdk-errors';
import { ProviderRpcError } from '@vechain/vechain-sdk-errors';

/**
* RPC Mapper integration tests for 'eth_subscribe' method
Expand Down Expand Up @@ -45,25 +52,18 @@ describe('RPC Mapper - eth_subscribe method tests', () => {
* of 32 characters, indicating a valid response format.
*/
test('eth_subscribe - new latest blocks subscription', async () => {
expect(provider.getPollInstance()).toBeUndefined();
// Call RPC function
const rpcCall = (await provider.request({
method: 'eth_subscribe',
params: ['newHeads']
})) as string;

expect(provider.getPollInstance()).toBeDefined();

// Verify the length of the subscription ID
expect(rpcCall.length).toEqual(32);
});

test('eth_subscribe - no provider', async () => {
// Attempts to unsubscribe with no provider and expects an error.
await expect(
async () =>
await RPCMethodsMap(thorClient)[RPC_METHODS.eth_subscribe](
[]
)
).rejects.toThrowError(JSONRPCInternalError);
});
});

/**
Expand All @@ -86,7 +86,35 @@ describe('RPC Mapper - eth_subscribe method tests', () => {
method: 'eth_subscribe',
params: ['invalidSubscriptionType']
})
).rejects.toThrowError(); // Ideally, specify the expected error for more precise testing.
).rejects.toThrowError(ProviderRpcError); // Ideally, specify the expected error for more precise testing.
});

/**
* Tests the behavior of `eth_subscribe` when no provider is available.
*/
test('eth_subscribe - no provider', async () => {
// Attempts to unsubscribe with no provider and expects an error.
await expect(
async () =>
await RPCMethodsMap(thorClient)[RPC_METHODS.eth_subscribe](
[]
)
).rejects.toThrowError(ProviderRpcError);
});

test('eth_subscribe - no best block', async () => {
jest.spyOn(thorClient.blocks, 'getBestBlock').mockReturnValue(
Promise.resolve(null)
);

// Attempts to unsubscribe with no provider and expects an error.
await expect(
async () =>
await provider.request({
method: 'eth_subscribe',
params: ['newHeads']
})
).rejects.toThrowError(ProviderRpcError);
});
});
});
Loading

1 comment on commit d1e7635

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test Coverage

Summary

Lines Statements Branches Functions
Coverage: 100%
100% (2504/2504) 100% (512/512) 100% (528/528)
Title Tests Skipped Failures Errors Time
core 409 0 💤 0 ❌ 0 🔥 1m 5s ⏱️
network 234 0 💤 0 ❌ 0 🔥 2m 46s ⏱️
errors 43 0 💤 0 ❌ 0 🔥 9.158s ⏱️

Please sign in to comment.