介绍
windows下使用的是postthreadmessage、getmessage,考虑要跨平台,所以打算自己实现消息队列
无锁队列原理
CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作。
CAS的原子指令:1
2
3
4
5
6
7
8bool compare_and_swap (int *accum, int *dest, int newval)
{
if ( *accum == *dest ) {
*dest = newval;
return true;
}
return false;
}
- GCC的CAS:
1 | bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...) |
- windows 的CAS:
1 | InterlockedCompareExchange ( __inout LONG volatile *Target, |
- C++11中的CAS
1 | template< class T > |
无锁队列就是基于CAS来实现的。
备选方案
std::list + std::mutex
boost::lockfree::queue
boost::lockfree::spsc_queue
其他队列
https://zhuanlan.zhihu.com/p/55583561
未比较
测试性能方法
- 生产500w数据
- 消费500w数据
- 串行消费耗时
- 测试机器
1
windows i5-7500 @3.4GHz
测试代码
写的有点乱1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84#include <cstdio>
#include <mutex>
#include <list>
#include <thread>
#include <boost/lockfree/queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
int g_count = 0;
std::mutex g_oMutex;
std::list<int> g_oList;
boost::lockfree::queue<int> g_queue(512);
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<5000000> > spsc_queue;
int g_max = 5000000;
int GetMs()
{
const boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
// Get the time offset in current day
const boost::posix_time::time_duration td = now.time_of_day();
int hh = td.hours();
int mm = td.minutes();
int ss = td.seconds();
int ms = td.total_milliseconds();
return ms;
}
void func_producer()
{
for (int i = 0; i < g_max; i++)
{
//g_oMutex.lock();
//g_oList.push_back(i);
//g_oMutex.unlock();
spsc_queue.push(i);
}
}
void func_consume()
{
for (int i = 0; i < g_max; i++)
{
/*g_oMutex.lock();
g_oList.pop_front();
g_oMutex.unlock();
g_count++;*/
int n;
if (spsc_queue.pop(n))
{
g_count++;
}
}
}
int main()
{
int dwTime = GetMs();
std::thread t2(func_producer);
t2.join();
std::thread t1(func_consume);
t1.join();
int diff = GetMs() - dwTime;
printf("lost time is %d ,count is %d \n", diff, g_count);
//system("pause");
return 0;
}
结论
队列 | 时间(ms) |
---|---|
windows | 未验证 |
list+mutex | 766 |
lockfree::queue | 1156 |
lockfree::spsc_queue | 47 |
单生产单消费模式下lockfree::spsc_queue 的确最优秀,但是我们的服务采用的是多生产单消费模式,性能上也足够了,所以还是暂时先用list+mutex。
注:单生产单消费者套在多生产单消费里面,会导致数据丢失。