消息队列 性能比较

介绍

windows下使用的是postthreadmessage、getmessage,考虑要跨平台,所以打算自己实现消息队列

无锁队列原理

CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作。

CAS的原子指令:

1
2
3
4
5
6
7
8
bool compare_and_swap (int *accum, int *dest, int newval)
{
if ( *accum == *dest ) {
*dest = newval;
return true;
}
return false;
}

  • GCC的CAS:
1
2
3
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)

type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
  • windows 的CAS:
1
2
3
InterlockedCompareExchange ( __inout LONG volatile *Target,
__in LONG Exchange,
__in LONG Comperand);
  • C++11中的CAS
1
2
3
4
5
6
template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,
T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,
T* expected, T desired );

无锁队列就是基于CAS来实现的。

备选方案

std::list + std::mutex

boost::lockfree::queue

boost::lockfree::spsc_queue

其他队列

https://zhuanlan.zhihu.com/p/55583561

未比较

测试性能方法

  • 生产500w数据
  • 消费500w数据
  • 串行消费耗时
  • 测试机器
    1
    windows i5-7500 @3.4GHz

测试代码

写的有点乱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <cstdio>
#include <mutex>
#include <list>
#include <thread>
#include <boost/lockfree/queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>

#include <boost/date_time/posix_time/posix_time.hpp>

int g_count = 0;

std::mutex g_oMutex;
std::list<int> g_oList;


boost::lockfree::queue<int> g_queue(512);

boost::lockfree::spsc_queue<int, boost::lockfree::capacity<5000000> > spsc_queue;

int g_max = 5000000;


int GetMs()
{
const boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
// Get the time offset in current day
const boost::posix_time::time_duration td = now.time_of_day();
int hh = td.hours();
int mm = td.minutes();
int ss = td.seconds();
int ms = td.total_milliseconds();

return ms;
}

void func_producer()
{
for (int i = 0; i < g_max; i++)
{
//g_oMutex.lock();
//g_oList.push_back(i);
//g_oMutex.unlock();
spsc_queue.push(i);
}

}

void func_consume()
{
for (int i = 0; i < g_max; i++)
{
/*g_oMutex.lock();
g_oList.pop_front();
g_oMutex.unlock();

g_count++;*/
int n;
if (spsc_queue.pop(n))
{
g_count++;
}
}

}

int main()
{
int dwTime = GetMs();

std::thread t2(func_producer);

t2.join();

std::thread t1(func_consume);

t1.join();

int diff = GetMs() - dwTime;

printf("lost time is %d ,count is %d \n", diff, g_count);

//system("pause");
return 0;
}

结论

队列 时间(ms)
windows 未验证
list+mutex 766
lockfree::queue 1156
lockfree::spsc_queue 47

单生产单消费模式下lockfree::spsc_queue 的确最优秀,但是我们的服务采用的是多生产单消费模式,性能上也足够了,所以还是暂时先用list+mutex。

注:单生产单消费者套在多生产单消费里面,会导致数据丢失。