目录

第十五章 智能指针与多线程

智能指针(Smart Pointers)是Rust中一种数据结构,它们的行为类似于指针,但拥有额外的元数据和功能。与普通的引用不同,智能指针通常拥有它们所指向的数据。本章将深入探讨Rust中的各种智能指针,以及如何使用它们进行多线程编程。

1. Box<T>:堆上分配

Box<T>是最简单的智能指针,它允许你在堆上存储数据,而在栈上保留指向堆数据的指针。

1.1 基本用法

fn main() {
    // 在堆上存储一个i32
    let b = Box::new(5);
    println!("b = {}", b);  // 自动解引用
 
    // Box允许递归类型
    #[derive(Debug)]
    enum List {
        Cons(i32, Box<List>),
        Nil,
    }
 
    let list = List::Cons(1, Box::new(List::Cons(2, Box::new(List::Nil))));
    println!("{:?}", list);
}

1.2 使用场景

// trait对象
trait Animal {
    fn speak(&self);
}
 
struct Dog;
impl Animal for Dog {
    fn speak(&self) { println!("Woof!"); }
}
 
struct Cat;
impl Animal for Cat {
    fn speak(&self) { println!("Meow!"); }
}
 
fn main() {
    let animals: Vec<Box<dyn Animal>> = vec![
        Box::new(Dog),
        Box::new(Cat),
    ];
 
    for animal in animals {
        animal.speak();
    }
}

2. Rc<T>:引用计数

Rc<T>(Reference Counted)用于在程序的多个部分之间共享只读数据。它通过维护一个引用计数来跟踪有多少个所有者。

2.1 基本用法

use std::rc::Rc;
 
fn main() {
    let data = Rc::new(String::from("共享数据"));
 
    println!("引用计数: {}", Rc::strong_count(&data));  // 1
 
    {
        let data2 = Rc::clone(&data);
        println!("引用计数: {}", Rc::strong_count(&data));  // 2
        println!("data2: {}", data2);
    }  // data2离开作用域,计数减1
 
    println!("引用计数: {}", Rc::strong_count(&data));  // 1
    println!("data: {}", data);
}

2.2 克隆与引用

Rc::clone不会深拷贝数据,只是增加引用计数。这比其他类型的克隆要便宜得多。

use std::rc::Rc;
 
#[derive(Debug)]
struct Node {
    value: i32,
    children: Vec<Rc<Node>>,
}
 
fn main() {
    let leaf = Rc::new(Node {
        value: 3,
        children: vec![],
    });
 
    let branch = Rc::new(Node {
        value: 5,
        children: vec![Rc::clone(&leaf)],
    });
 
    // leaf现在有两个所有者:leaf变量和branch.children
    println!("leaf引用计数: {}", Rc::strong_count(&leaf));  // 2
}

重要限制:Rc<T>不是线程安全的,只能用于单线程场景。

3. RefCell<T>:内部可变性

RefCell<T>提供了内部可变性,允许你在不可变引用的情况下修改数据。它在运行时而不是编译时执行借用规则。

3.1 基本用法

use std::cell::RefCell;
 
fn main() {
    let data = RefCell::new(5);
 
    // 获取可变引用
    *data.borrow_mut() += 10;
 
    // 获取不可变引用
    println!("data: {}", data.borrow());  // 15
}

3.2 运行时借用检查

RefCell在运行时检查借用规则,如果违反会触发panic。

use std::cell::RefCell;
 
fn main() {
    let data = RefCell::new(5);
 
    let _borrow1 = data.borrow();
    let _borrow2 = data.borrow();  // 多个不可变借用是允许的
 
    // 下面这行会在运行时panic,因为已经有了不可变借用
    // let _borrow_mut = data.borrow_mut();
}

3.3 Rc<RefCell<T>>组合

Rc<RefCell<T»是一个强大的组合,允许多个所有者且每个所有者都可以修改数据。

