Skip to content

Commit

Permalink
Shut down gracefully on Ctrl+C (#670)
Browse files Browse the repository at this point in the history
* Shut down gracefully on Ctrl+C

Cleanly closes the database and HTTP server, in particular.

If there's ongoing work, üWave can attempt to publish messages to Redis
post-disconnect, which will error. Potentially it would be nice to have
an AbortSignal throughout the server for shutdown, but that's a bit more
work :)

* clean shutdown for emote plugin

* lint
  • Loading branch information
goto-bus-stop authored Nov 26, 2024
1 parent 89e2a3a commit 3e95976
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 23 deletions.
9 changes: 9 additions & 0 deletions bin/u-wave-core.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,12 @@ uw.listen(port).then(() => {
console.error(error.stack);
process.exit(1);
});

process.once('SIGINT', () => {
/** @type {import('../src/Uwave.js').Boot} */ (uw).close((err) => {
if (err != null) {
console.error(err);
process.exitCode = 1;
}
});
});
10 changes: 10 additions & 0 deletions dev/u-wave-dev-server.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { once } from 'node:events';
import minimist from 'minimist';
import concat from 'concat-stream';
import explain from 'explain-error';
Expand Down Expand Up @@ -79,6 +80,15 @@ async function start() {

await uw.listen();
logger.info('Now listening', { port });

await once(process, 'SIGINT');

/** @type {import('../src/Uwave.js').Boot} */ (uw).close((err) => {
if (err != null) {
console.error(err);
process.exitCode = 1;
}
});
}

