Linux内核epoll源码剖析

本文基于linux 3.10.104 x86_64内核,对epoll相关系统调用的源码分析,对应源码位于fs/eventpoll.c。

用户态下epoll系统调用相关的函数原型如下:

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_create系统调用的入口:

SYSCALL_DEFINE1(epoll_create, int, size)
{
	if (size <= 0)
		return -EINVAL;

	return sys_epoll_create1(0);
}

根据man文档,自从linux 2.6.8开始epoll_create的参数就被忽略了,仅需传入大于0的值即可(从上面实现源码第3-4行亦可知)。
epoll_create实质调用了自linux 2.6.27新增的epoll_create1调用,传入flag标记为0。

先来看下几个关键的数据结构定义:

// include/uapi/linux/eventpoll.h
#ifdef __x86_64__
#define EPOLL_PACKED __attribute__((packed))
#else
#define EPOLL_PACKED
#endif
struct epoll_event {
	__u32 events;
	__u64 data;
} EPOLL_PACKED;

// fs/eventpoll.c
struct eventpoll {
	/* Protect the access to this structure */
	spinlock_t lock;

	/*
	 * This mutex is used to ensure that files are not removed
	 * while epoll is using them. This is held during the event
	 * collection loop, the file cleanup path, the epoll file exit
	 * code and the ctl operations.
	 */
	struct mutex mtx;

	/* Wait queue used by sys_epoll_wait() */
	wait_queue_head_t wq;

	/* Wait queue used by file->poll() */
	wait_queue_head_t poll_wait;

	/* List of ready file descriptors */
	struct list_head rdllist;

	/* RB tree root used to store monitored fd structs */
	struct rb_root rbr;

	/*
	 * This is a single linked list that chains all the "struct epitem" that
	 * happened while transferring ready events to userspace w/out
	 * holding ->lock.
	 */
	struct epitem *ovflist;

	/* wakeup_source used when ep_scan_ready_list is running */
	struct wakeup_source *ws;

	/* The user that created the eventpoll descriptor */
	struct user_struct *user;

	struct file *file;

	/* used to optimize loop detection check */
	int visited;
	struct list_head visited_list_link;
};

struct epitem {
	/* RB tree node used to link this structure to the eventpoll RB tree */
	struct rb_node rbn;

	/* List header used to link this structure to the eventpoll ready list */
	struct list_head rdllink;

	/*
	 * Works together "struct eventpoll"->ovflist in keeping the
	 * single linked chain of items.
	 */
	struct epitem *next;

	/* The file descriptor information this item refers to */
	struct epoll_filefd ffd;

	/* Number of active wait queue attached to poll operations */
	int nwait;

	/* List containing poll wait queues */
	struct list_head pwqlist;

	/* The "container" of this item */
	struct eventpoll *ep;

	/* List header used to link this item to the "struct file" items list */
	struct list_head fllink;

	/* wakeup_source used when EPOLLWAKEUP is set */
	struct wakeup_source __rcu *ws;

	/* The structure that describe the interested events and the source fd */
	struct epoll_event event;
};

由上第2-6行可知,x86_64下取消了epoll_event的对齐填充,与32位下保持一致。
由结构体eventpoll定义第35行可知,epoll管理fd使用了红黑树结构

再来看epoll_create1的具体实现:

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
	int error, fd;
	struct eventpoll *ep = NULL;
	struct file *file;

	/* Check the EPOLL_* constant for consistency.  */
	BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

	if (flags & ~EPOLL_CLOEXEC)
		return -EINVAL;
	/*
	 * Create the internal data structure ("struct eventpoll").
	 */
	error = ep_alloc(&ep);
	if (error < 0)
		return error;
	/*
	 * Creates all the items needed to setup an eventpoll file. That is,
	 * a file structure and a free file descriptor.
	 */
	fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
	if (fd < 0) {
		error = fd;
		goto out_free_ep;
	}
	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
				 O_RDWR | (flags & O_CLOEXEC));
	if (IS_ERR(file)) {
		error = PTR_ERR(file);
		goto out_free_fd;
	}
	ep->file = file;
	fd_install(fd, file);
	return fd;

out_free_fd:
	put_unused_fd(fd);
out_free_ep:
	ep_free(ep);
	return error;
}