use std::rc::Rc;
use std::cell::RefCell;
 
#[derive(Debug)]
struct User {
    name: String,
    age: RefCell<u32>,
}
 
fn main() {
    let user = Rc::new(User {
        name: String::from("Alice"),
        age: RefCell::new(30),
    });
 
    let user2 = Rc::clone(&user);
 
    // 通过user2修改年龄
    *user2.age.borrow_mut() = 31;
 
    // 通过user也能看到修改
    println!("{:?}", user);  // age: 31
}

4. Arc<T>:原子引用计数

Arc<T>(Atomic Reference Counted)是线程安全的Rc<T>版本,使用原子操作来维护引用计数。

4.1 基本用法

use std::sync::Arc;
use std::thread;
 
fn main() {
    let data = Arc::new(String::from("线程间共享数据"));
    let mut handles = vec![];
 
    for i in 0..5 {
        let data_clone = Arc::clone(&data);
        let handle = thread::spawn(move || {
            println!("线程{}: {}", i, data_clone);
        });
        handles.push(handle);
    }
 
    for handle in handles {
        handle.join().unwrap();
    }
 
    println!("最终引用计数: {}", Arc::strong_count(&data));
}

4.2 Arc<Mutex<T>>组合

Arc<Mutex<T»是在多线程间共享可变数据的标准方式。

use std::sync::{Arc, Mutex};
use std::thread;
 
fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
 
    for _ in 0..10 {
        let counter_clone = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter_clone.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
 
    for handle in handles {
        handle.join().unwrap();
    }
 
    println!("结果: {}", *counter.lock().unwrap());  // 10
}

5. Mutex<T>和RwLock<T>:互斥锁

5.1 Mutex<T>

Mutex(互斥锁)确保同一时间只有一个线程能访问数据。

use std::sync::Mutex;
 
fn main() {
    let m = Mutex::new(5);
 
    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }  // 锁在这里自动释放
 
    println!("m = {:?}", m);
}

死锁风险

use std::sync::Mutex;
 
fn main() {
    let m = Mutex::new(5);
 
    let num = m.lock().unwrap();
    // 如果在持有锁时panic或发生死锁,其他线程会永远等待
    // let num2 = m.lock().unwrap();  // 这会导致死锁!
}

5.2 RwLock<T>

RwLock(读写锁)允许多个读或一个写。

use std::sync::RwLock;
 
fn main() {
    let lock = RwLock::new(5);
 
    // 允许多个读锁
    {
        let r1 = lock.read().unwrap();
        let r2 = lock.read().unwrap();
        println!("读: {} {}", r1, r2);
    }
 
    // 只允许一个写锁
    {
        let mut w = lock.write().unwrap();
        *w += 1;
        println!("写后: {}", w);
    }
}

6. 多线程基础

6.1 创建线程

use std::thread;
use std::time::Duration;
 
fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("子线程: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
 
    for i in 1..5 {
        println!("主线程: {}", i);
        thread::sleep(Duration::from_millis(1));
    }
 
    // 等待子线程完成
    handle.join().unwrap();
}

6.2 线程与闭包

use std::thread;
 
fn main() {
    let v = vec![1, 2, 3];
 
    let handle = thread::spawn(move || {
        // move关键字转移v的所有权到闭包
        println!("向量: {:?}", v);
    });
 
    handle.join().unwrap();
 
    // 这里不能再使用v了
    // println!("{:?}", v);  // 编译错误
}

7. 使用通道进行消息传递

Rust使用mpsc(multiple producer, single consumer)通道进行线程间通信。

7.1 基本通道

use std::sync::mpsc;
use std::thread;
 
fn main() {
    let (tx, rx) = mpsc::channel();
 
    thread::spawn(move || {
        let val = String::from("来自子线程的消息");
        tx.send(val).unwrap();
        // val的所有权已经转移,不能再使用
    });
 
    let received = rx.recv().unwrap();
    println!("收到: {}", received);
}

