Redis的事件处理机制分析

redis服务器是典型的事件驱动系统,它的事件分为两大类:文件事件和时间事件

1、什么是事件驱动?

所谓事件驱动,简单地说就是你点什么按钮(即产生什么事件),电脑执行什么操作(即调用什么函数).当然事件不仅限于用户的操作. 事件驱动的核心自然是事件。

从事件角度说,事件驱动程序的基本结构是由一个事件收集器、一个事件发送器和一个事件处理器组成。

  • 事件收集器专门负责收集所有事件,包括来自用户的(如鼠标、键盘事件等)、来自硬件的(如时钟事件等)和来自软件的(如操作系统、应用程序本身等)。
  • 事件发送器负责将收集器收集到的事件分发到目标对象中。
  • 事件处理器做具体的事件响应工作。

2、Redis的事件

2.1、文件事件

文件事件其实就是普通的socket的读写事件,也就是IO操作。

比如: 客户端的连接,命令请求,数据回复,连接断开等。

2.1.1、Reactor模式

Redis中采用的事件处理机制是Reactor模式,属于IO多路复用的一种常见模式。

IO多路复用指的是通过单个线程去管理多个Socket.

Reactor模式是一种为处理并发服务请求,并将请求提交到一个或多个服务处理程序的事件设计模式。

Reactor模式是通过事件来驱动的,包含了:

  • 一个或多个并发的输入源(文件事件)
  • 有一个服务处理类(Service Handler),也叫做事件收集器
  • 通过事件分发器发送的一个或多个请求处理器(Requests Handler), 也叫做事件处理器。

事件处理机制

Reactor模式结构

  • Handle
    • IO操作的基本句柄,在Linux下就是fd(文件描述符)
  • Synchronous Event Demultiplexer(事件同步分离器)
    • 阻塞等待Handlers中的事件发生。
  • Reactor(事件分派器)
  • 负责事件的注册,删除以及所有注册的事件监控。
    • 当事件发生时调用Event Handler处理事件
  • Event Handler(事件处理器)
    • 需要Concrete Event Handler来实现事件处理的接口
  • Concrete Event Handler(具体的事件处理器)
    • 一般会绑定一个handler,实现对可读事件进行读取,对可写事件进行写入

事件处理机制时序图

  • 1、主程序向Reactor(事件派发器)发起注册事件。
  • 2、Reactor调用OS(操作系统)的事件处理分离器,并且监听事件(wait)
  • 3、当事件产生时,Reactor将事件委派给响应的处理器来处理事件, 也就是调用handle_event()

2.1.2、4中IO多路复用模型与选择

IO多路复用机制包括:select,poll, epoll, kqueue。

IP多路复用就是通过一种机制,一个进程可以监控多个socket(或描述符),一旦某个描述符就绪,就通知应用程序进行响应的处理。

2.1.2.1、select

int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select函数监听的文件描述符有三类:

  • readfds(可读文件描述符)
  • writefds(可写文件描述符)
  • exceptfds(不包括的文件描述符)

调用select函数后阻塞,直到有描述符就绪或超时,则函数返回。 当select函数返回后,可以通过遍历fd列表,来找到就绪的描述符。

优点和缺点

优点:能够很好的跨平台,因为select在所有的平台都支持(windows, linux ....)

缺点:

  • 1、单进程打开的文件描述符存在一定限制,由FD_SET_SIZE设置,默认1024,并且使用数组存储。
  • 2、检查是否存在文件描述符时,使用线性扫描(遍历数组)的方法,不管这些socket是否活跃,都会遍历。所以效率比较低。

2.1.2.2、pool

// 方法
int poll (struct pollfd *fds, unsigned int nfds, int timeout);

// 类
struct pollfd { 
  //文件描述符 
  int fd; 
  //要监视的事件 
  short events; 
  //实际发生的事件 
  short revents; 
};

