-
-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathConcurrentQueue.cpp
More file actions
68 lines (55 loc) · 2.05 KB
/
ConcurrentQueue.cpp
File metadata and controls
68 lines (55 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include "ConcurrentQueue.h"
#include "Helpers.h"
namespace tns {
void ConcurrentQueue::Initialize(CFRunLoopRef runLoop, void (*performWork)(void*), void* info) {
std::unique_lock<std::mutex> lock(initializationMutex_);
if (terminated) {
return;
}
this->runLoop_ = runLoop;
CFRunLoopSourceContext sourceContext = { 0, info, 0, 0, 0, 0, 0, 0, 0, performWork };
this->runLoopTasksSource_ = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &sourceContext);
CFRunLoopAddSource(this->runLoop_, this->runLoopTasksSource_, kCFRunLoopCommonModes);
}
void ConcurrentQueue::Push(std::shared_ptr<worker::Message> message) {
if (this->runLoopTasksSource_ != nullptr && !CFRunLoopSourceIsValid(this->runLoopTasksSource_)) {
return;
}
{
std::unique_lock<std::mutex> mlock(this->mutex_);
this->messagesQueue_.push(message);
}
this->SignalAndWakeUp();
}
std::vector<std::shared_ptr<worker::Message>> ConcurrentQueue::PopAll() {
std::unique_lock<std::mutex> mlock(this->mutex_);
std::vector<std::shared_ptr<worker::Message>> messages;
while (!this->messagesQueue_.empty()) {
std::shared_ptr<worker::Message> message = this->messagesQueue_.front();
this->messagesQueue_.pop();
messages.push_back(message);
}
return messages;
}
void ConcurrentQueue::SignalAndWakeUp() {
if (this->runLoopTasksSource_ != nullptr) {
tns::Assert(CFRunLoopSourceIsValid(this->runLoopTasksSource_));
CFRunLoopSourceSignal(this->runLoopTasksSource_);
}
if (this->runLoop_ != nullptr) {
CFRunLoopWakeUp(this->runLoop_);
}
}
void ConcurrentQueue::Terminate() {
std::unique_lock<std::mutex> lock(initializationMutex_);
terminated = true;
if (this->runLoop_) {
CFRunLoopStop(this->runLoop_);
}
if (this->runLoopTasksSource_) {
CFRunLoopRemoveSource(this->runLoop_, this->runLoopTasksSource_, kCFRunLoopCommonModes);
CFRunLoopSourceInvalidate(this->runLoopTasksSource_);
CFRelease(this->runLoopTasksSource_);
}
}
}