7.2 多个生产者

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::channel();
 
    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec!["线程1: 你好", "线程1: 来自", "线程1: 子线程"];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
 
    thread::spawn(move || {
        let vals = vec!["线程2: 更多", "线程2: 消息", "线程2: 给你"];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
 
    for received in rx {
        println!("收到: {}", received);
    }
}

8. 线程同步原语

8.1 Condvar:条件变量

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
 
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = Arc::clone(&pair);
 
    thread::spawn(move || {
        thread::sleep(Duration::from_secs(2));
        let (lock, cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        *started = true;
        cvar.notify_one();
        println!("子线程: 通知主线程");
    });
 
    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        println!("主线程: 等待中...");
        started = cvar.wait(started).unwrap();
    }
    println!("主线程: 被唤醒!");
}

8.2 Barrier:屏障

use std::sync::Barrier;
use std::thread;
 
fn main() {
    let barrier = Barrier::new(3);
    let mut handles = vec![];
 
    for i in 0..3 {
        let b = barrier.clone();
        handles.push(thread::spawn(move || {
            println!("线程{}: 到达屏障前", i);
            b.wait();
            println!("线程{}: 通过屏障!", i);
        }));
    }
 
    for h in handles {
        h.join().unwrap();
    }
}

9. 原子类型

Atomic类型提供低级别的原子操作,适合高性能计数器等场景。

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
 
fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];
 
    for _ in 0..10 {
        let c = Arc::clone(&counter);
        handles.push(thread::spawn(move || {
            for _ in 0..1000 {
                c.fetch_add(1, Ordering::SeqCst);
            }
        }));
    }
 
    for h in handles {
        h.join().unwrap();
    }
 
    println!("计数: {}", counter.load(Ordering::SeqCst));  // 10000
}

内存排序

10. 线程池

手动创建线程有开销,线程池可以复用线程。

use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
 
type Job = Box<dyn FnOnce() + Send + 'static>;
 
struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
 
impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
 
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
 
        ThreadPool { workers, sender }
    }
 
    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}
 
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
 
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();
                println!("Worker {} 开始执行任务", id);
                job();
                println!("Worker {} 任务完成", id);
            }
        });
        Worker { id, thread }
    }
}
 
fn main() {
    let pool = ThreadPool::new(4);
 
    for i in 0..8 {
        pool.execute(move || {
            println!("执行任务 {}", i);
            thread::sleep(std::time::Duration::from_millis(100));
        });
    }
 
    thread::sleep(std::time::Duration::from_secs(2));
}

11. Send和Sync trait

Rust的并发安全依赖于两个核心trait:

11.1 Send trait

Send标记类型可以安全地在线程间转移所有权。大多数类型都实现了Send,但Rc<T>没有。

11.2 Sync trait

Sync标记类型可以安全地被多个线程引用(即&T是Send的)。Arc<T>实现了Sync,RefCell<T>和Cell<T>没有。

use std::rc::Rc;
use std::sync::Arc;
use std::thread;
 
fn main() {
    // Rc不能跨线程
    // let rc = Rc::new(5);
    // thread::spawn(move || {
    //     println!("{}", rc);  // 编译错误
    // });
 
    // Arc可以跨线程
    let arc = Arc::new(5);
    let arc_clone = Arc::clone(&arc);
    thread::spawn(move || {
        println!("{}", arc_clone);  // 正常
    }).join().unwrap();
}

12. 循环引用与内存泄漏

12.1 Rc导致的循环引用

use std::rc::Rc;
use std::cell::RefCell;
 
#[derive(Debug)]
struct Node {
    value: i32,
    parent: RefCell<Option<Rc<Node>>>,
    children: RefCell<Vec<Rc<Node>>>,
}
 
