Swoole基础模块 - Pipe管道

概述

前面的文章介绍了很多种进程/线程间通讯的方式,管道是其中最常用的一种,管道分为命名管道和匿名管道,具体的可以参考我的其他管道相关的文章。 我们这里主要是讲swoole中的管道通讯,swoole中的所有管道都是匿名管道。

swoole中实现了三种管道,swPipeBaseswPipeEventfd,swPipeUnsock

swPipe的数据结构

swoole中,这三种管道的基础都是swPipe这个结构体。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
typedef struct _swPipe
{
    void *object;
    int blocking;
    double timeout;

    swSocket *master_socket;
    swSocket *worker_socket;

    int (*read)(struct _swPipe *, void *recv, int length);
    int (*write)(struct _swPipe *, const void *send, int length);
    swSocket* (*getSocket)(struct _swPipe *, int master);
    int (*close)(struct _swPipe *);
} swPipe;
  • object 具体管道的对象
  • blocking 是否阻塞
  • timeout 超时时间
  • master_socket 主进程的socket对象
  • worker_socket 子进程的socket对象
  • 对管道read,write,getSocket,close操作的4个函数.

swPipeBase 匿名管道

数据结构

1
2
3
4
typedef struct _swPipeBase
{
    int pipes[2];
} swPipeBase;

swPipeBase结构很简单,一个整型数组,pipes[0] 存放的是读端,pipes[1]存放的是写端。

可以看出,swPipeBase结构是一个半双工管道,也就是说多个进程共享这个管道时,进程的读端都是read(pipe[0]),写入消息使用write(pipi[1]).在使用这个匿名管道的时候,只能是一个进程负责写,一个进程负责读,行程单项的消息传递,这也是半双工管道的特性。

创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int swPipeBase_create(swPipe *p, int blocking)
{
    int ret;
    swPipeBase *object = (swPipeBase *) sw_malloc(sizeof(swPipeBase));
    if (object == NULL)
    {
        return -1;
    }
    p->blocking = blocking;
    ret = pipe(object->pipes);
    if (ret < 0)
    {
        swSysWarn("pipe() failed");
        sw_free(object);
        return -1;
    }
    else
    {
        if (swPipe_init_socket(p, object->pipes[1], object->pipes[0], blocking) < 0)
        {
            sw_free(object);
            return SW_ERR;
        }

        p->timeout = -1;
        p->object = object;
        p->read = swPipeBase_read;
        p->write = swPipeBase_write;
        p->getSocket = swPipe_getSocket;
        p->close = swPipeBase_close;
    }
    return 0;
}

从函数可以看出,匿名管道时调用的pipe函数,并根据参数设置阻塞模式。

值得注意的是swPipe_init_socket这个函数,针对fd创建socket对象,这个socket对象是swoole-4.5.x版本增加的逻辑,在之前版本是没有的,目前我还不知道真正的用处,后续了解够在后续的文章会再进行说明。

大家可以看下swPipe_init_socket这个函数,这里设置了管道阻塞与非阻塞。

读取

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
static int swPipeBase_read(swPipe *p, void *data, int length)
{
    swPipeBase *object = (swPipeBase *) p->object;
    if (p->blocking == 1 && p->timeout > 0)
    {
        if (swSocket_wait(object->pipes[SW_PIPE_READ], p->timeout * 1000, SW_EVENT_READ) < 0)
        {
            return SW_ERR;
        }
    }
    return read(object->pipes[SW_PIPE_READ], data, length);
}

读取的代码也很简单,直接使用read函数读取数据,但是,这里有一个问题,如果设置了阻塞,但是一直阻塞怎么办?没法做到超时,也没法充分利用异步io的优势,所以swoole实现了一个swSocket_wait函数,用到了poll异步io来实现本身无法做到的功能。

我们可以看看这个函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * Wait socket can read or write.
 */
int swSocket_wait(int fd, int timeout_ms, int events)
{
    struct pollfd event;
    event.fd = fd;
    event.events = 0;

    if (timeout_ms < 0)
    {
        timeout_ms = -1;
    }

    if (events & SW_EVENT_READ)
    {
        event.events |= POLLIN;
    }
    if (events & SW_EVENT_WRITE)
    {
        event.events |= POLLOUT;
    }
    while (1)
    {
        int ret = poll(&event, 1, timeout_ms);
        if (ret == 0)
        {
            return SW_ERR;
        }
        else if (ret < 0 && errno != EINTR)
        {
            swSysWarn("poll() failed");
            return SW_ERR;
        }
        else
        {
            return SW_OK;
        }
    }
    return SW_OK;
}