poll使用一个pollfd的指针实现,pollfd包含了要监听的文件事件和要发生的事件。

优点和缺点

优点:采用链式存储,监听的文件描述符数量不存在限制,可以超过select的默认限制1024.

缺点:检查文件描述符是否存在,同样采用线性扫描(遍历链表),因此效率较低。

2.1.2.3、epoll

epoll是linux2.6内核中提出的,是select和poll的增强版本。epoll更加灵活,没有文件描述符数量限制。

epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放在内核的一个事件表中,这样用户空间和内核空间的copy只需要一次。

epoll文件描述符创建

int epoll_create(int size)

创造一个epoll句柄,自从linux2.6.8之后,size参数被忽略。

值得注意的是,创建好句柄后,就会占用一个文件描述符(fd)的值。

在linux的/proc/进程ID/fd/下。能够看到这个fd。所以使用完epoll后,一定要调用close()关闭,避免可能导致fd耗尽。

epoll注册事件函数

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
  • epfd: epoll_create()的返回值
  • op表示动作
    • EPOLL_CTL_ADD:注册新的文件描述符到epfd(事件列表)中
    • EPOLL_CTL_MOD:修改已经注册的fd的监听事件
    • EPOLL_CTL_DEL:从epfd(事件列表)中删除一个fd
  • fd:需要监听的fd
  • event:需要监听的事件

epoll等待事件返回函数

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待内核返回的可读写事件,最多返回maxEvents个事件.

优点和缺点

优点:

  • 1、epoll没有最大并发连接数的限制,上限就是最大可以打开的文件个数,1GB内存大概是10w左右。
  • 2、epoll只管活跃的连接,跟连接总数没关系,因此效率远远高于select和poll
  • 3、epoll使用了共享内存,不用做内存拷贝。

缺点: 缺点啥就不说了,目前用得较多的就是这个模式啦。

2.1.2.4、kqueue

kqueue是unix下的一个IO多路复用库。最初是2000年Jonathan Lemon在FreeBSD系统上开发的一个高性能事件通知接口。

注册一批socket描述符到kqueue后,当描述符发生变化时,kqueue将会一次性通知应用程序那些描述符可读,可写或出错。

struct kevent { 
  uintptr_t ident; //是事件唯一的 key,在 socket() 使用中,它是 socket 的 fd 句柄 
  int16_t filter; //是事件的类型(EVFILT_READ socket 可读事件 EVFILT_WRITE socket 可 写事件) 
  uint16_t flags; //操作方式 
  uint32_t fflags; // 
  intptr_t data; //数据长度 
  void *udata; //数据 
};

优点: 能处理大量数据,性能较高

2.1.3、文件事件分派器(事件发送器)

在redis中,对于文件事件的处理采用了Reactor模型,采用的是epoll的实现方式。

Redis在主循环中统一处理文件事件和时间事件,信号事件则由专门的handler来处理。

void aeMain(aeEventLoop *eventLoop) { 
  eventLoop->stop = 0; 
  while (!eventLoop->stop) { //循环监听事件 
  // 阻塞之前的处理 
  if (eventLoop->beforesleep != NULL) 
    eventLoop->beforesleep(eventLoop); 
  // 事件处理,第二个参数决定处理哪类事件 
  aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); 
  } 
}

aeMain函数其实就是一个封装的while循环,循环中的代码会一直运行直到eventLoop的stop被设置为1(true)。

它会不停尝试调用aeProcessEvents对可能存在的多种事件进行处理,而aeProcessEvents就是实际用于处理事件的函数。

2.1.4、事件处理器

2.1.4.1、连接处理函数

当客户端向Redis建立socket时,aeEventLoop会调用连接处理器(acceptTCPHandler),服务端会为每一个链接创建一个Client对象和创建文件事件监听可读事件,并指定事件的处理函数。

