从epoll机制看MessageQueue
epoll机制
一句话解释:epoll
机制可以监听特定的fd
,当fd
收到内容时,发送事件回调。相比select
和poll
机制,效率更高。
epoll API
epoll_create(int size)
参数:
- size:表示最多可以监听多少个fd,新版本已弃用。
返回值:epoll实例的fd
-
>= 0
成功 -
< 0
失败
作用:
初始化epoll机制,调用API后,操作系统内核会产生一个eventpoll实例,并返回一个fd,这个fd就是epoll实例的句柄。
epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
参数:
- epfd: 方法1中创建的epoll实例的fd
- op: 操作指令
- EPOLL_CTL_ADD: 注册新的fd到epfd中
- EPOLL_CTL_MOD:修改已注册的fd的监听事件
- EPOLL_CTL_DEL:从epfd中删除一个fd
- fd:要监听的fd
- event:要监听的event
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; // 表示监听的事件类型(EPOLLIN/EPOLLHUP/EPOLLOUT...)
epoll_data_t data; // 用户自定义数据,当事件发生时将会原样返回给用户
};
返回值:
-
>= 0
成功 -
< 0
失败
作用:
注册、修改或删除监听的fd和事件。
epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
参数:
- epfd:方法1中创建的epoll实例的fd
- events:和2中结构一样
- maxevents:events数量
- timeout:等待超时时间。如果超过timeout还没有事件,则返回
返回值:
到来事件的个数,返回的事件存储在events数组中。
MessageQueue原理
大家都知道,Android的主线程的Looper
,本质是运行了一个死循环,不断从MessageQueue
中取消息执行,如果没有消息,则会等待在nativePollOnce
方法上,这个方法的底层原理就是epoll_wait
.
下面一起来看源码,弄清楚Looper
的整个流程是怎样的。
首先看Looper.java中的loop方法,这是我们启动一个线程looper的入口。
// Looper.java
public static void loop() {
final Looper me = myLooper();
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
这个方法就是开启一个无限循环,调用loopOnce
。
// Looper.java
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
// 从messagequeue中获取下一条message
Message msg = me.mQueue.next();
// 执行message
msg.target.dispatchMessage(msg);
// Android10开始,可以通过添加observer的方式,监听messsage的执行情况
if (observer != null) {
observer.messageDispatched(token, msg);
}
// 回收message
msg.recycleUnchecked();
return true;
}
当loopOnce
方法返回true之后,又会再次循环调到loopOnce
,重复执行。
接下来我们看MessageQueue
的next()
方法。
// MessageQueue.java
Message next() {
for (;;) {
// 通过epoll机制,等待消息,或超时唤醒
nativePollOnce(ptr, nextPollTimeoutMills);
synchronized (this) {
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// msg.target == null 表示是同步屏障
// 如果有同步屏障,则直接跳到下一个异步的消息(同步的消息都过滤掉,先不处理)
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// 如果当前还没到达message的执行时间, 则获取当前的时间差作为timeout
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
}else{
// 省略一些链表的操作 prevMsg.next = msg.next; msg.next = null;
// 直接返回已经到达执行时间的,第一条message
return msg;
}
}
}
}
}
这个方法,首先会调用nativePollOnce
这个native方法,等nativePollOnce
返回后,会去MessageQueue
的链表中取下一条待执行的message。
取message的方法:
- 取链表头的第一个message(
MessageQueue
中的message是按照时间顺序排列的,所以第一个就是最近的待执行的message) - 如果这个消息是同步屏障,则跳过所有同步消息,直接取下一个异步消息,返回
- 否则,判断当前message是否到执行时间,如果到执行时间,则直接返回,否则继续调
nativePollOnce
等待。
接下来看nativePollOnce
的实现。
// android_os_MessageQueue_nativePollOnce()
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
//将Java层传递下来的mPtr转换为nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 【3】
}
经过一系列调用,最后调到了Looper
中的pollInner
方法
Looper.cpp
int Looper::pollInner(int timeoutMillis) {
...
struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //fd最大个数为16
//等待事件发生或者超时,在nativeWake()方法,向管道写端写入字符,则该方法会返回;
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//循环遍历,处理所有的事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
// 如果是唤醒事件,则读取并清空管道数据
awoken();
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
// 如果是之前在mRequests中注册过的fd
//处理request,生成对应的reponse对象,push到响应数组
pushResponse(events, mRequests.valueAt(requestIndex));
}
}
}
Done: ;
//处理Native的Message,调用相应回调方法
while (mMessageEnvelopes.size() != 0) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{
handler->handleMessage(message); // 处理消息事件
}
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
//处理带有Callback()方法的Response事件,执行Reponse相应的回调方法
for (size_t i = 0; i < mResponses.size(); i++) {
if (response.request.ident == POLL_CALLBACK) {
// 处理请求的回调方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq); //移除fd
}
}
}
return result;
}
Looper.pollInner
主要做如下事情:
- 调用
epoll_wait
,等待在一些特定的fd上 - 当
epoll_wait
返回后(fd发生写入或超时时间到),执行唤醒的事件。- 如果唤醒的是
mWakeEventFd
,则直接调用awoken方法。 - 如果唤醒的是之前注册在
mRequests
中的fd,则将Request
生成一个对应的Response
,加入mResponses
集合
- 如果唤醒的是
- 处理native message,执行相应的回调方法
- 处理
mResponses
集合中的所有Response
事件,调用他们callback
的handleEvent
回调方法。(点击事件就是在这里被执行的)
我们来看看awoken
方法。它的逻辑很简单,就是循环读取fd中的全部内容。
// Looper.cpp
void Looper::awoken() {
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
关于mRequests
和mResponses
的逻辑,先挖个坑,后面的文章再讲。
epoll使用示例
下面我们写一个epoll
机制使用的示例代码。
在这个例子中,我们监听了一个sockfd
的管道端口,启动一个线程,等待在epoll_wait
上。一旦sockfd中写入数据,就可以唤醒我们的线程,进行读取。
#include
void MonitorInit::createEpoll(int sockfd) {
if(mSockFd == sockfd) return;
mEpollFd = epoll_create(EPOLL_MAX_EVENTS);
int epollEvents = 0;
epollEvents |= EPOLLIN;
struct epoll_event eventItem;
memset(&eventItem, 0, sizeof(epoll_event));
eventItem.events = epollEvents;
eventItem.data.fd = sockfd;
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, sockfd, &eventItem);
LLOG_ERROR(TAG_LOOPER, "Adding epoll event resutl %d", epollResult);
if(epollResult < 0){
LLOG_ERROR(TAG_LOOPER, "Error adding epoll event, fd %d, errno %d", sockfd, epollResult);
}
pthread_t thd;
// 开启一个线程,这个线程用来监听epoll_wait
pthread_create(&thd, nullptr, epollCallback, nullptr);
pthread_detach(thd);
mSockFd = sockfd;
}
void epollCallback(void *arg){
// 死循环,等待fd消息
while(loop){
int timeoutMillis = 100000;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
LLOG_ERROR(TAG_LOOPER, "receive event count %d", eventCount);
if(eventCount < 0){
LLOG_ERROR(TAG_LOOPER, "Poll failed with an unexpected error, errno=%d", errno);
}
if(eventCount == 0){
// 超时时间到
LLOG_ERROR(TAG_LOOPER, "pollOnce - timeout");
}
for(int i = 0; i < eventCount; i++){
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if(fd == mSockFd){
// 将fd的内容读出来
}
}
}
}
总结
MessageQueue核心原理:主线程通过Looper
中的死循环,不断从MessageQueue
中获取待指定的message。
- 如果有到执行时间的消息时,直接执行。
- 如果还没有到执行时间的消息,会通过
epoll_wait
等待在mWakeReadPipeFd
端口,等待内容写入,超时时间是下一个message
执行时间到现在的时间差。- 如果在等待的过程中,有新的消息插入队列,会往
mWakeReadPipeFd
端口写入数据,这样就能唤醒等待在这个上面的pollInner
方法,从而继续执行之后的message
。 - 如果等待的过程中,没有新的消息插入,则会在
timeout
时间到达的时候,唤醒,处理后面的message
。
- 如果在等待的过程中,有新的消息插入队列,会往
共有 0 条评论