这个函数写的比较清晰,设置pollfd结构,设置超时时间,并判断是读或写事件,执行poll函数, pool函数会在fd句柄可读,可写,超时。这三个事件任意一个事件发生的时候返回。 注意errno != EINTR 因为poll会受到中断的影响,所以需要判断。

通过swSocket_wait 解决了超时,和长期阻塞的问题,是不是很精妙呀?

写入

1
2
3
4
5
static int swPipeBase_write(swPipe *p, const void *data, int length)
{
    swPipeBase *object = (swPipeBase *) p->object;
    return write(object->pipes[SW_PIPE_WRITE], data, length);
}

直接调用write写入管道,根据设置,决定了阻塞与否。

关闭

1
2
3
4
5
6
7
8
static int swPipeBase_close(swPipe *p)
{
    swPipeBase *object = (swPipeBase *) p->object;
    swSocket_free(p->master_socket);
    swSocket_free(p->worker_socket);
    sw_free(object);
    return SW_OK;
}

关闭的时候把申请的内存全部释放。

获取Socket对象

1
2
3
4
swSocket* swPipe_getSocket(swPipe *p, int master)
{
    return master ? p->master_socket : p->worker_socket;
}

这个函数是在4.5版本后加入的,以前是swPipeBase_getFd函数,直接获取fd值。4.5版本以后把fd封装了,成了socket对象,这里也变成了getsocket了。

swPipeEventfd 通讯

数据结构

1
2
3
4
typedef struct _swPipeEventfd
{
    int event_fd;
} swPipeEventfd;

大家看到swPipeEventfd这个结构只有一个int类型的描述符,读写读是针对这个描述符进行,这个与swPipeBasepipe管道时不一样的。

当然,虽然是只有一个描述符读写,但是也是单向的,也就是说只能在一个进程中写,另外一个进程中读,如果在同一个进程中读写,读到的也是当前进程写入内容。

创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
int swPipeEventfd_create(swPipe *p, int blocking, int semaphore, int timeout)
{
    int efd;
    int flag = 0;
    swPipeEventfd *object = (swPipeEventfd *) sw_malloc(sizeof(swPipeEventfd));
    if (object == NULL)
    {
        return -1;
    }

    flag = EFD_NONBLOCK;

    if (blocking == 1)
    {
        if (timeout > 0)
        {
            flag = 0;
            p->timeout = -1;
        }
        else
        {
            p->timeout = timeout;
        }
    }

#ifdef EFD_SEMAPHORE
    if (semaphore == 1)
    {
        flag |= EFD_SEMAPHORE;
    }
#endif

    p->blocking = blocking;
    efd = eventfd(0, flag);
    if (efd < 0)
    {
        swSysWarn("eventfd create failed");
        sw_free(object);
        return -1;
    }
    else
    {
        p->master_socket = swSocket_new(efd, SW_FD_PIPE);
        if (p->master_socket == NULL)
        {
            close(efd);
            sw_free(object);
            return -1;
        }
        p->worker_socket = p->master_socket;
        p->object = object;
        p->read = swPipeEventfd_read;
        p->write = swPipeEventfd_write;
        p->getSocket = swPipe_getSocket;
        p->close = swPipeEventfd_close;
        object->event_fd = efd;
    }
    return 0;
}

大家看这个创建管道的封装,有几个点要注意。

  1. eventfdlinux2.6.22加入内核的,也是linux系统特有的函数,所以低版本linux和osx下是不会使用到的。
  2. 多进程间通讯是需要设置semaphore
  3. eventfd是可以设置初始值的,这里默认设置的是0.
  4. 注意swPipeEventfd这个管道只能传递uint64_t数字,不能传递其他的数据。
  5. 详细的eventfd解释可以看我文章开头提供的链接,里面有我写的eventfd相关文章。

