文章目录
- EventRunner是一种事件循环器,循环处理从该EventRunner创建的新线程的事件队列中获取InnerEvent事件。InnerEvent是EventHandler投递的事件。
EventHandler是一种用户在当前线程上投递InnerEvent事件到异步线程上处理的机制。每一个EventHandler和指定的EventRunner所创建的新线程绑定,并且该新线程内部有一个事件队列。EventHandler可以投递指定的InnerEvent事件到这个事件队列。EventRunner从事件队列里循环地取出事件,并在EventRunner所在线程执行processEvent回调。一般,EventHandler有两个主要作用:
在不同线程间分发和处理InnerEvent事件。
延迟处理InnerEvent事件。
- 使用EventHandler实现线程间通信的主要流程: EventHandler投递具体的InnerEvent事件到EventRunner所创建的线程的事件队列。 EventRunner循环从事件队列中获取InnerEvent事件。 新线程上处理该事件:触发InnerEvent的回调方法并触发EventHandler的处理方法。 接口说明:
- 进程内的事件。
- 发送事件时传递的数据。
- 用于表示事件被投递的优先级。 代码目录: foundation/└──foundation/appexecfwk/standard ├── interfaces │ ├── innerkits │ | └── libeventhandler # 内部接口 │ └── napi │ └── eventhandler # NAPI接口 └──— libs/libeventhandler # 机制实现代码 类图:
- EventRunner的create方法在创建Runner线程的同时会实例化一个EventQueue,并赋值给类内部的成员变量queue_ 。 EventRunner的Runner线程轮询queue_,取出队列中的InnerEvent事件,并处理该事件。
- EventHandler的各Send方法通过调用EventQueue的Insert方法将InnerEvent插入EventQueue的subEventQueues_中 或 idleEvents__ 中。 当调用Post方法,投递某个回调函数时,该回调函数会赋值给InnerEvent的taskCallback_,并通过Send方法将将拥有回调函数的InnerEvent插入队列。 其中,subEventQueues_是size为3的SubEventQueue的列表,3个SubEventQueue分别用于存储三种不同优先级(IMMEDIATE/HIGH/LOW)的InnerEvent。 idleEvents_用于存储优先级为IDLE的InnerEvent。 同理,Remove方法即从上述subEventQueue_ 或 idleEvent_ 中删除某事件。
- 通过成员变量 ioWaiter_ 取出定时或延时的InnerEvent事件。同时实现对文件描述符监听事件的回调。 ioWaiter_ 成员的类型默认使用NoneIoWaiter , 此时不支持文件描述符的监听。队列中的延时或定时事件的取出,通过NoneIoWaiter调用 std::condition_variable机制实现。 当EventHandler中的AddFileDescriptorListener被调用时,ioWaiter_成员的类型自动转换成EpollIoWaiter类型。EpollIoWaiter支持文件描述符的监听事件。此时,队列中的延时或定时事件的取出,通过系统的epoll机制实现。
-
- // Start event looper. for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) { //监听轮询queue_ std::shared_ptr<EventHandler> handler = event->GetOwner(); // Make sure owner of the event exists. if (handler) { std::shared_ptr<Logger> logging = logger_; std::stringstream address; address << handler.get(); if (logging != nullptr) { if (!event->HasTask()) { logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId())); } else { logging->Log("Dispatching to handler event task name = " + event->GetTaskName()); } } handler->DistributeEvent(event); //执行aInnerEvent事件 if (logging != nullptr) { logging->Log("Finished to handler(0x" + address.str() + ")"); } } // Release event manually, otherwise event will be released until next event coming. event.reset(); }
- void EventQueue::Insert(InnerEvent::Pointer &event, Priority priority) //事件或任务的插入{ if (!event) { HILOGE("Insert: Could not insert an invalid event"); return; } std::lock_guard<std::mutex> lock(queueLock_); bool needNotify = false; switch (priority) { //根据Event的优先级将该Event插入不同的事件队列 case Priority::IMMEDIATE: case Priority::HIGH: case Priority::LOW: { needNotify = (event->GetHandleTime() < wakeUpTime_); InsertEventsLocked(subEventQueues_[static_cast<uint32_t>(priority)].queue, event); break; } case Priority::IDLE: { // Never wake up thread if insert an idle event. InsertEventsLocked(idleEvents_, event); break; } default: break; } if (needNotify) { ioWaiter_->NotifyOne(); }} InnerEvent::Pointer EventQueue::GetEvent() //事件或任务的取出{ std::unique_lock<std::mutex> lock(queueLock_); while (!finished_) { InnerEvent::TimePoint nextWakeUpTime = InnerEvent::TimePoint::max(); InnerEvent::Pointer event = GetExpiredEventLocked(nextWakeUpTime); if (event) { return event; } WaitUntilLocked(nextWakeUpTime, lock); } HILOGD("GetEvent: Break out"); return InnerEvent::Pointer(nullptr, nullptr);}
- void EventHandler::DistributeEvent(const InnerEvent::Pointer &event) //执行InnerEvent事件{ if (!event) { HILOGE("DistributeEvent: Could not distribute an invalid event"); return; } // Save old event handler. std::weak_ptr<EventHandler> oldHandler = currentEventHandler; // Save current event handler into thread local data. currentEventHandler = shared_from_this(); auto spanId = event->GetTraceId(); auto traceId = HiTrace::GetId(); bool allowTraceOutPut = AllowHiTraceOutPut(spanId, event->HasWaiter()); if (allowTraceOutPut) { HiTrace::SetId(*spanId); HiTracePointerOutPut(spanId, event, "Receive", HiTraceTracepointType::HITRACE_TP_SR); } InnerEvent::TimePoint nowStart = InnerEvent::Clock::now(); DeliveryTimeAction(event, nowStart); if (event->HasTask()) { //如果有回调函数 // Call task callback directly if contains a task. (event->GetTaskCallback())(); //执行回调函数 } else { // Otherwise let developers to handle it. ProcessEvent(event); //执行ProcessEvent } DistributeTimeAction(event, nowStart); if (allowTraceOutPut) { HiTrace::Tracepoint(HiTraceTracepointType::HITRACE_TP_SS, *spanId, "Event Distribute over"); HiTrace::ClearId(); if (traceId.IsValid()) { HiTrace::SetId(traceId); } } // Restore current event handler. if (oldHandler.expired()) { currentEventHandler = nullptr; } else { currentEventHandler = oldHandler; }} 想了解更多关于开源的内容,请访问: 51CTO 开源基础软件社区 https://ost.51cto.com。

