概述
前面的文章介绍了很多种进程/线程间通讯的方式,管道是其中最常用的一种,管道分为命名管道和匿名管道,具体的可以参考我的其他管道相关的文章。
我们这里主要是讲swoole
中的管道通讯,swoole中的所有管道都是匿名管道。
swoole
中实现了三种管道,swPipeBase
,swPipeEventfd
,swPipeUnsock
。
swPipe的数据结构
在swoole
中,这三种管道的基础都是swPipe
这个结构体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct _swPipe
{
void * object ;
int blocking ;
double timeout ;
swSocket * master_socket ;
swSocket * worker_socket ;
int ( * read )( struct _swPipe * , void * recv , int length );
int ( * write )( struct _swPipe * , const void * send , int length );
swSocket * ( * getSocket )( struct _swPipe * , int master );
int ( * close )( struct _swPipe * );
} swPipe ;
object
具体管道的对象
blocking
是否阻塞
timeout
超时时间
master_socket
主进程的socket对象
worker_socket
子进程的socket对象
对管道read
,write
,getSocket
,close
操作的4个函数.
swPipeBase 匿名管道
数据结构
1
2
3
4
typedef struct _swPipeBase
{
int pipes [ 2 ];
} swPipeBase ;
swPipeBase
结构很简单,一个整型数组,pipes[0]
存放的是读端,pipes[1]
存放的是写端。
可以看出,swPipeBase
结构是一个半双工管道,也就是说多个进程共享这个管道时,进程的读端都是read(pipe[0])
,写入消息使用write(pipi[1])
.在使用这个匿名管道的时候,只能是一个进程负责写,一个进程负责读,行程单项的消息传递,这也是半双工管道的特性。
创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int swPipeBase_create ( swPipe * p , int blocking )
{
int ret ;
swPipeBase * object = ( swPipeBase * ) sw_malloc ( sizeof ( swPipeBase ));
if ( object == NULL )
{
return - 1 ;
}
p -> blocking = blocking ;
ret = pipe ( object -> pipes );
if ( ret < 0 )
{
swSysWarn ( "pipe() failed" );
sw_free ( object );
return - 1 ;
}
else
{
if ( swPipe_init_socket ( p , object -> pipes [ 1 ], object -> pipes [ 0 ], blocking ) < 0 )
{
sw_free ( object );
return SW_ERR ;
}
p -> timeout = - 1 ;
p -> object = object ;
p -> read = swPipeBase_read ;
p -> write = swPipeBase_write ;
p -> getSocket = swPipe_getSocket ;
p -> close = swPipeBase_close ;
}
return 0 ;
}
从函数可以看出,匿名管道时调用的pipe
函数,并根据参数设置阻塞模式。
值得注意的是swPipe_init_socket
这个函数,针对fd创建socket对象,这个socket对象是swoole-4.5.x版本增加的逻辑,在之前版本是没有的,目前我还不知道真正的用处,后续了解够在后续的文章会再进行说明。
大家可以看下swPipe_init_socket
这个函数,这里设置了管道阻塞与非阻塞。
读取
1
2
3
4
5
6
7
8
9
10
11
12
static int swPipeBase_read ( swPipe * p , void * data , int length )
{
swPipeBase * object = ( swPipeBase * ) p -> object ;
if ( p -> blocking == 1 && p -> timeout > 0 )
{
if ( swSocket_wait ( object -> pipes [ SW_PIPE_READ ], p -> timeout * 1000 , SW_EVENT_READ ) < 0 )
{
return SW_ERR ;
}
}
return read ( object -> pipes [ SW_PIPE_READ ], data , length );
}
读取的代码也很简单,直接使用read
函数读取数据,但是,这里有一个问题,如果设置了阻塞,但是一直阻塞怎么办?没法做到超时,也没法充分利用异步io的优势,所以swoole实现了一个swSocket_wait
函数,用到了poll
异步io来实现本身无法做到的功能。
我们可以看看这个函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Wait socket can read or write.
*/
int swSocket_wait ( int fd , int timeout_ms , int events )
{
struct pollfd event ;
event . fd = fd ;
event . events = 0 ;
if ( timeout_ms < 0 )
{
timeout_ms = - 1 ;
}
if ( events & SW_EVENT_READ )
{
event . events |= POLLIN ;
}
if ( events & SW_EVENT_WRITE )
{
event . events |= POLLOUT ;
}
while ( 1 )
{
int ret = poll ( & event , 1 , timeout_ms );
if ( ret == 0 )
{
return SW_ERR ;
}
else if ( ret < 0 && errno != EINTR )
{
swSysWarn ( "poll() failed" );
return SW_ERR ;
}
else
{
return SW_OK ;
}
}
return SW_OK ;
}
这个函数写的比较清晰,设置pollfd
结构,设置超时时间,并判断是读或写事件,执行poll
函数,
pool
函数会在fd句柄可读,可写,超时。这三个事件任意一个事件发生的时候返回。
注意errno != EINTR
因为poll
会受到中断的影响,所以需要判断。
通过swSocket_wait
解决了超时,和长期阻塞的问题,是不是很精妙呀?
写入
1
2
3
4
5
static int swPipeBase_write ( swPipe * p , const void * data , int length )
{
swPipeBase * object = ( swPipeBase * ) p -> object ;
return write ( object -> pipes [ SW_PIPE_WRITE ], data , length );
}
直接调用write
写入管道,根据设置,决定了阻塞与否。
关闭
1
2
3
4
5
6
7
8
static int swPipeBase_close ( swPipe * p )
{
swPipeBase * object = ( swPipeBase * ) p -> object ;
swSocket_free ( p -> master_socket );
swSocket_free ( p -> worker_socket );
sw_free ( object );
return SW_OK ;
}
关闭的时候把申请的内存全部释放。
获取Socket对象
1
2
3
4
swSocket * swPipe_getSocket ( swPipe * p , int master )
{
return master ? p -> master_socket : p -> worker_socket ;
}
这个函数是在4.5版本后加入的,以前是swPipeBase_getFd
函数,直接获取fd值。4.5版本以后把fd封装了,成了socket对象,这里也变成了getsocket了。
swPipeEventfd 通讯
数据结构
1
2
3
4
typedef struct _swPipeEventfd
{
int event_fd ;
} swPipeEventfd ;
大家看到swPipeEventfd
这个结构只有一个int类型的描述符,读写读是针对这个描述符进行,这个与swPipeBase
的pipe
管道时不一样的。
当然,虽然是只有一个描述符读写,但是也是单向的,也就是说只能在一个进程中写,另外一个进程中读,如果在同一个进程中读写,读到的也是当前进程写入内容。
创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
int swPipeEventfd_create ( swPipe * p , int blocking , int semaphore , int timeout )
{
int efd ;
int flag = 0 ;
swPipeEventfd * object = ( swPipeEventfd * ) sw_malloc ( sizeof ( swPipeEventfd ));
if ( object == NULL )
{
return - 1 ;
}
flag = EFD_NONBLOCK ;
if ( blocking == 1 )
{
if ( timeout > 0 )
{
flag = 0 ;
p -> timeout = - 1 ;
}
else
{
p -> timeout = timeout ;
}
}
#ifdef EFD_SEMAPHORE
if ( semaphore == 1 )
{
flag |= EFD_SEMAPHORE ;
}
#endif
p -> blocking = blocking ;
efd = eventfd ( 0 , flag );
if ( efd < 0 )
{
swSysWarn ( "eventfd create failed" );
sw_free ( object );
return - 1 ;
}
else
{
p -> master_socket = swSocket_new ( efd , SW_FD_PIPE );
if ( p -> master_socket == NULL )
{
close ( efd );
sw_free ( object );
return - 1 ;
}
p -> worker_socket = p -> master_socket ;
p -> object = object ;
p -> read = swPipeEventfd_read ;
p -> write = swPipeEventfd_write ;
p -> getSocket = swPipe_getSocket ;
p -> close = swPipeEventfd_close ;
object -> event_fd = efd ;
}
return 0 ;
}
大家看这个创建管道的封装,有几个点要注意。
eventfd
是linux2.6.22
加入内核的,也是linux系统特有的函数,所以低版本linux和osx下是不会使用到的。
多进程间通讯是需要设置semaphore
。
eventfd
是可以设置初始值的,这里默认设置的是0.
注意swPipeEventfd
这个管道只能传递uint64_t
数字,不能传递其他的数据。
详细的eventfd
解释可以看我文章开头提供的链接,里面有我写的eventfd
相关文章。
读取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int swPipeEventfd_read ( swPipe * p , void * data , int length )
{
int ret = - 1 ;
swPipeEventfd * object = ( swPipeEventfd * ) p -> object ;
//eventfd not support socket timeout
if ( p -> blocking == 1 && p -> timeout > 0 )
{
if ( swSocket_wait ( object -> event_fd , p -> timeout * 1000 , SW_EVENT_READ ) < 0 )
{
return SW_ERR ;
}
}
while ( 1 )
{
ret = read ( object -> event_fd , data , sizeof ( uint64_t ));
if ( ret < 0 && errno == EINTR )
{
continue ;
}
break ;
}
return ret ;
}
读取数据跟前面的基础匿名管道逻辑基本一样,eventfd
不支持超时等待,也使用到了swSocket_wait
超时等待逻辑。
值得注意的是,这里读取使用了while(1)
这个逻辑,
因为evenfd
在阻塞read
的时候可能被信号打断,所以需要判断是否是信号打断。
写入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int swPipeEventfd_write ( swPipe * p , const void * data , int length )
{
int ret ;
swPipeEventfd * object = ( swPipeEventfd * ) p -> object ;
while ( 1 )
{
ret = write ( object -> event_fd , data , sizeof ( uint64_t ));
if ( ret < 0 )
{
if ( errno == EINTR )
{
continue ;
}
}
break ;
}
return ret ;
}
写入跟读取一样,阻塞写入被信号打断需要继续循环,在此写入。
关闭
1
2
3
4
5
6
static int swPipeEventfd_close ( swPipe * p )
{
swSocket_free ( p -> master_socket );
sw_free ( p -> object );
return SW_OK ;
}
释放socket对象,释放eventfd结构对象。
获取socket对象
同swPipeBase
swPipeUnsock Unix套接字通讯
数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct _swPipeUnsock
{
/**
* master : socks[1]
* worker : socks[0]
*/
int socks [ 2 ];
/**
* master pipe is closed
*/
uint8_t pipe_master_closed ;
/**
* worker pipe is closed
*/
uint8_t pipe_worker_closed ;
} swPipeUnsock ;
swPipeUnsock
与前面两个匿名管道不同,前面两个匿名管道时单项通讯,而swPipeUnsock
管道时双向通讯的。
两个进程使用swPipeUnsock
通讯的时候,每个进程独占一个socket,master进程读写都是通过socks[1]
,worker进程都是socks[0]
,master进程写入socks[1]
的数据,worker进程可以通过socks[0]
读出,反之一样。
创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int swPipeUnsock_create ( swPipe * p , int blocking , int protocol )
{
int ret ;
swPipeUnsock * object = ( swPipeUnsock * ) sw_malloc ( sizeof ( swPipeUnsock ));
if ( object == NULL )
{
swWarn ( "malloc() failed" );
return SW_ERR ;
}
bzero ( object , sizeof ( swPipeUnsock ));
p -> blocking = blocking ;
ret = socketpair ( AF_UNIX , protocol , 0 , object -> socks );
if ( ret < 0 )
{
swSysWarn ( "socketpair() failed" );
sw_free ( object );
return SW_ERR ;
}
else
{
if ( swPipe_init_socket ( p , object -> socks [ 1 ], object -> socks [ 0 ], blocking ) < 0 )
{
sw_free ( object );
return SW_ERR ;
}
uint32_t sbsize = SwooleG . socket_buffer_size ;
swSocket_set_buffer_size ( p -> master_socket , sbsize );
swSocket_set_buffer_size ( p -> worker_socket , sbsize );
p -> object = object ;
p -> read = swPipeUnsock_read ;
p -> write = swPipeUnsock_write ;
p -> getSocket = swPipe_getSocket ;
p -> close = swPipeUnsock_close ;
}
return 0 ;
}
使用socketpair
来创建全双工的管道,通过protocol
来决定是使用SOCK_DGRAM
还是SOCK_STREAM
协议类型。
这里值得注意的是swSocket_set_buffer_size
,设置socker的buffer size。使用的是全局的buffer size配置。
具体socketpair
的使用可以参考本文开头的连接,里面有详细的介绍。
读取
1
2
3
4
static int swPipeUnsock_read ( swPipe * p , void * data , int length )
{
return read ((( swPipeUnsock * ) p -> object ) -> socks [ 0 ], data , length );
}
写入
1
2
3
4
static int swPipeUnsock_write ( swPipe * p , const void * data , int length )
{
return write ((( swPipeUnsock * ) p -> object ) -> socks [ 1 ], data , length );
}
关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static int swPipeUnsock_close ( swPipe * p )
{
swPipeUnsock * object = ( swPipeUnsock * ) p -> object ;
int ret = swPipeUnsock_close_ext ( p , 0 );
sw_free ( object );
return ret ;
}
int swPipeUnsock_close_ext ( swPipe * p , int which )
{
swPipeUnsock * object = ( swPipeUnsock * ) p -> object ;
if ( which == SW_PIPE_CLOSE_MASTER )
{
if ( object -> pipe_master_closed )
{
return SW_ERR ;
}
swSocket_free ( p -> master_socket );
object -> pipe_master_closed = 1 ;
}
else if ( which == SW_PIPE_CLOSE_WORKER )
{
if ( object -> pipe_worker_closed )
{
return SW_ERR ;
}
swSocket_free ( p -> worker_socket );;
object -> pipe_worker_closed = 1 ;
}
else
{
swPipeUnsock_close_ext ( p , SW_PIPE_CLOSE_MASTER );
swPipeUnsock_close_ext ( p , SW_PIPE_CLOSE_WORKER );
}
return SW_OK ;
}
这里的关闭做了一个简单的逻辑,实现的挺精妙的,
swPipeUnsock_close_ext
支持根据传参指定关闭master
orworker
端。默认是两端关闭,
两端关闭的时候是递归调用了函数本身。
获取socket对象
同swPipeBase
管道的应用
process模块
进程模块通讯使用的是swPipeUnsock
.可以进行双向通讯
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
swPipe * _pipe = ( swPipe * ) emalloc ( sizeof ( swPipe ));
int socket_type = pipe_type == zend :: PIPE_TYPE_STREAM ? SOCK_STREAM : SOCK_DGRAM ;
if ( swPipeUnsock_create ( _pipe , 1 , socket_type ) < 0 )
{
zend_throw_exception ( swoole_exception_ce , "swPipeUnsock_create failed" , errno );
efree ( _pipe );
efree ( process );
RETURN_FALSE ;
}
process -> pipe_master = _pipe -> getSocket ( _pipe , SW_PIPE_MASTER );
process -> pipe_worker = _pipe -> getSocket ( _pipe , SW_PIPE_WORKER );
process -> pipe_object = _pipe ;
process -> pipe_current = process -> pipe_master ;
zend_update_property_long ( swoole_process_ce , ZEND_THIS , ZEND_STRL ( "pipe" ), process -> pipe_master -> fd );
...
在父进程中,会把master socker
赋值给pipe_current
。
process->pipe_current = process->pipe_master
在子进程中则会把worker pipe
赋值给pipe_current
.
读取的时候直接 ssize_t ret = read(process->pipe_current->fd, buf->val, buf_size);
读取当前的fd就好。
写入的时候阻塞写swSocket_write_blocking
或非阻塞写swoole_event_write
.
Channel 队列
内存Channel队列底层是基于共享内存+Mutex互斥锁实现的用户态高性能队列,在多进程环境下的通讯用到了匿名管道。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//use notify
if ( flags & SW_CHAN_NOTIFY )
{
ret = swPipeNotify_auto ( & object -> notify_fd , 1 , 1 );
if ( ret < 0 )
{
swWarn ( "notify_fd init failed" );
return NULL ;
}
}
static inline int swPipeNotify_auto ( swPipe * p , int blocking , int semaphore )
{
#ifdef HAVE_EVENTFD
return swPipeEventfd_create ( p , blocking , semaphore , 0 );
#else
return swPipeBase_create ( p , blocking );
#endif
}
优先使用swPipeEventfd
,不支持的系统使用swPipeBase
.
写入数据到内存中以后,通过swChannel_notify
通知到另一个进程写入的数据长度。
读取数据的时候使用swChannel_wait
等待另一个进程传递数据返回。
Server 与task进程通讯
server中调用taskwait
方法后,
1
2
3
4
5
6
swPipe * task_notify_pipe = & serv -> task_notify [ SwooleWG . id ];
swSocket * task_notify_socket = task_notify_pipe -> getSocket ( task_notify_pipe , SW_PIPE_READ );
while ( swSocket_wait ( task_notify_socket -> fd , 0 , SW_EVENT_READ ) == SW_OK )
{
read ( task_notify_socket -> fd , & notify , sizeof ( notify ));
}
会阻塞等待数据。
task进程处理完成会调用swWorker_send2worker
发送数据。