概述
用过协程的朋友,看到Channel
这个名字是不是以为讲解的是协程中的通道
,实际上不是,今天要讲的是swoole
中实现的用户态高性能内存队列。
Channel
的底层实现是基于共享内存
+Mutex
互斥锁+Pipe
管道。
所以Channel
可以用于多进程环境,底层读写会自动加锁,使用者不需要关心数据同步问题。
Channel
在swoole中只在一个地方使用了,那就是manager
进程和worker
进程相互通讯,worker
进程在退出的时候会通知manager
进程,manager
进程收到了请求以后会重启对应的worker
进程。
从而保持固定数目的worker进程。
Channel数据结构详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
enum swChannel_flag
{
SW_CHAN_LOCK = 1u << 1 , //加锁
SW_CHAN_NOTIFY = 1u << 2 , //通知
SW_CHAN_SHM = 1u << 3 , //使用共享内存
};
typedef struct _swChannel
{
off_t head ;
off_t tail ;
size_t size ;
char head_tag ;
char tail_tag ;
int num ;
int max_num ;
/**
* Data length, excluding structure
*/
size_t bytes ;
int flag ;
int maxlen ;
/**
* memory point
*/
void * mem ;
swLock lock ;
swPipe notify_fd ;
} swChannel ;
我们先来看看swChannel
结构。
head 内存队列的头部地址,出队列的方向。
tail 内存队列的尾部地址,入队列的方向
size 申请的内存大小
head_tag,tail_tag 队列循环标志
num 当前元素数目
max_num 最大元素数目
bytes 当前占用内存大小
flag 队列标识SW_CHAN_LOCK
使用锁,SW_CHAN_NOTIFY
使用pipe
通知,SW_CHAN_SHM
使用共享内存
maxlen 队列中每个元素的大小
mem channel内存首地址
lock 锁
notify_fd 管道
1
2
3
4
5
typedef struct _swChannel_item
{
int length ;
char data [ 0 ];
} swChannel_item ;
队列元素结构,这个很简单,就是长度+内存地址。
创建一个Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
swChannel * swChannel_new ( size_t size , size_t maxlen , int flags )
{
assert ( size >= maxlen );
int ret ;
void * mem ;
//use shared memory
if ( flags & SW_CHAN_SHM )
{
/**
* overflow space
*/
mem = sw_shm_malloc ( size + sizeof ( swChannel ) + maxlen + sizeof ( swChannel_item ));
}
else
{
mem = sw_malloc ( size + sizeof ( swChannel ) + maxlen + sizeof ( swChannel_item ));
}
if ( mem == NULL )
{
swWarn ( "swChannel_create: malloc(%ld) failed" , size );
return NULL ;
}
swChannel * object = ( swChannel * ) mem ;
mem = ( char * ) mem + sizeof ( swChannel );
bzero ( object , sizeof ( swChannel ));
//overflow space
object -> size = size ;
object -> mem = mem ;
object -> maxlen = maxlen ;
object -> flag = flags ;
//use lock
if ( flags & SW_CHAN_LOCK )
{
//init lock
if ( swMutex_create ( & object -> lock , 1 ) < 0 )
{
swWarn ( "mutex init failed" );
return NULL ;
}
}
//use notify
if ( flags & SW_CHAN_NOTIFY )
{
ret = swPipeNotify_auto ( & object -> notify_fd , 1 , 1 );
if ( ret < 0 )
{
swWarn ( "notify_fd init failed" );
return NULL ;
}
}
return object ;
}
通过swChannel_new
来创建channel,创建的时候传入了size
和maxlen
, 申请内存的时候是size + sizeof(swChannel) + maxlen + sizeof(swChannel_item)
,在这里,maxlen表示要传入的对象最大长度,再加一个maxlen是为了防止内存越界,多申请了一些内存。
可以看到,根据flag
不同,进入不同的逻辑,flag如果包含SW_CHAN_SHM
,则申请的是共享内存。
传入SW_CHAN_LOCK
则创建互斥锁。
传入SW_CHAN_NOTIFY
则创建管道。
注意swPipeNotify_auto
这个函数,在前面的匿名管道章节,我们知道了它是创建一个单通道的匿名管道,用来通知读取数据。
写入数据
1
2
3
4
5
6
7
8
int swChannel_push ( swChannel * object , const void * in , int data_length )
{
assert ( object -> flag & SW_CHAN_LOCK );
object -> lock . lock ( & object -> lock );
int ret = swChannel_in ( object , in , data_length );
object -> lock . unlock ( & object -> lock );
return ret ;
}
这个函数的作用是数据入队列。
为了防止其他进程/线程读写数据,需要先加上锁,然后调用swChannel_in
往尾部写入数据。写好以后再解锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int swChannel_in ( swChannel * object , const void * in , int data_length )
{
assert ( data_length <= object -> maxlen );
if ( swChannel_full ( object ))
{
return SW_ERR ;
}
swChannel_item * item ;
int msize = sizeof ( item -> length ) + data_length ;
if ( object -> tail < object -> head )
{
//no enough memory space
if (( object -> head - object -> tail ) < msize )
{
return SW_ERR ;
}
item = ( swChannel_item * ) (( char * ) object -> mem + object -> tail );
object -> tail += msize ;
}
else
{
item = ( swChannel_item * ) (( char * ) object -> mem + object -> tail );
object -> tail += msize ;
if ( object -> tail >= ( off_t ) object -> size )
{
object -> tail = 0 ;
object -> tail_tag = 1 - object -> tail_tag ;
}
}
object -> num ++ ;
object -> bytes += data_length ;
item -> length = data_length ;
memcpy ( item -> data , in , data_length );
return SW_OK ;
}
第一行就判断传入的数据长度不能大于定义的maxlen
,
然后判断队列是否已满,如果满了就不能添加了。
1
#define swChannel_full(ch) ((ch->head == ch->tail && ch->tail_tag != ch->head_tag) || (ch->bytes + sizeof(int) * ch->num == ch->size))
看下是如何判断队列已满,头部指针等于尾部指针,并且尾部标识不等于头部标识。或者队列占用空间等于队列设定长度。
在初始状态下,头部指针值和尾部指针值都为0,头部标识和尾部标识也为0,不满足已满的条件。
如果一直插入内容,尾部指针最终会变为0,尾部标识变成1,如果一直没有读取数据,头部指针值还是0,但是头尾部标识会不一样,这样就能区分队列是否已满。很精妙的判断。
正常情况下会首先执行else
里的逻辑,插入数据,则尾部指针值增加,如果尾部指针的值大于或等于队列的容量size,则从头开始插入。
如果尾部指针小于头部指针,则表示已经插入了一轮了,重新开始插入了,但是读取还没有完成,那么就需要判断这其中的空间是否足以插入一个对象。如果空间够,则插入,不够则报错。
读出数据
1
2
3
4
5
6
7
8
int swChannel_push ( swChannel * object , const void * in , int data_length )
{
assert ( object -> flag & SW_CHAN_LOCK );
object -> lock . lock ( & object -> lock );
int ret = swChannel_in ( object , in , data_length );
object -> lock . unlock ( & object -> lock );
return ret ;
}
读出数据,读出的时候也需要加锁,防止数据污染。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int swChannel_out ( swChannel * object , void * out , int buffer_length )
{
if ( swChannel_empty ( object ))
{
return SW_ERR ;
}
swChannel_item * item = ( swChannel_item * ) (( char * ) object -> mem + object -> head );
assert ( buffer_length >= item -> length );
memcpy ( out , item -> data , item -> length );
object -> head += ( item -> length + sizeof ( item -> length ));
if ( object -> head >= ( off_t ) object -> size )
{
object -> head = 0 ;
object -> head_tag = 1 - object -> head_tag ;
}
object -> num -- ;
object -> bytes -= item -> length ;
return item -> length ;
}
读取数据的时候判断队列是否为空。
不为空的情况下,通过head指针来读取数据,默认是从0开始。因为tail指针写入数据也是从0开始。
这里有个判断 if (object->head >= (off_t) object->size)
当读取数据到了队列的末尾,则把head指针置位0,表示需要从头开始读取。并且把head标志位置位1.这样就能区分队列实时状态。
其他功能
数据窥视
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int swChannel_peek ( swChannel * object , void * out , int buffer_length )
{
if ( swChannel_empty ( object ))
{
return SW_ERR ;
}
int length ;
object -> lock . lock ( & object -> lock );
swChannel_item * item = ( swChannel_item * ) (( char * ) object -> mem + object -> head );
assert ( buffer_length >= item -> length );
memcpy ( out , item -> data , item -> length );
length = item -> length ;
object -> lock . unlock ( & object -> lock );
return length ;
}
数据窥视的作用是为了在不出列数据的情况下,获取当前读取数据的进度。可以看到是直接那了head指针对应的对象,。也就是下一个会读取的数据。
等待读取
1
2
3
4
5
6
int swChannel_wait ( swChannel * object )
{
assert ( object -> flag & SW_CHAN_NOTIFY );
uint64_t flag ;
return object -> notify_fd . read ( & object -> notify_fd , & flag , sizeof ( flag ));
}
等待读取很简单,通过匿名管道对象,阻塞读取消息,当另外一个进程/线程发送通知以后,这边收到通知,则去出列。
发送通知
1
2
3
4
5
6
int swChannel_notify ( swChannel * object )
{
assert ( object -> flag & SW_CHAN_NOTIFY );
uint64_t flag = 1 ;
return object -> notify_fd . write ( & object -> notify_fd , & flag , sizeof ( flag ));
}
写入数据到内存以后,通过匿名管道发送消息,高速另外一个进程/线程,已经入列,可以读取了。
释放对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void swChannel_free ( swChannel * object )
{
if ( object -> flag & SW_CHAN_LOCK )
{
object -> lock . free ( & object -> lock );
}
if ( object -> flag & SW_CHAN_NOTIFY )
{
object -> notify_fd . close ( & object -> notify_fd );
}
if ( object -> flag & SW_CHAN_SHM )
{
sw_shm_free ( object );
}
else
{
sw_free ( object );
}
}
根据当前对象不同的标志位,释放申请的内存。
应用场景
目前channel
内存队列在swoole中的应用只有manger
进程与worker
,task
进程之间通讯使用,总用是worker
,task
进程退出以后,通知manger
进程重启它。
1
2
//manger.cc 220行
serv -> message_box = swChannel_new ( 65536 , sizeof ( swWorkerStopMessage ), SW_CHAN_LOCK | SW_CHAN_SHM );
大家可以看到,在fork进程之间,先创建了内存队列,然后在主循环中等待wokre的通知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
if ( ManagerProcess . read_message )
{
swWorkerStopMessage msg ;
//这里一直等待
while ( swChannel_pop ( serv -> message_box , & msg , sizeof ( msg )) > 0 )
{
if ( SwooleG . running == 0 )
{
continue ;
}
if ( msg . worker_id >= serv -> worker_num )
{
swManager_spawn_task_worker ( serv , swServer_get_worker ( serv , msg . worker_id ));
}
else
{
pid_t new_pid = swManager_spawn_worker ( serv , & serv -> workers [ msg . worker_id ]);
if ( new_pid > 0 )
{
serv -> workers [ msg . worker_id ]. pid = new_pid ;
}
}
}
ManagerProcess . read_message = false ;
}