fn main() {
    let leaf = Rc::new(Node {
        value: 3,
        parent: RefCell::new(None),
        children: RefCell::new(vec![]),
    });
 
    let branch = Rc::new(Node {
        value: 5,
        parent: RefCell::new(None),
        children: RefCell::new(vec![Rc::clone(&leaf)]),
    });
 
    *leaf.parent.borrow_mut() = Some(Rc::clone(&branch));
 
    // 现在leaf和branch互相引用,形成循环引用
    // 即使它们离开作用域,引用计数也不会归零,导致内存泄漏
    println!("branch引用计数: {}", Rc::strong_count(&branch));
    println!("leaf引用计数: {}", Rc::strong_count(&leaf));
}

12.2 使用Weak<T>解决

Weak<T>是一种不增加引用计数的智能指针,用于打破循环引用。

use std::rc::{Rc, Weak};
use std::cell::RefCell;
 
#[derive(Debug)]
struct Node {
    value: i32,
    parent: RefCell<Weak<Node>>,
    children: RefCell<Vec<Rc<Node>>>,
}
 
fn main() {
    let leaf = Rc::new(Node {
        value: 3,
        parent: RefCell::new(Weak::new()),
        children: RefCell::new(vec![]),
    });
 
    println!("leaf创建后强引用计数: {}", Rc::strong_count(&leaf));
    println!("leaf创建后弱引用计数: {}", Rc::weak_count(&leaf));
 
    {
        let branch = Rc::new(Node {
            value: 5,
            parent: RefCell::new(Weak::new()),
            children: RefCell::new(vec![Rc::clone(&leaf)]),
        });
 
        *leaf.parent.borrow_mut() = Rc::downgrade(&branch);
 
        println!("branch强引用计数: {}", Rc::strong_count(&branch));
        println!("branch弱引用计数: {}", Rc::weak_count(&branch));
        println!("leaf强引用计数: {}", Rc::strong_count(&leaf));
 
        // 通过Weak获取父节点
        if let Some(parent) = leaf.parent.borrow().upgrade() {
            println!("leaf的父节点值: {}", parent.value);
        }
    }  // branch在这里被释放
 
    println!("branch离开作用域后leaf父节点: {:?}", leaf.parent.borrow().upgrade());
    println!("leaf强引用计数: {}", Rc::strong_count(&leaf));
}

12.3 Arc<Weak<T>>用于多线程

在多线程环境中使用Weak需要配合Arc:

use std::sync::{Arc, Weak};
use std::thread;
 
fn main() {
    let strong = Arc::new(5);
    let weak: Weak<i32> = Arc::downgrade(&strong);
 
    let handle = thread::spawn(move || {
        // 尝试升级Weak指针
        if let Some(strong_ref) = weak.upgrade() {
            println!("子线程获取值: {}", strong_ref);
        } else {
            println!("值已经被释放");
        }
    });
 
    handle.join().unwrap();
    println!("主线程: {}", strong);
}

13. 实践建议

  1. 优先使用消息传递:channel通常比共享内存更容易正确使用
  2. 最小化锁的持有范围:尽快释放锁以减少 contention
  3. 避免在持有锁时进行I/O:这会导致性能问题和死锁风险
  4. 使用 rayon 进行数据并行:对于并行计算任务,rayon crate比手动管理线程更安全高效
  5. 使用 parking_lot:提供更高效的Mutex和RwLock实现
  6. 使用 crossbeam:提供无锁数据结构和更灵活的channel
// 使用rayon进行并行计算
use rayon::prelude::*;
 
fn main() {
    let numbers: Vec<i32> = (1..=100).collect();
 
    // 并行计算平方和
    let sum: i32 = numbers.par_iter()
        .map(|x| x * x)
        .sum();
 
    println!("平方和: {}", sum);
}

小结

本章介绍了Rust中的智能指针和多线程编程:

Rust的所有权系统和类型系统确保了并发编程的内存安全,在编译期就能捕获数据竞争等错误。虽然这些概念初看起来复杂,但它们提供了强大的安全保障,让并发编程变得更加可靠。