C++ 多线程编程总结

上一篇 / 下一篇  2012-05-22 09:58:27 / 个人分类:C++

在开发C++程序时,一般在吞吐量、并发、实时性上有较高的要求。设计C++程序时,总结起来可以从如下几点提高效率:

u$mNq'v!R(rI%V0  ● 并发51Testing软件测试网:} CQ2w7S8j

?9h@cN/`2{H0  ● 异步51Testing软件测试网'V5BV#\+s&DiD

51Testing软件测试网Z,Gr Ujj;G

  ● 缓存

FzK7pc:q c0

5\2jHz6c0  下面将我平常工作中遇到一些问题例举一二,其设计思想无非以上三点。51Testing软件测试网Yuf1Bpy

51Testing软件测试网 b`0ox \8^U

  1、任务队列

'SRK-G|\&JB0

)k8d!F V[#pn,Q0  1.1 以生产者-消费者模型设计任务队列51Testing软件测试网!L*k)~Tb

51Testing软件测试网g(mb v$kCw

  生产者-消费者模型是人们非常熟悉的模型,比如在某个服务器程序中,当User数据被逻辑模块修改后,就产生一个更新数据库的任务(produce),投递给IO模块任务队列,IO模块从任务队列中取出任务执行sql操作(consume)。51Testing软件测试网C,Y,E _e:G]3`

1f fT"YgW0  设计通用的任务队列,示例代码如下:51Testing软件测试网KB'h zH;M-B6Zkj.@

51Testing软件测试网8bXZ0E0ybt F i

  详细实现可参见:http://ffown.googlecode.com/svn/trunk/fflib/include/detail/task_queue_impl.h

#Z;b0a1G Y^0
  1. void task_queue_t::produce(const task_t& task_) {  
  2.  lock_guard_t lock(m_mutex);  
  3.  if (m_tasklist->empty()){//! 条件满足唤醒等待线程 
  4.  m_cond.signal();  
  5.  }  
  6.  m_tasklist->push_back(task_);  
  7.  }  
  8.  int task_queue_t::comsume(task_t& task_){  
  9.  lock_guard_t lock(m_mutex);  
  10.  while (m_tasklist->empty())//! 当没有作业时,就等待直到条件满足被唤醒{ 
  11.  if (false == m_flag){  
  12.  return -1;  
  13.  }  
  14.  m_cond.wait();  
  15.  }  
  16.  task_ = m_tasklist->front();  
  17.  m_tasklist->pop_front();  
  18.  return 0;  
  19.  }

'{s'h N#Ts,|"c)A0  1.2 任务队列使用技巧

/u9g$A_U d8?&Gpw+P%v|0

/^G ^{d!ZGtn0Y0  1.2.1 IO 与 逻辑分离51Testing软件测试网)i1^X:vO+]'s? k5Q

5g4G/D(d.r\7Ll8`x d8v0  比如网络游戏服务器程序中,网络模块收到消息包,投递给逻辑层后立即返回,继续接受下一个消息包。逻辑线程在一个没有io操作的环境下运行,以保障实时性。示例:

7]:RCN5K,f0
  1. void handle_xx_msg(long uid, const xx_msg_t& msg){  
  2.  logic_task_queue->post(boost::bind(&servie_t::proces, uid, msg));  
  3.  }

!b[k-nBp e'je0  注意,此模式下为单任务队列,每个任务队列单线程。

O1x$T5Q ]1n,Xg051Testing软件测试网^:~SI.`\3_S\#A8}

  1.2.2 并行流水线

}U,la` d5xAu0

