Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker ID #14

Merged
merged 7 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/client/src/app/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export const makeServer = ({ environment = 'test' } = {}) => {
traceMessages: faker.datatype.boolean(),
version: '15',
workerId: faker.vehicle.vrm() + `${i}`,
deviceId: faker.vehicle.vrm(),
userAgent: '',
};
},
}),
Expand Down
6 changes: 6 additions & 0 deletions packages/connections/src/lib/mitmWorkerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export type MitmWorkerDTO = Omit<DTO<MitmWorkerConnection>, 'ws' | 'log' | 'hear

export class MitmWorkerConnection extends EventEmitter {
workerId?: string;
deviceId?: string;
userAgent?: string;
init: boolean;
noMessagesReceived: number;
dateLastMessageReceived: number;
Expand Down Expand Up @@ -65,6 +67,8 @@ export class MitmWorkerConnection extends EventEmitter {
welcome = WelcomeMessage.decode(new Uint8Array(message));

this.workerId = welcome.workerId;
this.deviceId = welcome.deviceId;
this.userAgent = welcome.useragent;
this.version = welcome.versionCode.toString();
this.origin = welcome.origin;
} catch (e) {
Expand Down Expand Up @@ -134,6 +138,8 @@ export class MitmWorkerConnection extends EventEmitter {
traceMessages: this.traceMessages,
version: this.version,
workerId: this.workerId,
deviceId: this.deviceId,
userAgent: this.userAgent,
};
}
}
11 changes: 6 additions & 5 deletions packages/connections/src/lib/utils/mitmProto.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,8 @@ export namespace RotomProtos {
/** Properties of a LoginResponse. */
interface ILoginResponse {

/** LoginResponse deviceId */
deviceId?: (string|null);
/** LoginResponse workerId */
workerId?: (string|null);

/** LoginResponse status */
status?: (RotomProtos.AuthStatus|null);
Expand All @@ -634,8 +634,8 @@ export namespace RotomProtos {
*/
constructor(properties?: RotomProtos.MitmResponse.ILoginResponse);

/** LoginResponse deviceId. */
public deviceId: string;
/** LoginResponse workerId. */
public workerId: string;

/** LoginResponse status. */
public status: RotomProtos.AuthStatus;
Expand Down Expand Up @@ -1099,6 +1099,7 @@ export namespace RotomProtos {
RPC_STATUS_ACCESS_DENIED = 15,
RPC_STATUS_ACCESS_SUSPENDED = 16,
RPC_STATUS_DEVICE_INCOMPATIBLE = 17,
RPC_STATUS_ACCESS_RATE_LIMITED = 18
RPC_STATUS_ACCESS_RATE_LIMITED = 18,
RPC_STATUS_MITM_DISALLOWED_REQUEST = 99
}
}
37 changes: 22 additions & 15 deletions packages/connections/src/lib/utils/mitmProto.js
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
* Properties of a LoginResponse.
* @memberof RotomProtos.MitmResponse
* @interface ILoginResponse
* @property {string|null} [deviceId] LoginResponse deviceId
* @property {string|null} [workerId] LoginResponse workerId
* @property {RotomProtos.AuthStatus|null} [status] LoginResponse status
* @property {boolean|null} [supportsCompression] LoginResponse supportsCompression
*/
Expand All @@ -1639,12 +1639,12 @@ export const RotomProtos = $root.RotomProtos = (() => {
}

/**
* LoginResponse deviceId.
* @member {string} deviceId
* LoginResponse workerId.
* @member {string} workerId
* @memberof RotomProtos.MitmResponse.LoginResponse
* @instance
*/
LoginResponse.prototype.deviceId = "";
LoginResponse.prototype.workerId = "";

/**
* LoginResponse status.
Expand Down Expand Up @@ -1686,8 +1686,8 @@ export const RotomProtos = $root.RotomProtos = (() => {
LoginResponse.encode = function encode(message, writer) {
if (!writer)
writer = $Writer.create();
if (message.deviceId != null && Object.hasOwnProperty.call(message, "deviceId"))
writer.uint32(/* id 1, wireType 2 =*/10).string(message.deviceId);
if (message.workerId != null && Object.hasOwnProperty.call(message, "workerId"))
writer.uint32(/* id 1, wireType 2 =*/10).string(message.workerId);
if (message.status != null && Object.hasOwnProperty.call(message, "status"))
writer.uint32(/* id 2, wireType 0 =*/16).int32(message.status);
if (message.supportsCompression != null && Object.hasOwnProperty.call(message, "supportsCompression"))
Expand Down Expand Up @@ -1727,7 +1727,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
let tag = reader.uint32();
switch (tag >>> 3) {
case 1: {
message.deviceId = reader.string();
message.workerId = reader.string();
break;
}
case 2: {
Expand Down Expand Up @@ -1773,9 +1773,9 @@ export const RotomProtos = $root.RotomProtos = (() => {
LoginResponse.verify = function verify(message) {
if (typeof message !== "object" || message === null)
return "object expected";
if (message.deviceId != null && message.hasOwnProperty("deviceId"))
if (!$util.isString(message.deviceId))
return "deviceId: string expected";
if (message.workerId != null && message.hasOwnProperty("workerId"))
if (!$util.isString(message.workerId))
return "workerId: string expected";
if (message.status != null && message.hasOwnProperty("status"))
switch (message.status) {
default:
Expand Down Expand Up @@ -1812,8 +1812,8 @@ export const RotomProtos = $root.RotomProtos = (() => {
if (object instanceof $root.RotomProtos.MitmResponse.LoginResponse)
return object;
let message = new $root.RotomProtos.MitmResponse.LoginResponse();
if (object.deviceId != null)
message.deviceId = String(object.deviceId);
if (object.workerId != null)
message.workerId = String(object.workerId);
switch (object.status) {
default:
if (typeof object.status === "number") {
Expand Down Expand Up @@ -1889,12 +1889,12 @@ export const RotomProtos = $root.RotomProtos = (() => {
options = {};
let object = {};
if (options.defaults) {
object.deviceId = "";
object.workerId = "";
object.status = options.enums === String ? "AUTH_STATUS_UNSET" : 0;
object.supportsCompression = false;
}
if (message.deviceId != null && message.hasOwnProperty("deviceId"))
object.deviceId = message.deviceId;
if (message.workerId != null && message.hasOwnProperty("workerId"))
object.workerId = message.workerId;
if (message.status != null && message.hasOwnProperty("status"))
object.status = options.enums === String ? $root.RotomProtos.AuthStatus[message.status] === undefined ? message.status : $root.RotomProtos.AuthStatus[message.status] : message.status;
if (message.supportsCompression != null && message.hasOwnProperty("supportsCompression"))
Expand Down Expand Up @@ -2103,6 +2103,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
case 16:
case 17:
case 18:
case 99:
break;
}
if (message.response != null && message.hasOwnProperty("response")) {
Expand Down Expand Up @@ -2208,6 +2209,10 @@ export const RotomProtos = $root.RotomProtos = (() => {
case 18:
message.rpcStatus = 18;
break;
case "RPC_STATUS_MITM_DISALLOWED_REQUEST":
case 99:
message.rpcStatus = 99;
break;
}
if (object.response) {
if (!Array.isArray(object.response))
Expand Down Expand Up @@ -2915,6 +2920,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
* @property {number} RPC_STATUS_ACCESS_SUSPENDED=16 RPC_STATUS_ACCESS_SUSPENDED value
* @property {number} RPC_STATUS_DEVICE_INCOMPATIBLE=17 RPC_STATUS_DEVICE_INCOMPATIBLE value
* @property {number} RPC_STATUS_ACCESS_RATE_LIMITED=18 RPC_STATUS_ACCESS_RATE_LIMITED value
* @property {number} RPC_STATUS_MITM_DISALLOWED_REQUEST=99 RPC_STATUS_MITM_DISALLOWED_REQUEST value
*/
RotomProtos.RpcStatus = (function() {
const valuesById = {}, values = Object.create(valuesById);
Expand All @@ -2936,6 +2942,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
values[valuesById[16] = "RPC_STATUS_ACCESS_SUSPENDED"] = 16;
values[valuesById[17] = "RPC_STATUS_DEVICE_INCOMPATIBLE"] = 17;
values[valuesById[18] = "RPC_STATUS_ACCESS_RATE_LIMITED"] = 18;
values[valuesById[99] = "RPC_STATUS_MITM_DISALLOWED_REQUEST"] = 99;
return values;
})();

Expand Down
3 changes: 2 additions & 1 deletion packages/server/mitm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ message MitmResponse {
}

message LoginResponse {
string device_id = 1;
string worker_id = 1;
AuthStatus status = 2;
bool supports_compression = 3;
}
Expand Down Expand Up @@ -121,4 +121,5 @@ enum RpcStatus {
RPC_STATUS_ACCESS_SUSPENDED = 16;
RPC_STATUS_DEVICE_INCOMPATIBLE = 17;
RPC_STATUS_ACCESS_RATE_LIMITED = 18;
RPC_STATUS_MITM_DISALLOWED_REQUEST = 99;
}
78 changes: 36 additions & 42 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,34 +182,21 @@ wssMitm.on('connection', (ws, req) => {

const wssScanner = new WebSocketServer({ port: config.controllerListener.port });

// function onSocketError(err: Error) {
// log.info(err);
// }
//
// wssScanner.on('upgrade', (request, socket, head) => {
// socket.on('error', onSocketError);
//
// // This function is not defined on purpose. Implement it with your own logic.
// authenticate(request, (err : Error, client) => {
// if (err || !client) {
// socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
// socket.destroy();
// return;
// }
//
// socket.removeListener('error', onSocketError);
//
// wssScanner.handleUpgrade(request, socket, head, function done(ws) {
// wssScanner.emit('connection', ws, request, client);
// });
// });
// });

function identifyControlChannelFromDevice(deviceId: string): string | null {
// Find a currently connected control connection that starts with the same characters
function identifyControlChannelFromWorkerId(workerId: string): string | null {
// Try to look up connected worker id and see if it presented us with a device id
const connection = currentConnections[workerId];

if (connection) {
const deviceId = connection.mitm?.deviceId;
if (deviceId) {
return deviceId;
}
}

// Fallback: Find a currently connected control connection that starts with the same characters
// as the given device id
for (const key of Object.keys(controlConnections)) {
if (deviceId.substring(0, key.length) === key) return key;
if (workerId.substring(0, key.length) === key) return key;
}
return null;
}
Expand All @@ -230,30 +217,37 @@ wssScanner.on('connection', (ws, req) => {
return;
}

let nextSpareDeviceId = unallocatedConnections.shift() as string;
let nextSpareWorkerId = unallocatedConnections.shift() as string;
let eligibleDeviceFound = false;
const firstSpareDeviceId = nextSpareDeviceId;
const firstSpareWorkerId = nextSpareWorkerId;
do {
const mainDeviceId = identifyControlChannelFromDevice(nextSpareDeviceId);
const mainDeviceId = identifyControlChannelFromWorkerId(nextSpareWorkerId);
log.info(`SCANNER: Found ${mainDeviceId} connects to workerId ${nextSpareWorkerId}`);
if (mainDeviceId == null) {
log.info(`SCANNER: Warning - found ${nextSpareDeviceId} in pool with no record of main device`);
unallocatedConnections.push(nextSpareDeviceId);
nextSpareDeviceId = unallocatedConnections.shift() as string;
log.info(`SCANNER: Warning - found ${nextSpareWorkerId} in pool with no record of main device`);
unallocatedConnections.push(nextSpareWorkerId);
nextSpareWorkerId = unallocatedConnections.shift() as string;
} else {
const mainDeviceInfo = deviceInformation[mainDeviceId];
if (mainDeviceInfo.lastScannerConnection + config.monitor.deviceCooldown > Date.now() / 1000) {
// device was allocated to someone else too recently, find another
unallocatedConnections.push(nextSpareDeviceId);
nextSpareDeviceId = unallocatedConnections.shift() as string;
if (!mainDeviceInfo) {
log.info(`SCANNER: Warning - found ${nextSpareWorkerId} in pool with no record of main device ${mainDeviceId}`);
unallocatedConnections.push(nextSpareWorkerId);
nextSpareWorkerId = unallocatedConnections.shift() as string;
} else {
eligibleDeviceFound = true;
if (mainDeviceInfo.lastScannerConnection + config.monitor.deviceCooldown > Date.now() / 1000) {
// device was allocated to someone else too recently, find another
unallocatedConnections.push(nextSpareWorkerId);
nextSpareWorkerId = unallocatedConnections.shift() as string;
} else {
eligibleDeviceFound = true;
}
}
}
} while (!eligibleDeviceFound && nextSpareDeviceId != firstSpareDeviceId);
} while (!eligibleDeviceFound && nextSpareWorkerId != firstSpareWorkerId);

if (!eligibleDeviceFound) {
// no devices found, return the original one back to pool
unallocatedConnections.push(nextSpareDeviceId);
unallocatedConnections.push(nextSpareWorkerId);
log.info(
`SCANNER: New connection from ${req.socket.remoteAddress} - no MITMs available outside cooldown, rejecting`,
);
Expand All @@ -263,12 +257,12 @@ wssScanner.on('connection', (ws, req) => {
}

// Set last connection time on device
const mainDeviceId = identifyControlChannelFromDevice(nextSpareDeviceId) as string;
const mainDeviceId = identifyControlChannelFromWorkerId(nextSpareWorkerId) as string;
deviceInformation[mainDeviceId].lastScannerConnection = Date.now() / 1000;

log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - will allocate ${nextSpareDeviceId}`);
log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - will allocate ${nextSpareWorkerId}`);

const currentConnection = currentConnections[nextSpareDeviceId];
const currentConnection = currentConnections[nextSpareWorkerId];
const scannerConnection = new ScannerConnection(log, ws, currentConnection.mitm);
currentConnection.scanner = scannerConnection;

Expand Down
Loading