从epoll机制看MessageQueue

epoll机制

一句话解释:epoll机制可以监听特定的fd,当fd收到内容时,发送事件回调。相比selectpoll机制,效率更高。

epoll API

  1. epoll_create(int size)

参数:

  • size:表示最多可以监听多少个fd,新版本已弃用。

返回值:epoll实例的fd

  • >= 0 成功
  • < 0 失败

作用:
初始化epoll机制,调用API后,操作系统内核会产生一个eventpoll实例,并返回一个fd,这个fd就是epoll实例的句柄。

  1. epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

参数:

  • epfd: 方法1中创建的epoll实例的fd
  • op: 操作指令
    • EPOLL_CTL_ADD: 注册新的fd到epfd中
    • EPOLL_CTL_MOD:修改已注册的fd的监听事件
    • EPOLL_CTL_DEL:从epfd中删除一个fd
  • fd:要监听的fd
  • event:要监听的event
typedef union epoll_data {
   void        *ptr;
   int          fd;
   uint32_t     u32;
   uint64_t     u64;
} epoll_data_t;

struct epoll_event {
   uint32_t     events;  // 表示监听的事件类型(EPOLLIN/EPOLLHUP/EPOLLOUT...)
   epoll_data_t data; // 用户自定义数据,当事件发生时将会原样返回给用户
};

返回值:

  • >= 0 成功
  • < 0 失败

作用:
注册、修改或删除监听的fd和事件。

  1. epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

参数:

  • epfd:方法1中创建的epoll实例的fd
  • events:和2中结构一样
  • maxevents:events数量
  • timeout:等待超时时间。如果超过timeout还没有事件,则返回

返回值:
到来事件的个数,返回的事件存储在events数组中。

MessageQueue原理

大家都知道,Android的主线程的Looper,本质是运行了一个死循环,不断从MessageQueue中取消息执行,如果没有消息,则会等待在nativePollOnce方法上,这个方法的底层原理就是epoll_wait.

下面一起来看源码,弄清楚Looper的整个流程是怎样的。

首先看Looper.java中的loop方法,这是我们启动一个线程looper的入口。
// Looper.java

public static void loop() {
  final Looper me = myLooper();
  for (;;) {
    if (!loopOnce(me, ident, thresholdOverride)) {
      return;
    }
  }
}

这个方法就是开启一个无限循环,调用loopOnce
// Looper.java

private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
  // 从messagequeue中获取下一条message
  Message msg = me.mQueue.next();
  // 执行message
  msg.target.dispatchMessage(msg);
  // Android10开始,可以通过添加observer的方式,监听messsage的执行情况
  if (observer != null) {
    observer.messageDispatched(token, msg);
  }
  //  回收message
  msg.recycleUnchecked();
  return true;
}

loopOnce方法返回true之后,又会再次循环调到loopOnce,重复执行。
接下来我们看MessageQueuenext()方法。

// MessageQueue.java

Message next() {
  for (;;) {
    // 通过epoll机制,等待消息,或超时唤醒
    nativePollOnce(ptr, nextPollTimeoutMills);
    synchronized (this) {
       Message prevMsg = null;
       Message msg = mMessages;
       if (msg != null && msg.target == null) {
          // msg.target == null 表示是同步屏障
          // 如果有同步屏障,则直接跳到下一个异步的消息(同步的消息都过滤掉,先不处理)
          do {
             prevMsg = msg;
             msg = msg.next;
          } while (msg != null && !msg.isAsynchronous());
       }
       if (msg != null) {
         if (now < msg.when) {
          // 如果当前还没到达message的执行时间, 则获取当前的时间差作为timeout
          nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
         }else{
           // 省略一些链表的操作 prevMsg.next = msg.next; msg.next = null;
           // 直接返回已经到达执行时间的,第一条message
           return msg;
         }
       }
    }
  }
}

这个方法,首先会调用nativePollOnce这个native方法,等nativePollOnce返回后,会去MessageQueue的链表中取下一条待执行的message。

取message的方法:

  • 取链表头的第一个message(MessageQueue中的message是按照时间顺序排列的,所以第一个就是最近的待执行的message)
  • 如果这个消息是同步屏障,则跳过所有同步消息,直接取下一个异步消息,返回
  • 否则,判断当前message是否到执行时间,如果到执行时间,则直接返回,否则继续调nativePollOnce等待。

接下来看nativePollOnce的实现。

// android_os_MessageQueue_nativePollOnce()

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    //将Java层传递下来的mPtr转换为nativeMessageQueue
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 【3】
}

经过一系列调用,最后调到了Looper中的pollInner方法

Looper.cpp