// 当客户端建立链接时进行的eventloop处理函数 在redis源码的networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        // 层层调用,最后在anet.c中anetGenericAccept方法中调用socket的accept方法
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        // 进行socket建立连接后的处理
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

2.1.4.2、请求处理函数

当客户端通过socket发送数据后,Redis会调用请求处理函数(readQueryFromClient)。然后调用read方法将socket数据读取到缓冲区。

然后判断判断是否大于系统设置的client_max_querybuf_len,大于则向redis发送错误信息,并关闭client.

// 处理从client中读取客户端的输入缓冲区内容。
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    
    // ... 略
    
    // 从fd对应的socket中读取到client中的querybuf输入缓冲区
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
            freeClientAsync(c);
            goto done;
        }
    } else if (nread == 0) {
        if (server.verbosity <= LL_VERBOSE) {
            sds info = catClientInfoString(sdsempty(), c);
            serverLog(LL_VERBOSE, "Client closed connection %s", info);
            sdsfree(info);
        }
        freeClientAsync(c);
        goto done;
    }

    sdsIncrLen(c->querybuf,nread);
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) {
        c->read_reploff += nread;
        atomicIncr(server.stat_net_repl_input_bytes, nread);
    } else {
        atomicIncr(server.stat_net_input_bytes, nread);
    }

    // 如果大于系统配置的最大客户端缓存区大小,也就是配置文件中的client-query-buffer-limit
    if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        // 返回错误信息,并且关闭client    
        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClientAsync(c);
        goto done;
    }

    // // processInputBuffer 处理输入缓冲区
    if (processInputBuffer(c) == C_ERR)
         c = NULL;

done:
    beforeNextClient(c);
}

2.1.4.3、命令回复处理器

执行命令结束后,命令回复处理器(sendReplyToClient)负责执行命令结果通过socket返回给客户端。

2.2、时间事件

2.2.1、时间事件组成

时间事件分为定时事件和周期事件。 一个时间事件主要由三个属性组成

  • 1、id(全局唯一ID)
  • 2、when(毫秒时间戳,记录了时间事件到达的时间)
  • 3、timeProc(时间事件处理器,当时间到了时,Redis会调用相应的处理器来处理事件)
/* 时间事件结构 Time event structure */
typedef struct aeTimeEvent {
    // 时间事件的唯一标识符
    long long id; /* time event identifier. */
    // 事件的到达时间,存储的是UNIX的时间戳
    monotime when;
    // 事件处理函数,当到达指定时间后调用该函数处理对应的问题
    aeTimeProc *timeProc;
    // 事件释放函数
    aeEventFinalizerProc *finalizerProc;
    // 多路复用库的私有数据
    void *clientData;
    // 指向上个时间事件结构,形成链表
    struct aeTimeEvent *prev;
    // 指向下个时间事件结构,形成链表
    struct aeTimeEvent *next;
    // refcount以防止在递归时间事件调用中释放计时器事件
    int refcount; /* refcount to prevent timer events from being
           * freed in recursive time event calls. */
} aeTimeEvent;

2.2.2、Redis中时间事件使用场景

时间事件的最主要应用是在redis服务器需要对自身的资源与配置进行定期的调度,从而确保服务器的长久运行。 这些操作都是由serverCron函数实现。该函数做了以下操作。

  • 1、更新redis服务器各类统计信息,包括时间,内存占用,数据库占用等
  • 2、清理数据库中过期的key(过期删除)
  • 3、关闭和清理连接失败的客户端
  • 4、尝试进行aof个rdb的持久化操作(数据持久化)
  • 5、如果服务器是主服务器,会定期将数据向从服务器做同步操作(主从复制)。
  • 6、如果是集群模式,对集群定期进行同步和连接测试等操作(健康检查)。

Redis启动后,会定期执行serverCron函数,直到redis关闭为止,默认每秒执行10次,也就是100ms执行一次。

可以在redis配置文件(redis.conf)中的hz选项调整执行频率。

