-
Notifications
You must be signed in to change notification settings - Fork 422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[service-utils] feat: Add usageV2 support #6021
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"@thirdweb-dev/service-utils": minor | ||
--- | ||
|
||
feat: Add usageV2 support |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import type { UsageV2Event } from "src/core/usageV2.js"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use relative imports There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of absolute path Spotted by Graphite Reviewer (based on CI logs) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of absolute path Spotted by Graphite Reviewer (based on CI logs) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path Spotted by Graphite Reviewer (based on CI logs) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of src-based path Spotted by Graphite Reviewer (based on CI logs) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of absolute path Spotted by Graphite Reviewer (based on CI logs) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Import path should use relative path '../core/usageV2.js' instead of absolute path 'src/core/usageV2.js' Spotted by Graphite Reviewer (based on CI logs) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to use relative path instead of absolute path Spotted by Graphite Reviewer (based on CI logs) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change import path from 'src/core/usageV2.js' to '../core/usageV2.js' to fix TypeScript module resolution Spotted by Graphite Reviewer (based on CI logs) |
||
|
||
/** | ||
* Send events to Kafka. | ||
* This method may throw. To call this non-blocking: | ||
* | ||
* ```ts | ||
* void sendUsageV2Events(events, { | ||
* environment: "production", | ||
* serviceKey: "..." | ||
* }).catch(console.error) | ||
* ``` | ||
* | ||
* @param events - The events to send. | ||
* @param options.environment - The environment the service is running in. | ||
* @param options.serviceKey - The service key required for authentication. | ||
*/ | ||
export async function sendUsageV2Events( | ||
events: UsageV2Event[], | ||
options: { | ||
environment: "development" | "production"; | ||
serviceKey: string; | ||
}, | ||
): Promise<void> { | ||
const baseUrl = | ||
options.environment === "production" | ||
? "https://u.thirdweb.com" | ||
: "https://u.thirdweb-dev.com"; | ||
|
||
const resp = await fetch(`${baseUrl}/usage-v2/raw-events`, { | ||
method: "POST", | ||
headers: { | ||
"Content-Type": "application/json", | ||
"x-service-api-key": options.serviceKey, | ||
}, | ||
body: JSON.stringify({ events }), | ||
}); | ||
|
||
if (!resp.ok) { | ||
throw new Error( | ||
`[UsageV2] unexpected response ${resp.status}: ${await resp.text()}`, | ||
); | ||
} | ||
resp.body?.cancel(); | ||
Comment on lines
+39
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The response body is consumed by
const text = await resp.text();
if (!resp.ok) {
throw new Error(`[UsageV2] unexpected response ${resp.status}: ${text}`);
}
Either approach would prevent attempting to operate on an already consumed response body. Spotted by Graphite Reviewer |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
export interface UsageV2Event { | ||
/** | ||
* A unique identifier for the event. Defaults to a random UUID. | ||
* Useful if your service retries sending events. | ||
*/ | ||
id?: `${string}-${string}-${string}-${string}-${string}`; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might be too strict on the type here, just |
||
/** | ||
* The event timestamp. Defaults to now(). | ||
*/ | ||
created_at?: Date; | ||
/** | ||
* The source of the event. Example: "storage" | ||
*/ | ||
source: string; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. COULD consider enforcing the actual allowed service names here |
||
/** | ||
* The action of the event. Example: "upload" | ||
*/ | ||
action: string; | ||
/** | ||
* The team ID. | ||
*/ | ||
team_id: string; | ||
/** | ||
* The client ID, if available. | ||
*/ | ||
client_id?: string; | ||
/** | ||
* The SDK name, if available. | ||
*/ | ||
sdk_name?: string; | ||
/** | ||
* The SDK platform, if available. | ||
*/ | ||
sdk_platform?: string; | ||
/** | ||
* The SDK version, if available. | ||
*/ | ||
sdk_version?: string; | ||
/** | ||
* The SDK OS, if available. | ||
*/ | ||
sdk_os?: string; | ||
/** | ||
* The product name, if available. | ||
*/ | ||
product_name?: string; | ||
/** | ||
* The product version, if available. | ||
*/ | ||
product_version?: string; | ||
/** | ||
* An object of service-specific data. Example: "file_size_bytes" | ||
* It is safe to pass any new JSON-serializable data here before updating the usageV2 schema. | ||
*/ | ||
data: Record<string, unknown>; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import { randomUUID } from "node:crypto"; | ||
import { checkServerIdentity } from "node:tls"; | ||
import { Kafka, type Producer } from "kafkajs"; | ||
import type { UsageV2Event } from "src/core/usageV2.js"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. relative import pls |
||
|
||
const TOPIC_USAGE_V2 = "usage_v2.raw_events"; | ||
|
||
/** | ||
* Creates a UsageV2Producer which opens a persistent TCP connection. | ||
* This class is thread-safe so your service should re-use one instance. | ||
* | ||
* Example: | ||
* ```ts | ||
* usageV2 = new UsageV2Producer(..) | ||
* await usageV2.init() | ||
* await usageV2.sendEvents(events) | ||
* // Non-blocking: | ||
* // void usageV2.sendEvents(events).catch(console.error) | ||
* ``` | ||
*/ | ||
export class UsageV2Producer { | ||
private kafka: Kafka; | ||
private producer: Producer | null = null; | ||
|
||
constructor(config: { | ||
/** | ||
* A descriptive name for your service. Example: "storage-server" | ||
*/ | ||
producerName: string; | ||
/** | ||
* The environment the service is running in. | ||
*/ | ||
environment: "development" | "production"; | ||
|
||
username: string; | ||
password: string; | ||
}) { | ||
this.kafka = new Kafka({ | ||
clientId: `${config.producerName}-${config.environment}`, | ||
brokers: | ||
config.environment === "production" | ||
? ["warpstream.thirdweb.xyz:9092"] | ||
: ["warpstream-dev.thirdweb.xyz:9092"], | ||
ssl: { | ||
checkServerIdentity(hostname, cert) { | ||
return checkServerIdentity(hostname.toLowerCase(), cert); | ||
}, | ||
}, | ||
sasl: { | ||
mechanism: "plain", | ||
username: config.username, | ||
password: config.password, | ||
}, | ||
}); | ||
} | ||
|
||
/** | ||
* Connect the producer. | ||
* This must be called before calling `sendEvents()`. | ||
*/ | ||
async init() { | ||
this.producer = this.kafka.producer({ | ||
allowAutoTopicCreation: false, | ||
}); | ||
await this.producer.connect(); | ||
} | ||
|
||
/** | ||
* Send usageV2 events. | ||
* This method may throw. To call this non-blocking: | ||
* | ||
* ```ts | ||
* usageV2 = new UsageV2Producer(...) | ||
* void usageV2.sendEvents(events).catch(console.error) | ||
* | ||
* @param events - The events to send. | ||
*/ | ||
async sendEvents(events: UsageV2Event[]): Promise<void> { | ||
if (!this.producer) { | ||
throw new Error("Producer not initialized. Call `init()` first."); | ||
} | ||
|
||
const parsedEvents = events.map(({ id, created_at, data, ...rest }) => { | ||
return { | ||
id: id ?? randomUUID(), | ||
created_at: created_at ?? new Date(), | ||
data: JSON.stringify(data), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to the schema, the Spotted by Graphite Reviewer |
||
...rest, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might be better to just spell out each field here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for |
||
}; | ||
}); | ||
|
||
await this.producer.send({ | ||
topic: TOPIC_USAGE_V2, | ||
messages: parsedEvents.map((event) => ({ | ||
value: JSON.stringify(event), | ||
})), | ||
}); | ||
Comment on lines
+92
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what should happen if this were to throw? |
||
} | ||
|
||
/** | ||
* Disconnects UsageV2Producer. | ||
* Useful when shutting down the service to flush in-flight events. | ||
*/ | ||
async disconnect() { | ||
if (this.producer) { | ||
await this.producer.disconnect(); | ||
this.producer = null; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pin the version