前言
swoole采用的是多进程模型,并且在master
进程中reactor
用的是多线程模型,那么他们之间怎么保证数据正确的同步和更新呢?
swoole对php使用层也提供了进程间锁
与进程间无锁计数器
,让php开发者能够很方便的实现一些需要多进程间同步的功能,那么它是怎么做到的呢?
这篇文章全篇帮你解答,锁与信号
。
swoole中的锁与信号主要使用的是pthread系列函数来实现,也有自己实现的互斥锁。
锁的类型有很多,各有各的应用场景,我们只有深刻的理解了它们,才能更好的使用。
锁的类型:互斥锁
,读写锁
,文件锁
,自旋锁
,原子锁
,信号量
.
这篇文章的内容会比较多,有很多前置的知识需要学习,我列出来了我写的一系列文章,大家可以去学习。当然,如果你已经了解了这些前置知识,可以直接看后续的内容。
-
swoole的使用文档
-
锁的原理相关基础
PHP中如何应用这些锁
在PHP中要使用Swoole提供的锁非常简单。
只需要:$lock = new Swoole\Lock(SWOOLE_MUTEX);
就可以获得一个互斥锁
.
Swoole提供了五种类型的锁。
锁类型 |
说明 |
SWOOLE_FILELOCK |
文件锁 |
SWOOLE_RWLOCK |
读写锁 |
SWOOLE_SEM |
信号量 |
SWOOLE_MUTEX |
互斥锁 |
SWOOLE_SPINLOCK |
自旋锁 |
具体使用情况可以参考swoole的文档,进程间锁Lock.
如果你看完文档或者已经理解了swoole提供的锁的使用方法,那么我们继续。
讲到swoole锁的实现,需要明确两种类型的锁。
- swoole提供给PHP开发者使用的锁。这个锁就是以上提供的五种锁。
- swoole内部使用锁。swoole内部使用的锁是自旋锁。
那么接下来我们深入源码去理解这些锁的实现吧!
Swoole中的锁
一、swLock数据结构
理解swoole的锁,必须先了解swLock
这个结构体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
typedef struct _swLock
{
int type;
union
{
swMutex mutex;
#ifndef _WIN32
#ifdef HAVE_RWLOCK
swRWLock rwlock;
#endif
#ifdef HAVE_SPINLOCK
swSpinLock spinlock;
#endif
swFileLock filelock;
swSem sem;
swAtomicLock atomlock;
#endif
} object;
int (*lock_rd)(struct _swLock *);
int (*lock)(struct _swLock *);
int (*unlock)(struct _swLock *);
int (*trylock_rd)(struct _swLock *);
int (*trylock)(struct _swLock *);
int (*free)(struct _swLock *);
} swLock;
|
在swoole中,无论哪种锁,它的数据结构都是swLock
,它由几个关键的结构构成。
二、互斥锁
源码路径:
数据结构:
1
2
3
4
5
|
typedef struct _swMutex
{
pthread_mutex_t _lock;
pthread_mutexattr_t attr;
} swMutex;
|
互斥锁是最常用的进程/线程锁,swMutex
的基础是基于pthread_mutex
系列函数做了一些封装,所以实现起来比较简单,大家只需要看我前言里面提供的互斥锁与条件变量
文章,学好以后在来看这里的代码就比较简单。
swMutex
结构体比较简单,只有_lock
和attr
两个成员变量。
其中_lock
是互斥锁本身,attr
是该互斥锁的设置属性。具体属性值参考, 互斥锁属性详解。
互斥锁的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
int swMutex_create(swLock *lock, int use_in_process)
{
int ret;
bzero(lock, sizeof(swLock));
lock->type = SW_MUTEX;
pthread_mutexattr_init(&lock->object.mutex.attr);
if (use_in_process == 1)
{
pthread_mutexattr_setpshared(&lock->object.mutex.attr, PTHREAD_PROCESS_SHARED);
}
if ((ret = pthread_mutex_init(&lock->object.mutex._lock, &lock->object.mutex.attr)) < 0)
{
return SW_ERR;
}
lock->lock = swMutex_lock;
lock->unlock = swMutex_unlock;
lock->trylock = swMutex_trylock;
lock->free = swMutex_free;
return SW_OK;
}
|
互斥锁的创建其实就是pthread_mutex
提供的互斥锁初始化,在初始化互斥锁之前,需要先初始化互斥锁的属性,pthread_mutexattr_init
,并且设置互斥锁是否需要在进程间共享,
PTHREAD_PROCESS_SHARED
表示需要共享。
之后设置各个函数指针。
互斥锁加锁
1
2
3
4
|
static int swMutex_lock(swLock *lock)
{
return pthread_mutex_lock(&lock->object.mutex._lock);
}
|
互斥锁解锁
1
2
3
4
|
static int swMutex_unlock(swLock *lock)
{
return pthread_mutex_unlock(&lock->object.mutex._lock);
}
|
互斥锁尝试加锁
1
2
3
4
|
static int swMutex_trylock(swLock *lock)
{
return pthread_mutex_trylock(&lock->object.mutex._lock);
}
|
互斥锁加锁带超时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
#ifdef HAVE_MUTEX_TIMEDLOCK
int swMutex_lockwait(swLock *lock, int timeout_msec)
{
struct timespec timeo;
timeo.tv_sec = timeout_msec / 1000;
timeo.tv_nsec = (timeout_msec - timeo.tv_sec * 1000) * 1000 * 1000;
return pthread_mutex_timedlock(&lock->object.mutex._lock, &timeo);
}
#else
int swMutex_lockwait(swLock *lock, int timeout_msec)
{
int sub = 1;
int sleep_ms = 1000;
if (timeout_msec > 100)
{
sub = 10;
sleep_ms = 10000;
}
while( timeout_msec > 0)
{
if (pthread_mutex_trylock(&lock->object.mutex._lock) == 0)
{
return 0;
}
else
{
usleep(sleep_ms);
timeout_msec -= sub;
}
}
return ETIMEDOUT;
}
#endif
|
注意这里,swoole做了判断,可以调用原生的pthread_mutex_timedlock
方法,
也自己实现了一个加锁超时的逻辑。
释放互斥锁
1
2
3
4
5
|
static int swMutex_free(swLock *lock)
{
pthread_mutexattr_destroy(&lock->object.mutex.attr);
return pthread_mutex_destroy(&lock->object.mutex._lock);
}
|
三、条件变量
源码路径:
数据结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
typedef struct _swCond
{
swLock _lock;
pthread_cond_t _cond;
int (*wait)(struct _swCond *object);
int (*timewait)(struct _swCond *object, long, long);
int (*notify)(struct _swCond *object);
int (*broadcast)(struct _swCond *object);
void (*free)(struct _swCond *object);
int (*lock)(struct _swCond *object);
int (*unlock)(struct _swCond *object);
} swCond;
|
- 条件变量并没有作为
swLock
的一员,而是自成一体,反而是其中包含了swLock
。
- 条件变量没法单独使用,需要结合互斥锁
swLock
使用
swoole
并没有对外提供条件变量的api,暴露给php使用的互斥锁中也未用到swCond
.
swCond
只在master进程的线程池中有使用到。
条件变量的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
int swCond_create(swCond *cond)
{
if (pthread_cond_init(&cond->_cond, NULL) < 0)
{
swSysWarn("pthread_cond_init fail");
return SW_ERR;
}
if (swMutex_create(&cond->_lock, 0) < 0)
{
return SW_ERR;
}
cond->notify = swCond_notify;
cond->broadcast = swCond_broadcast;
cond->timewait = swCond_timewait;
cond->wait = swCond_wait;
cond->lock = swCond_lock;
cond->unlock = swCond_unlock;
cond->free = swCond_free;
return SW_OK;
}
|
注意,条件变量创建的时候,使用pthread_cond_init
创建了条件变量,同时调用swMutex_create
创建了互斥锁。
唤醒单个等待者
1
2
3
4
|
static int swCond_notify(swCond *cond)
{
return pthread_cond_signal(&cond->_cond);
}
|
广播唤醒多个等待者
1
2
3
4
|
static int swCond_broadcast(swCond *cond)
{
return pthread_cond_broadcast(&cond->_cond);
}
|
等待条件变量被唤醒
1
2
3
4
|
static int swCond_wait(swCond *cond)
{
return pthread_cond_wait(&cond->_cond, &cond->_lock.object.mutex._lock);
}
|
pthread_cond_wait函数原子的执行了前2个动作。
- 给互斥锁解锁。
- 把调用线程投入睡眠,直到另外某个线程调用函数pthread_cond_signal,pthread_cond_broadcast产生了唤醒信号.
- 返回之前,重新给互斥锁上锁,如果未上锁成功则一直阻塞到上锁成功为止。
等待条件变量被唤醒(带超时)
1
2
3
4
5
6
7
8
9
|
static int swCond_timewait(swCond *cond, long sec, long nsec)
{
struct timespec timeo;
timeo.tv_sec = sec;
timeo.tv_nsec = nsec;
return pthread_cond_timedwait(&cond->_cond, &cond->_lock.object.mutex._lock, &timeo);
}
|
等待条件变量cond被唤醒,直到由一个信号或广播,或绝对时间abstime到才唤醒该线程
互斥锁加锁解锁
1
2
3
4
5
6
7
8
9
|
static int swCond_lock(swCond *cond)
{
return cond->_lock.lock(&cond->_lock);
}
static int swCond_unlock(swCond *cond)
{
return cond->_lock.unlock(&cond->_lock);
}
|
释放条件变量
1
2
3
4
5
|
static void swCond_free(swCond *cond)
{
pthread_cond_destroy(&cond->_cond);
cond->_lock.free(&cond->_lock);
}
|
释放的时候同时释放条件变量和互斥锁。
三、读写锁
源码路径:
数据结构:
1
2
3
4
5
6
|
typedef struct _swRWLock
{
pthread_rwlock_t _lock;
pthread_rwlockattr_t attr;
} swRWLock;
|
对于读多写少的场景,如果使用互斥锁,对效率是一种浪费,因为大部分时间是读,加锁没有意义,在这种场景下,使用读写锁能显著的提升效率。
swRWLock
结构体比较简单,只有_lock
和attr
两个成员变量,它的实现依赖于pthread_rwlock
系列函数。
其中_lock
是读写锁本身,attr
是该读写锁的设置属性。具体属性值参考,前文互斥锁的属性详解。
读写锁的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
int swRWLock_create(swLock *lock, int use_in_process)
{
int ret;
bzero(lock, sizeof(swLock));
lock->type = SW_RWLOCK;
pthread_rwlockattr_init(&lock->object.rwlock.attr);
if (use_in_process == 1)
{
pthread_rwlockattr_setpshared(&lock->object.rwlock.attr, PTHREAD_PROCESS_SHARED);
}
if ((ret = pthread_rwlock_init(&lock->object.rwlock._lock, &lock->object.rwlock.attr)) < 0)
{
return SW_ERR;
}
lock->lock_rd = swRWLock_lock_rd;
lock->lock = swRWLock_lock_rw;
lock->unlock = swRWLock_unlock;
lock->trylock = swRWLock_trylock_rw;
lock->trylock_rd = swRWLock_trylock_rd;
lock->free = swRWLock_free;
return SW_OK;
}
|
读写锁的创建其实就是pthread_rwlock
提供的读写锁初始化,在初始化读写锁之前,需要先初始化读写锁的属性,pthread_rwlockattr_init
,并且设置读写锁是否需要在进程间共享,
PTHREAD_PROCESS_SHARED
表示需要共享。
之后设置各个函数指针。
读锁加锁
1
2
3
4
|
static int swRWLock_lock_rd(swLock *lock)
{
return pthread_rwlock_rdlock(&lock->object.rwlock._lock);
}
|
写锁加锁
1
2
3
4
|
static int swRWLock_lock_rw(swLock *lock)
{
return pthread_rwlock_wrlock(&lock->object.rwlock._lock);
}
|
解锁
1
2
3
4
|
static int swRWLock_unlock(swLock *lock)
{
return pthread_rwlock_unlock(&lock->object.rwlock._lock);
}
|
尝试读锁加锁
1
2
3
4
|
static int swRWLock_trylock_rd(swLock *lock)
{
return pthread_rwlock_tryrdlock(&lock->object.rwlock._lock);
}
|
尝试写锁加锁
1
2
3
4
|
static int swRWLock_trylock_rw(swLock *lock)
{
return pthread_rwlock_trywrlock(&lock->object.rwlock._lock);
}
|
释放锁
1
2
3
4
|
static int swRWLock_free(swLock *lock)
{
return pthread_rwlock_destroy(&lock->object.rwlock._lock);
}
|
四、文件锁
源码路径:
数据结构:
1
2
3
4
5
|
typedef struct _swFileLock
{
struct flock lock_t;
int fd;
} swFileLock;
|
文件锁是对多进程,多线程同一时间写相同文件这一场景设定的锁,底层调用的函数是fcntl
文件锁的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
|
int swFileLock_create(swLock *lock, int fd)
{
bzero(lock, sizeof(swLock));
lock->type = SW_FILELOCK;
lock->object.filelock.fd = fd;
lock->lock_rd = swFileLock_lock_rd;
lock->lock = swFileLock_lock_rw;
lock->trylock_rd = swFileLock_trylock_rd;
lock->trylock = swFileLock_trylock_rw;
lock->unlock = swFileLock_unlock;
lock->free = swFileLock_free;
return 0;
}
|
文件锁读加锁
1
2
3
4
5
|
static int swFileLock_lock_rd(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_RDLCK;
return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}
|
文件锁写加锁
1
2
3
4
5
|
static int swFileLock_lock_rw(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_WRLCK;
return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}
|
文件锁写解锁
1
2
3
4
5
|
static int swFileLock_unlock(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_UNLCK;
return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock);
}
|
文件锁尝试写加锁
1
2
3
4
5
|
static int swFileLock_trylock_rw(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_WRLCK;
return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock);
}
|
文件锁尝试读加锁
1
2
3
4
5
|
static int swFileLock_trylock_rd(swLock *lock)
{
lock->object.filelock.lock_t.l_type = F_RDLCK;
return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock);
}
|
文件锁释放锁
1
2
3
4
|
static int swFileLock_free(swLock *lock)
{
return close(lock->object.filelock.fd);
}
|
五、自旋锁
源码路径:
数据结构:
1
2
3
4
|
typedef struct _swSpinLock
{
pthread_spinlock_t lock_t;
} swSpinLock;
|
自旋锁类似与互斥锁,但是不同于互斥锁的地方是自旋锁在加锁失败的时候,并不会沉入内核,而是空转cpu,因为没有上下文切换,这样锁的效率更高,但是会空耗cpu资源。适用于加锁预期时间很短暂的场景。
自旋锁的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
int swSpinLock_create(swLock *lock, int use_in_process)
{
int ret;
bzero(lock, sizeof(swLock));
lock->type = SW_SPINLOCK;
if ((ret = pthread_spin_init(&lock->object.spinlock.lock_t, use_in_process)) < 0)
{
return -1;
}
lock->lock = swSpinLock_lock;
lock->unlock = swSpinLock_unlock;
lock->trylock = swSpinLock_trylock;
lock->free = swSpinLock_free;
return 0;
}
|
自旋锁也依赖于pthread_spin
系列函数。
自旋锁加锁
1
2
3
4
|
static int swSpinLock_lock(swLock *lock)
{
return pthread_spin_lock(&lock->object.spinlock.lock_t);
}
|
自旋锁解锁
1
2
3
4
|
static int swSpinLock_unlock(swLock *lock)
{
return pthread_spin_unlock(&lock->object.spinlock.lock_t);
}
|
自旋锁尝试加锁
1
2
3
4
|
static int swSpinLock_trylock(swLock *lock)
{
return pthread_spin_trylock(&lock->object.spinlock.lock_t);
}
|
释放自旋锁
1
2
3
4
|
static int swSpinLock_free(swLock *lock)
{
return pthread_spin_destroy(&lock->object.spinlock.lock_t);
}
|
六、信号量
源码路径:
数据结构:
1
2
3
4
5
|
typedef struct _swSem
{
key_t key;
int semid;
} swSem;
|
信号量也是一种数据的同步方式。
信号量的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
int swSem_create(swLock *lock, key_t key)
{
int ret;
lock->type = SW_SEM;
if ((ret = semget(key, 1, IPC_CREAT | 0666)) < 0)
{
return SW_ERR;
}
if (semctl(ret, 0, SETVAL, 1) == -1)
{
swWarn("semctl(SETVAL) failed");
return SW_ERR;
}
lock->object.sem.semid = ret;
lock->lock = swSem_lock;
lock->unlock = swSem_unlock;
lock->free = swSem_free;
return SW_OK;
}
|
- 信号量的初始化需要调用
semget
创建一个新的信号量
semctl
将信号量初始化为0
信号量的 P 操作
1
2
3
4
5
6
7
8
|
static int swSem_lock(swLock *lock)
{
struct sembuf sem;
sem.sem_flg = SEM_UNDO;
sem.sem_num = 0;
sem.sem_op = -1;
return semop(lock->object.sem.semid, &sem, 1);
}
|
信号量的 V 操作
1
2
3
4
5
6
7
8
|
static int swSem_unlock(swLock *lock)
{
struct sembuf sem;
sem.sem_flg = SEM_UNDO;
sem.sem_num = 0;
sem.sem_op = 1;
return semop(lock->object.sem.semid, &sem, 1);
}
|
信号量的销毁
1
2
3
4
|
static int swSem_free(swLock *lock)
{
return semctl(lock->object.sem.semid, 0, IPC_RMID);
}
|
IPC_RMID 用于销毁信号量
七、原子锁
源码路径:
数据结构:
1
2
3
4
5
6
7
8
|
typedef volatile uint32_t sw_atomic_uint32_t;
typedef sw_atomic_uint32_t sw_atomic_t;
typedef struct _swAtomicLock
{
sw_atomic_t lock_t;
uint32_t spin;
} swAtomicLock;
|
swoole的原子锁与上面介绍的锁都不一样,它不依赖pthread
系列函数,它的实现是swoole
自行实现的,nginx
也是这样实现的。
观察数据结构会发现结构体中是简单的两个uint32
成员变量。
原子锁的创建
1
2
3
4
5
6
7
8
9
10
|
int swAtomicLock_create(swLock *lock, int spin)
{
bzero(lock, sizeof(swLock));
lock->type = SW_ATOMLOCK;
lock->object.atomlock.spin = spin;
lock->lock = swAtomicLock_lock;
lock->unlock = swAtomicLock_unlock;
lock->trylock = swAtomicLock_trylock;
return SW_OK;
}
|
很简单的代码。
原子锁的加锁
1
2
3
4
5
|
static int swAtomicLock_lock(swLock *lock)
{
sw_spinlock(&lock->object.atomlock.lock_t);
return SW_OK;
}
|
可以发现,调用的是sw_spinlock
这个函数。
sw_spinlock函数加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set)
#ifdef __arm__
#define sw_atomic_cpu_pause() __asm__ __volatile__ ("NOP");
#elif defined(__x86_64__)
#define sw_atomic_cpu_pause() __asm__ __volatile__ ("pause")
#else
#define sw_atomic_cpu_pause()
#endif
#define swYield() sched_yield() //or usleep(1)
static sw_inline void sw_spinlock(sw_atomic_t *lock)
{
uint32_t i, n;
while (1)
{
if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
{
return;
}
if (SW_CPU_NUM > 1)
{
for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1)
{
for (i = 0; i < n; i++)
{
sw_atomic_cpu_pause();
}
if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))
{
return;
}
}
}
swYield();
}
}
|
我们来梳理一下swoole的自旋锁流程:
-
死循环while,首次判断lock是否为0,如果为零则加锁把lock置为1,加锁成功就返回,失败继续。
这里要注意sw_atomic_cmp_set
这个函数,这个函数是swoole的封装。
实际是使用了 #define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set)
这个,__sync_bool_compare_and_swap
是gcc提供的原子操作函数,含义是比较lock与old的值,如果两者相等,则讲set的值写入lock中,并返回true。这是一个原子操作。
-
判断cpu数量
如果cpu是单核则直接swYield主动出让线程执行权,因为只有一个核,同一时间只会执行一段代码,所以不存在空转等待。
如果是多核cpu,则继续执行空转.
-
空转中加入特殊代码sw_atomic_cpu_pause()
这个函数定义如下:
1
2
3
4
5
6
7
|
#ifdef __arm__
#define sw_atomic_cpu_pause() __asm__ __volatile__ ("NOP");
#elif defined(__x86_64__)
#define sw_atomic_cpu_pause() __asm__ __volatile__ ("pause")
#else
#define sw_atomic_cpu_pause()
#endif
|
可以看到,在x86_64架构下,执行的是pause
指令。那么pause
指令是什么呢?
PAUSE指令提升了自旋等待循环(spin-wait loop)的性能。当执行一个循环等待时,Intel P4或Intel Xeon处理器会因为检测到一个可能的内存顺序违规(memory order violation)而在退出循环时使性能大幅下降。PAUSE指令给处理器提了个醒:这段代码序列是个循环等待。处理器利用这个提示可以避免在大多数情况下的内存顺序违规,这将大幅提升性能。因为这个原因,所以推荐在循环等待中使用PAUSE指令。
PAUSE的另一个功能就是降低Intel P4在执行循环等待时的耗电量。Intel P4处理器在循环等待时会执行得非常快,这将导致处理器消耗大量的电力,而在循环中插入一个PAUSE指令会大幅降低处理器的电力消耗。
-
空转结束后继续等待锁。
-
注意,如果超过了 SW_SPINLOCK_LOOP_N
次数,还没有能够获取的到锁,那么也要让出控制权,这时很有可能被锁保护的代码有阻塞行为.
原子锁的解锁
1
2
3
4
|
static int swAtomicLock_unlock(swLock *lock)
{
return lock->object.atomlock.lock_t = 0;
}
|
直接赋值为0就可以。
原子锁的尝试加锁
1
2
3
4
5
|
static int swAtomicLock_trylock(swLock *lock)
{
sw_atomic_t *atomic = &lock->object.atomlock.lock_t;
return (*(atomic) == 0 && sw_atomic_cmp_set(atomic, 0, 1));
}
|
非阻塞加锁,加锁成功则返回true,失败返回false。
锁的知识就讲到这里,大家可以看下本文前面推荐的相关文章。
第二部分会具体讲解swoole的锁是怎么通过php扩展开发关联起来的。