Skip to content

Commit 7ab32b4

Browse files
committed
free lock
1 parent 4e505c3 commit 7ab32b4

File tree

5 files changed

+205
-53
lines changed

5 files changed

+205
-53
lines changed

burger/base/MpscQueue.h

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#ifndef MPSCQUEUE_H
2+
#define MPSCQUEUE_H
3+
4+
#include <atomic>
5+
#include <boost/noncopyable.hpp>
6+
// https://stackoom.com/question/3hBmO/MPSC%E9%98%9F%E5%88%97-%E7%AB%9E%E8%B5%9B%E6%9D%A1%E4%BB%B6
7+
// todo test
8+
namespace burger {
9+
template <typename T>
10+
class MpscQueue : boost::noncopyable {
11+
public:
12+
MpscQueue();
13+
~MpscQueue();
14+
void enqueue(T &&input);
15+
void enqueue(const T &input);
16+
bool dequeue(T &output);
17+
bool empty();
18+
private:
19+
struct BufferNode {
20+
BufferNode() = default;
21+
BufferNode(const T &data) : dataPtr_(new T(data)) {}
22+
BufferNode(T &&data) : dataPtr_(new T(std::move(data))) {}
23+
T *dataPtr_;
24+
std::atomic<BufferNode *> next_{nullptr};
25+
};
26+
27+
std::atomic<BufferNode *> head_;
28+
std::atomic<BufferNode *> tail_;
29+
};
30+
template<typename T>
31+
MpscQueue<T>::MpscQueue()
32+
: head_(new BufferNode),
33+
tail_(head_.load(std::memory_order_relaxed)){
34+
}
35+
36+
template<typename T>
37+
MpscQueue<T>::~MpscQueue() {
38+
T output;
39+
while (this->dequeue(output)) {}
40+
BufferNode *front = head_.load(std::memory_order_relaxed);
41+
delete front;
42+
}
43+
44+
template<typename T>
45+
void MpscQueue<T>::enqueue(T &&input) {
46+
BufferNode *node{new BufferNode(std::move(input))};
47+
BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)};
48+
prevhead->next_.store(node, std::memory_order_release);
49+
}
50+
51+
template<typename T>
52+
void MpscQueue<T>::enqueue(const T &input) {
53+
BufferNode *node{new BufferNode(input)};
54+
BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)};
55+
prevhead->next_.store(node, std::memory_order_release);
56+
}
57+
58+
template<typename T>
59+
bool MpscQueue<T>::dequeue(T &output) {
60+
BufferNode *tail = tail_.load(std::memory_order_relaxed);
61+
BufferNode *next = tail->next_.load(std::memory_order_acquire);
62+
63+
if (next == nullptr) {
64+
return false;
65+
}
66+
output = std::move(*(next->dataPtr_));
67+
delete next->dataPtr_;
68+
tail_.store(next, std::memory_order_release);
69+
delete tail;
70+
return true;
71+
}
72+
73+
template<typename T>
74+
bool MpscQueue<T>::empty() {
75+
BufferNode *tail = tail_.load(std::memory_order_relaxed);
76+
BufferNode *next = tail->next_.load(std::memory_order_acquire);
77+
return next == nullptr;
78+
}
79+
80+
81+
} // namespace burger
82+
83+
84+
85+
#endif // MPSCQUEUE_H

burger/net/Channel.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ class EventLoop;
1515
class Epoll;
1616
// This class doesn't own the file descriptor.
1717
// Channel的成员函数都只能再IO线程调用,所以更新数据不需要加锁
18+
// reactor的核心 : 事件分发机制,拿到IO事件分发给各个文件描述符(fd)的事件处理函数
19+
// 每个Channel对象自始至终都属于一个eventLoop, 只属于某一个IO线程, 只负责一个fd
20+
// 把不同的IO事件分发给不同的回调, 一般不给用户使用,封装给更高层的TcpConnection
1821
class Channel : boost::noncopyable {
1922
public:
2023
enum class Status {
@@ -32,10 +35,14 @@ class Channel : boost::noncopyable {
3235
~Channel();
3336

3437
void handleEvent(Timestamp receiveTime);
35-
// void setReadCallback(const ReadEventCallback& cb) { readCallback_ = cb; }
36-
void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
37-
void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }
38-
void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }
38+
39+
void setReadCallback(const ReadEventCallback& cb) { readCallback_ = cb; }
40+
void setReadCallback(ReadEventCallback&& cb) { readCallback_ = std::move(cb); }
41+
void setWriteCallback(const EventCallback cb) { writeCallback_ = cb; }
42+
void setWriteCallback(EventCallback&& cb) { writeCallback_ = std::move(cb); }
43+
void setCloseCallback(const EventCallback& cb) { closeCallback_ = cb; }
44+
void setCloseCallback(EventCallback&& cb) { closeCallback_ = std::move(cb); }
45+
void setErrorCallback(const EventCallback&& cb) { errorCallback_ = cb; }
3946
void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }
4047

