Swoole基础模块 - Channel 队列

概述

用过协程的朋友,看到Channel这个名字是不是以为讲解的是协程中的通道,实际上不是,今天要讲的是swoole中实现的用户态高性能内存队列。

Channel的底层实现是基于共享内存+Mutex互斥锁+Pipe管道。 所以Channel可以用于多进程环境,底层读写会自动加锁,使用者不需要关心数据同步问题。

Channel在swoole中只在一个地方使用了,那就是manager进程和worker进程相互通讯,worker进程在退出的时候会通知manager进程,manager进程收到了请求以后会重启对应的worker进程。 从而保持固定数目的worker进程。

Channel数据结构详解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
enum swChannel_flag
{
    SW_CHAN_LOCK     = 1u << 1,//加锁
    SW_CHAN_NOTIFY   = 1u << 2,//通知
    SW_CHAN_SHM      = 1u << 3,//使用共享内存
};

typedef struct _swChannel
{
    off_t head;
    off_t tail;
    size_t size;
    char head_tag;
    char tail_tag;
    int num;
    int max_num;
    /**
     * Data length, excluding structure
     */
    size_t bytes;
    int flag;
    int maxlen;
    /**
     * memory point
     */
    void *mem;
    swLock lock;
    swPipe notify_fd;
} swChannel;

我们先来看看swChannel结构。

  • head 内存队列的头部地址,出队列的方向。
  • tail 内存队列的尾部地址,入队列的方向
  • size 申请的内存大小
  • head_tag,tail_tag 队列循环标志
  • num 当前元素数目
  • max_num 最大元素数目
  • bytes 当前占用内存大小
  • flag 队列标识SW_CHAN_LOCK使用锁,SW_CHAN_NOTIFY使用pipe通知,SW_CHAN_SHM 使用共享内存
  • maxlen 队列中每个元素的大小
  • mem channel内存首地址
  • lock 锁
  • notify_fd 管道
1
2
3
4
5
typedef struct _swChannel_item
{
    int length;
    char data[0];
} swChannel_item;

队列元素结构,这个很简单,就是长度+内存地址。

创建一个Channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
swChannel* swChannel_new(size_t size, size_t maxlen, int flags)
{
    assert(size >= maxlen);
    int ret;
    void *mem;

    //use shared memory
    if (flags & SW_CHAN_SHM)
    {
        /**
         * overflow space
         */
        mem = sw_shm_malloc(size + sizeof(swChannel) + maxlen + sizeof(swChannel_item));
    }
    else
    {
        mem = sw_malloc(size + sizeof(swChannel) + maxlen + sizeof(swChannel_item));
    }

    if (mem == NULL)
    {
        swWarn("swChannel_create: malloc(%ld) failed", size);
        return NULL;
    }
    swChannel *object = (swChannel *) mem;
    mem = (char*) mem + sizeof(swChannel);

    bzero(object, sizeof(swChannel));

    //overflow space
    object->size = size;
    object->mem = mem;
    object->maxlen = maxlen;
    object->flag = flags;

    //use lock
    if (flags & SW_CHAN_LOCK)
    {
        //init lock
        if (swMutex_create(&object->lock, 1) < 0)
        {
            swWarn("mutex init failed");
            return NULL;
        }
    }
    //use notify
    if (flags & SW_CHAN_NOTIFY)
    {
        ret = swPipeNotify_auto(&object->notify_fd, 1, 1);
        if (ret < 0)
        {
            swWarn("notify_fd init failed");
            return NULL;
        }
    }
    return object;
}

通过swChannel_new来创建channel,创建的时候传入了sizemaxlen, 申请内存的时候是size + sizeof(swChannel) + maxlen + sizeof(swChannel_item),在这里,maxlen表示要传入的对象最大长度,再加一个maxlen是为了防止内存越界,多申请了一些内存。

可以看到,根据flag不同,进入不同的逻辑,flag如果包含SW_CHAN_SHM,则申请的是共享内存。

传入SW_CHAN_LOCK则创建互斥锁。 传入SW_CHAN_NOTIFY 则创建管道。

注意swPipeNotify_auto这个函数,在前面的匿名管道章节,我们知道了它是创建一个单通道的匿名管道,用来通知读取数据。

写入数据

1
2
3
4
5
6
7
8
int swChannel_push(swChannel *object, const void *in, int data_length)
{
    assert(object->flag & SW_CHAN_LOCK);
    object->lock.lock(&object->lock);
    int ret = swChannel_in(object, in, data_length);
    object->lock.unlock(&object->lock);
    return ret;
}

这个函数的作用是数据入队列。 为了防止其他进程/线程读写数据,需要先加上锁,然后调用swChannel_in往尾部写入数据。写好以后再解锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int swChannel_in(swChannel *object, const void *in, int data_length)
{
    assert(data_length <= object->maxlen);
    if (swChannel_full(object))
    {
        return SW_ERR;
    }
    swChannel_item *item;
    int msize = sizeof(item->length) + data_length;

    if (object->tail < object->head)
    {
        //no enough memory space
        if ((object->head - object->tail) < msize)
        {
            return SW_ERR;
        }
        item = (swChannel_item *) ((char*) object->mem + object->tail);
        object->tail += msize;
    }
    else
    {
        item = (swChannel_item *) ((char*) object->mem + object->tail);
        object->tail += msize;
        if (object->tail >= (off_t) object->size)
        {
            object->tail = 0;
            object->tail_tag = 1 - object->tail_tag;
        }
    }
    object->num++;
    object->bytes += data_length;
    item->length = data_length;
    memcpy(item->data, in, data_length);
    return SW_OK;
}

