diff --git a/docker/relay/Dockerfile b/docker/relay/Dockerfile index c07aa208..e23335a2 100644 --- a/docker/relay/Dockerfile +++ b/docker/relay/Dockerfile @@ -5,8 +5,10 @@ WORKDIR /app COPY extensions/src/multiplayer/server/package.json ./ RUN npm install -COPY extensions/src/multiplayer/server/relay.ts extensions/src/multiplayer/server/wrangler.toml ./ +COPY extensions/src/multiplayer/server/*.ts extensions/src/multiplayer/server/wrangler.toml ./ EXPOSE 8787 -CMD ["npx", "wrangler", "dev", "--ip", "0.0.0.0", "--port", "8787"] +# Run wrangler directly as PID 1 so it receives SIGINT (Ctrl+C) +# and shuts down gracefully. Using npx as PID 1 swallows signals. +CMD ["node_modules/.bin/wrangler", "dev", "--ip", "0.0.0.0", "--port", "8787"] diff --git a/extensions/src/multiplayer/server/gameroom.ts b/extensions/src/multiplayer/server/gameroom.ts new file mode 100644 index 00000000..8a892adf --- /dev/null +++ b/extensions/src/multiplayer/server/gameroom.ts @@ -0,0 +1,161 @@ +import { + HEADER_SIZE, + MSG_REQUEST_SNAPSHOT, + MSG_WORLD_EVENT_REQUEST, + MSG_WORLD_SNAPSHOT, + SNAPSHOT_MIN_SIZE, + createAssignIdMsg, + createHostAssignMsg, + createLeaveMsg, + readTargetPeerId, + stampSender, +} from "./protocol"; +import type { Env } from "./relay"; + +export class GameRoom implements DurableObject { + private connections = new Map(); + private nextPeerId = 1; + private hostPeerId = 0; + + constructor( + private state: DurableObjectState, + private env: Env + ) {} + + async fetch(request: Request): Promise { + if (request.headers.get("Upgrade") !== "websocket") { + return new Response("Expected WebSocket", { status: 426 }); + } + + const pair = new WebSocketPair(); + const [client, server] = [pair[0], pair[1]]; + + const peerId = this.nextPeerId++; + + server.accept(); + this.connections.set(peerId, server); + + server.send(createAssignIdMsg(peerId)); + this.assignHostIfNeeded(peerId, server); + + server.addEventListener("message", (event) => + this.handleMessage(event, peerId) + ); + + const handleDisconnect = () => this.handleDisconnect(peerId); + server.addEventListener("close", handleDisconnect); + server.addEventListener("error", handleDisconnect); + + return new Response(null, { status: 101, webSocket: client }); + } + + // ---- Connection lifecycle ---- + + private assignHostIfNeeded(peerId: number, ws: WebSocket): void { + if (this.hostPeerId === 0 || !this.connections.has(this.hostPeerId)) { + this.hostPeerId = peerId; + this.broadcast(createHostAssignMsg(this.hostPeerId)); + } else { + this.trySend(ws, createHostAssignMsg(this.hostPeerId)); + } + } + + private handleDisconnect(peerId: number): void { + this.connections.delete(peerId); + this.broadcast(createLeaveMsg(peerId)); + + if (peerId === this.hostPeerId) { + this.electNewHost(); + } + } + + // ---- Message routing ---- + + private handleMessage(event: MessageEvent, peerId: number): void { + if (!(event.data instanceof ArrayBuffer)) { + return; + } + + const data = new Uint8Array(event.data); + if (data.length < HEADER_SIZE) { + return; + } + + const msgType = data[0]; + const stamped = stampSender(data, peerId); + + if ( + msgType === MSG_REQUEST_SNAPSHOT || + msgType === MSG_WORLD_EVENT_REQUEST + ) { + this.sendToHost(stamped); + } else if ( + msgType === MSG_WORLD_SNAPSHOT && + data.length >= SNAPSHOT_MIN_SIZE + ) { + this.sendToTarget(stamped); + } else { + this.broadcastExcept(stamped.buffer, peerId); + } + } + + private sendToHost(data: Uint8Array): void { + const hostWs = this.connections.get(this.hostPeerId); + if (hostWs) { + this.trySend(hostWs, data.buffer); + } + } + + private sendToTarget(data: Uint8Array): void { + const targetId = readTargetPeerId(data); + const targetWs = this.connections.get(targetId); + if (targetWs) { + if (!this.trySend(targetWs, data.buffer)) { + this.connections.delete(targetId); + } + } + } + + // ---- Broadcasting ---- + + private broadcast(msg: ArrayBuffer): void { + for (const ws of this.connections.values()) { + this.trySend(ws, msg); + } + } + + private broadcastExcept(msg: ArrayBuffer, excludePeerId: number): void { + for (const [id, ws] of this.connections) { + if (id !== excludePeerId) { + if (!this.trySend(ws, msg)) { + this.connections.delete(id); + } + } + } + } + + private trySend(ws: WebSocket, data: ArrayBuffer): boolean { + try { + ws.send(data); + return true; + } catch { + return false; + } + } + + // ---- Host election ---- + + private electNewHost(): void { + this.hostPeerId = 0; + + for (const id of this.connections.keys()) { + if (this.hostPeerId === 0 || id < this.hostPeerId) { + this.hostPeerId = id; + } + } + + if (this.hostPeerId > 0) { + this.broadcast(createHostAssignMsg(this.hostPeerId)); + } + } +} diff --git a/extensions/src/multiplayer/server/protocol.ts b/extensions/src/multiplayer/server/protocol.ts new file mode 100644 index 00000000..46e4a911 --- /dev/null +++ b/extensions/src/multiplayer/server/protocol.ts @@ -0,0 +1,64 @@ +// Protocol constants — must stay in sync with protocol.h + +// MessageHeader binary layout: type(1) + peerId(4) + sequence(4) = 9 bytes +export const HEADER_SIZE = 9; +export const PEER_ID_OFFSET = 1; +export const SEQUENCE_OFFSET = 5; + +// Message types the relay inspects for routing decisions. +// All other types are broadcast to every peer in the room. +export const MSG_LEAVE = 2; +export const MSG_HOST_ASSIGN = 4; +export const MSG_REQUEST_SNAPSHOT = 5; +export const MSG_WORLD_SNAPSHOT = 6; +export const MSG_WORLD_EVENT_REQUEST = 8; +export const MSG_ASSIGN_ID = 0xff; + +// AssignIdMsg: compact server-only message — type(1) + peerId(4) +const ASSIGN_ID_SIZE = 1 + 4; + +// HostAssignMsg: header(9) + hostPeerId(4) +const HOST_ASSIGN_SIZE = HEADER_SIZE + 4; + +// WorldSnapshotMsg: header(9) + targetPeerId(4) + dataLength(2) + data... +export const SNAPSHOT_TARGET_OFFSET = HEADER_SIZE; +export const SNAPSHOT_MIN_SIZE = HEADER_SIZE + 4 + 2; + +export function createAssignIdMsg(peerId: number): ArrayBuffer { + const buf = new ArrayBuffer(ASSIGN_ID_SIZE); + const view = new DataView(buf); + view.setUint8(0, MSG_ASSIGN_ID); + view.setUint32(1, peerId, true); + return buf; +} + +export function createHostAssignMsg(hostPeerId: number): ArrayBuffer { + const buf = new ArrayBuffer(HOST_ASSIGN_SIZE); + const view = new DataView(buf); + view.setUint8(0, MSG_HOST_ASSIGN); + view.setUint32(PEER_ID_OFFSET, 0, true); + view.setUint32(SEQUENCE_OFFSET, 0, true); + view.setUint32(HEADER_SIZE, hostPeerId, true); + return buf; +} + +export function createLeaveMsg(peerId: number): ArrayBuffer { + const buf = new ArrayBuffer(HEADER_SIZE); + const view = new DataView(buf); + view.setUint8(0, MSG_LEAVE); + view.setUint32(PEER_ID_OFFSET, peerId, true); + view.setUint32(SEQUENCE_OFFSET, 0, true); + return buf; +} + +/** Copy a message and stamp the sender's peerId into the header. */ +export function stampSender(data: Uint8Array, peerId: number): Uint8Array { + const stamped = new Uint8Array(data.length); + stamped.set(data); + new DataView(stamped.buffer).setUint32(PEER_ID_OFFSET, peerId, true); + return stamped; +} + +export function readTargetPeerId(data: Uint8Array): number { + return new DataView(data.buffer).getUint32(SNAPSHOT_TARGET_OFFSET, true); +} diff --git a/extensions/src/multiplayer/server/relay.ts b/extensions/src/multiplayer/server/relay.ts index 0090eda4..4fb804fb 100644 --- a/extensions/src/multiplayer/server/relay.ts +++ b/extensions/src/multiplayer/server/relay.ts @@ -1,3 +1,5 @@ +export { GameRoom } from "./gameroom"; + export interface Env { GAME_ROOM: DurableObjectNamespace; } @@ -25,180 +27,3 @@ export default { return new Response("Not Found", { status: 404 }); }, }; - -// Message types matching protocol.h -const MSG_LEAVE = 2; -const MSG_HOST_ASSIGN = 4; -const MSG_REQUEST_SNAPSHOT = 5; -const MSG_WORLD_SNAPSHOT = 6; -const MSG_WORLD_EVENT_REQUEST = 8; -const MSG_ASSIGN_ID = 0xff; - -export class GameRoom implements DurableObject { - private connections: Map = new Map(); - private nextPeerId: number = 1; - private hostPeerId: number = 0; - - constructor( - private state: DurableObjectState, - private env: Env - ) {} - - async fetch(request: Request): Promise { - if (request.headers.get("Upgrade") !== "websocket") { - return new Response("Expected WebSocket", { status: 426 }); - } - - const pair = new WebSocketPair(); - const [client, server] = [pair[0], pair[1]]; - - const peerId = this.nextPeerId++; - const peerIdStr = String(peerId); - - server.accept(); - this.connections.set(peerIdStr, server); - - // Send the peer its assigned ID as the first message - const idMsg = new ArrayBuffer(5); - const view = new DataView(idMsg); - view.setUint8(0, MSG_ASSIGN_ID); - view.setUint32(1, peerId, true); // little-endian peer ID - server.send(idMsg); - - // Assign host if none exists (first peer becomes host) - if (this.hostPeerId === 0 || !this.connections.has(String(this.hostPeerId))) { - this.hostPeerId = peerId; - this.broadcastHostAssign(); - } else { - // Send current host assignment to the new peer only - this.sendHostAssign(server); - } - - server.addEventListener("message", (event) => { - if (!(event.data instanceof ArrayBuffer)) { - return; - } - - const data = new Uint8Array(event.data); - if (data.length < 9) { - return; // Too short for header - } - - const msgType = data[0]; - - // Stamp the peerId into the message header (bytes 1-4) - const stamped = new Uint8Array(data.length); - stamped.set(data); - new DataView(stamped.buffer).setUint32(1, peerId, true); - - if (msgType === MSG_REQUEST_SNAPSHOT || msgType === MSG_WORLD_EVENT_REQUEST) { - // Route to host only - const hostWs = this.connections.get(String(this.hostPeerId)); - if (hostWs) { - try { - hostWs.send(stamped.buffer); - } catch { - // Host disconnected; will be handled by close event - } - } - } else if (msgType === MSG_WORLD_SNAPSHOT && data.length >= 15) { - // Route to the target peer only (targetPeerId at offset 9) - const targetId = new DataView(stamped.buffer).getUint32(9, true); - const targetWs = this.connections.get(String(targetId)); - if (targetWs) { - try { - targetWs.send(stamped.buffer); - } catch { - this.connections.delete(String(targetId)); - } - } - } else { - // Broadcast to all other peers in this room - for (const [id, ws] of this.connections) { - if (id !== peerIdStr) { - try { - ws.send(stamped.buffer); - } catch { - this.connections.delete(id); - } - } - } - } - }); - - const handleDisconnect = () => { - this.connections.delete(peerIdStr); - - // Broadcast LEAVE message to remaining peers - const leaveMsg = new ArrayBuffer(9); - const leaveView = new DataView(leaveMsg); - leaveView.setUint8(0, MSG_LEAVE); - leaveView.setUint32(1, peerId, true); - leaveView.setUint32(5, 0, true); // sequence 0 - - for (const [, ws] of this.connections) { - try { - ws.send(leaveMsg); - } catch { - // Ignore send errors on cleanup - } - } - - // Host migration: if the disconnected peer was the host, assign a new one - if (peerId === this.hostPeerId) { - this.electNewHost(); - } - }; - - server.addEventListener("close", handleDisconnect); - server.addEventListener("error", handleDisconnect); - - return new Response(null, { status: 101, webSocket: client }); - } - - private electNewHost(): void { - // Pick the lowest peer ID from remaining connections - let lowestId = 0; - for (const idStr of this.connections.keys()) { - const id = parseInt(idStr, 10); - if (lowestId === 0 || id < lowestId) { - lowestId = id; - } - } - - this.hostPeerId = lowestId; - if (lowestId > 0) { - this.broadcastHostAssign(); - } - } - - private broadcastHostAssign(): void { - const msg = this.createHostAssignMsg(); - for (const [, ws] of this.connections) { - try { - ws.send(msg); - } catch { - // Ignore send errors - } - } - } - - private sendHostAssign(ws: WebSocket): void { - try { - ws.send(this.createHostAssignMsg()); - } catch { - // Ignore send errors - } - } - - private createHostAssignMsg(): ArrayBuffer { - // MessageHeader (9 bytes) + hostPeerId (4 bytes) = 13 bytes - const msg = new ArrayBuffer(13); - const view = new DataView(msg); - view.setUint8(0, MSG_HOST_ASSIGN); // type - view.setUint32(1, 0, true); // peerId (server, so 0) - view.setUint32(5, 0, true); // sequence - view.setUint32(9, this.hostPeerId, true); // hostPeerId - return msg; - } -}