/`!N~,B2H7d0   上面的只是完成了io 和 cpu运算的并行,而cpu中逻辑操作是串行的。在某些场合,cpu逻辑运算部分也可实现并行,如游戏中用户A种菜和B种菜两种操作是完全可以并行的,因 为两个操作没有共享数据。最简单的方式是A、B相关的操作被分配到不同的任务队列中。示例如下:

8j9M6sQ7Q J0
  1. void handle_xx_msg(long uid, const xx_msg_t& msg) {  
  2.  logic_task_queue_array[uid % sizeof(logic_task_queue_array)]->post(  
  3.  boost::bind(&servie_t::proces, uid, msg));  
  4.  }
51Testing软件测试网*V kNc7zG:B6h

  注意,此模式下为多任务队列,每个任务队列单线程。51Testing软件测试网"z)@V%I6BG

1.2.3 连接池与异步回调

*YBo6T oz@8IN0  比如逻辑Service模块需要数据库模块异步载入用户数据,并做后续处理计算。而数据库模块拥有一个固定连接数的连接池,当执行SQL的任务到来时,选择一个空闲的连接,执行SQL,并把SQL 通过回调函数传递给逻辑层。其步骤如下:51Testing软件测试网n*Pt |]

,Y x:Hl ?t3V0  ● 预先分配好线程池,每个线程创建一个连接到数据库的连接

Fs H)r qt2]i$\051Testing软件测试网} q*r,M#VP\

  ● 为数据库模块创建一个任务队列,所有线程都是这个任务队列的消费者51Testing软件测试网*Fi~/ro Uw,_!v

@LX e/c}An1S0  ● 逻辑层想数据库模块投递sql执行任务,同时传递一个回调函数来接受sql执行结果51Testing软件测试网#f,DYN5{hH

51Testing软件测试网9`eoo'KME

  示例如下:

1Xh4t*Qqo~};X0

^+ZE3w.^0

Q uAE}C0
void db_t:load(long uid_, boost::functionpost(boost::bind(&db_t:load, uid, func));

]6x"C%_*F {EY0  注意,此模式下为单任务队列,每个任务队列多线程。51Testing软件测试网*WqK-_B7j

6v(us)Gt4YM0  2、日志