读取

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int swPipeEventfd_read(swPipe *p, void *data, int length)
{
    int ret = -1;
    swPipeEventfd *object = (swPipeEventfd *) p->object;

    //eventfd not support socket timeout
    if (p->blocking == 1 && p->timeout > 0)
    {
        if (swSocket_wait(object->event_fd, p->timeout * 1000, SW_EVENT_READ) < 0)
        {
            return SW_ERR;
        }
    }

    while (1)
    {
        ret = read(object->event_fd, data, sizeof(uint64_t));
        if (ret < 0 && errno == EINTR)
        {
            continue;
        }
        break;
    }
    return ret;
}

读取数据跟前面的基础匿名管道逻辑基本一样,eventfd不支持超时等待,也使用到了swSocket_wait超时等待逻辑。

值得注意的是,这里读取使用了while(1)这个逻辑, 因为evenfd在阻塞read的时候可能被信号打断,所以需要判断是否是信号打断。

写入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
static int swPipeEventfd_write(swPipe *p, const void *data, int length)
{
    int ret;
    swPipeEventfd *object = (swPipeEventfd *) p->object;
    while (1)
    {
        ret = write(object->event_fd, data, sizeof(uint64_t));
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                continue;
            }
        }
        break;
    }
    return ret;
}

写入跟读取一样,阻塞写入被信号打断需要继续循环,在此写入。

关闭

1
2
3
4
5
6
static int swPipeEventfd_close(swPipe *p)
{
    swSocket_free(p->master_socket);
    sw_free(p->object);
    return SW_OK;
}

释放socket对象,释放eventfd结构对象。

获取socket对象

swPipeBase

swPipeUnsock Unix套接字通讯

数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
typedef struct _swPipeUnsock
{
    /**
     * master : socks[1]
     * worker : socks[0]
     */
    int socks[2];
    /**
     * master pipe is closed
     */
    uint8_t pipe_master_closed;
    /**
     * worker pipe is closed
     */
    uint8_t pipe_worker_closed;
} swPipeUnsock;

swPipeUnsock 与前面两个匿名管道不同,前面两个匿名管道时单项通讯,而swPipeUnsock管道时双向通讯的。

两个进程使用swPipeUnsock通讯的时候,每个进程独占一个socket,master进程读写都是通过socks[1],worker进程都是socks[0],master进程写入socks[1]的数据,worker进程可以通过socks[0]读出,反之一样。

创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int swPipeUnsock_create(swPipe *p, int blocking, int protocol)
{
    int ret;
    swPipeUnsock *object = (swPipeUnsock *) sw_malloc(sizeof(swPipeUnsock));
    if (object == NULL)
    {
        swWarn("malloc() failed");
        return SW_ERR;
    }
    bzero(object, sizeof(swPipeUnsock));
    p->blocking = blocking;
    ret = socketpair(AF_UNIX, protocol, 0, object->socks);
    if (ret < 0)
    {
        swSysWarn("socketpair() failed");
        sw_free(object);
        return SW_ERR;
    }
    else
    {
        if (swPipe_init_socket(p, object->socks[1], object->socks[0], blocking) < 0)
        {
            sw_free(object);
            return SW_ERR;
        }

        uint32_t sbsize = SwooleG.socket_buffer_size;
        swSocket_set_buffer_size(p->master_socket, sbsize);
        swSocket_set_buffer_size(p->worker_socket, sbsize);

        p->object = object;
        p->read = swPipeUnsock_read;
        p->write = swPipeUnsock_write;
        p->getSocket = swPipe_getSocket;
        p->close = swPipeUnsock_close;
    }
    return 0;
}

使用socketpair来创建全双工的管道,通过protocol来决定是使用SOCK_DGRAM还是SOCK_STREAM协议类型。

这里值得注意的是swSocket_set_buffer_size,设置socker的buffer size。使用的是全局的buffer size配置。

具体socketpair的使用可以参考本文开头的连接,里面有详细的介绍。

读取

1
2
3
4
static int swPipeUnsock_read(swPipe *p, void *data, int length)
{
    return read(((swPipeUnsock *) p->object)->socks[0], data, length);
}

写入

1
2
3
4
static int swPipeUnsock_write(swPipe *p, const void *data, int length)
{
    return write(((swPipeUnsock *) p->object)->socks[1], data, length);
}