int Looper::pollInner(int timeoutMillis) {
    ...
    struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //fd最大个数为16
    //等待事件发生或者超时,在nativeWake()方法,向管道写端写入字符,则该方法会返回;
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    //循环遍历,处理所有的事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                // 如果是唤醒事件,则读取并清空管道数据
                awoken(); 
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                // 如果是之前在mRequests中注册过的fd
                //处理request,生成对应的reponse对象,push到响应数组
                pushResponse(events, mRequests.valueAt(requestIndex));
            }
        }
    }
Done: ;
    //处理Native的Message,调用相应回调方法
    while (mMessageEnvelopes.size() != 0) {
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            {
                handler->handleMessage(message);  // 处理消息事件
            }
        } else {
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    //处理带有Callback()方法的Response事件,执行Reponse相应的回调方法
    for (size_t i = 0; i < mResponses.size(); i++) {
        if (response.request.ident == POLL_CALLBACK) {
            // 处理请求的回调方法
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq); //移除fd
            }
        }
    }
    return result;
}

Looper.pollInner主要做如下事情:

  • 调用epoll_wait,等待在一些特定的fd上
  • epoll_wait返回后(fd发生写入或超时时间到),执行唤醒的事件。
    • 如果唤醒的是mWakeEventFd,则直接调用awoken方法。
    • 如果唤醒的是之前注册在mRequests中的fd,则将Request生成一个对应的Response,加入mResponses集合
  • 处理native message,执行相应的回调方法
  • 处理mResponses集合中的所有Response事件,调用他们callbackhandleEvent回调方法。(点击事件就是在这里被执行的)

我们来看看awoken方法。它的逻辑很简单,就是循环读取fd中的全部内容。
// Looper.cpp

void Looper::awoken() {
    char buffer[16];
    ssize_t nRead;
    do {
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

关于mRequestsmResponses的逻辑,先挖个坑,后面的文章再讲。

epoll使用示例

下面我们写一个epoll机制使用的示例代码

在这个例子中,我们监听了一个sockfd的管道端口,启动一个线程,等待在epoll_wait上。一旦sockfd中写入数据,就可以唤醒我们的线程,进行读取。

#include 

void MonitorInit::createEpoll(int sockfd) {
    if(mSockFd == sockfd) return;
    mEpollFd = epoll_create(EPOLL_MAX_EVENTS);
    int epollEvents = 0;
    epollEvents |= EPOLLIN;
    struct epoll_event eventItem;
    memset(&eventItem, 0, sizeof(epoll_event));
    eventItem.events = epollEvents;
    eventItem.data.fd = sockfd;
    int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, sockfd, &eventItem);
    LLOG_ERROR(TAG_LOOPER, "Adding epoll event resutl %d", epollResult);
    if(epollResult < 0){
        LLOG_ERROR(TAG_LOOPER, "Error adding epoll event, fd %d, errno %d", sockfd, epollResult);
    }
    pthread_t thd;
    // 开启一个线程,这个线程用来监听epoll_wait
    pthread_create(&thd, nullptr, epollCallback, nullptr);
    pthread_detach(thd);
    mSockFd = sockfd;
}

void epollCallback(void *arg){
    // 死循环,等待fd消息
    while(loop){
        int timeoutMillis = 100000;
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
        LLOG_ERROR(TAG_LOOPER, "receive event count %d", eventCount);
        if(eventCount < 0){
            LLOG_ERROR(TAG_LOOPER, "Poll failed with an unexpected error, errno=%d", errno);
        }
        if(eventCount == 0){
            // 超时时间到
            LLOG_ERROR(TAG_LOOPER, "pollOnce - timeout");
        }
        for(int i = 0; i < eventCount; i++){
            int fd = eventItems[i].data.fd;
            uint32_t epollEvents = eventItems[i].events;
            if(fd == mSockFd){
                // 将fd的内容读出来
            }
        }
    }
}

总结

MessageQueue核心原理:主线程通过Looper中的死循环,不断从MessageQueue中获取待指定的message。

  • 如果有到执行时间的消息时,直接执行。
  • 如果还没有到执行时间的消息,会通过epoll_wait等待在mWakeReadPipeFd端口,等待内容写入,超时时间是下一个message执行时间到现在的时间差。
    • 如果在等待的过程中,有新的消息插入队列,会往mWakeReadPipeFd端口写入数据,这样就能唤醒等待在这个上面的pollInner方法,从而继续执行之后的message
    • 如果等待的过程中,没有新的消息插入,则会在timeout时间到达的时候,唤醒,处理后面的message

扩展阅读

深入理解Linux的epoll机制

版权声明:
作者:Mr李
链接:https://www.techfm.club/p/43226.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>