Rust Fearless Concurrency
- Call the
std::thread::spawn
function and pass it a closure to Create a new thread.- All spawned threads are shut down, when the main thread completes.
- Call the
join
method on it, will wait for its thread to finish.
std::mpsc::channel
function returns a tuple, the first element of which is the transmitter and the second element is the receiver.- Transmitter
send
method takes ownership of its parameter.- Receiver
recv
method will block the main thread’s execution and wait until a value is sent down the channel.- Receiver
try_recv
method doesn’t block, but will instead return aResult<T, E>
immediately.- Receiver can be used as an iterator.
Mutex<T>
is a smart pointer.Mutex<T>
provides interior mutabilityArc<T>
is a type likeRc<T>
that is safe to use in concurrent situations.- Call to
lock
returns a smart pointer calledMutexGuard
.- The lock will automatically be released when a
MutexGuard
goes out of scope.- Thread safety comes with a performance penalty.
1. Using Threads to Run Code Simultaneously
In most current operating systems, an executed program’s code is run in a process, and the operating system will manage multiple processes at once. Within a program, you can also have independent parts that run simultaneously. The features that run these independent parts are called threads.
Splitting the computation in your program into multiple threads to run multiple tasks at the same time can improve performance, but it also adds complexity. Because threads can run simultaneously, there’s no inherent guarantee about the order in which parts of your code on different threads will run. This can lead to problems, such as:
- Race conditions, where threads are accessing data or resources in an inconsistent order
- Deadlocks, where two threads are waiting for each other, preventing both threads from continuing
- Bugs that happen only in certain situations and are hard to reproduce and fix reliably
Creating a New Thread with spawn
Call the thread::spawn
function and pass it a closure.
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
Will look similar to the following:
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
When the main thread of a Rust program completes, all spawned threads are shut down, whether or not they have finished running.
Waiting for All Threads to Finish Using join Handles
Call join
to make sure the spawned thread finishes before main
exits.
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap();
}
The two threads continue alternating, but the main thread waits because of the call to handle.join()
and does not end until the spawned thread is finished.
Will look similar to the following:
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
When instead move handle.join()
before the for
loop in main
.
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
handle.join().unwrap();
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
The main thread will wait for the spawned thread to finish and then run its for
loop, so the output won’t be interleaved anymore.
Will look similar to the following:
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!
Using move Closures with Threads
Use the move
keyword with closures passed to thread::spawn
because the closure will then take ownership of the values it uses from the environment, thus transferring ownership of those values from one thread to another.
// This code does not compile
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| { // error: may outlive borrowed value `v`
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
}
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
}
2. Using Message Passing to Transfer Data Between Threads
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
recv
will block the main thread’s execution and wait until a value is sent down the channel. Once a value is sent, recv
will return it in a Result<T, E>
. When the transmitter closes, recv
will return an error to signal that no more values will be coming.
try_recv
method doesn’t block, but will instead return a Result<T, E>
immediately: an Ok
value holding a message if one is available and an Err
value if there aren’t any messages this time. Using try_recv
is useful if this thread has other work to do while waiting for messages: we could write a loop that calls try_recv
every so often, handles a message if one is available, and otherwise does other work for a little while until checking again.
Channels and Ownership Transference
// This code does not compile
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val); // error: borrow of moved value: `val`
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
send
takes ownership of its parameter, and when the value is moved, the receiver takes ownership of it.
Sending Multiple Values and Seeing the Receiver Waiting
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
In the main thread, we’re not calling the recv
function explicitly anymore: instead, we’re treating rx
as an iterator. For each value received, we’re printing it. When the channel is closed, iteration will end.
Got: hi
Got: from
Got: the
Got: thread
Creating Multiple Producers by Cloning the Transmitter
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
// --snip--
}
Call clone
on the transmitter. This will give us a new transmitter.
Will look similar to the following:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
Shared-State Concurrency
Using Mutexes to Allow Access to Data from One Thread at a Time
Mutex is an abbreviation for mutual exclusion, as in, a mutex allows only one thread to access some data at any given time. To access the data in a mutex, a thread must first signal that it wants access by asking to acquire the mutex’s lock. The lock is a data structure that is part of the mutex that keeps track of who currently has exclusive access to the data. Therefore, the mutex is described as guarding the data it holds via the locking system.
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
Use the lock
method to acquire the lock. This call will block the current thread so it can’t do any work until it’s our turn to have the lock.
The call to lock
would fail if another thread holding the lock panicked. In that case, no one would ever be able to get the lock, so we’ve chosen to unwrap
and have this thread panic if we’re in that situation.
Mutex<T>
is a smart pointer. More accurately, the call to lock
returns a smart pointer called MutexGuard
, wrapped in a LockResult
that we handled with the call to unwrap
. The MutexGuard
smart pointer implements Deref
to point at our inner data; the smart pointer also has a Drop
implementation that releases the lock automatically when a MutexGuard
goes out of scope, which happens at the end of the inner scope.
Sharing a Mutex Between Multiple Threads
// This code does not compile
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || { // value moved into closure here, in previous iteration of loop
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Arc<T>
is a type like Rc<T>
that is safe to use in concurrent situations.
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Mutex<T>
provides interior mutability.
Result: 10