关闭

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static int swPipeUnsock_close(swPipe *p)
{
    swPipeUnsock *object = (swPipeUnsock *) p->object;
    int ret = swPipeUnsock_close_ext(p, 0);
    sw_free(object);
    return ret;
}
int swPipeUnsock_close_ext(swPipe *p, int which)
{
    swPipeUnsock *object = (swPipeUnsock *) p->object;

    if (which == SW_PIPE_CLOSE_MASTER)
    {
        if (object->pipe_master_closed)
        {
            return SW_ERR;
        }
        swSocket_free(p->master_socket);
        object->pipe_master_closed = 1;
    }
    else if (which == SW_PIPE_CLOSE_WORKER)
    {
        if (object->pipe_worker_closed)
        {
            return SW_ERR;
        }
        swSocket_free(p->worker_socket);;
        object->pipe_worker_closed = 1;
    }
    else
    {
        swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_MASTER);
        swPipeUnsock_close_ext(p, SW_PIPE_CLOSE_WORKER);
    }

    return SW_OK;
}

这里的关闭做了一个简单的逻辑,实现的挺精妙的,

swPipeUnsock_close_ext 支持根据传参指定关闭masterorworker 端。默认是两端关闭, 两端关闭的时候是递归调用了函数本身。

获取socket对象

swPipeBase

管道的应用

process模块

进程模块通讯使用的是swPipeUnsock.可以进行双向通讯

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
...

swPipe *_pipe = (swPipe *) emalloc(sizeof(swPipe));
int socket_type = pipe_type == zend::PIPE_TYPE_STREAM ? SOCK_STREAM : SOCK_DGRAM;
if (swPipeUnsock_create(_pipe, 1, socket_type) < 0)
{
    zend_throw_exception(swoole_exception_ce, "swPipeUnsock_create failed", errno);
    efree(_pipe);
    efree(process);
    RETURN_FALSE;
}

process->pipe_master = _pipe->getSocket(_pipe, SW_PIPE_MASTER);
process->pipe_worker = _pipe->getSocket(_pipe, SW_PIPE_WORKER);

process->pipe_object = _pipe;
process->pipe_current = process->pipe_master;

zend_update_property_long(swoole_process_ce, ZEND_THIS, ZEND_STRL("pipe"), process->pipe_master->fd);

...

在父进程中,会把master socker 赋值给pipe_currentprocess->pipe_current = process->pipe_master 在子进程中则会把worker pipe 赋值给pipe_current.

读取的时候直接 ssize_t ret = read(process->pipe_current->fd, buf->val, buf_size); 读取当前的fd就好。

写入的时候阻塞写swSocket_write_blocking或非阻塞写swoole_event_write.

Channel 队列

内存Channel队列底层是基于共享内存+Mutex互斥锁实现的用户态高性能队列,在多进程环境下的通讯用到了匿名管道。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
//use notify
if (flags & SW_CHAN_NOTIFY)
{
    ret = swPipeNotify_auto(&object->notify_fd, 1, 1);
    if (ret < 0)
    {
        swWarn("notify_fd init failed");
        return NULL;
    }
}
    
static inline int swPipeNotify_auto(swPipe *p, int blocking, int semaphore)
{
#ifdef HAVE_EVENTFD
    return swPipeEventfd_create(p, blocking, semaphore, 0);
#else
    return swPipeBase_create(p, blocking);
#endif
}

优先使用swPipeEventfd,不支持的系统使用swPipeBase.

写入数据到内存中以后,通过swChannel_notify通知到另一个进程写入的数据长度。

读取数据的时候使用swChannel_wait等待另一个进程传递数据返回。

Server 与task进程通讯

server中调用taskwait方法后,

1
2
3
4
5
6
swPipe *task_notify_pipe = &serv->task_notify[SwooleWG.id];
swSocket *task_notify_socket = task_notify_pipe->getSocket(task_notify_pipe, SW_PIPE_READ);
while (swSocket_wait(task_notify_socket->fd, 0, SW_EVENT_READ) == SW_OK)
{
    read(task_notify_socket->fd, &notify, sizeof(notify));
}

会阻塞等待数据。

task进程处理完成会调用swWorker_send2worker发送数据。

0%