Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(apps/price_pusher): fix bug causing price_pusher to hand when invalid price feed ids are passed in to hermes ws #2297

Merged
merged 2 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/price_pusher/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/price-pusher",
"version": "8.3.2",
"version": "8.3.3",
"description": "Pyth Price Pusher",
"homepage": "https://pyth.network",
"main": "lib/index.js",
Expand All @@ -24,6 +24,7 @@
"format": "prettier --write \"src/**/*.ts\"",
"test:lint": "eslint src/",
"start": "node lib/index.js",
"test": "jest",
"dev": "ts-node src/index.ts",
"prepublishOnly": "pnpm run build && pnpm run test:lint",
"preversion": "pnpm run test:lint",
Expand Down
101 changes: 101 additions & 0 deletions apps/price_pusher/src/__tests__/pyth-price-listener.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { PythPriceListener } from "../pyth-price-listener";
import { PriceServiceConnection } from "@pythnetwork/price-service-client";
import { Logger } from "pino";

describe("PythPriceListener", () => {
let logger: Logger;
let connection: PriceServiceConnection;
let listener: PythPriceListener;
let originalConsoleError: typeof console.error;

beforeEach(() => {
// Save original console.error and mock it
originalConsoleError = console.error;
console.error = jest.fn();

logger = {
debug: jest.fn(),
error: jest.fn(),
info: jest.fn(),
} as unknown as Logger;

// Use real Hermes beta endpoint for testing
connection = new PriceServiceConnection("https://hermes.pyth.network");
});

afterEach(() => {
// Clean up websocket connection
connection.closeWebSocket();
// Clean up health check interval
if (listener) {
listener.cleanup();
}
// Restore original console.error
console.error = originalConsoleError;
});

it("should handle invalid price feeds gracefully", async () => {
const validFeedId =
"e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"; // BTC/USD
const invalidFeedId =
"0000000000000000000000000000000000000000000000000000000000000000";

const priceItems = [
{ id: validFeedId, alias: "BTC/USD" },
{ id: invalidFeedId, alias: "INVALID/PRICE" },
];

listener = new PythPriceListener(connection, priceItems, logger);

await listener.start();

// Wait for both error handlers to complete
await new Promise((resolve) => {
const checkInterval = setInterval(() => {
const errorCalls = (logger.error as jest.Mock).mock.calls;

// Check for both HTTP and websocket error logs
const hasHttpError = errorCalls.some(
(call) => call[0] === "Failed to get latest price feeds:"
);
const hasGetLatestError = errorCalls.some((call) =>
call[0].includes("not found for getLatestPriceFeeds")
);
const hasWsError = errorCalls.some((call) =>
call[0].includes("not found for subscribePriceFeedUpdates")
);

if (hasHttpError && hasGetLatestError && hasWsError) {
clearInterval(checkInterval);
resolve(true);
}
}, 100);
});

// Verify HTTP error was logged
expect(logger.error).toHaveBeenCalledWith(
"Failed to get latest price feeds:",
expect.objectContaining({
message: "Request failed with status code 404",
})
);

// Verify invalid feed error was logged
expect(logger.error).toHaveBeenCalledWith(
`Price feed ${invalidFeedId} (INVALID/PRICE) not found for getLatestPriceFeeds`
);

// Verify invalid feed error was logged
expect(logger.error).toHaveBeenCalledWith(
`Price feed ${invalidFeedId} (INVALID/PRICE) not found for subscribePriceFeedUpdates`
);

// Verify resubscription message was logged
expect(logger.info).toHaveBeenCalledWith(
"Resubscribing with valid feeds only"
);

// Verify priceIds was updated to only include valid feeds
expect(listener["priceIds"]).toEqual([validFeedId]);
});
});
118 changes: 105 additions & 13 deletions apps/price_pusher/src/pyth-price-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class PythPriceListener implements IPriceListener {
private latestPriceInfo: Map<HexString, PriceInfo>;
private logger: Logger;
private lastUpdated: TimestampInMs | undefined;
private healthCheckInterval?: NodeJS.Timeout;

constructor(
connection: PriceServiceConnection,
Expand All @@ -33,26 +34,111 @@ export class PythPriceListener implements IPriceListener {
// This method should be awaited on and once it finishes it has the latest value
// for the given price feeds (if they exist).
async start() {
// Set custom error handler for websocket errors
this.connection.onWsError = (error: Error) => {
if (error.message.includes("not found")) {
// Extract invalid feed IDs from error message
const match = error.message.match(/\[(.*?)\]/);
if (match) {
const invalidFeedIds = match[1].split(",").map((id) => {
// Remove '0x' prefix if present to match our stored IDs
return id.trim().replace(/^0x/, "");
});

// Log invalid feeds with their aliases
invalidFeedIds.forEach((id) => {
this.logger.error(
`Price feed ${id} (${this.priceIdToAlias.get(
id
)}) not found for subscribePriceFeedUpdates`
);
});

// Filter out invalid feeds and resubscribe with valid ones
const validFeeds = this.priceIds.filter(
(id) => !invalidFeedIds.includes(id)
);

this.priceIds = validFeeds;

if (validFeeds.length > 0) {
this.logger.info("Resubscribing with valid feeds only");
this.connection.subscribePriceFeedUpdates(
validFeeds,
this.onNewPriceFeed.bind(this)
);
}
}
} else {
this.logger.error("Websocket error occurred:", error);
}
};

this.connection.subscribePriceFeedUpdates(
this.priceIds,
this.onNewPriceFeed.bind(this)
);

const priceFeeds = await this.connection.getLatestPriceFeeds(this.priceIds);
priceFeeds?.forEach((priceFeed) => {
// Getting unchecked because although it might be old
// but might not be there on the target chain.
const latestAvailablePrice = priceFeed.getPriceUnchecked();
this.latestPriceInfo.set(priceFeed.id, {
price: latestAvailablePrice.price,
conf: latestAvailablePrice.conf,
publishTime: latestAvailablePrice.publishTime,
try {
const priceFeeds = await this.connection.getLatestPriceFeeds(
this.priceIds
);
priceFeeds?.forEach((priceFeed) => {
const latestAvailablePrice = priceFeed.getPriceUnchecked();
this.latestPriceInfo.set(priceFeed.id, {
price: latestAvailablePrice.price,
conf: latestAvailablePrice.conf,
publishTime: latestAvailablePrice.publishTime,
});
});
});
} catch (error: any) {
// Always log the HTTP error first
this.logger.error("Failed to get latest price feeds:", error);

if (error.response.data.includes("Price ids not found:")) {
// Extract invalid feed IDs from error message
const invalidFeedIds = error.response.data
.split("Price ids not found:")[1]
.split(",")
.map((id: string) => id.trim().replace(/^0x/, ""));

// Log invalid feeds with their aliases
invalidFeedIds.forEach((id: string) => {
this.logger.error(
`Price feed ${id} (${this.priceIdToAlias.get(
id
)}) not found for getLatestPriceFeeds`
);
});

// Check health of the price feeds 5 second. If the price feeds are not updating
// for more than 30s, throw an error.
setInterval(() => {
// Filter out invalid feeds and retry
const validFeeds = this.priceIds.filter(
(id) => !invalidFeedIds.includes(id)
);

this.priceIds = validFeeds;

if (validFeeds.length > 0) {
this.logger.info(
"Retrying getLatestPriceFeeds with valid feeds only"
);
const validPriceFeeds = await this.connection.getLatestPriceFeeds(
validFeeds
);
validPriceFeeds?.forEach((priceFeed) => {
const latestAvailablePrice = priceFeed.getPriceUnchecked();
this.latestPriceInfo.set(priceFeed.id, {
price: latestAvailablePrice.price,
conf: latestAvailablePrice.conf,
publishTime: latestAvailablePrice.publishTime,
});
});
}
}
}

// Store health check interval reference
this.healthCheckInterval = setInterval(() => {
if (
this.lastUpdated === undefined ||
this.lastUpdated < Date.now() - 30 * 1000
Expand Down Expand Up @@ -88,4 +174,10 @@ export class PythPriceListener implements IPriceListener {
getLatestPriceInfo(priceId: string): PriceInfo | undefined {
return this.latestPriceInfo.get(priceId);
}

cleanup() {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
}
}
}
Loading