第一行就判断传入的数据长度不能大于定义的maxlen, 然后判断队列是否已满,如果满了就不能添加了。

1
#define swChannel_full(ch) ((ch->head == ch->tail && ch->tail_tag != ch->head_tag) || (ch->bytes + sizeof(int) * ch->num == ch->size))

看下是如何判断队列已满,头部指针等于尾部指针,并且尾部标识不等于头部标识。或者队列占用空间等于队列设定长度。 在初始状态下,头部指针值和尾部指针值都为0,头部标识和尾部标识也为0,不满足已满的条件。 如果一直插入内容,尾部指针最终会变为0,尾部标识变成1,如果一直没有读取数据,头部指针值还是0,但是头尾部标识会不一样,这样就能区分队列是否已满。很精妙的判断。

  • 正常情况下会首先执行else里的逻辑,插入数据,则尾部指针值增加,如果尾部指针的值大于或等于队列的容量size,则从头开始插入。
  • 如果尾部指针小于头部指针,则表示已经插入了一轮了,重新开始插入了,但是读取还没有完成,那么就需要判断这其中的空间是否足以插入一个对象。如果空间够,则插入,不够则报错。

读出数据

1
2
3
4
5
6
7
8
int swChannel_push(swChannel *object, const void *in, int data_length)
{
    assert(object->flag & SW_CHAN_LOCK);
    object->lock.lock(&object->lock);
    int ret = swChannel_in(object, in, data_length);
    object->lock.unlock(&object->lock);
    return ret;
}

读出数据,读出的时候也需要加锁,防止数据污染。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
int swChannel_out(swChannel *object, void *out, int buffer_length)
{
    if (swChannel_empty(object))
    {
        return SW_ERR;
    }

    swChannel_item *item = (swChannel_item *) ((char*) object->mem + object->head);
    assert(buffer_length >= item->length);
    memcpy(out, item->data, item->length);
    object->head += (item->length + sizeof(item->length));
    if (object->head >= (off_t) object->size)
    {
        object->head = 0;
        object->head_tag = 1 - object->head_tag;
    }
    object->num--;
    object->bytes -= item->length;
    return item->length;
}

读取数据的时候判断队列是否为空。 不为空的情况下,通过head指针来读取数据,默认是从0开始。因为tail指针写入数据也是从0开始。 这里有个判断 if (object->head >= (off_t) object->size)当读取数据到了队列的末尾,则把head指针置位0,表示需要从头开始读取。并且把head标志位置位1.这样就能区分队列实时状态。

其他功能

数据窥视

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
int swChannel_peek(swChannel *object, void *out, int buffer_length)
{
    if (swChannel_empty(object))
    {
        return SW_ERR;
    }

    int length;
    object->lock.lock(&object->lock);
    swChannel_item *item = (swChannel_item *) ((char*) object->mem + object->head);
    assert(buffer_length >= item->length);
    memcpy(out, item->data, item->length);
    length = item->length;
    object->lock.unlock(&object->lock);

    return length;
}

数据窥视的作用是为了在不出列数据的情况下,获取当前读取数据的进度。可以看到是直接那了head指针对应的对象,。也就是下一个会读取的数据。

等待读取

1
2
3
4
5
6
int swChannel_wait(swChannel *object)
{
    assert(object->flag & SW_CHAN_NOTIFY);
    uint64_t flag;
    return object->notify_fd.read(&object->notify_fd, &flag, sizeof(flag));
}

等待读取很简单,通过匿名管道对象,阻塞读取消息,当另外一个进程/线程发送通知以后,这边收到通知,则去出列。

发送通知

1
2
3
4
5
6
int swChannel_notify(swChannel *object)
{
    assert(object->flag & SW_CHAN_NOTIFY);
    uint64_t flag = 1;
    return object->notify_fd.write(&object->notify_fd, &flag, sizeof(flag));
}

写入数据到内存以后,通过匿名管道发送消息,高速另外一个进程/线程,已经入列,可以读取了。

释放对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void swChannel_free(swChannel *object)
{
    if (object->flag & SW_CHAN_LOCK)
    {
        object->lock.free(&object->lock);
    }
    if (object->flag & SW_CHAN_NOTIFY)
    {
        object->notify_fd.close(&object->notify_fd);
    }
    if (object->flag & SW_CHAN_SHM)
    {
        sw_shm_free(object);
    }
    else
    {
        sw_free(object);
    }
}

根据当前对象不同的标志位,释放申请的内存。

应用场景

目前channel内存队列在swoole中的应用只有manger进程与workertask进程之间通讯使用,总用是worker,task进程退出以后,通知manger进程重启它。

1
2
//manger.cc 220行
 serv->message_box = swChannel_new(65536, sizeof(swWorkerStopMessage), SW_CHAN_LOCK | SW_CHAN_SHM);

大家可以看到,在fork进程之间,先创建了内存队列,然后在主循环中等待wokre的通知

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

        if (ManagerProcess.read_message)
        {
            swWorkerStopMessage msg;
            //这里一直等待
            while (swChannel_pop(serv->message_box, &msg, sizeof(msg)) > 0)
            {
                if (SwooleG.running == 0)
                {
                    continue;
                }
                if (msg.worker_id >= serv->worker_num)
                {
                    swManager_spawn_task_worker(serv, swServer_get_worker(serv, msg.worker_id));
                }
                else
                {
                    pid_t new_pid = swManager_spawn_worker(serv, &serv->workers[msg.worker_id]);
                    if (new_pid > 0)
                    {
                        serv->workers[msg.worker_id].pid = new_pid;
                    }
                }
            }
            ManagerProcess.read_message = false;
        }
0%