目录
第十五章 智能指针与多线程
智能指针(Smart Pointers)是Rust中一种数据结构,它们的行为类似于指针,但拥有额外的元数据和功能。与普通的引用不同,智能指针通常拥有它们所指向的数据。本章将深入探讨Rust中的各种智能指针,以及如何使用它们进行多线程编程。
1. Box<T>:堆上分配
Box<T>是最简单的智能指针,它允许你在堆上存储数据,而在栈上保留指向堆数据的指针。
1.1 基本用法
fn main() { // 在堆上存储一个i32 let b = Box::new(5); println!("b = {}", b); // 自动解引用 // Box允许递归类型 #[derive(Debug)] enum List { Cons(i32, Box<List>), Nil, } let list = List::Cons(1, Box::new(List::Cons(2, Box::new(List::Nil)))); println!("{:?}", list); }
1.2 使用场景
- 当你有一个在编译时未知大小的类型,而需要在确切大小的上下文中使用它
- 当你有大量数据,希望转移所有权时确保数据不被复制
- 当你想拥有一个实现了特定trait的值,而不关心其具体类型
// trait对象 trait Animal { fn speak(&self); } struct Dog; impl Animal for Dog { fn speak(&self) { println!("Woof!"); } } struct Cat; impl Animal for Cat { fn speak(&self) { println!("Meow!"); } } fn main() { let animals: Vec<Box<dyn Animal>> = vec![ Box::new(Dog), Box::new(Cat), ]; for animal in animals { animal.speak(); } }
2. Rc<T>:引用计数
Rc<T>(Reference Counted)用于在程序的多个部分之间共享只读数据。它通过维护一个引用计数来跟踪有多少个所有者。
2.1 基本用法
use std::rc::Rc; fn main() { let data = Rc::new(String::from("共享数据")); println!("引用计数: {}", Rc::strong_count(&data)); // 1 { let data2 = Rc::clone(&data); println!("引用计数: {}", Rc::strong_count(&data)); // 2 println!("data2: {}", data2); } // data2离开作用域,计数减1 println!("引用计数: {}", Rc::strong_count(&data)); // 1 println!("data: {}", data); }
2.2 克隆与引用
Rc::clone不会深拷贝数据,只是增加引用计数。这比其他类型的克隆要便宜得多。
use std::rc::Rc; #[derive(Debug)] struct Node { value: i32, children: Vec<Rc<Node>>, } fn main() { let leaf = Rc::new(Node { value: 3, children: vec![], }); let branch = Rc::new(Node { value: 5, children: vec![Rc::clone(&leaf)], }); // leaf现在有两个所有者:leaf变量和branch.children println!("leaf引用计数: {}", Rc::strong_count(&leaf)); // 2 }
重要限制:Rc<T>不是线程安全的,只能用于单线程场景。
3. RefCell<T>:内部可变性
RefCell<T>提供了内部可变性,允许你在不可变引用的情况下修改数据。它在运行时而不是编译时执行借用规则。
3.1 基本用法
use std::cell::RefCell; fn main() { let data = RefCell::new(5); // 获取可变引用 *data.borrow_mut() += 10; // 获取不可变引用 println!("data: {}", data.borrow()); // 15 }
3.2 运行时借用检查
RefCell在运行时检查借用规则,如果违反会触发panic。
use std::cell::RefCell; fn main() { let data = RefCell::new(5); let _borrow1 = data.borrow(); let _borrow2 = data.borrow(); // 多个不可变借用是允许的 // 下面这行会在运行时panic,因为已经有了不可变借用 // let _borrow_mut = data.borrow_mut(); }
3.3 Rc<RefCell<T>>组合
Rc<RefCell<T»是一个强大的组合,允许多个所有者且每个所有者都可以修改数据。
use std::rc::Rc; use std::cell::RefCell; #[derive(Debug)] struct User { name: String, age: RefCell<u32>, } fn main() { let user = Rc::new(User { name: String::from("Alice"), age: RefCell::new(30), }); let user2 = Rc::clone(&user); // 通过user2修改年龄 *user2.age.borrow_mut() = 31; // 通过user也能看到修改 println!("{:?}", user); // age: 31 }
4. Arc<T>:原子引用计数
Arc<T>(Atomic Reference Counted)是线程安全的Rc<T>版本,使用原子操作来维护引用计数。
4.1 基本用法
use std::sync::Arc; use std::thread; fn main() { let data = Arc::new(String::from("线程间共享数据")); let mut handles = vec![]; for i in 0..5 { let data_clone = Arc::clone(&data); let handle = thread::spawn(move || { println!("线程{}: {}", i, data_clone); }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("最终引用计数: {}", Arc::strong_count(&data)); }
4.2 Arc<Mutex<T>>组合
Arc<Mutex<T»是在多线程间共享可变数据的标准方式。
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter_clone = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter_clone.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("结果: {}", *counter.lock().unwrap()); // 10 }
5. Mutex<T>和RwLock<T>:互斥锁
5.1 Mutex<T>
Mutex(互斥锁)确保同一时间只有一个线程能访问数据。
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 6; } // 锁在这里自动释放 println!("m = {:?}", m); }
死锁风险:
use std::sync::Mutex; fn main() { let m = Mutex::new(5); let num = m.lock().unwrap(); // 如果在持有锁时panic或发生死锁,其他线程会永远等待 // let num2 = m.lock().unwrap(); // 这会导致死锁! }
5.2 RwLock<T>
RwLock(读写锁)允许多个读或一个写。
use std::sync::RwLock; fn main() { let lock = RwLock::new(5); // 允许多个读锁 { let r1 = lock.read().unwrap(); let r2 = lock.read().unwrap(); println!("读: {} {}", r1, r2); } // 只允许一个写锁 { let mut w = lock.write().unwrap(); *w += 1; println!("写后: {}", w); } }
6. 多线程基础
6.1 创建线程
use std::thread; use std::time::Duration; fn main() { let handle = thread::spawn(|| { for i in 1..10 { println!("子线程: {}", i); thread::sleep(Duration::from_millis(1)); } }); for i in 1..5 { println!("主线程: {}", i); thread::sleep(Duration::from_millis(1)); } // 等待子线程完成 handle.join().unwrap(); }
6.2 线程与闭包
use std::thread; fn main() { let v = vec![1, 2, 3]; let handle = thread::spawn(move || { // move关键字转移v的所有权到闭包 println!("向量: {:?}", v); }); handle.join().unwrap(); // 这里不能再使用v了 // println!("{:?}", v); // 编译错误 }
7. 使用通道进行消息传递
Rust使用mpsc(multiple producer, single consumer)通道进行线程间通信。
7.1 基本通道
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("来自子线程的消息"); tx.send(val).unwrap(); // val的所有权已经转移,不能再使用 }); let received = rx.recv().unwrap(); println!("收到: {}", received); }
7.2 多个生产者
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn(move || { let vals = vec!["线程1: 你好", "线程1: 来自", "线程1: 子线程"]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec!["线程2: 更多", "线程2: 消息", "线程2: 给你"]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("收到: {}", received); } }
8. 线程同步原语
8.1 Condvar:条件变量
use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; fn main() { let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair2 = Arc::clone(&pair); thread::spawn(move || { thread::sleep(Duration::from_secs(2)); let (lock, cvar) = &*pair2; let mut started = lock.lock().unwrap(); *started = true; cvar.notify_one(); println!("子线程: 通知主线程"); }); let (lock, cvar) = &*pair; let mut started = lock.lock().unwrap(); while !*started { println!("主线程: 等待中..."); started = cvar.wait(started).unwrap(); } println!("主线程: 被唤醒!"); }
8.2 Barrier:屏障
use std::sync::Barrier; use std::thread; fn main() { let barrier = Barrier::new(3); let mut handles = vec![]; for i in 0..3 { let b = barrier.clone(); handles.push(thread::spawn(move || { println!("线程{}: 到达屏障前", i); b.wait(); println!("线程{}: 通过屏障!", i); })); } for h in handles { h.join().unwrap(); } }
9. 原子类型
Atomic类型提供低级别的原子操作,适合高性能计数器等场景。
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; fn main() { let counter = Arc::new(AtomicUsize::new(0)); let mut handles = vec![]; for _ in 0..10 { let c = Arc::clone(&counter); handles.push(thread::spawn(move || { for _ in 0..1000 { c.fetch_add(1, Ordering::SeqCst); } })); } for h in handles { h.join().unwrap(); } println!("计数: {}", counter.load(Ordering::SeqCst)); // 10000 }
内存排序:
- Relaxed:最弱约束,只保证原子性
- Acquire/Release:用于线程间同步
- SeqCst:最强约束,全局顺序一致性
10. 线程池
手动创建线程有开销,线程池可以复用线程。
use std::sync::mpsc; use std::sync::{Arc, Mutex}; use std::thread; type Job = Box<dyn FnOnce() + Send + 'static>; struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } impl ThreadPool { fn new(size: usize) -> ThreadPool { let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} 开始执行任务", id); job(); println!("Worker {} 任务完成", id); } }); Worker { id, thread } } } fn main() { let pool = ThreadPool::new(4); for i in 0..8 { pool.execute(move || { println!("执行任务 {}", i); thread::sleep(std::time::Duration::from_millis(100)); }); } thread::sleep(std::time::Duration::from_secs(2)); }
11. Send和Sync trait
Rust的并发安全依赖于两个核心trait:
11.1 Send trait
Send标记类型可以安全地在线程间转移所有权。大多数类型都实现了Send,但Rc<T>没有。
11.2 Sync trait
Sync标记类型可以安全地被多个线程引用(即&T是Send的)。Arc<T>实现了Sync,RefCell<T>和Cell<T>没有。
use std::rc::Rc; use std::sync::Arc; use std::thread; fn main() { // Rc不能跨线程 // let rc = Rc::new(5); // thread::spawn(move || { // println!("{}", rc); // 编译错误 // }); // Arc可以跨线程 let arc = Arc::new(5); let arc_clone = Arc::clone(&arc); thread::spawn(move || { println!("{}", arc_clone); // 正常 }).join().unwrap(); }
12. 循环引用与内存泄漏
12.1 Rc导致的循环引用
use std::rc::Rc; use std::cell::RefCell; #[derive(Debug)] struct Node { value: i32, parent: RefCell<Option<Rc<Node>>>, children: RefCell<Vec<Rc<Node>>>, } fn main() { let leaf = Rc::new(Node { value: 3, parent: RefCell::new(None), children: RefCell::new(vec![]), }); let branch = Rc::new(Node { value: 5, parent: RefCell::new(None), children: RefCell::new(vec![Rc::clone(&leaf)]), }); *leaf.parent.borrow_mut() = Some(Rc::clone(&branch)); // 现在leaf和branch互相引用,形成循环引用 // 即使它们离开作用域,引用计数也不会归零,导致内存泄漏 println!("branch引用计数: {}", Rc::strong_count(&branch)); println!("leaf引用计数: {}", Rc::strong_count(&leaf)); }
12.2 使用Weak<T>解决
Weak<T>是一种不增加引用计数的智能指针,用于打破循环引用。
use std::rc::{Rc, Weak}; use std::cell::RefCell; #[derive(Debug)] struct Node { value: i32, parent: RefCell<Weak<Node>>, children: RefCell<Vec<Rc<Node>>>, } fn main() { let leaf = Rc::new(Node { value: 3, parent: RefCell::new(Weak::new()), children: RefCell::new(vec![]), }); println!("leaf创建后强引用计数: {}", Rc::strong_count(&leaf)); println!("leaf创建后弱引用计数: {}", Rc::weak_count(&leaf)); { let branch = Rc::new(Node { value: 5, parent: RefCell::new(Weak::new()), children: RefCell::new(vec![Rc::clone(&leaf)]), }); *leaf.parent.borrow_mut() = Rc::downgrade(&branch); println!("branch强引用计数: {}", Rc::strong_count(&branch)); println!("branch弱引用计数: {}", Rc::weak_count(&branch)); println!("leaf强引用计数: {}", Rc::strong_count(&leaf)); // 通过Weak获取父节点 if let Some(parent) = leaf.parent.borrow().upgrade() { println!("leaf的父节点值: {}", parent.value); } } // branch在这里被释放 println!("branch离开作用域后leaf父节点: {:?}", leaf.parent.borrow().upgrade()); println!("leaf强引用计数: {}", Rc::strong_count(&leaf)); }
12.3 Arc<Weak<T>>用于多线程
在多线程环境中使用Weak需要配合Arc:
use std::sync::{Arc, Weak}; use std::thread; fn main() { let strong = Arc::new(5); let weak: Weak<i32> = Arc::downgrade(&strong); let handle = thread::spawn(move || { // 尝试升级Weak指针 if let Some(strong_ref) = weak.upgrade() { println!("子线程获取值: {}", strong_ref); } else { println!("值已经被释放"); } }); handle.join().unwrap(); println!("主线程: {}", strong); }
13. 实践建议
- 优先使用消息传递:channel通常比共享内存更容易正确使用
- 最小化锁的持有范围:尽快释放锁以减少 contention
- 避免在持有锁时进行I/O:这会导致性能问题和死锁风险
- 使用 rayon 进行数据并行:对于并行计算任务,rayon crate比手动管理线程更安全高效
- 使用 parking_lot:提供更高效的Mutex和RwLock实现
- 使用 crossbeam:提供无锁数据结构和更灵活的channel
// 使用rayon进行并行计算 use rayon::prelude::*; fn main() { let numbers: Vec<i32> = (1..=100).collect(); // 并行计算平方和 let sum: i32 = numbers.par_iter() .map(|x| x * x) .sum(); println!("平方和: {}", sum); }
小结
本章介绍了Rust中的智能指针和多线程编程:
- Box<T>:堆分配,用于递归类型和trait对象
- Rc<T>:单线程引用计数
- Arc<T>:多线程原子引用计数
- RefCell<T>:运行时借用检查的内部可变性
- Mutex<T>/RwLock<T>:线程同步原语
- Weak<T>:弱引用,用于打破循环引用
- Send/Sync:Rust并发安全的基石
Rust的所有权系统和类型系统确保了并发编程的内存安全,在编译期就能捕获数据竞争等错误。虽然这些概念初看起来复杂,但它们提供了强大的安全保障,让并发编程变得更加可靠。
