原子操作与内存顺序

原子操作能够用来实现无🔓并发

程序运行问题

下面列举了影响代码执行顺序的诸多原因

1. 编译器重排

编译器会尽可能优化代码,在单线程下一般不会出现问题

x = 1;
y = 3;
x = 2;

上面的代码可能被优化为下面的样子

x = 2;
y = 3;

但是在多线程下可能会有问题

2.指令重排

类似于上面的,下面的代码中 y = 200 可能会先于 x = 100 执行,打印出的 x 可能是 0 或 100 例 1:

    int x = 0;     // global variable
    int y = 0;     // global variable

    Thread-1:           Thread-2:
    x = 100;            while (y != 200) {};
    y = 200;            std::cout << x;

例 2:


#![allow(unused)]
fn main() {
初始状态: x = 0, y = 1
线程 1        线程 2
y = 3;      if x == 1 {
x = 1;          y *= 2;
            }
}

这段程序实际上有两种可能的结果:

  • y = 3:线程 2 在线程 1 完成之前检查了 x 的值
  • y = 6:线程 2 在线程 1 完成之后检查了 x 的值
  • y = 2:线程 2 看到了 x = 1,但是没看到 y = 3,接下来用计算结果覆盖了 y = 3

3. CPU Cache

由于一次复制操作涉及到 move(内存 -> 寄存器) add mov(寄存器 -> 内存) 三条汇编指令,在 add 操作完成后,数据还没有被拷贝到内存时,另一个线程可能读取到此时还没有被修改的数据,或者是两个线程同时修改,结果某一个线程修改的结果被另一个线程覆盖

术语

下列术语定义了多线程和单线程中各个变量操作之间的关系 这些关系只是我们期望的,想要确保下面的关系在活动中是正常的,就需要内存顺序了

1. happens-before

先于,或者说 B 能看到 A 操作的结果,AB 分别在两个线程里

2. sequenced-before

同上 A,B 在一个线程内

3. synchronized-with

