diff --git a/board-network/src/protocol.rs b/board-network/src/protocol.rs index c850203..b1f1967 100644 --- a/board-network/src/protocol.rs +++ b/board-network/src/protocol.rs @@ -11,5 +11,9 @@ pub enum ClientMessage { pub enum ServerMessage { JoinedRoom { room_name: String, + players: Vec<(String, u32, bool)>, }, + PlayerConnected(String), + PlayerReconnected(usize), + PlayerDisconnected(usize), } \ No newline at end of file diff --git a/board-server/Cargo.toml b/board-server/Cargo.toml index f0d8a9e..0a23f4f 100644 --- a/board-server/Cargo.toml +++ b/board-server/Cargo.toml @@ -13,3 +13,5 @@ smol = "1.3.0" async-tungstenite = "0.20.0" tungstenite = "0.18.0" anyhow = "1.0.69" +rand = "0.8.5" +serde_json = "1.0.93" diff --git a/board-server/README.md b/board-server/README.md new file mode 100644 index 0000000..224d97f --- /dev/null +++ b/board-server/README.md @@ -0,0 +1,50 @@ +# WebSocket server implementation + +This crate provides a WebSocket server for the Scrabble with numbers game. + +## Build + +This project uses [Cargo](https://crates.io/), so ensure you have it installed. + +```bash +cargo build --release +``` + +## Usage + +The server listens on port `8080` by default. You can change this by specifying a program argument: + +```bash +# Run a debug build on port 1234 +cargo run -- '0.0.0.0:1234' +# Run an already built binary on the default port +./board-server +``` + +## Protocol + +The server only understands certain predefined messages. +All messages are sent as JSON strings are can only be sent by either the client or the server. + +You can see the exact layout of the messages in the [protocol file](../board-network/src/protocol.rs). + +Messages sent and received shouldn't contain any unnecessary indentation. + +## Sample client + +```js +// Create WebSocket connection. +const socket = new WebSocket('ws://localhost:8080'); + +// Connection opened +socket.addEventListener('open', (event) => { + // Create a new room, and join it it immediately with the player name "player_name" + // The server will respond with a RoomCreated message which contains the room name + socket.send(JSON.stringify({ CreateRoom: 'player_name' })); +}); + +// Listen for messages +socket.addEventListener('message', (event) => { + console.log('Message from server', JSON.parse(event.data)); +}); +``` diff --git a/board-server/src/main.rs b/board-server/src/main.rs index eb3404c..d8b4671 100644 --- a/board-server/src/main.rs +++ b/board-server/src/main.rs @@ -1,17 +1,103 @@ +mod player; +mod room; + +use crate::room::{generate_room_name, Room, RoomHandle}; use anyhow::Result; -use futures::StreamExt; +use async_tungstenite::WebSocketStream; +use board_network::protocol::ClientMessage; +use futures::channel::mpsc::{unbounded, UnboundedSender}; +use futures::future::join; +use futures::{future, SinkExt, StreamExt}; use smol::Async; +use std::collections::HashMap; use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::sync::{Arc, Mutex}; use std::{env, io}; use tungstenite::Message as WebsocketMessage; -async fn handle_connection(raw_stream: Async, addr: SocketAddr) -> Result<()> { +type Rooms = Arc>>; + +async fn run_player( + player_name: String, + addr: SocketAddr, + handle: RoomHandle, + ws_stream: WebSocketStream>, +) { + let (incoming, outgoing) = ws_stream.split(); + let (tx, rx) = unbounded(); + + { + let room = &mut handle.room.lock().unwrap(); + if let Err(e) = room.add_player(addr, player_name.clone(), tx) { + eprintln!("[{}] Failed to add player: {:?}", room.name, e); + return; + } + } + + let write = handle.write.clone(); + let ra = rx + .map(|c| { + serde_json::to_string(&c).unwrap_or_else(|_| panic!("Could not serialize {:?}", c)) + }) + .map(WebsocketMessage::Text) + .map(Ok) + .forward(incoming); + let rb = outgoing + .map(|m| match m { + Ok(WebsocketMessage::Text(t)) => serde_json::from_str::(&t).ok(), + _ => None, + }) + .take_while(|m| future::ready(m.is_some())) + .map(|m| m.unwrap()) + .chain(futures::stream::once(async { ClientMessage::Disconnected })) + .map(move |m| Ok((addr, m))) + .forward(write); + + let (ra, rb) = join(ra, rb).await; + if let Err(e) = ra { + eprintln!("[{addr}] Got error {e} from player {player_name}'s rx queue"); + } + if let Err(e) = rb { + eprintln!("[{addr}] Got error {e} from player {player_name}'s tx queue"); + } + println!("[{addr}] Finished session with {player_name}"); +} + +async fn handle_connection( + rooms: Rooms, + raw_stream: Async, + addr: SocketAddr, + mut close_room: UnboundedSender, +) -> Result<()> { println!("[{addr}] Incoming TCP connection"); let mut ws_stream = async_tungstenite::accept_async(raw_stream).await?; println!("[{addr}] WebSocket connection established"); - while let Some(Ok(WebsocketMessage::Text(text))) = ws_stream.next().await { - println!("[{addr}] Received message: {text}"); + if let Some(Ok(WebsocketMessage::Text(text))) = ws_stream.next().await { + let msg = serde_json::from_str::(&text)?; + match msg { + ClientMessage::CreateRoom(player_name) => { + let (write, read) = unbounded(); + let room = Arc::new(Mutex::new(Room::default())); + let handle = RoomHandle { write, room }; + let room_name = generate_room_name(&mut rooms.lock().unwrap(), handle.clone()); + println!("[{addr}] Creating room '{room_name}' for player '{player_name}'"); + + let mut h = handle.clone(); + join( + h.run_room(read), + run_player(player_name, addr, handle, ws_stream), + ) + .await; + + if let Err(e) = close_room.send(room_name.clone()).await { + eprintln!("[{room_name}] Failed to close room: {e}"); + } + + return Ok(()); + } + msg => eprintln!("[{addr}] Received illegal message {:?}", msg), + } } println!("[{addr}] Dropping connection"); @@ -25,13 +111,30 @@ fn main() -> Result<(), io::Error> { .parse::() .expect("Invalid address"); + let rooms = Rooms::new(Mutex::new(HashMap::new())); + + let close_room = { + let (tx, mut rx) = unbounded(); + let rooms = rooms.clone(); + smol::spawn(async move { + while let Some(room_name) = rx.next().await { + println!("Closing room {room_name}"); + rooms.lock().unwrap().remove(&room_name); + } + }) + .detach(); + tx + }; + smol::block_on(async { println!("Listening on: {addr}"); let listener = Async::::bind(addr).expect("Could not create listener"); while let Ok((stream, addr)) = listener.accept().await { + let close_room = close_room.clone(); + let rooms = rooms.clone(); smol::spawn(async move { - if let Err(e) = handle_connection(stream, addr).await { + if let Err(e) = handle_connection(rooms, stream, addr, close_room).await { eprintln!("Failed to handle connection from {addr}: {e}"); } }) diff --git a/board-server/src/player.rs b/board-server/src/player.rs new file mode 100644 index 0000000..d0a788e --- /dev/null +++ b/board-server/src/player.rs @@ -0,0 +1,9 @@ +use board_network::protocol::ServerMessage; +use futures::channel::mpsc::UnboundedSender; + +#[derive(Debug)] +pub struct Player { + pub name: String, + pub score: u32, + pub ws: Option>, +} diff --git a/board-server/src/room.rs b/board-server/src/room.rs new file mode 100644 index 0000000..7d981a8 --- /dev/null +++ b/board-server/src/room.rs @@ -0,0 +1,129 @@ +use crate::player::Player; +use board_network::protocol::{ClientMessage, ServerMessage}; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures::StreamExt; +use rand::distributions::{Alphanumeric, DistString}; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +type TaggedClientMessage = (SocketAddr, ClientMessage); + +#[derive(Debug, Default)] +pub struct Room { + pub name: String, + pub connections: HashMap, + pub players: Vec, +} + +impl Room { + pub fn add_player( + &mut self, + addr: SocketAddr, + player_name: String, + tx: UnboundedSender, + ) -> anyhow::Result<()> { + // If the player name matches an existing player, but with a dropped connection, + // then replace this player. + let mut player_index = None; + for (i, p) in self.players.iter().enumerate() { + if p.name == player_name && p.ws.is_none() { + player_index = Some(i); + break; + } + } + + if let Some(i) = player_index { + // Reclaim the player's spot + self.broadcast(ServerMessage::PlayerReconnected(i)); + self.players[i].ws = Some(tx.clone()); + } else { + self.broadcast(ServerMessage::PlayerConnected(player_name.clone())); + player_index = Some(self.players.len()); + + self.players.push(Player { + name: player_name, + score: 0, + ws: Some(tx.clone()), + }); + } + + let player_index = player_index.expect("A player index should have been attributed"); + self.connections.insert(addr, player_index); + + // Send the player the current state of the room + tx.unbounded_send(ServerMessage::JoinedRoom { + room_name: self.name.clone(), + players: self + .players + .iter() + .map(|p| (p.name.clone(), p.score, p.ws.is_some())) + .collect(), + })?; + + Ok(()) + } + + pub fn on_message(&mut self, addr: SocketAddr, msg: ClientMessage) -> bool { + match msg { + ClientMessage::Disconnected => self.on_client_disconnected(addr), + ClientMessage::CreateRoom(_) | ClientMessage::JoinRoom(_, _) => { + eprintln!("[{}] Illegal client message {:?}", self.name, msg); + } + } + !self.connections.is_empty() + } + + fn on_client_disconnected(&mut self, addr: SocketAddr) { + if let Some(p) = self.connections.remove(&addr) { + self.players[p].ws = None; + self.broadcast(ServerMessage::PlayerDisconnected(p)); + } + } + + fn broadcast(&self, s: ServerMessage) { + for c in self.connections.values() { + if let Some(ws) = &self.players[*c].ws { + if let Err(e) = ws.unbounded_send(s.clone()) { + eprintln!( + "[{}] Failed to send broadcast to {}: {}", + self.name, self.players[*c].name, e + ); + } + } + } + } +} + +type RoomPtr = Arc>; + +#[derive(Clone)] +pub struct RoomHandle { + pub write: UnboundedSender, + pub room: RoomPtr, +} + +impl RoomHandle { + pub async fn run_room(&mut self, mut read: UnboundedReceiver) { + while let Some((addr, msg)) = read.next().await { + if !self.room.lock().unwrap().on_message(addr, msg) { + break; + } + } + } +} + +pub type Rooms = HashMap; + +pub fn generate_room_name(rooms: &mut Rooms, room: RoomHandle) -> String { + let mut rng = rand::thread_rng(); + loop { + let name = Alphanumeric.sample_string(&mut rng, 5); + if let Entry::Vacant(v) = rooms.entry(name.clone()) { + room.room.lock().unwrap().name = name.clone(); + v.insert(room); + return name; + } + } +}