4148
void enableReading() { events_ |= kReadEvent; update(); }

burger/net/EventLoop.cc

Lines changed: 66 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,19 @@ class IgnoreSigPipe {
3434
#pragma GCC diagnostic error "-Wold-style-cast"
3535
IgnoreSigPipe initObj;
3636

37-
// 和IgnoreSigPipe一样再全局中init
38-
class Log {
39-
public:
40-
Log() {
41-
if (!Logger::Instance().init("log", "logs/test.log", spdlog::level::trace)) {
42-
ERROR("Logger init error");
43-
} else {
44-
TRACE("Logger setup");
45-
}
46-
}
47-
};
48-
49-
Log initLog;
37+
// // 和IgnoreSigPipe一样再全局中init
38+
// class Log {
39+
// public:
40+
// Log() {
41+
// if (!Logger::Instance().init("log", "logs/test.log", spdlog::level::trace)) {
42+
// ERROR("Logger init error");
43+
// } else {
44+
// TRACE("Logger setup");
45+
// }
46+
// }
47+
// };
48+
49+
// Log initLog;
5050

5151
} // namespace
5252

@@ -61,8 +61,10 @@ EventLoop::EventLoop() :
6161
epoll_(util::make_unique<Epoll>(this)),
6262
timerQueue_(util::make_unique<TimerQueue>(this)),
6363
wakeupFd_(createEventfd()),
64-
wakeupChannel_(util::make_unique<Channel>(this, wakeupFd_)) {
65-
TRACE("EventLoop created {} ", fmt::ptr(this));
64+
wakeupChannel_(util::make_unique<Channel>(this, wakeupFd_)),
65+
currentActiveChannel_(nullptr),
66+
threadLocalLoopPtr_(&t_loopInthisThread) {
67+
TRACE("EventLoop created {}", fmt::ptr(this));
6668
if(t_loopInthisThread) {
6769
CRITICAL("Another EventLoop {} exists in this Thread( tid = {} ) ...", fmt::ptr(t_loopInthisThread), util::tid());
6870
} else {
@@ -100,13 +102,13 @@ void EventLoop::loop() {
100102
// TODO : sort channel by priority
101103
eventHandling_ = true;
102104
for(auto channel: activeChannels_) {
103-
// currentActiveChannel_ 在removeChannel()中需要做判断
105+
// currentActiveChannel_ 在removeChannel()中需要做判断 todo 理
104106
currentActiveChannel_ = channel;
105107
currentActiveChannel_->handleEvent(epollWaitReturnTime_);
106108
}
107109
currentActiveChannel_ = nullptr;
108110
eventHandling_ = false;
109-
doPendingFunctors();
111+
doQueueInLoopFuncs();
110112
}
111113
TRACE("EventLoop {} stop looping", fmt::ptr(this));
112114
looping_ = false;
@@ -132,33 +134,41 @@ void EventLoop::quit() {
132134
// 为其他线程提供一个接口,其他线程将要执行的任务用这个接口交给当前线程
133135
// 这样当前线程统一处理自己的资源,而不用加锁,唯一需要加锁的地方就是
134136
// 通过接口添加任务的任务队列这个地方,大大减小了锁粒度
135-
void EventLoop::runInLoop(const Functor& func) {
137+
void EventLoop::runInLoop(const Func& func) {
136138
if(isInLoopThread()) {
137139
func(); // 如果在当前IO线程中调用,则同步调用
138140
} else {
139141
queueInLoop(func); // 如果在其他线程中调用该函数,则异步调用,用queueInLoop添加到任务队列中
140142
}
141143
}
142-
// 加入队列中,等待被执行,该函数可以跨线程调用,即其他线程可以给当前线程添加任务
143-
void EventLoop::queueInLoop(const Functor& func) {
144-
{
145-
std::lock_guard<std::mutex> lock(mutex_);
146-
pendingFunctors_.push_back(std::move(func));
144+
145+
void EventLoop::runInLoop(Func&& func) {
146+
if(isInLoopThread()) {
147+
func();
148+
} else {
149+
queueInLoop(std::move(func));
147150
}
151+
}
152+
// 加入队列中,等待被执行,该函数可以跨线程调用,即其他线程可以给当前线程添加任务
153+
void EventLoop::queueInLoop(const Func& func) {
154+
queueFuncs_.enqueue(func); // 无锁队列入队
148155
// 如果不是当前线程(可能阻塞在wait),需要唤醒
149156
// 或者是当前线程但是在正在处理队列中的任务(使得处理完当前队列中的元素后立即在进行下一轮处理,因为在这里又添加了任务)需要唤醒
150157
// 只有当前IO线程的事件回调中调用queueInLoop才不需要唤醒(因为执行完handleEvent会自然执行loop()中的doPendingFunctor)
151-
if(!isInLoopThread() || callingPendingFunctors_) {
158+
if(!isInLoopThread() || !looping_) { // todo !looping
152159
wakeup();
153160
}
154161
}
155162

156-
size_t EventLoop::queueSize() const {
157-
std::lock_guard<std::mutex> lock(mutex_);
158-
return pendingFunctors_.size();
163+
void EventLoop::queueInLoop(Func&& func) {
164+
queueFuncs_.enqueue(std::move(func));
165+
if(!isInLoopThread() || !looping_) { // todo !looping
166+
wakeup();
167+
}
159168
}
160169

161170

171+
162172
TimerId EventLoop::runAt(Timestamp time, TimerCallback timercb) {
163173
return timerQueue_->addTimer(std::move(timercb), time, 0.0);
164174
}
@@ -187,6 +197,7 @@ bool EventLoop::isInLoopThread() const {
187197
return threadId_ == util::tid();
188198
}
189199

200+
// 如果当前线程不是IO线程则为nullptr
190201
EventLoop* EventLoop::getEventLoopOfCurrentThread() {
191202
return t_loopInthisThread;
192203
}
@@ -202,6 +213,26 @@ void EventLoop::wakeup() {
202213
ERROR("EventLoop::wakeup() writes {} bytes instead of 8", n);
203214
}
204215
}
216+
void EventLoop::moveToCurrentThread() {
217+
if (isRunning()) {
218+
CRITICAL("EventLoop cannot be moved when running");
219+
}
220+
if (isInLoopThread()) {
221+
WARN("This EventLoop is already in the current thread");
222+
return;
223+
}
224+
if (t_loopInthisThread) {
225+
CRITICAL("There is already an EventLoop in this thread, you cannot move another in");
226+
}
227+
// *threadLocalLoopPtr_ = nullptr;
228+
t_loopInthisThread = this;
229+
threadLocalLoopPtr_ = &t_loopInthisThread;
230+
threadId_ = util::tid();
231+
}
232+
233+
bool EventLoop::isRunning() {
234+
return looping_ && (!quit_);
235+
}
205236

206237
void EventLoop::updateChannel(Channel* channel) {
207238
assert(channel->ownerLoop() == this);
@@ -227,7 +258,6 @@ bool EventLoop::hasChannel(Channel* channel) {
227258
}
228259

229260
void EventLoop::abortNotInLoopThread() {
230-
std::cout << "11" << std::endl;
231261
CRITICAL("EventLoop::abortNotInLoopThread - EventLoop {} was \
232262
created in threadId_ = {}, and current id = {} ...",
233263
fmt::ptr(this), threadId_, util::tid());
@@ -256,17 +286,15 @@ void EventLoop::printActiveChannels() const {
256286

257287
// 3. 没有反复执行doPendingFunctors()直到pendingFunctors为空,
258288
// 这是有意的,否则IO线程可能陷入死循环,无法处理IO事件。
259-
void EventLoop::doPendingFunctors() {
260-
std::vector<Functor> functors;
261-
callingPendingFunctors_ = true;
262-
{
263-
std::lock_guard<std::mutex> lock(mutex_);
264-
functors.swap(pendingFunctors_);
265-
}
266-
for(const auto& functor: functors) {
267-
functor();
289+
void EventLoop::doQueueInLoopFuncs() {
290+
callingQueueFuncs_ = true;
291+
while(!queueFuncs_.empty()) {
292+
Func func;
293+
while(queueFuncs_.dequeue(func)) {
294+
func();
295+
}
268296
}
269-
callingPendingFunctors_ = false;
297+
callingQueueFuncs_ = false;
270298
}
271299

272300

burger/net/EventLoop.h

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#include "burger/base/Timestamp.h"
1212
#include <sys/eventfd.h>
1313
#include "TimerId.h"
14-
14+
#include "burger/base/MpscQueue.h"
1515
#include "SocketsOps.h"
1616
#include "Callbacks.h"
1717

@@ -24,18 +24,20 @@ class Channel;
2424

2525
class EventLoop : boost::noncopyable {
2626
public:
27-
using Functor = std::function<void()>;
27+
using Func = std::function<void()>;
2828
EventLoop();
2929
~EventLoop();
3030
void loop();
3131
void quit();
3232
Timestamp epollWaitRetrunTime() const { return epollWaitReturnTime_; }
3333
uint64_t iteration() const { return iteration_; }
3434
// 在主循环中进行, safe to call from other threads
35-
void runInLoop(const Functor& func);
35+
void runInLoop(const Func& func);
36+
void runInLoop(Func&& func);
3637
// 插入主循环任务队列, safe to call from other threads
37-
void queueInLoop(const Functor& func);
38-
size_t queueSize() const;
38+
void queueInLoop(const Func& func);
39+
void queueInLoop(Func&& func);
40+
3941
// timers , safe ti call from other threads
4042
TimerId runAt(Timestamp time, TimerCallback timercb);
4143
TimerId runAfter(double delay, TimerCallback timercb);
@@ -47,6 +49,11 @@ class EventLoop : boost::noncopyable {
4749
static EventLoop* getEventLoopOfCurrentThread();
4850

4951
void wakeup();
52+
// Move the EventLoop to the current thread, this method must be called before the loop is running.
53+
void moveToCurrentThread();
54+
bool isRunning();
55+
bool isCallingQueueFuncs();
56+
5057
void updateChannel(Channel* channel); // 从epoll添加或更新channel
5158
void removeChannel(Channel* channel); // 从epoll里移除channel
5259
bool hasChannel(Channel* channel);
@@ -55,24 +62,24 @@ class EventLoop : boost::noncopyable {
5562
void abortNotInLoopThread();
5663
void handleWakeupFd();
5764
void printActiveChannels() const; // for DEBUG
58-
void doPendingFunctors();
65+
void doQueueInLoopFuncs();
5966
private:
6067
bool looping_; // atomic
6168
std::atomic<bool> quit_; // linux下bool也是atomic的
6269
bool eventHandling_; // atomic
63-
bool callingPendingFunctors_; // atomic
70+
bool callingQueueFuncs_{false}; // atomic
6471
uint64_t iteration_;
65-
const pid_t threadId_; // 当前对象所属线程ID
72+
pid_t threadId_; // 当前对象所属线程ID
6673
Timestamp epollWaitReturnTime_;
6774
std::unique_ptr<Epoll> epoll_;
6875
std::unique_ptr<TimerQueue> timerQueue_;
6976
int wakeupFd_;
7077
std::unique_ptr<Channel> wakeupChannel_;
71-
std::vector<Channel*> activeChannels_;
78+
std::vector<Channel *> activeChannels_;
7279
Channel* currentActiveChannel_;
7380
mutable std::mutex mutex_;
74-
// todo : 此处可以优化
75-
std::vector<Functor> pendingFunctors_;
81+
MpscQueue<Func> queueFuncs_;
82+
EventLoop **threadLocalLoopPtr_;
7683
};
7784

7885

0 commit comments

Comments
 (0)