static int ep_alloc(struct eventpoll **pep)
{
	int error;
	struct user_struct *user;
	struct eventpoll *ep;

	user = get_current_user();
	error = -ENOMEM;
	ep = kzalloc(sizeof(*ep), GFP_KERNEL);
	if (unlikely(!ep))
		goto free_uid;

	spin_lock_init(&ep->lock);
	mutex_init(&ep->mtx);
	init_waitqueue_head(&ep->wq);
	init_waitqueue_head(&ep->poll_wait);
	INIT_LIST_HEAD(&ep->rdllist);
	ep->rbr = RB_ROOT;
	ep->ovflist = EP_UNACTIVE_PTR;
	ep->user = user;

	*pep = ep;

	return 0;

free_uid:
	free_uid(user);
	return error;
}

epoll_create1第8行编译期检查确保EPOLL_CLOEXEC的值为O_CLOEXEC,第10行检查若指定flags目前仅支持EPOLL_CLOEXEC,表示当执行exec调用时会自动关闭epoll fd。
第15行调用ep_alloc创建并初始化eventpoll结构体实例,由ep_alloc实现(第52行)可知,调用kzalloc在内核空间申请内存并初始化清零。
第22-35行,创建与eventpoll关联的管理者epoll fd。

epoll_ctl系统调用的入口:

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	int error;
	int did_lock_epmutex = 0;
	struct file *file, *tfile;
	struct eventpoll *ep;
	struct epitem *epi;
	struct epoll_event epds;

	error = -EFAULT;
	if (ep_op_has_event(op) &&
	    copy_from_user(&epds, event, sizeof(struct epoll_event)))
		goto error_return;

	/* Get the "struct file *" for the eventpoll file */
	error = -EBADF;
	file = fget(epfd);
	if (!file)
		goto error_return;

	/* Get the "struct file *" for the target file */
	tfile = fget(fd);
	if (!tfile)
		goto error_fput;

	/* The target file descriptor must support poll */
	error = -EPERM;
	if (!tfile->f_op || !tfile->f_op->poll)
		goto error_tgt_fput;

	/* Check if EPOLLWAKEUP is allowed */
	if ((epds.events & EPOLLWAKEUP) && !capable(CAP_BLOCK_SUSPEND))
		epds.events &= ~EPOLLWAKEUP;

	/*
	 * We have to check that the file structure underneath the file descriptor
	 * the user passed to us _is_ an eventpoll file. And also we do not permit
	 * adding an epoll file descriptor inside itself.
	 */
	error = -EINVAL;
	if (file == tfile || !is_file_epoll(file))
		goto error_tgt_fput;

	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	ep = file->private_data;

	/*
	 * When we insert an epoll file descriptor, inside another epoll file
	 * descriptor, there is the change of creating closed loops, which are
	 * better be handled here, than in more critical paths. While we are
	 * checking for loops we also determine the list of files reachable
	 * and hang them on the tfile_check_list, so we can check that we
	 * haven't created too many possible wakeup paths.
	 *
	 * We need to hold the epmutex across both ep_insert and ep_remove
	 * b/c we want to make sure we are looking at a coherent view of
	 * epoll network.
	 */
	if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
		mutex_lock(&epmutex);
		did_lock_epmutex = 1;
	}
	if (op == EPOLL_CTL_ADD) {
		if (is_file_epoll(tfile)) {
			error = -ELOOP;
			if (ep_loop_check(ep, tfile) != 0) {
				clear_tfile_check_list();
				goto error_tgt_fput;
			}
		} else
			list_add(&tfile->f_tfile_llink, &tfile_check_list);
	}

	mutex_lock_nested(&ep->mtx, 0);

	/*
	 * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
	 * above, we can be sure to be able to use the item looked up by
	 * ep_find() till we release the mutex.
	 */
	epi = ep_find(ep, tfile, fd);

	error = -EINVAL;
	switch (op) {
	case EPOLL_CTL_ADD:
		if (!epi) {
			epds.events |= POLLERR | POLLHUP;
			error = ep_insert(ep, &epds, tfile, fd);
		} else
			error = -EEXIST;
		clear_tfile_check_list();
		break;
	case EPOLL_CTL_DEL:
		if (epi)
			error = ep_remove(ep, epi);
		else
			error = -ENOENT;
		break;
	case EPOLL_CTL_MOD:
		if (epi) {
			epds.events |= POLLERR | POLLHUP;
			error = ep_modify(ep, epi, &epds);
		} else
			error = -ENOENT;
		break;
	}
	mutex_unlock(&ep->mtx);