EventHandler是用于处理线程间通信的一种机制,可以通过EventRunner创建新线程,将耗时的操作放到新线程上执行。这样既不阻塞原来的线程,任务又可以得到合理的处理。比如:主线程使用EventHandler创建子线程,子线程做耗时的下载图片操作,下载完成后,子线程通过EventHandler通知主线程,主线程再更新UI。
EventRunner是一种事件循环器,循环处理从该EventRunner创建的新线程的事件队列中获取InnerEvent事件。InnerEvent是EventHandler投递的事件。
EventHandler是一种用户在当前线程上投递InnerEvent事件到异步线程上处理的机制。每一个EventHandler和指定的EventRunner所创建的新线程绑定,并且该新线程内部有一个事件队列。EventHandler可以投递指定的InnerEvent事件到这个事件队列。EventRunner从事件队列里循环地取出事件,并在EventRunner所在线程执行processEvent回调。一般,EventHandler有两个主要作用:
- 在不同线程间分发和处理InnerEvent事件。
- 延迟处理InnerEvent事件。

使用EventHandler实现线程间通信的主要流程:
- EventHandler投递具体的InnerEvent事件到EventRunner所创建的线程的事件队列。
- EventRunner循环从事件队列中获取InnerEvent事件。
- 新线程上处理该事件:触发InnerEvent的回调方法并触发EventHandler的处理方法。
接口说明:

进程内的事件。

发送事件时传递的数据。

用于表示事件被投递的优先级。

代码目录:
foundation/
└──foundation/appexecfwk/standard
├── interfaces
│ ├── innerkits
│ | └── libeventhandler # 内部接口
│ └── napi
│ └── eventhandler # NAPI接口
└──— libs/libeventhandler # 机制实现代码
类图:

