diff --git a/packages/provider/src/providers/vechain-provider.ts b/packages/provider/src/providers/vechain-provider.ts index 3d7c28020..a884b2c32 100644 --- a/packages/provider/src/providers/vechain-provider.ts +++ b/packages/provider/src/providers/vechain-provider.ts @@ -47,7 +47,6 @@ class VechainProvider extends EventEmitter implements EIP1193ProviderMessage { readonly wallet?: Wallet ) { super(); - this.startSubscriptionsPolling(); } /** @@ -58,6 +57,7 @@ class VechainProvider extends EventEmitter implements EIP1193ProviderMessage { this.thorClient.destroy(); if (this.pollInstance !== undefined) { this.pollInstance.stopListen(); + this.pollInstance = undefined; } } @@ -93,43 +93,85 @@ class VechainProvider extends EventEmitter implements EIP1193ProviderMessage { * This method leverages the `Poll.createEventPoll` utility to create the polling mechanism, * which is then started by invoking `startListen` on the poll instance. */ - private startSubscriptionsPolling(): void { - this.pollInstance = Poll.createEventPoll(async () => { - const data: SubscriptionEvent[] = []; - - const currentBlock = await this.getCurrentBlock(); - - if (currentBlock !== null) { - if ( - this.subscriptionManager.newHeadsSubscription !== undefined - ) { - data.push({ - method: 'eth_subscription', - params: { - subscription: - this.subscriptionManager.newHeadsSubscription - .subscriptionId, - result: currentBlock - } - }); + public startSubscriptionsPolling(): boolean { + let result = false; + if (this.pollInstance === undefined) { + this.pollInstance = Poll.createEventPoll(async () => { + const data: SubscriptionEvent[] = []; + + const currentBlock = await this.getCurrentBlock(); + + if (currentBlock !== null) { + if ( + this.subscriptionManager.newHeadsSubscription !== + undefined + ) { + data.push({ + method: 'eth_subscription', + params: { + subscription: + this.subscriptionManager + .newHeadsSubscription.subscriptionId, + result: currentBlock + } + }); + } + if (this.subscriptionManager.logSubscriptions.size > 0) { + const logs = await this.getLogsRPC(); + data.push(...logs); + } + + this.subscriptionManager.currentBlockNumber++; } - if (this.subscriptionManager.logSubscriptions.size > 0) { - const logs = await this.getLogsRPC(); - data.push(...logs); + return data; + }, POLLING_INTERVAL).onData( + (subscriptionEvents: SubscriptionEvent[]) => { + subscriptionEvents.forEach((event) => { + this.emit('message', event); + }); } + ); - this.subscriptionManager.currentBlockNumber++; - } - return data; - }, POLLING_INTERVAL).onData( - (subscriptionEvents: SubscriptionEvent[]) => { - subscriptionEvents.forEach((event) => { - this.emit('message', event); - }); - } + this.pollInstance.startListen(); + result = true; + } + return result; + } + + /** + * Stops the polling mechanism for subscription events. + * This method stops the polling mechanism for subscription events, if it is active. + * + * @returns {boolean} A boolean indicating whether the polling mechanism was stopped. + */ + public stopSubscriptionsPolling(): boolean { + let result = false; + if (this.pollInstance !== undefined) { + this.pollInstance.stopListen(); + this.pollInstance = undefined; + result = true; + } + return result; + } + + /** + * Checks if there are active subscriptions. + * This method checks if there are any active log subscriptions or a new heads subscription. + * + * @returns {boolean} A boolean indicating whether there are active subscriptions. + */ + public isThereActiveSubscriptions(): boolean { + return ( + this.subscriptionManager.logSubscriptions.size > 0 || + this.subscriptionManager.newHeadsSubscription !== undefined ); + } - this.pollInstance.startListen(); + /** + * Returns the poll instance for subscriptions. + */ + public getPollInstance(): EventPoll | undefined { + return this.pollInstance; } /** @@ -193,10 +235,7 @@ class VechainProvider extends EventEmitter implements EIP1193ProviderMessage { let result: BlockDetail | null = null; // Proceed only if there are active log subscriptions or a new heads subscription is present - if ( - this.subscriptionManager.logSubscriptions.size > 0 || - this.subscriptionManager.newHeadsSubscription !== undefined - ) { + if (this.isThereActiveSubscriptions()) { // Fetch the block details for the current block number const block = await this.thorClient.blocks.getBlock( this.subscriptionManager.currentBlockNumber diff --git a/packages/provider/src/utils/rpc-mapper/methods-map/methods/eth_subscribe.ts b/packages/provider/src/utils/rpc-mapper/methods-map/methods/eth_subscribe.ts index d1d175e6d..a6ce34f5b 100644 --- a/packages/provider/src/utils/rpc-mapper/methods-map/methods/eth_subscribe.ts +++ b/packages/provider/src/utils/rpc-mapper/methods-map/methods/eth_subscribe.ts @@ -3,7 +3,7 @@ import { type FilterOptions, type VechainProvider } from '../../../../providers'; -import { buildError, ERROR_CODES } from '@vechain/vechain-sdk-errors'; +import { buildProviderError, JSONRPC } from '@vechain/vechain-sdk-errors'; import { dataUtils } from '@vechain/vechain-sdk-core'; /** @@ -53,12 +53,14 @@ const ethSubscribe = async ( provider?: VechainProvider ): Promise => { if (provider === undefined) { - throw buildError( - ERROR_CODES.JSONRPC.INTERNAL_ERROR, - 'Provider not available', + throw buildProviderError( + JSONRPC.INTERNAL_ERROR, + `Method 'ethSubscribe' failed: provider not available\n + Params: ${JSON.stringify(params)}\n + URL: ${thorClient.httpClient.baseURL}`, { - message: 'The Provider is not defined', - code: -32603 + params, + provider } ); } @@ -66,26 +68,35 @@ const ethSubscribe = async ( params[0] !== SUBSCRIPTION_TYPE.NEW_HEADS && params[0] !== SUBSCRIPTION_TYPE.LOGS ) { - throw buildError( - ERROR_CODES.JSONRPC.INVALID_PARAMS, - 'Invalid subscription type param', + throw buildProviderError( + JSONRPC.INVALID_PARAMS, + `Method 'ethSubscribe' failed: Invalid subscription type param\n + Params: ${JSON.stringify(params)}\n + URL: ${thorClient.httpClient.baseURL}`, { - message: 'Invalid subscription type param', - code: -32602 + params } ); } - // I check if some subscription is already active, if not I set a new starting block number for the subscription - if ( - provider.subscriptionManager.logSubscriptions.size === 0 && - provider.subscriptionManager.newHeadsSubscription === undefined - ) { + // I check if a poll instance is already active, if not I set a new starting block number for the subscription + if (provider.getPollInstance() === undefined) { const block = await thorClient.blocks.getBestBlock(); if (block !== undefined && block !== null) { provider.subscriptionManager.currentBlockNumber = block.number; - } + } else + throw buildProviderError( + JSONRPC.INTERNAL_ERROR, + `Method 'ethSubscribe' failed: Best block not available\n + Params: ${JSON.stringify(params)}\n + URL: ${thorClient.httpClient.baseURL}`, + { + params + } + ); + + provider.startSubscriptionsPolling(); } const subscriptionId = dataUtils.generateRandomHexOfLength(32); diff --git a/packages/provider/src/utils/rpc-mapper/methods-map/methods/eth_unsubscribe.ts b/packages/provider/src/utils/rpc-mapper/methods-map/methods/eth_unsubscribe.ts index f79b854f6..d83937f5f 100644 --- a/packages/provider/src/utils/rpc-mapper/methods-map/methods/eth_unsubscribe.ts +++ b/packages/provider/src/utils/rpc-mapper/methods-map/methods/eth_unsubscribe.ts @@ -1,5 +1,5 @@ import type { VechainProvider } from '../../../../providers'; -import { buildError, ERROR_CODES } from '@vechain/vechain-sdk-errors'; +import { buildProviderError, JSONRPC } from '@vechain/vechain-sdk-errors'; /** * Asynchronously unsubscribes from a vechain event subscription. @@ -28,16 +28,19 @@ const ethUnsubscribe = async ( provider?: VechainProvider ): Promise => { let result: boolean = false; + if (provider === undefined) { - throw buildError( - ERROR_CODES.JSONRPC.INTERNAL_ERROR, - 'Provider not available', + throw buildProviderError( + JSONRPC.INTERNAL_ERROR, + `Method 'ethSubscribe' failed: provider not available\n + Params: ${JSON.stringify(params)}`, { - message: 'The Provider is not defined', - code: -32603 + params, + provider } ); } + const subscriptionId = params[0] as string; // Unsubscribe from 'newHeads' events if the subscription ID matches the newHeads subscription @@ -57,6 +60,10 @@ const ethUnsubscribe = async ( ); } + if (!provider.isThereActiveSubscriptions()) { + provider.stopSubscriptionsPolling(); + } + return await Promise.resolve(result); }; diff --git a/packages/provider/tests/providers/vechain-provider.solo.test.ts b/packages/provider/tests/providers/vechain-provider.solo.test.ts index 1be003dca..09a103e9c 100644 --- a/packages/provider/tests/providers/vechain-provider.solo.test.ts +++ b/packages/provider/tests/providers/vechain-provider.solo.test.ts @@ -100,12 +100,15 @@ describe('Vechain provider tests', () => { * eth_subscribe latest blocks and then unsubscribe RPC call test */ test('Should be able to get to subscribe to the latest blocks and then unsubscribe', async () => { + expect(provider.getPollInstance()).toBeUndefined(); // Call RPC function const subscriptionId = await provider.request({ method: 'eth_subscribe', params: ['newHeads'] }); + expect(provider.getPollInstance()).toBeDefined(); + expect(subscriptionId).toBeDefined(); expect( provider.subscriptionManager.newHeadsSubscription?.subscriptionId @@ -116,6 +119,8 @@ describe('Vechain provider tests', () => { params: [subscriptionId] }); + expect(provider.getPollInstance()).toBeUndefined(); + expect( provider.subscriptionManager.newHeadsSubscription?.subscriptionId ).toBeUndefined(); diff --git a/packages/provider/tests/rpc-mapper/methods/eth_subscribe/eth_subscribe.test.ts b/packages/provider/tests/rpc-mapper/methods/eth_subscribe/eth_subscribe.test.ts index 9ffb419c1..ada39dc7c 100644 --- a/packages/provider/tests/rpc-mapper/methods/eth_subscribe/eth_subscribe.test.ts +++ b/packages/provider/tests/rpc-mapper/methods/eth_subscribe/eth_subscribe.test.ts @@ -1,8 +1,15 @@ -import { afterEach, beforeEach, describe, expect, test } from '@jest/globals'; +import { + afterEach, + beforeEach, + describe, + expect, + jest, + test +} from '@jest/globals'; import { ThorClient } from '@vechain/vechain-sdk-network'; import { testNetwork } from '../../../fixture'; import { RPC_METHODS, RPCMethodsMap, VechainProvider } from '../../../../src'; -import { JSONRPCInternalError } from '@vechain/vechain-sdk-errors'; +import { ProviderRpcError } from '@vechain/vechain-sdk-errors'; /** * RPC Mapper integration tests for 'eth_subscribe' method @@ -45,25 +52,18 @@ describe('RPC Mapper - eth_subscribe method tests', () => { * of 32 characters, indicating a valid response format. */ test('eth_subscribe - new latest blocks subscription', async () => { + expect(provider.getPollInstance()).toBeUndefined(); // Call RPC function const rpcCall = (await provider.request({ method: 'eth_subscribe', params: ['newHeads'] })) as string; + expect(provider.getPollInstance()).toBeDefined(); + // Verify the length of the subscription ID expect(rpcCall.length).toEqual(32); }); - - test('eth_subscribe - no provider', async () => { - // Attempts to unsubscribe with no provider and expects an error. - await expect( - async () => - await RPCMethodsMap(thorClient)[RPC_METHODS.eth_subscribe]( - [] - ) - ).rejects.toThrowError(JSONRPCInternalError); - }); }); /** @@ -86,7 +86,35 @@ describe('RPC Mapper - eth_subscribe method tests', () => { method: 'eth_subscribe', params: ['invalidSubscriptionType'] }) - ).rejects.toThrowError(); // Ideally, specify the expected error for more precise testing. + ).rejects.toThrowError(ProviderRpcError); // Ideally, specify the expected error for more precise testing. + }); + + /** + * Tests the behavior of `eth_subscribe` when no provider is available. + */ + test('eth_subscribe - no provider', async () => { + // Attempts to unsubscribe with no provider and expects an error. + await expect( + async () => + await RPCMethodsMap(thorClient)[RPC_METHODS.eth_subscribe]( + [] + ) + ).rejects.toThrowError(ProviderRpcError); + }); + + test('eth_subscribe - no best block', async () => { + jest.spyOn(thorClient.blocks, 'getBestBlock').mockReturnValue( + Promise.resolve(null) + ); + + // Attempts to unsubscribe with no provider and expects an error. + await expect( + async () => + await provider.request({ + method: 'eth_subscribe', + params: ['newHeads'] + }) + ).rejects.toThrowError(ProviderRpcError); }); }); }); diff --git a/packages/provider/tests/rpc-mapper/methods/eth_unsubscribe/eth_unsubscribe.test.ts b/packages/provider/tests/rpc-mapper/methods/eth_unsubscribe/eth_unsubscribe.test.ts index c23a8ff2f..804b092f1 100644 --- a/packages/provider/tests/rpc-mapper/methods/eth_unsubscribe/eth_unsubscribe.test.ts +++ b/packages/provider/tests/rpc-mapper/methods/eth_unsubscribe/eth_unsubscribe.test.ts @@ -2,7 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from '@jest/globals'; import { ThorClient } from '@vechain/vechain-sdk-network'; import { testNetwork } from '../../../fixture'; import { RPC_METHODS, RPCMethodsMap, VechainProvider } from '../../../../src'; -import { JSONRPCInternalError } from '@vechain/vechain-sdk-errors'; +import { ProviderRpcError } from '@vechain/vechain-sdk-errors'; /** * RPC Mapper integration tests for 'eth_unsubscribe' method @@ -81,6 +81,8 @@ describe('RPC Mapper - eth_unsubscribe method tests', () => { params: ['invalid_subscription_id'] }) ).toBe(false); + + expect(provider.getPollInstance()).toBeUndefined(); }); test('eth_unsubscribe - no provider', async () => { @@ -90,7 +92,7 @@ describe('RPC Mapper - eth_unsubscribe method tests', () => { await RPCMethodsMap(thorClient)[ RPC_METHODS.eth_unsubscribe ]([]) - ).rejects.toThrowError(JSONRPCInternalError); + ).rejects.toThrowError(ProviderRpcError); }); }); });