error_tgt_fput:
	if (did_lock_epmutex)
		mutex_unlock(&epmutex);

	fput(tfile);
error_fput:
	fput(file);
error_return:

	return error;
}

由epoll_ctl第13行可知,epoll仍然需要将待检测fd的对应事件从用户空间拷贝到内核空间。
第18-43行,一是检查传入事件是否包含自linux 3.5引入的EPOLLWAKEUP特性;二是检查并确保传入待监听fd与管理者epoll fd不是同一个fd。
第63-66行,当op为EPOLL_CTL_ADD或EPOLL_CTL_DEL时,获取epmutex全局锁(一个较大粒度的锁),防止某次epoll_ctl调用添加某个fd时有其它调用同时添加或删除某个fd,保持数据同步,而EPOLL_CTL_MOD修改操作并不影响红黑树结构上观测到的一致性;也保证EPOLL_CTL_ADD时树形结构发生环路以及检测死锁。
第67-76行,在op为EPOLL_CTL_ADD时,如果待添加的fd是epoll fd,会基于当前管理者epoll fd对待添加epoll fd进行死循环检测以及递归深度检测,该部分实现细节有空时再做详细分析。
第78行,epoll_ctl调用的三种具体操作的锁,实际跟epmutex有一定的应用重合,这里更多是为了实现上更好的扩展性。

下面看epoll_ctl的EPOLL_CTL_ADD对应的ep_insert操作实现:

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)

{
	int error, revents, pwake = 0;
	unsigned long flags;
	long user_watches;
	struct epitem *epi;
	struct ep_pqueue epq;

	user_watches = atomic_long_read(&ep->user->epoll_watches);
	if (unlikely(user_watches >= max_user_watches))
		return -ENOSPC;
	if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
		return -ENOMEM;

	/* Item initialization follow here ... */
	INIT_LIST_HEAD(&epi->rdllink);
	INIT_LIST_HEAD(&epi->fllink);
	INIT_LIST_HEAD(&epi->pwqlist);
	epi->ep = ep;
	ep_set_ffd(&epi->ffd, tfile, fd);
	epi->event = *event;
	epi->nwait = 0;
	epi->next = EP_UNACTIVE_PTR;
	if (epi->event.events & EPOLLWAKEUP) {
		error = ep_create_wakeup_source(epi);
		if (error)
			goto error_create_wakeup_source;
	} else {
		RCU_INIT_POINTER(epi->ws, NULL);
	}

	/* Initialize the poll table using the queue callback */
	epq.epi = epi;
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

	/*
	 * Attach the item to the poll hooks and get current event bits.
	 * We can safely use the file* here because its usage count has
	 * been increased by the caller of this function. Note that after
	 * this operation completes, the poll callback can start hitting
	 * the new item.
	 */
	revents = ep_item_poll(epi, &epq.pt);

	/*
	 * We have to check if something went wrong during the poll wait queue
	 * install process. Namely an allocation for a wait queue failed due
	 * high memory pressure.
	 */
	error = -ENOMEM;
	if (epi->nwait < 0)
		goto error_unregister;

	/* Add the current item to the list of active epoll hook for this file */
	spin_lock(&tfile->f_lock);
	list_add_tail(&epi->fllink, &tfile->f_ep_links);
	spin_unlock(&tfile->f_lock);

	/*
	 * Add the current item to the RB tree. All RB tree operations are
	 * protected by "mtx", and ep_insert() is called with "mtx" held.
	 */
	ep_rbtree_insert(ep, epi);

	/* now check if we've created too many backpaths */
	error = -EINVAL;
	if (reverse_path_check())
		goto error_remove_epi;

	/* We have to drop the new item inside our item list to keep track of it */
	spin_lock_irqsave(&ep->lock, flags);

	/* If the file is already "ready" we drop it inside the ready list */
	if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);
		ep_pm_stay_awake(epi);

		/* Notify waiting tasks that events are available */
		if (waitqueue_active(&ep->wq))
			wake_up_locked(&ep->wq);
		if (waitqueue_active(&ep->poll_wait))
			pwake++;
	}

	spin_unlock_irqrestore(&ep->lock, flags);

	atomic_long_inc(&ep->user->epoll_watches);

	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(&ep->poll_wait);

	return 0;