#redis执行任务的频率为1s除以hz, 一秒钟执行多少次。
hz 10

2.2.3、定时与周期事件

定时事件

让一段程序在指定的时间之后执行一次。

aeTimeProc(时间处理器)的返回值是AE_NOMORE。该事件到达后删除,之后不会再重复。

周期事件

让一段程序每隔指定时间就执行一次

aeTimeProc(时间处理器)的返回值不是AE_NOMORE。

当一个时间事件到达后,服务器会根据时间处理器的返回值,对时间事件的when属性进行更新,让这个事件在一段时间后再次到达。

serverCron就是一个典型的周期性事件。

2.3、Redis的事件处理机制

Redis有自己的事件处理机制, aeEventLoop是整个事件驱动的核心。它管理着文件和事件的事件列表。不断循环处理就绪的文件事件和到期的事件事件。

typedef struct aeEventLoop {
    // 最大文件描述符的值
    int maxfd;   /* highest file descriptor currently registered */
    // 文件描述符的最大监听数
    int setsize; /* max number of file descriptors tracked */
    // 用于生成时间事件的唯一标识id
    long long timeEventNextId;
    // 注册的文件事件列表
    aeFileEvent *events; /* Registered events */
    // 已就绪的事件列表
    aeFiredEvent *fired; /* Fired events */
    // 注册要使用的时间事件
    aeTimeEvent *timeEventHead;
    // 停止标志,1表示停止
    int stop;
    // 这个是处理底层特定API的数据,对于epoll来说,该结构体包含了epoll fd和epoll_event
    void *apidata; /* This is used for polling API specific data */
    // 在调用processEvent前(即如果没有事件则睡眠),调用该处理函数
    aeBeforeSleepProc *beforesleep;
    // 在调用aeApiPoll后,调用该函数
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

beforesleep

beforesleep对象是一个回调函数,在redis-server初始化时已经设置好了

  • 检测集群状态
  • 随机释放已过期的键
  • 在数据同步复制阶段取消客户端的阻塞
  • 处理输入数据,并且同步副本信息
  • 处理非阻塞的客户端请求
  • AOF持久化存储策略,类似于mysql的bin log
  • 使用挂起的输出缓冲区处理写入

aftersleep

对象是一个回调函数,在IO多路复用与IO事件处理之间被调用。

2.3.1、Redis的初始化

Redis服务端的初始化在其初始化函数initServer中,会创建事件管理器aeEventLoop对象。

函数aeCreateEventLoop将创建一个事件管理器,主要初始化aeEventLoop的各个属性值。比如:events, fired,timeEventHead,apidata。

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    monotonicInit();    /* just in case the calling app didn't initialize */

    // 创建 aeEventLoop 对象
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    // 初始化注册的文件事件表、就绪文件事件表.events指针指向注册的文件事件表、 fired指针指向就绪文件事件表
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    // 初始化时间事件列表. 设置timeEventHead和timeEventNextId 属性
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    // 停止标志 1表示停止 初始化为0    
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    // 调用aeApiCreate函数创建epoll实例,并初始化apidata
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}
  • 首先创建 aeEventLoop 对象。
  • 初始化注册的文件事件表、就绪文件事件表。 events指针指向注册的文件事件表、 fired指针指向就绪文件事件表。表的内容在后面添加具体事件时进行初变更。
  • 初始化时间事件列表,设置 timeEventHead 和 timeEventNextId 属性。
  • 调用aeApiCreate函数创建epoll实例,并初始化apidata。
  • 停止标志,1表示停止,初始化为0。

2.3.2、初始化文件事件属性

aeFileEvent

aeFileEvent结构体为已经注册并需要监听的事件的结构体。

