Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-utils] feat: Add usageV2 support #6021

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/neat-ads-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@thirdweb-dev/service-utils": minor
---

feat: Add usageV2 support
1 change: 1 addition & 0 deletions packages/service-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"sideEffects": false,
"dependencies": {
"aws4fetch": "1.0.20",
"kafkajs": "^2.2.4",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pin the version

Suggested change
"kafkajs": "^2.2.4",
"kafkajs": "2.2.4",

"zod": "3.24.1"
},
"devDependencies": {
Expand Down
45 changes: 45 additions & 0 deletions packages/service-utils/src/cf-worker/usageV2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { UsageV2Event } from "src/core/usageV2.js";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use relative imports

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of absolute path

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of absolute path

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of src-based path

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of absolute path

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import path should use relative path '../core/usageV2.js' instead of absolute path 'src/core/usageV2.js'

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of absolute path

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to fix TypeScript module resolution

Spotted by Graphite Reviewer (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.


/**
* Send events to Kafka.
* This method may throw. To call this non-blocking:
*
* ```ts
* void sendUsageV2Events(events, {
* environment: "production",
* serviceKey: "..."
* }).catch(console.error)
* ```
*
* @param events - The events to send.
* @param options.environment - The environment the service is running in.
* @param options.serviceKey - The service key required for authentication.
*/
export async function sendUsageV2Events(
events: UsageV2Event[],
options: {
environment: "development" | "production";
serviceKey: string;
},
): Promise<void> {
const baseUrl =
options.environment === "production"
? "https://u.thirdweb.com"
: "https://u.thirdweb-dev.com";

const resp = await fetch(`${baseUrl}/usage-v2/raw-events`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-service-api-key": options.serviceKey,
},
body: JSON.stringify({ events }),
});

if (!resp.ok) {
throw new Error(
`[UsageV2] unexpected response ${resp.status}: ${await resp.text()}`,
);
}
resp.body?.cancel();
Comment on lines +39 to +44
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The response body is consumed by resp.text() before throwing the error, making the subsequent resp.body?.cancel() call potentially unsafe since the body stream has already been read. Consider either:

  1. Moving the text() call before constructing the error:
const text = await resp.text();
if (!resp.ok) {
  throw new Error(`[UsageV2] unexpected response ${resp.status}: ${text}`);
}
  1. Or removing the cancel() call since the body is already consumed

Either approach would prevent attempting to operate on an already consumed response body.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

}
56 changes: 56 additions & 0 deletions packages/service-utils/src/core/usageV2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
export interface UsageV2Event {
/**
* A unique identifier for the event. Defaults to a random UUID.
* Useful if your service retries sending events.
*/
id?: `${string}-${string}-${string}-${string}-${string}`;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be too strict on the type here, just string should be fine - unless you want to specifically enforce uuidv4

/**
* The event timestamp. Defaults to now().
*/
created_at?: Date;
/**
* The source of the event. Example: "storage"
*/
source: string;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COULD consider enforcing the actual allowed service names here

/**
* The action of the event. Example: "upload"
*/
action: string;
/**
* The team ID.
*/
team_id: string;
/**
* The client ID, if available.
*/
client_id?: string;
/**
* The SDK name, if available.
*/
sdk_name?: string;
/**
* The SDK platform, if available.
*/
sdk_platform?: string;
/**
* The SDK version, if available.
*/
sdk_version?: string;
/**
* The SDK OS, if available.
*/
sdk_os?: string;
/**
* The product name, if available.
*/
product_name?: string;
/**
* The product version, if available.
*/
product_version?: string;
/**
* An object of service-specific data. Example: "file_size_bytes"
* It is safe to pass any new JSON-serializable data here before updating the usageV2 schema.
*/
data: Record<string, unknown>;
}
110 changes: 110 additions & 0 deletions packages/service-utils/src/node/usageV2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { randomUUID } from "node:crypto";
import { checkServerIdentity } from "node:tls";
import { Kafka, type Producer } from "kafkajs";
import type { UsageV2Event } from "src/core/usageV2.js";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relative import pls


const TOPIC_USAGE_V2 = "usage_v2.raw_events";

/**
* Creates a UsageV2Producer which opens a persistent TCP connection.
* This class is thread-safe so your service should re-use one instance.
*
* Example:
* ```ts
* usageV2 = new UsageV2Producer(..)
* await usageV2.init()
* await usageV2.sendEvents(events)
* // Non-blocking:
* // void usageV2.sendEvents(events).catch(console.error)
* ```
*/
export class UsageV2Producer {
private kafka: Kafka;
private producer: Producer | null = null;

constructor(config: {
/**
* A descriptive name for your service. Example: "storage-server"
*/
producerName: string;
/**
* The environment the service is running in.
*/
environment: "development" | "production";

username: string;
password: string;
}) {
this.kafka = new Kafka({
clientId: `${config.producerName}-${config.environment}`,
brokers:
config.environment === "production"
? ["warpstream.thirdweb.xyz:9092"]
: ["warpstream-dev.thirdweb.xyz:9092"],
ssl: {
checkServerIdentity(hostname, cert) {
return checkServerIdentity(hostname.toLowerCase(), cert);
},
},
sasl: {
mechanism: "plain",
username: config.username,
password: config.password,
},
});
}

/**
* Connect the producer.
* This must be called before calling `sendEvents()`.
*/
async init() {
this.producer = this.kafka.producer({
allowAutoTopicCreation: false,
});
await this.producer.connect();
}

/**
* Send usageV2 events.
* This method may throw. To call this non-blocking:
*
* ```ts
* usageV2 = new UsageV2Producer(...)
* void usageV2.sendEvents(events).catch(console.error)
*
* @param events - The events to send.
*/
async sendEvents(events: UsageV2Event[]): Promise<void> {
if (!this.producer) {
throw new Error("Producer not initialized. Call `init()` first.");
}

const parsedEvents = events.map(({ id, created_at, data, ...rest }) => {
return {
id: id ?? randomUUID(),
created_at: created_at ?? new Date(),
data: JSON.stringify(data),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the schema, the data field is already defined as a JSON object (Record<string, unknown>). The JSON.stringify() call here will convert it to a string, which is incorrect. Please remove the JSON.stringify() wrapper around data to maintain the correct type.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

...rest,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be better to just spell out each field here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for team_id field we MAY want to de-normalize or normalize it - since it might be passed in as team_<id> or just <id> based on how the service consumes it

};
});

await this.producer.send({
topic: TOPIC_USAGE_V2,
messages: parsedEvents.map((event) => ({
value: JSON.stringify(event),
})),
});
Comment on lines +92 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should happen if this were to throw?

}

/**
* Disconnects UsageV2Producer.
* Useful when shutting down the service to flush in-flight events.
*/
async disconnect() {
if (this.producer) {
await this.producer.disconnect();
this.producer = null;
}
}
}
Loading
Loading