error_remove_epi:
	spin_lock(&tfile->f_lock);
	if (ep_is_linked(&epi->fllink))
		list_del_init(&epi->fllink);
	spin_unlock(&tfile->f_lock);

	rb_erase(&epi->rbn, &ep->rbr);

error_unregister:
	ep_unregister_pollwait(ep, epi);

	/*
	 * We need to do this because an event could have been arrived on some
	 * allocated wait queue. Note that we don't care about the ep->ovflist
	 * list, since that is used/cleaned only inside a section bound by "mtx".
	 * And ep_insert() is called with "mtx" held.
	 */
	spin_lock_irqsave(&ep->lock, flags);
	if (ep_is_linked(&epi->rdllink))
		list_del_init(&epi->rdllink);
	spin_unlock_irqrestore(&ep->lock, flags);

	wakeup_source_unregister(ep_wakeup_source(epi));

error_create_wakeup_source:
	kmem_cache_free(epi_cache, epi);

	return error;
}

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	struct epitem *epi = ep_item_from_epqueue(pt);
	struct eppoll_entry *pwq;

	if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
		init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
		pwq->whead = whead;
		pwq->base = epi;
		add_wait_queue(whead, &pwq->wait);
		list_add_tail(&pwq->llink, &epi->pwqlist);
		epi->nwait++;
	} else {
		/* We have to signal that an error occurred */
		epi->nwait = -1;
	}
}

第12行,查看是否超出了当前用户可注册监听的文件描述符总量上限,max_user_watches由内核模块初始化时调用eventpoll_init进行初始化,可由/proc/sys/fs/epoll/max_user_watches查看,默认值是可用内核内存空间的1/25(4%)除以单个注册文件描述符消耗的字节数(在64位内核上消耗160字节),可见x86_64系统下在内存不受限和系统级fd(/proc/sys/fs/fs/file-max)不受限情况下可监听处理的fd会很大,不像select有内核限制
第14行,基于slab高速缓存创建epitem,对于频繁使用或销毁的结构特别高效,epi_cache也是由内核模块初始化时调用eventpoll_init进行创建。可见不论是epoll fd本身还是epitem都是基于内核内存分配创建,没有mmap之说,网上关于epoll与mmap有关的文章完全是无稽之谈
第18-20行,初始化三个非常重要的队列:事件就绪队列rdllink、链接fd对应的file链表fllink、poll等待队列pwqlist等。
第26-32行,如果当前系统支持EPOLLWAKEUP事件,注册唤醒源,如在电源自动睡眠的系统,当唤醒设备的事件发生时,设备驱动会保持唤醒状态,系统不会挂起或休眠,直到事件进入排队状态,从epoll_wait调用开始,持续到下一次epoll_wait调用。
第35-36行,这里借助了包含poll_table和epitem指针的wrapper结构体ep_pqueue,调用跟select中相同的init_poll_funcptr注册初始化poll回调函数指针ep_ptable_queue_proc(参见第127-144行,该回调仅初始化一次,之后的如ep_modify、ep_send_events_proc、ep_read_events_proc等都会将ep_table对应该回调置空,而不像select每次调用都需要初始化相应回调),该回调将创建eppoll_entry并使用init_waitqueue_func_entry注册事件就绪时的回调函数ep_poll_callback,该事件回调是最核心的等待队列唤醒机制,将相应事件fd插入就绪链表。
第45行,调用ep_item_poll实际调用ep_eventpoll_poll将该file插入设备的等待队列,并查找监听的事件当前是否已经就绪并返回。可见结合上面几行,这里就是核心的注册事件唤醒回调,并没有像select那样每次主动做事件轮询
第56-59行,将epitem插入该fd文件对应监听item链表。
第65行,将epitem插入该eventpoll对应的红黑树。
第76-85行,若当前监听的事件已就绪,则插入就绪链表,并唤醒当前epollfd。

ep_find、ep_insert、ep_modify、ep_remove等基于红黑树的查找、插入等操作的时间复杂度都是O(logn)。