await start();
1 change: 1 addition & 0 deletions src/Uwave.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class UwaveServer extends EventEmitter {

boot.onClose(() => Promise.all([
this.redis.quit(),
this.db.destroy(),
]));

boot.use(migrations);
Expand Down
77 changes: 54 additions & 23 deletions src/plugins/emotes.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ const schema = JSON.parse(
fs.readFileSync(new URL('../schemas/emotes.json', import.meta.url), 'utf8'),
);

/** @param {unknown} error */
function isAbortError(error) {
return error instanceof Error && error.name === 'AbortError';
}

/**
* @typedef {{
* clientId: string | null,
Expand Down Expand Up @@ -58,10 +63,11 @@ class EmoteMap extends Map {
/**
* @template {object} T
* @param {URL|string} url
* @param {import('node-fetch').RequestInit} [init]
* @returns {Promise<T>}
*/
async function fetchJSON(url) {
const res = await nodeFetch(url);
async function fetchJSON(url, init) {
const res = await nodeFetch(url, init);

if (!res.ok) {
if (res.status === 404) {
Expand All @@ -86,21 +92,25 @@ function fromBTTVEmote(bttv) {
};
}

async function getBTTVGlobalEmotes() {
/**
* @param {AbortSignal} [signal]
*/
async function getBTTVGlobalEmotes(signal) {
/** @type {BTTVEmote[]} */
const emotes = await fetchJSON('https://api.betterttv.net/3/cached/emotes/global');
const emotes = await fetchJSON('https://api.betterttv.net/3/cached/emotes/global', { signal });
return emotes.map(fromBTTVEmote);
}

/**
* @param {string} channelId
* @param {AbortSignal} [signal]
* @returns {Promise<Emote[]>}
*/
async function getBTTVChannelEmotes(channelId) {
async function getBTTVChannelEmotes(channelId, signal) {
let channel = null;
try {
channel = /** @type {{ channelEmotes: BTTVEmote[], sharedEmotes: BTTVEmote[] }} */ (
await fetchJSON(`https://api.betterttv.net/3/cached/users/twitch/${channelId}`)
await fetchJSON(`https://api.betterttv.net/3/cached/users/twitch/${channelId}`, { signal })
);
} catch (err) {
if (!(err instanceof NotFound)) {
Expand All @@ -118,12 +128,13 @@ async function getBTTVChannelEmotes(channelId) {

/**
* @param {string[]} channels
* @param {AbortSignal} [signal]
* @returns {Promise<Emote[]>}
*/
async function getBTTVEmotes(channels) {
async function getBTTVEmotes(channels, signal) {
const list = await Promise.all([
getBTTVGlobalEmotes(),
...channels.map((channelId) => getBTTVChannelEmotes(channelId)),
getBTTVGlobalEmotes(signal),
...channels.map((channelId) => getBTTVChannelEmotes(channelId, signal)),
]);

return list.flat();
Expand All @@ -142,13 +153,14 @@ function fromFFZEmote(emote) {

/**
* @param {string} channelName
* @param {AbortSignal} [signal]
* @returns {Promise<Emote[]>}
*/
async function getFFZChannelEmotes(channelName) {
async function getFFZChannelEmotes(channelName, signal) {
let channel = null;
try {
channel = /** @type {{ sets: Record<number, FFZEmoteSet> }} */ (
await fetchJSON(`https://api.frankerfacez.com/v1/room/${channelName}`)
await fetchJSON(`https://api.frankerfacez.com/v1/room/${channelName}`, { signal })
);
} catch (err) {
if (!(err instanceof NotFound)) {
Expand All @@ -166,10 +178,13 @@ async function getFFZChannelEmotes(channelName) {

/**
* @param {string[]} channels
* @param {AbortSignal} [signal]
* @returns {Promise<Emote[]>}
*/
async function getFFZEmotes(channels) {
const list = await Promise.all(channels.map((channelId) => getFFZChannelEmotes(channelId)));
async function getFFZEmotes(channels, signal) {
const list = await Promise.all(channels.map((channelId) => (
getFFZChannelEmotes(channelId, signal)
)));

return list.flat();
}
Expand All @@ -188,13 +203,14 @@ function fromSevenTVEmote(emote) {

/**
* @param {string} channelId
* @param {AbortSignal} [signal]
* @returns {Promise<Emote[]>}
*/
async function getSevenTVChannelEmotes(channelId) {
async function getSevenTVChannelEmotes(channelId, signal) {
let channel = null;
try {
channel = /** @type {{ emote_set?: { emotes: SevenTVEmote[] } }} */ (
await fetchJSON(`https://7tv.io/v3/users/twitch/${channelId}`)
await fetchJSON(`https://7tv.io/v3/users/twitch/${channelId}`, { signal })
);
} catch (err) {
if (!(err instanceof NotFound)) {
Expand All @@ -210,14 +226,15 @@ async function getSevenTVChannelEmotes(channelId) {

/**
* @param {string[]} channels
* @param {AbortSignal} [signal]
* @returns {Promise<Emote[]>}
*/
async function getSevenTVEmotes(channels) {
async function getSevenTVEmotes(channels, signal) {
/** @type {Promise<{ emotes: SevenTVEmote[] }>} */
const global = fetchJSON('https://7tv.io/v3/emote-sets/global');
const global = fetchJSON('https://7tv.io/v3/emote-sets/global', { signal });
const emotes = await Promise.all([
global.then((data) => data.emotes.map(fromSevenTVEmote)),
...channels.map((channelId) => getSevenTVChannelEmotes(channelId)),
...channels.map((channelId) => getSevenTVChannelEmotes(channelId, signal)),
]);

return emotes.flat();
Expand Down Expand Up @@ -250,6 +267,8 @@ class Emotes {

#ready = Promise.resolve();

#shutdownSignal;

/**
* @param {import('../Uwave.js').Boot} uw
*/
Expand All @@ -264,7 +283,14 @@ class Emotes {
this.#ready = this.#reloadEmotes();
},
);
uw.onClose(unsubscribe);

const controller = new AbortController();
uw.onClose(() => {
unsubscribe();
controller.abort();
});

this.#shutdownSignal = controller.signal;

this.#ready = this.#reloadEmotes();
}
Expand Down Expand Up @@ -312,15 +338,15 @@ class Emotes {
}

if (options.bttv) {
promises.push(getBTTVEmotes(channels));
promises.push(getBTTVEmotes(channels, this.#shutdownSignal));
}

if (options.ffz) {
promises.push(getFFZEmotes(options.channels));
promises.push(getFFZEmotes(options.channels, this.#shutdownSignal));
}

if (options.seventv) {
promises.push(getSevenTVEmotes(channels));
promises.push(getSevenTVEmotes(channels, this.#shutdownSignal));
}

const emotes = new EmoteMap();
Expand All @@ -331,7 +357,7 @@ class Emotes {
for (const emote of result.value) {
emotes.insert(emote);
}
} else {
} else if (!isAbortError(result.reason)) {
this.#logger.warn(result.reason);
}
}
Expand All @@ -348,6 +374,11 @@ class Emotes {
this.#emotes = await this.#loadTTVEmotes(config.twitch);
}

if (this.#shutdownSignal.aborted) {
this.#logger.info('emote reload aborted due to server shutdown');
return;
}

this.#uw.publish('emotes:reload', null);
}
}
Expand Down

0 comments on commit 3e95976

Please sign in to comment.