nanomsg源码阅读(三)
上一篇 nanomsg源码阅读(二) 介绍了nanomsg的实用工具类utils模块,这篇将试图解析下nanomsg最核心的aio(即用线程池模拟带状态机的异步IO)模块,当然这里的IO是广义上的IO,包含了用户自定义事件的输入输出,后面的分析中我们会看到。
定时任务集合:timerset.h timerset.c
采用有序链表方式实现,新增定时任务时,根据到期timeout时间,排序插入,最近到期置于链表头,通过nn_timerset_timeout函数获取最近到期时间与当前时间的差,作为参数传入nn_poller_wait
IO多路分离器:poller.h poller.c poller_kqueue.h poller_kqueue.inc poller_epoll.h poller_epoll.inc poller_poll.h poller_poll.inc
前文提到常用的IO多路分离器,网上对他们各自的对比也有很多,这里不再细述了,值得注意的是nanomsg打开任意的fd都会设置CLOEXEC标志(出于安全方面的考虑)。
上下文环境:ctx.h ctx.c
每个nn_socket对应一个nn_ctx,调用关系为:nn_socket–>nn_sock_init–>nn_ctx_init,即程序中创建了多少个nn_socket就创建了多少个nn_ctx。
struct nn_ctx {
struct nn_mutex sync; //互斥量
struct nn_pool *pool; //线程池外部初始化后,init时赋值
struct nn_queue events; //本上下文状态机事件队列
struct nn_queue eventsto; //外部状态机事件队列
nn_ctx_onleave onleave; //通知函数,在nn_ctx_leave时调用
};
//上下文初始化与回收
void nn_ctx_init (struct nn_ctx *self, struct nn_pool *pool,
nn_ctx_onleave onleave);
void nn_ctx_term (struct nn_ctx *self);
//对上下文上锁
void nn_ctx_enter (struct nn_ctx *self);
/* 先处理完内部的events,调用onleave函数通知owner,
* 再copy一份eventsto队列后,对ctx解锁,之后处理外部上下文队列数据
*/
void nn_ctx_leave (struct nn_ctx *self);
//对nn_pool_choose_worker的包装,从线程池中选取一个thread作为工作线程
struct nn_worker *nn_ctx_choose_worker (struct nn_ctx *self);
//往内部状态机事件队列中添加事件
void nn_ctx_raise (struct nn_ctx *self, struct nn_fsm_event *event);
//往外部状态机时间队列中添加事件,仅用在线程间通讯中
void nn_ctx_raiseto (struct nn_ctx *self, struct nn_fsm_event *event);
线程池:pool.h pool.c
好吧,pool.c上赫然的TODO告诉我们:这个线程池里面目前其实只有一个线程。
/* TODO: The dummy implementation of a thread pool. As for now there's only one worker thread created. */
这个线程池需要在外部进行初始化,然后在aio contex初始化时,赋上对它的引用,通过nn_ctx_choose_worker函数从线程池中选取一个thread作为该上下文的工作线程worker。
工作线程:worker.h worker.c worker_win.h worker_win.inc worker_posix.h worker_posix.inc
这里我主要查看的是posix的实现:worker_posix.inc,windows的实现没有细看了。
struct nn_worker {
struct nn_mutex sync; //对下面自定义任务队列tasks操作的锁
struct nn_queue tasks; //本工作线程的待处理自定义任务队列
struct nn_queue_item stop; //worker停止通知任务,收到此任务后,线程将终止
struct nn_efd efd; //用于处理自定义任务task事件的通知描述符
struct nn_poller poller; //多路分离器,处理定时timer、IO的fd、任务通知efd
struct nn_poller_hndl efd_hndl; //用于标识此事件为自定义任务task
struct nn_timerset timerset; //定时任务列表
struct nn_thread thread; //工作线程
};
工作线程有三类工作要做,由多路分离器poller轮询获取处理(具体处理流程在nn_worker_routine函数里有较为清晰的注释),他们分别为:
//定时任务
struct nn_worker_timer {
struct nn_fsm *owner;
struct nn_timerset_hndl hndl;
};
//文件描述符的IO操作
struct nn_worker_fd {
int src;
struct nn_fsm *owner;
struct nn_poller_hndl hndl;
};
//自定义任务
struct nn_worker_task {
int src;
struct nn_fsm *owner;
struct nn_queue_item item;
};
值得注意的是:处理工作前,都会对工作处理状态机nn_fsm所属的上下文nn_ctx上锁并处理(调用nn_ctx_enter/nn_ctx_leave)。
即worker中通过nn_poller_wait轮询获取工作,然后每一个工作都有对应的状态机nn_fsm做流程处理。
状态机:fsm.h fsm.c
上面看到worker接到工作以后,进入工作区域,按照工作流程来处理,如果这是一个顺序执行的大步骤,便相当于同步流程了。引入状态和状态机以后,将工作进一步切分为几个步骤,通过状态机状态及动作的方式,不仅让流程相对清晰,更通过这样对任务的进一步细分,将很多同步的操作内部转变为异步。
传统的状态机一般由状态state+动作action构成,这里的状态机增加了一个来源src:即动作发起者
struct nn_fsm_event {
struct nn_fsm *fsm; //事件所属状态机
int src; //事件发起者
void *srcptr; //事件发起者附属信息
int type; //动作action,因为包含文件的IO,这里取名为type
struct nn_queue_item item; //事件会加入到nn_ctx的events队列中
};
struct nn_fsm {
nn_fsm_fn fn; //状态机处理主函数,大switch所在
nn_fsm_fn shutdown_fn; //状态机关闭处理函数
int state; //状态
int src; //本状态机的源(动作发起者)
void *srcptr; //来源附属信息
struct nn_fsm *owner; //父指针
struct nn_ctx *ctx; //状态机所属上下文
struct nn_fsm_event stopped; //状态机停止事件
};
基础的fsm内部有个默认的src:NN_FSM_ACTION,以及与此相对应的两个action:NN_FSM_START/NN_FSM_STOP,而后用户可以在此基础上扩展,只要定义的src和action不冲突即可(默认为负,只要自定义为正即可)。
定时处理:timer.h timer.c
使用状态机具体实现
socket包装:usock.h usock.c usock_posix.h usock_posix.inc usock_win.h usock_win.inc
使用状态机具体实现
总结
aio应该是nanomsg最核心的模块了,中间最复杂的当属作者将将状态机引入到事件驱动的编程中。
martin用了连续两篇博客 《The Callback Hell》及《Event-driven architecture, state machines et al.》介绍了传统的callback的恶心,最主要的是不易扩展,而网络层的scalable正式nanomsg追求的目标也是与zeromq最大的区别,用户可以在其核心构建基础上,实现自己的网络拓扑协议。martin在文章里有说过有空要博客详细介绍下nanomsg的状态机机制,可惜到目前为止,他还没有写……。
欢迎大家探讨指正!