对于ep_modify、ep_remove等本篇暂不详述分析,最后来看epoll_wait系统调用的入口:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	int error;
	struct fd f;
	struct eventpoll *ep;

	/* The maximum number of event must be greater than zero */
	if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
		return -EINVAL;

	/* Verify that the area passed by the user is writeable */
	if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
		return -EFAULT;

	/* Get the "struct file *" for the eventpoll file */
	f = fdget(epfd);
	if (!f.file)
		return -EBADF;

	/*
	 * We have to check that the file structure underneath the fd
	 * the user passed to us _is_ an eventpoll file.
	 */
	error = -EINVAL;
	if (!is_file_epoll(f.file))
		goto error_fput;

	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	ep = f.file->private_data;

	/* Time to fish for events ... */
	error = ep_poll(ep, events, maxevents, timeout);

error_fput:
	fdput(f);
	return error;
}

第9行,检查传入的监听fd数量是否超出EP_MAX_EVENTS(INT_MAX / sizeof(struct epoll_event)),可以认为epoll可以监听的fd数量非常大

epoll_wait系统调用实际调用ep_poll,来看具体实现如下:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	int res = 0, eavail, timed_out = 0;
	unsigned long flags;
	long slack = 0;
	wait_queue_t wait;
	ktime_t expires, *to = NULL;

	if (timeout > 0) {
		struct timespec end_time = ep_set_mstimeout(timeout);

		slack = select_estimate_accuracy(&end_time);
		to = &expires;
		*to = timespec_to_ktime(end_time);
	} else if (timeout == 0) {
		/*
		 * Avoid the unnecessary trip to the wait queue loop, if the
		 * caller specified a non blocking operation.
		 */
		timed_out = 1;
		spin_lock_irqsave(&ep->lock, flags);
		goto check_events;
	}

fetch_events:
	spin_lock_irqsave(&ep->lock, flags);

	if (!ep_events_available(ep)) {
		/*
		 * We don't have any available event to return to the caller.
		 * We need to sleep here, and we will be wake up by
		 * ep_poll_callback() when events will become available.
		 */
		init_waitqueue_entry(&wait, current);
		__add_wait_queue_exclusive(&ep->wq, &wait);

		for (;;) {
			/*
			 * We don't want to sleep if the ep_poll_callback() sends us
			 * a wakeup in between. That's why we set the task state
			 * to TASK_INTERRUPTIBLE before doing the checks.
			 */
			set_current_state(TASK_INTERRUPTIBLE);
			if (ep_events_available(ep) || timed_out)
				break;
			if (signal_pending(current)) {
				res = -EINTR;
				break;
			}

			spin_unlock_irqrestore(&ep->lock, flags);
			if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
				timed_out = 1;

			spin_lock_irqsave(&ep->lock, flags);
		}
		__remove_wait_queue(&ep->wq, &wait);

		set_current_state(TASK_RUNNING);
	}
check_events:
	/* Is it worth to try to dig for events ? */
	eavail = ep_events_available(ep);

	spin_unlock_irqrestore(&ep->lock, flags);

	/*
	 * Try to transfer events to user space. In case we get 0 events and
	 * there's still timeout left over, we go trying again in search of
	 * more luck.
	 */
	if (!res && eavail &&
	    !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
		goto fetch_events;

	return res;
}

static inline int ep_events_available(struct eventpoll *ep)
{
	return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
}

如果未指定超时直接跳转到check_events,通常用于非阻塞fd。
指定超时,走正常流程fetch_events:
第35行,将当前进程挂在等待队列睡眠,当相应待监听事件就绪时会有回调ep_poll_callback唤醒。
第45行,唤醒时调用ep_events_available检查就绪链表(实现参见第82行),不像select每次都需要轮询,这里epoll只需要检查下就绪链表是否为空(时间复杂度O(1))
第74行,ep_send_events调用ep_scan_ready_list执行回调ep_read_events_proc遍历就绪链表并传入用户空间以及回调执行期间发生的事件通过eventpoll的ovflist(回调执行前置空)将就绪fd去重插入就绪链表以便下一次epoll_wait调用处理,这里还涉及ET和LT模式下的不同处理,暂时不作分析。

refer:
1> https://zhuanlan.zhihu.com/p/22803683

发表评论

电子邮件地址不会被公开。 必填项已用*标注