自旋锁与原子操作

概述

引用维基百科:

自旋锁是计算机科学用于多线程同步的一种锁,线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。

自旋锁避免了进程上下文的调度开销,因此对于线程只会阻塞很短时间的场合是有效的。因此操作系统的实现在很多地方往往用自旋锁。 Windows操作系统提供的轻型读写锁(SRWLock)内部就用了自旋锁。显然,单核CPU不适于使用自旋锁,这里的单核CPU指的是单核单线程的CPU,因为,在同一时间只有一个线程是处在运行状态,假设运行线程A发现无法获取锁,只能等待解锁,但因为A自身不挂起,所以那个持有锁的线程B没有办法进入运行状态,只能等到操作系统分给A的时间片用完,才能有机会被调度。这种情况下使用自旋锁的代价很高。

获取、释放自旋锁,实际上是读写自旋锁的存储内存或寄存器。因此这种读写操作必须是原子的。通常用test-and-set等原子操作来实现。

自旋锁互斥锁对比

  • 自旋锁spinlock 不会使线程发生切换,互斥锁mutex在获取不到锁的时候线程会sleep。
  • 互斥锁mutex获取锁分为两个阶段,第一阶段在用户态获取锁,如果获取不到,第二阶段则会产生系统调用,进入sleep,当锁可用后再恢复上下文,继续竞争锁。
  • 自旋锁spinlock 优点,没有昂贵的系统调用,一直处于用户态,执行速度快。
  • 自旋锁spinlock 缺点一直占用cpu,而且还会锁总线,锁定状态,其他处理器一直在空转。
  • 互斥锁mutex 优点,不会忙等,得不到锁会sleep。
  • 互斥锁mutex 缺点,sleep后会陷入内核态,恢复起来需要昂贵的系统调用。

从上所属,我们不难得出结论, 自旋锁spinlock和互斥锁mutex的应用场景。 自旋锁spinlock适合临界区特别简短的场景,中间不能有显示或隐式的系统调用,执行速度要非常快。不要在临界区调用read``write``open这样的函数。

使用自旋锁

自旋锁相关api

初始化自旋锁

1
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);

pshared有两个值

PTHREAD_PROCESS_SHARED:该自旋锁可以在多个进程中的线程之间共享。 PTHREAD_PROCESS_PRIVATE: 仅初始化本自旋锁的线程所在的进程内的线程才能够使用该自旋锁。

获取自旋锁

1
int pthread_spin_lock(pthread_spinlock_t *lock);

当一个线程获取自旋锁的的时候,这个锁未被其他线程持有,则获取到锁,如果已被其他线程持有,则一直阻塞空转直到获取到该锁。

非阻塞获取自旋锁

1
int pthread_spin_trylock(pthread_spinlock_t *lock);

释放一个自旋锁

1
int pthread_spin_unlock(pthread_spinlock_t *lock);

销毁一个自旋锁

1
int pthread_spin_destroy(pthread_spinlock_t *lock);

原子操作

原子操作,可以理解为不可再次细分,也不可打断的一组操作。如果说一组操作是原子操作,那么这组操作要么一次成功,要么全部失败。其他的操作是不能插入进来的。 在linux中对原子操作提供了两种形式:

  • 对整数的操作
  • 对单独位的操作