4P;WnAUv8f/E2R051Testing软件测试网 jG*L*evo.Y

  本文主要讲C++多线程编程,日志系统不是为了提高程序效率,但是在程序调试、运行期排错上,日志是无可替代的工具,相信开发后台程序的朋友都会使用日志。常见的日志使用方式有如下几种:51Testing软件测试网E*[U-i5y6v%M

51Testing软件测试网 k/i Qu8O hh

  ● 流式,如logstream << “start servie time[%d]” << time(0) << ” app name[%s]” << app_string.c_str() << endl;

4cZ v tRN5yg0

J7IUH N.z f0  ● Printf 格式如:logtrace(LOG_MODULE, “start servie time[%d] app name[%s]“, time(0), app_string.c_str());51Testing软件测试网S5x/n@;K

*| `,s0P J0  二者各有优缺点,流式是线程安全的,printf格式格式化字符串会更直接,但缺点是线程不安全,如果把app_string.c_str() 换成app_string (std::string),编译被通过,但是运行期会crash(如果运气好每次都crash,运气不好偶尔会crash)。我个人钟爱printf风 格,可以做如下改进:

h*VB'Gd-S051Testing软件测试网r,WYsn}+b$J

  ● 增加线程安全,利用C++模板的traits机制,可以实现线程安全。示例:51Testing软件测试网;~#V!xZ Eymf

51Testing软件测试网(p%Tw f I

"kB(d-]6b"g0
  1. template 
  2.  void logtrace(const char* module, const char* fmt, ARG1 arg1){  
  3.  boost::format s(fmt);  
  4.  f % arg1;  
  5.  }

&U;s9rO,ByE0  这样,除了标准类型+std::string 传入其他类型将编译不能通过。这里只列举了一个参数的例子,可以重载该版本支持更多参数,如果你愿意,可以支持9个参数或更多。51Testing软件测试网PK2B'A!F&O{P |3ll

c f#~iFVVy2X0  ● 为日志增加颜色,在printf中加入控制字符,可以再屏幕终端上显示颜色,Linux下示例:printf(“\033[32;49;1m [DONE] \033[39;49;0m")

O;w g'\'X } @0

@F&jxP4Q0J0  更多颜色方案参见:http://hi.baidu.com/jiemnij/blog/item/d95df8c28ac2815cb219a80e.html51Testing软件测试网 D3vTGcbb

51Testing软件测试网!k5H5f@`

  ● 每个线程启动时,都应该用日志打印该线程负责什么功能。这样,程序跑起来的时候通过top –H – p pid 可以得知那个功能使用cpu的多少。实际上,我的每行日志都会打印线程id,此线程id非pthread_id,而其实是线程对应的系统分配的进程id 号。51Testing软件测试网 N6?HJV(DV u

.ZZK/Fpohw3M"r0  3、性能监控51Testing软件测试网 [*Fq'~8oI1p%uG

"l f0u({m8C A0  尽管已经有很多工具可以分析c++程序运行性能,但是其大部分还是运行在程序debug阶段。我们需要一种手段在debug和release阶段都能监控程序,一方面得知程序瓶颈之所在,一方面尽早发现哪些组件在运行期出现了异常。51Testing软件测试网mKF S5q"RI

51Testing软件测试网 J.Z8q)Uwz8[b c

  通常都是使用gettimeofday 来计算某个函数开销,可以精确到微妙。可以利用C++的确定性析构,非常方便的实现获取函数开销的小工具,示例如下:51Testing软件测试网 U2_6s o$TPWt@Y

51Testing软件测试网-n+F*qkkl I(\!W

*h;Ex%WQb0}i yx*A0
  1. struct profiler{  
  2.  profiler(const char* func_name){  
  3.  gettimeofday(&tv, NULL);  
  4.  }  
  5.  ~profiler(){  
  6.  struct timeval tv2;  
  7.  gettimeofday(&tv2, NULL);  
  8.  long cost = (tv.tv_sec - tv.tv_sec) * 1000000 + (tv.tv_usec - tv.tv_usec);  
  9.  //! post to some manager 
  10.  }  
  11.  struct timeval tv;  
  12.  };  
  13.  #define PROFILER() profiler(__FUNCTION__)
51Testing软件测试网&\'}~E'i7h

  Cost 应该被投递到性能统计管理器中,该管理器定时讲性能统计数据输出到文件中。51Testing软件测试网}$~/i0^ go(_

51Testing软件测试网h;i!g f1g

  4、Lambda 编程51Testing软件测试网*|-CH@5k

51Testing软件测试网'[D8tDqQd&s

  使用foreach 代替迭代器51Testing软件测试网:G$_ArA*o,e

9? Pt_[[q?$d0  很多编程语言已经内建了foreach,但是c++还没有。所以建议自己在需要遍历容器的地方编写foreach函数。习惯函数式编程的人应该 会非常钟情使用foreach,使用foreach的好处多多少少有些,如:http://www.cnblogs.com/chsword /archive/2007/09/28/910011.html51Testing软件测试网w'VP*NbB

51Testing软件测试网K#vf#K4l&^+@!_"Mz

  但主要是编程哲学上层面的。51Testing软件测试网7@1n4S? L

51Testing软件测试网E*m$kS4s1^E

  示例:51Testing软件测试网)i.b4`8W}1yz Dc

'F;]+Q&F} ^[0

;Z2|b.c5SU0
  1. void user_mgr_t::foreach(boost::function func_){  
  2.  for (iterator it = m_users.begin(); it != m_users.end() ++it){  
  3.  func_(it->second);  
  4.  }  
  5.  }
51Testing软件测试网{ ~@:K"tmEY%s

  比如要实现dump 接口,不需要重写关于迭代器的代码51Testing软件测试网)I6{k'^)l h

51Testing软件测试网2N+Z'by Yq3A,|e

'I-WK U"`/V'P0
  1. void user_mgr_t:dump(){  
  2.  struct lambda {  
  3.  static void print(user_t& user){  
  4.  //! print(tostring(user); 
  5.  }  
  6.  };  
  7.  this->foreach(lambda::print);  
  8.  }
51Testing软件测试网 HvCOY1LN2N

  实际上,上面的代码变通的生成了匿名函数,如果是c++ 11 标准的编译器,本可以写的更简洁一些:51Testing软件测试网A x'J&Q+r4O@

51Testing软件测试网dQ2bcU

"IbaT!a%{0
this->foreach([](user_t& user) {} );
51Testing软件测试网5r$Dv z6}xX
51Testing软件测试网9D w2O/dg!d(@R

  但是我大部分时间编写的程序都要运行在centos 上,你知道吗它的gcc版本是gcc 4.1.2, 所以大部分时间我都是用变通的方式使用lambda函数。51Testing软件测试网"W ?TZ ~bV

51Testing软件测试网,q-|E5h"c!F0RB

  Lambda 函数结合任务队列实现异步

(S4]N_o5Y#~`051Testing软件测试网aP4dmjh t,Iv/k

  常见的使用任务队列实现异步的代码如下:

R_3I#w g@-e aB051Testing软件测试网}`6],~w:n+W

/M ?(P;EPV Jl;xU0
  1. void service_t:async_update_user(long uid){  
  2.  task_queue->post(boost::bind(&service_t:sync_update_user_impl, this, uid));  
  3.  }  
  4.  void service_t:sync_update_user_impl(long uid){  
  5.  user_t& user = get_user(uid);  
  6.  user.update()  
  7.  }
51Testing软件测试网 bft oAE4^D1s\

  这样做的缺点是,一个接口要响应的写两遍函数,如果一个函数的参数变了,那么另一个参数也要跟着改动。并且代码也不是很美观。使用lambda可以让异步看起来更直观,仿佛就是在接口函数中立刻完成一样。示例代码:

8c4W u&xK7~v9Hz051Testing软件测试网T*f*vLY9b z)m d

%Q0{^$~ r ak%C0
  1. void service_t:async_update_user(long uid){  
  2.  struct lambda {  
  3.  static void update_user_impl(service_t* servie, long uid){  
  4.  user_t& user = servie->get_user(uid);  
  5.  user.update();  
  6.  }  
  7.  };  
  8.  task_queue->post(boost::bind(&lambda:update_user_impl, this, uid));  
  9.  }

:@;ia$U:I&c*G0  这样当要改动该接口时,直接在该接口内修改代码,非常直观。51Testing软件测试网)`u.LG {4aZR} u

)Gv'| E([$fA[-M~+N*q a0  5、奇技淫巧51Testing软件测试网bjf SY

)JrXX^m5] R`/b!Kp0  利用shared_ptr 实现map/reduce

g r"I@7j+?+T%r0

,Ai5ibVRC E{0  Map/reduce的语义是先将任务划分为多个任务,投递到多个worker中并发执行,其产生的结果经reduce汇总后生成最终的结果。 Shared_ptr的语义是什么呢?当最后一个shared_ptr析构时,将会调用托管对象的析构函数。语义和map/reduce过程非常相近。我 们只需自己实现讲请求划分多个任务即可。示例过程如下:51Testing软件测试网``c9o1M5UB

~HE.v.r0  ● 定义请求托管对象,加入我们需要在10个文件中搜索“oh nice”字符串出现的次数,定义托管结构体如下:51Testing软件测试网__fnn%{

0v|3eOtB d Li1s0

5y)ft c9e#h*C4x0
  1. struct reducer{  
  2.  void set_result(int index, long result) {  
  3.  m_result[index] = result;  
  4.  }  
  5.  ~reducer(){  
  6.  long total = 0;  
  7.  for (int i = 0; i < sizeof(m_result); ++i){  
  8.  total += m_result[i];  
  9.  }  
  10.  //! post total to somewhere 
  11.  }  
  12.  long m_result[10];  
  13.  };

gb/l+rG;f0  ● 定义执行任务的 worker51Testing软件测试网O0aXM$t7^ K#C

pt8@#oA4qE051Testing软件测试网!ReF}'c u

  1. void worker_t:exe(int index_, shared_ptr ret) {  
  2.  ret->set_result(index, 100);  
  3.  }

,m9M8xn_-w6p5QK+OZ&g0  ● 将任务分割后,投递给不同的worker51Testing软件测试网NW.`?/WHu,m

M+i/]G9Vw&EY4w.q051Testing软件测试网 U3i$Hb&m0k7@(i

  1. shared_ptr ret(new reducer());  
  2.  for (int i = 0; i < 10; ++i) { task_queue[i]->post(boost::bind(&worker_t:exe, i, ret));  
  3.  }
31

TAG:

 

评分:0

我来说两句

Open Toolbar