EventRunner的create方法在创建Runner线程的同时会实例化一个EventQueue,并赋值给类内部的成员变量queue_ 。 EventRunner的Runner线程轮询queue_,取出队列中的InnerEvent事件,并处理该事件。
EventHandler的各Send方法通过调用EventQueue的Insert方法将InnerEvent插入EventQueue的subEventQueues_中 或 idleEvents__ 中。
当调用Post方法,投递某个回调函数时,该回调函数会赋值给InnerEvent的taskCallback_,并通过Send方法将将拥有回调函数的InnerEvent插入队列。
其中,subEventQueues_是size为3的SubEventQueue的列表,3个SubEventQueue分别用于存储三种不同优先级(IMMEDIATE/HIGH/LOW)的InnerEvent。
idleEvents_用于存储优先级为IDLE的InnerEvent。
同理,Remove方法即从上述subEventQueue_ 或 idleEvent_ 中删除某事件。
通过成员变量 ioWaiter_ 取出定时或延时的InnerEvent事件。同时实现对文件描述符监听事件的回调。
ioWaiter_ 成员的类型默认使用NoneIoWaiter , 此时不支持文件描述符的监听。队列中的延时或定时事件的取出,通过NoneIoWaiter调用 std::condition_variable机制实现。
当EventHandler中的AddFileDescriptorListener被调用时,ioWaiter_成员的类型自动转换成EpollIoWaiter类型。EpollIoWaiter支持文件描述符的监听事件。此时,队列中的延时或定时事件的取出,通过系统的epoll机制实现。
// Start event looper.
for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) { //监听轮询queue_
std::shared_ptr<EventHandler> handler = event->GetOwner();
// Make sure owner of the event exists.
if (handler) {
std::shared_ptr<Logger> logging = logger_;
std::stringstream address;
address << handler.get();
if (logging != nullptr) {
if (!event->HasTask()) {
logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
} else {
logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
}
}
handler->DistributeEvent(event); //执行aInnerEvent事件
if (logging != nullptr) {
logging->Log("Finished to handler(0x" + address.str() + ")");
}
}
// Release event manually, otherwise event will be released until next event coming.
event.reset();
}
for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) { //监听轮询queue_
std::shared_ptr<EventHandler> handler = event->GetOwner();
// Make sure owner of the event exists.
if (handler) {
std::shared_ptr<Logger> logging = logger_;
std::stringstream address;
address << handler.get();
if (logging != nullptr) {
if (!event->HasTask()) {
logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
} else {
logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
}
}
handler->DistributeEvent(event); //执行aInnerEvent事件
if (logging != nullptr) {
logging->Log("Finished to handler(0x" + address.str() + ")");
}
}
// Release event manually, otherwise event will be released until next event coming.
event.reset();
}
void EventQueue::Insert(InnerEvent::Pointer &event, Priority priority) //事件或任务的插入
{
if (!event) {
HILOGE("Insert: Could not insert an invalid event");
return;
}
std::lock_guard<std::mutex> lock(queueLock_);
bool needNotify = false;
switch (priority) { //根据Event的优先级将该Event插入不同的事件队列
case Priority::IMMEDIATE:
case Priority::HIGH:
case Priority::LOW: {
needNotify = (event->GetHandleTime() < wakeUpTime_);
InsertEventsLocked(subEventQueues_[static_cast<uint32_t>(priority)].queue, event);
break;
}
case Priority::IDLE: {
// Never wake up thread if insert an idle event.
InsertEventsLocked(idleEvents_, event);
break;
}
default:
break;
}
if (needNotify) {
ioWaiter_->NotifyOne();
}
}
InnerEvent::Pointer EventQueue::GetEvent() //事件或任务的取出
{
std::unique_lock<std::mutex> lock(queueLock_);
while (!finished_) {
InnerEvent::TimePoint nextWakeUpTime = InnerEvent::TimePoint::max();
InnerEvent::Pointer event = GetExpiredEventLocked(nextWakeUpTime);
if (event) {
return event;
}
WaitUntilLocked(nextWakeUpTime, lock);
}
HILOGD("GetEvent: Break out");
return InnerEvent::Pointer(nullptr, nullptr);
}
{
if (!event) {
HILOGE("Insert: Could not insert an invalid event");
return;
}
std::lock_guard<std::mutex> lock(queueLock_);
bool needNotify = false;
switch (priority) { //根据Event的优先级将该Event插入不同的事件队列
case Priority::IMMEDIATE:
case Priority::HIGH:
case Priority::LOW: {
needNotify = (event->GetHandleTime() < wakeUpTime_);
InsertEventsLocked(subEventQueues_[static_cast<uint32_t>(priority)].queue, event);
break;
}
case Priority::IDLE: {
// Never wake up thread if insert an idle event.
InsertEventsLocked(idleEvents_, event);
break;
}
default:
break;
}
if (needNotify) {
ioWaiter_->NotifyOne();
}
}
{
std::unique_lock<std::mutex> lock(queueLock_);
while (!finished_) {
InnerEvent::TimePoint nextWakeUpTime = InnerEvent::TimePoint::max();
InnerEvent::Pointer event = GetExpiredEventLocked(nextWakeUpTime);
if (event) {
return event;
}
WaitUntilLocked(nextWakeUpTime, lock);
}
HILOGD("GetEvent: Break out");
return InnerEvent::Pointer(nullptr, nullptr);
}
void EventHandler::DistributeEvent(const InnerEvent::Pointer &event) //执行InnerEvent事件
{
if (!event) {
HILOGE("DistributeEvent: Could not distribute an invalid event");
return;
}
// Save old event handler.
std::weak_ptr<EventHandler> oldHandler = currentEventHandler;
// Save current event handler into thread local data.
currentEventHandler = shared_from_this();
auto spanId = event->GetTraceId();
auto traceId = HiTrace::GetId();
bool allowTraceOutPut = AllowHiTraceOutPut(spanId, event->HasWaiter());
if (allowTraceOutPut) {
HiTrace::SetId(*spanId);
HiTracePointerOutPut(spanId, event, "Receive", HiTraceTracepointType::HITRACE_TP_SR);
}
InnerEvent::TimePoint nowStart = InnerEvent::Clock::now();
DeliveryTimeAction(event, nowStart);
if (event->HasTask()) { //如果有回调函数
// Call task callback directly if contains a task.
(event->GetTaskCallback())(); //执行回调函数
} else {
// Otherwise let developers to handle it.
ProcessEvent(event); //执行ProcessEvent
}
DistributeTimeAction(event, nowStart);
if (allowTraceOutPut) {
HiTrace::Tracepoint(HiTraceTracepointType::HITRACE_TP_SS, *spanId, "Event Distribute over");
HiTrace::ClearId();
if (traceId.IsValid()) {
HiTrace::SetId(traceId);
}
}
// Restore current event handler.
if (oldHandler.expired()) {
currentEventHandler = nullptr;
} else {
currentEventHandler = oldHandler;
}
}
{
if (!event) {
HILOGE("DistributeEvent: Could not distribute an invalid event");
return;
}
// Save old event handler.
std::weak_ptr<EventHandler> oldHandler = currentEventHandler;
// Save current event handler into thread local data.
currentEventHandler = shared_from_this();
auto spanId = event->GetTraceId();
auto traceId = HiTrace::GetId();
bool allowTraceOutPut = AllowHiTraceOutPut(spanId, event->HasWaiter());
if (allowTraceOutPut) {
HiTrace::SetId(*spanId);
HiTracePointerOutPut(spanId, event, "Receive", HiTraceTracepointType::HITRACE_TP_SR);
}
InnerEvent::TimePoint nowStart = InnerEvent::Clock::now();
DeliveryTimeAction(event, nowStart);
if (event->HasTask()) { //如果有回调函数
// Call task callback directly if contains a task.
(event->GetTaskCallback())(); //执行回调函数
} else {
// Otherwise let developers to handle it.
ProcessEvent(event); //执行ProcessEvent
}
DistributeTimeAction(event, nowStart);
if (allowTraceOutPut) {
HiTrace::Tracepoint(HiTraceTracepointType::HITRACE_TP_SS, *spanId, "Event Distribute over");
HiTrace::ClearId();
if (traceId.IsValid()) {
HiTrace::SetId(traceId);
}
}
// Restore current event handler.
if (oldHandler.expired()) {
currentEventHandler = nullptr;
} else {
currentEventHandler = oldHandler;
}
}