Gcc 4.1.2版本之后,对X86或X86_64支持内置原子操作,不需要提供其他第三方库。 提供的函数api如下:

  • type __sync_fetch_and_add (type *ptr, type value, …)

    将value加到ptr上,结果更新到ptr,并返回操作之前*ptr的值

  • type __sync_fetch_and_sub (type *ptr, type value, …)

    ptr减去value,结果更新到ptr,并返回操作之前*ptr的值

  • type __sync_fetch_and_or (type *ptr, type value, …)

    ptr与value相或,结果更新到ptr, 并返回操作之前*ptr的值

  • type __sync_fetch_and_and (type *ptr, type value, …)

    ptr与value相与,结果更新到ptr,并返回操作之前*ptr的值

  • type __sync_fetch_and_xor (type *ptr, type value, …)

    ptr与value异或,结果更新到ptr,并返回操作之前*ptr的值

  • type __sync_fetch_and_nand (type *ptr, type value, …)

    ptr取反后,与value相与,结果更新到ptr,并返回操作之前*ptr的值

  • type __sync_add_and_fetch (type *ptr, type value, …)

    将value加到ptr上,结果更新到ptr,并返回操作之后新*ptr的值

  • type __sync_sub_and_fetch (type *ptr, type value, …)

    ptr减去value,结果更新到ptr,并返回操作之后新*ptr的值

  • type __sync_or_and_fetch (type *ptr, type value, …)

    ptr与value相或, 结果更新到ptr,并返回操作之后新*ptr的值

  • type __sync_and_and_fetch (type *ptr, type value, …)

    ptr与value相与,结果更新到ptr,并返回操作之后新*ptr的值

  • type __sync_xor_and_fetch (type *ptr, type value, …)

    ptr与value异或,结果更新到ptr,并返回操作之后新*ptr的值

  • type __sync_nand_and_fetch (type *ptr, type value, …)

    ptr取反后,与value相与,结果更新到ptr,并返回操作之后新*ptr的值

  • bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, …)

    比较ptr与oldval的值,如果两者相等,则将newval更新到ptr并返回true

  • type __sync_val_compare_and_swap (type *ptr, type oldval type newval, …)

    比较ptr与oldval的值,如果两者相等,则将newval更新到ptr并返回操作之前*ptr的值

  • __sync_synchronize (…)

    发出完整内存栅栏

  • type __sync_lock_test_and_set (type *ptr, type value, …)

    将value写入ptr,对ptr加锁,并返回操作之前*ptr的值。即,try spinlock语义

  • void __sync_lock_release (type *ptr, …)

    将0写入到ptr,并对ptr解锁。即,unlock spinlock语义

实现一个自旋锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static sw_inline void sw_spinlock(sw_atomic_t *lock)
{
    uint32_t i, n;
    while (1)
    {
        if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
        {
            return;
        }
        if (SW_CPU_NUM > 1)
        {
            for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1)
            {
                for (i = 0; i < n; i++)
                {
                    sw_atomic_cpu_pause();
                }

                if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
                {
                    return;
                }
            }
        }
        swYield();
    }
}

我们来梳理一下swoole的自旋锁流程:

  1. 死循环while,首次判断lock是否为0,如果为零则加锁把lock置为1,加锁成功就返回,失败继续。

    这里要注意sw_atomic_cmp_set 这个函数,这个函数是swoole的封装。

    实际是使用了 #define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set) 这个,__sync_bool_compare_and_swap 是gcc提供的原子操作函数,含义是比较lock与old的值,如果两者相等,则讲set的值写入lock中,并返回true。这是一个原子操作。

  2. 判断cpu数量

    如果cpu是单核则直接swYield主动出让线程执行权,因为只有一个核,同一时间只会执行一段代码,所以不存在空转等待。 如果是多核cpu,则继续执行空转.

  3. 空转中加入特殊代码sw_atomic_cpu_pause() 这个函数定义如下:

    1
    2
    3
    4
    5
    6
    7
    
    #ifdef __arm__
    #define sw_atomic_cpu_pause()             __asm__ __volatile__ ("NOP");
    #elif defined(__x86_64__)
    #define sw_atomic_cpu_pause()             __asm__ __volatile__ ("pause")
    #else
    #define sw_atomic_cpu_pause()
    #endif

    可以看到,在x86_64架构下,执行的是pause 指令。那么pause指令是什么呢?

    PAUSE指令提升了自旋等待循环(spin-wait loop)的性能。当执行一个循环等待时,Intel P4或Intel Xeon处理器会因为检测到一个可能的内存顺序违规(memory order violation)而在退出循环时使性能大幅下降。PAUSE指令给处理器提了个醒:这段代码序列是个循环等待。处理器利用这个提示可以避免在大多数情况下的内存顺序违规,这将大幅提升性能。因为这个原因,所以推荐在循环等待中使用PAUSE指令。

    PAUSE的另一个功能就是降低Intel P4在执行循环等待时的耗电量。Intel P4处理器在循环等待时会执行得非常快,这将导致处理器消耗大量的电力,而在循环中插入一个PAUSE指令会大幅降低处理器的电力消耗。

  4. 空转结束后继续等待锁。

参考资料

0%