/* File event structure */
typedef struct aeFileEvent {
    // 监听事件类型掩码, 
    // 值可以是 AE_READABLE 或 AE_WRITABLE , 
    // 或者 AE_READABLE | AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    // 读事件处理器
    aeFileProc *rfileProc;
    // 写事件处理器
    aeFileProc *wfileProc;
    // 多路复用库的私有数据
    void *clientData;
} aeFileEvent;

aeFiredEvent

已就绪的文件事件

/* A fired event */
typedef struct aeFiredEvent {
    // 已就绪文件描述符
    int fd;

    // 事件类型掩码, 
    // 值可以是 AE_READABLE 或 AE_WRITABLE 或者是两者的或
    int mask;
} aeFiredEvent;

apidata

在ae创建的时候,会被赋值为aeApiState结构体,结构体的定义如下:

typedef struct aeApiState {
    // epoll_event实例描述符
    int epfd;
    // 事件列表
    struct epoll_event *events;
} aeApiState;

这个结构体是为了epoll所准备的数据结构。redis可以选择不同的io多路复用方法。因此 apidata是个void类型,根据不同的io多路复用库来选择不同的实现。

2.3.3、初始化时间事件属性

aeTimeEvent结构体为时间事件,Redis将所有时间事件都放在一个无序链表中,每次Redis会遍历整个链表,查找所有已经到达的时间事件,并且调用相应的事件处理器。

前面其实已经提到过该结构了。

/* 时间事件结构 Time event structure */
typedef struct aeTimeEvent {
    // 时间事件的唯一标识符
    long long id; /* time event identifier. */
    // 事件的到达时间,存储的是UNIX的时间戳
    monotime when;
    // 事件处理函数,当到达指定时间后调用该函数处理对应的问题
    aeTimeProc *timeProc;
    // 事件释放函数
    aeEventFinalizerProc *finalizerProc;
    // 多路复用库的私有数据
    void *clientData;
    // 指向上个时间事件结构,形成链表
    struct aeTimeEvent *prev;
    // 指向下个时间事件结构,形成链表
    struct aeTimeEvent *next;
    // refcount以防止在递归时间事件调用中释放计时器事件
    int refcount; /* refcount to prevent timer events from being
           * freed in recursive time event calls. */
} aeTimeEvent;

在aeMain函数中,首先调用beforesleep。这个方法在Redis每次进入sleep/wait去等待监听的端口发生I/O事件之前被调用。当有事件发生时,调用aeProcessEvent进行处理。

aeProcessEvent

  • 首先计算距离当前时间最近的时间事件,以此计算一个超时时间;
  • 然后调用aeApiPoll函数去等待底层的I/O多路复用事件就绪;
  • aeApiPoll函数返回之后,会处理所有已经产生文件事件和已经达到的时间事件。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    // processed记录这次调度执行了多少事件
    int processed = 0, numevents;
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        struct timeval tv, *tvp;
        int64_t usUntilTimer = -1;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 获取最近将要发生的时间事件
            usUntilTimer = usUntilEarliestTimer(eventLoop);
        // 计算aeApiPoll的超时时间
        if (usUntilTimer >= 0) {
            // 计算距离下一次发生时间时间的时间间隔
            tv.tv_sec = usUntilTimer / 1000000;
            tv.tv_usec = usUntilTimer % 1000000;
            tvp = &tv;
        } else { 
            // 没有时间事件
            if (flags & AE_DONT_WAIT) {
                // 马上返回,不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                // 阻塞到文件事件发生
                tvp = NULL;
            }
        }

        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        // 等待文件事件发生,tvp为超时时间,超时马上返回(tvp为0表示马上,为null表示阻塞到事件发生)
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0; /* Number of events fired for current fd. */

            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                // 处理读事件
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    // 处理写事件
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        // 时间事件调度和执行
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

3、小结

Redis的高性能为什么是单线程也可以性能这么高。它的事件处理机制起着非常重要的一个作用。选用的模型为reactor模型。让单线程去做了多线程可以做的事情,并且还没有线程安全问题。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章