In this exercise, we will take our "Connected Mailbox" and make it multi-threaded. A new thread should be spawned for every incoming connection, and that thread should take ownership of the TcpStream
and drive it to completion.
-
spawn threads
-
convert a non-thread-safe type into a thread-safe-type
-
lock a Mutex to access the data within
- A completed "Connected Mailbox" solution
-
Use the
std::thread::spawn
API to start a new thead when your main loop produces a new connection to a client. Thehandle_client
function should be executed within that spawned thread. Note how Rust doesn't let you pass&mut VecDeque<String>
into the spawned thread, both because you have multiple&mut
references (not allowed) and because the thread might live longer than theVecDeque
(which only lives whilst themain()
function is running, andmain()
might quit at any time with an early return or a break out of the connection loop). -
Convert the
VecDeque
into aArc<Mutex<VecDeque>>
(usestd::sync::Mutex
). Change thehandle_client
function to take a&Mutex<VecDeque>
. Clone the Arc handle with.clone()
andmove
that cloned handle into the new thread. Change thehandle_client
function to calllet mut queue = your_mutex.lock().unwrap();
whenever you want to access the queue inside the Mutex. -
Convert the
Arc<Mutex<VecDeque>>
into aMutex<VecDeque>
and introduce scoped threads withstd::thread::scope
. TheMutex<VecDeque>
should be created outside of the scope (ensure it lives longer than any of the scoped threads), but the connection loop should be inside the scope. Changestd::thread::spawn
to bes.spawn
, wheres
is the name of the argument to the scope closure.
At every step (noting that Step 1 won't actually work...), try out your program using a command-line TCP Client: you can either use nc
, or netcat
, or our supplied tools/tcp-client
program.
- Run
cargo clippy
on your codebase. - Run
cargo fmt
on your codebase.
You can just nest the calls to SomeType::new()
...
Solution
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
fn main() {
// This type annotation isn't required if you actually push something into the queue...
let queue_handle: Arc<Mutex<VecDeque<String>>> = Arc::new(Mutex::new(VecDeque::new()));
}
The std::thread::spawn
function takes a closure. Rust will automatically try and borrow any local variables that the closure refers to but that were declared outside the closure. You can put move
in front of the closure bars (e.g. move ||
) to make Rust try and take ownership of variables instead of borrowing them.
You will want to clone the Arc
and move the clone into the thread.
Solution
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
fn main() {
let queue_handle = Arc::new(Mutex::new(VecDeque::new()));
for _ in 0..10 {
// Clone the handle and move it into a new thread
let thread_queue_handle = queue_handle.clone();
std::thread::spawn(move || {
handle_client(&thread_queue_handle);
});
// This is the same, but fancier. It stops you passing the wrong Arc handle
// into the thread.
std::thread::spawn({ // this is a block expression
// This is declared inside the block, so it shadows the one from the
// outer scope.
let queue_handle = queue_handle.clone();
// this is the closure produced by the block expression
move || {
handle_client(&queue_handle);
}
});
}
// This doesn't need to know it's in an Arc, just that it's in a Mutex.
fn handle_client(locked_queue: &Mutex<VecDeque<String>>) {
todo!();
}
}
A value of type Mutex<T>
has a lock()
method, but this method can fail if the Mutex has been poisoned (i.e. a thread panicked whilst holding the lock). We generally don't worry about handling the poisoned case (because one of your threads has already panicked, so the program is in a fairly bad state already), so we just use unwrap()
to make this thread panic as well.
Solution
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
fn main() {
let queue_handle = Arc::new(Mutex::new(VecDeque::new()));
let mut inner_q = queue_handle.lock().unwrap();
inner_q.push_back("Hello".to_string());
println!("{:?}", inner_q.pop_front());
println!("{:?}", inner_q.pop_front());
}
Recall, the purpose of a thread scope is to satisfy the compiler that it is safe for a thread to borrow an item that is on the current function's stack. It does this by ensuring that all threads created with the scope terminate before the thread scope ends (after which, the remainder of the function is executed including perhaps destruction or transfer of the variables that were borrowed).
Use std::thread::scope
to create a scope, and pass it a closure containing the
bulk of your main function. Any variables you want to borrow should be created
before the thread scope is created, but you should wait for incoming connections
inside the thread scope (think about what happens to any spawned threads that
are still executing at the point you try and leave the thread scope).
Solution
use std::collections::VecDeque;
use std::sync::Mutex;
fn main() {
let locked_queue = Mutex::new(VecDeque::new());
std::thread::scope(|s| {
for i in 0..10 {
let locked_queue = &locked_queue;
s.spawn(move || {
let mut inner_q = locked_queue.lock().unwrap();
inner_q.push_back(i.to_string());
println!("Pop {:?}", inner_q.pop_front());
});
}
});
}
If you need it, we have provided a complete solution for this exercise.