x 是支持原子操作的变量 A 写入 (store)x,B 读取 (load)x,分别在两个线程内,则 A((store) 就是 synchornized-with B(load) 的

4. inter-thread

跨线程 第三点说到 A:store synchornized-with B: load 那么 A happens-before B 多线程中写入先于读取

内存顺序

1. Relaxed

只保证在同一个线程中满足 Happens-before,这是最宽松的规则,他对编译器和 CPU 不做任何限制,可以乱需,因此下面的例子不保证会成功


#![allow(unused)]
fn main() {
#[test]
fn to_relaxed() {
    let x: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
    let y: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
    let t1 = thread::spawn(move || {
        // 读取 x 存到 y 里
        let r1 = y.load(Ordering::Relaxed);
        x.store(r1, Ordering::Relaxed);
        r1
    });
    let t2 = thread::spawn(move || {
        // 读取 y 存到 x 里
        let r2 = x.load(Ordering::Relaxed); // 下面两行可能会被重排
        y.store(42, Ordering::Relaxed);
        r2
    });

    let r1 = t1.join().unwrap();
    let r2 = t2.join().unwrap();
    // 可能出现 r1 == r2 == 42
}
}

2. Acquire-Release

Release 释放 ,设定内存屏障 (Memory barrier),保证它之前的操作永远在它之前,但是它后面的操作可能被重排到它前面 Acquire 获取 , 设定内存屏障,保证在它之后的访问永远在它之后,但是它之前的操作却有可能被重排到它后面,往往和 Release在不同线程中联合使用 AcqRel: Acquire 和 Release 的结合,同时拥有它们俩提供的保证。比如你要对一个 atomic 自增 1,同时希望该操作之前和之后的读取或写入操作不会被重新排序

这两个操作通常成对使用,对 store 使用 Release,对 load 使用 Acquire 先后顺序具体看下面的代码

fn write_x_then_y() {
    X.store(true, Ordering::Relaxed);
    Y.store(true, Ordering::Release);
}

fn read_y_then_x() {
    while !Y.load(Ordering::Acquire) {}
    if X.load(Ordering::Relaxed) {
        Z.fetch_add(1, Ordering::SeqCst);
    }
}

fn main() {
    let t1 = thread::spawn(move || {
        write_x_then_y();
    });

    let t2 = thread::spawn(move || {
        read_y_then_x();
    });

    t1.join().unwrap();
    t2.join().unwrap();

    assert_ne!(Z.load(Ordering::SeqCst), 0);
}

3. Sequence

顺序一致性,强制所有线程看到一致的原子操作,完全的分界点,SeqCst 就像是 AcqRel 的加强版,它不管原子操作是属于读取还是写入的操作,只要某个线程有用到 SeqCst 的原子操作,线程中该 SeqCst 操作前的数据操作绝对不会被重新排在该 SeqCst 操作之后,且该 SeqCst 操作后的数据操作也绝对不会被重新排在 SeqCst 操作前。下面的例子,只有使用 SeqCst ordering,才能保证 Z 最后的值不为 0

fn write_x() {
    X.store(true, Ordering::SeqCst);    // 1
}

fn write_y() {
    Y.store(true, Ordering::SeqCst);    // 2
}

fn read_x_then_y() {
    while !X.load(Ordering::SeqCst) {}
    if Y.load(Ordering::SeqCst) {       // 3
        Z.fetch_add(1, Ordering::SeqCst);
    }
}

fn read_y_then_x() {
    while !Y.load(Ordering::SeqCst) {}
    if X.load(Ordering::SeqCst) {       // 4
        Z.fetch_add(1, Ordering::SeqCst);
    }
}

fn main() {
        let t1 = thread::spawn(move || {
        write_x();
    });

    let t2 = thread::spawn(move || {
        write_y();
    });

    let t3 = thread::spawn(move || {
        read_x_then_y();
    });

    let t4 = thread::spawn(move || {
        read_y_then_x();
    });

    t1.join().unwrap();
    t2.join().unwrap();
    t3.join().unwrap();
    t4.join().unwrap();

    assert_ne!(Z.load(Ordering::SeqCst), 0);
}

4. Fence

fence 是支持 synchornized-with 的另一种方式,可以和 Acquire-Release 对比一下

例 1:


#![allow(unused)]
fn main() {
fn producer() -> JoinHandle<()> {
    thread::spawn(move || {
        unsafe {
            DATA = 100;                                 // A
        }
        READY.store(true, Ordering::Release);           // B: 内存屏障 ↑
    })
}

fn consumer() -> JoinHandle<()> {
    thread::spawn(move || {
        while !READY.load(Ordering::Acquire) {}         // C: 内存屏障 ↓

        assert_eq!(100, unsafe { DATA });               // D
    })
}
}

例 2:

fn write_x_then_y() {
    X.store(true, Ordering::Relaxed); // 1
    fence(Ordering::Release);         // 2
    Y.store(true, Ordering::Relaxed); // 3
}

fn read_y_then_x() {
    while !Y.load(Ordering::Relaxed) {}  // 4
    fence(Ordering::Acquire);            // 5
    if X.load(Ordering::Relaxed) {       // 6
        Z.fetch_add(1, Ordering::SeqCst);
    }
}

fn main() {
    let t1 = thread::spawn(move || {
        write_x_then_y();
    });

    let t2 = thread::spawn(move || {
        read_y_then_x();
    });

    t1.join().unwrap();
    t2.join().unwrap();

    assert_ne!(Z.load(Ordering::SeqCst), 0);
}

fence 的官方文档 An atomic fence. Depending on the specified order, a fence prevents the compiler and CPU from reordering certain types of memory operations around it. That creates synchronizes-with relationships between it and atomic operations or fences in other threads.

A fence ‘A’ which has (at least) Release ordering semantics, synchronizes with a fence ‘B’ with (at least) Acquire semantics, if and only if there exist operations X and Y, both operating on some atomic object ‘M’ such that A is sequenced before X, Y is synchronized before B and Y observes the change to M. This provides a happens-before dependence between A and B.

    Thread 1                                          Thread 2

fence(Release);      A --------------
x.store(3, Relaxed); X ---------    |
                               |    |
                               |    |
                               -------------> Y  if x.load(Relaxed) == 3 {
                                    |-------> B      fence(Acquire);
                                                     ...
                                                 }

其他

1. Tips

  • 不知道怎么选择时,优先使用 SeqCst,虽然会稍微减慢速度,但是慢一点也比出现错误好
  • 多线程只计数 fetch_add 而不使用该值触发其他逻辑分支的简单使用场景,可以使用 Relaxed 参考 Which std::sync::atomic::Ordering to use?
  • 在多线程环境中要使用 Atomic 需要配合 Arc

2. Atomic 能替代锁吗?

那么原子类型既然这么全能,它可以替代锁吗?答案是不行:

  • 对于复杂的场景下,锁的使用简单粗暴,不容易有坑
  • std::sync::atomic 包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16 等,而锁可以应用于各种类型
  • 在有些情况下,必须使用锁来配合,例如上一章节中使用 Mutex 配合 Condvar

3. Atomic 的应用场景

事实上,Atomic 虽然对于用户不太常用,但是对于高性能库的开发者、标准库开发者都非常常用,它是并发原语的基石,除此之外,还有一些场景适用:

  • 无锁 (lock free) 数据结构
  • 全局变量,例如全局自增 ID,在后续章节会介绍
  • 跨线程计数器,例如可以用于统计指标

引用

资料