Python多核编程mpi4py实践

发表于:2018-5-02 09:35

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:未知    来源:51testing软件测试网采编

分享:
  三、常见用法
  3.1、对一个文件的多个行并行处理
  #!usr/bin/env python 
  #-*- coding: utf-8 -*- 
  import sys 
  import os 
  import mpi4py.MPI as MPI 
  import numpy as np 
  # 
  # Global variables for MPI 
  # 
  # instance for invoking MPI relatedfunctions 
  comm = MPI.COMM_WORLD 
  # the node rank in the whole community 
  comm_rank = comm.Get_rank() 
  # the size of the whole community, i.e.,the total number of working nodes in the MPI cluster 
  comm _ size = comm.Get_size() 
  if __name__ == '__main__': 
  if comm_rank == 0: 
  sys.stderr.write("processor root starts reading data...\n") 
  all_lines = sys.stdin.readlines() 
  all_lines = comm.bcast(all _lines if comm _ rank == 0 else None, root = 0) 
  num_lines = len (all_lines) 
  local_lines_offset = np.linspace( 0, num_lines, comm_size +1) .astype ('int') 
  local_lines = all_lines [local _lines _offset [comm_rank] : local_lines_offset[comm_rank + 1]] 
  sys.stderr.write ("%d/%d processor gets %d/%d data \n" % (comm_rank, comm_size, len (local_ lines ) , num_lines)) 
  cnt = 0 
  for line in local_lines: 
  fields = line.strip().split('\t') 
  cnt += 1 
  if cnt % 100 == 0: 
  sys.stderr.write ("processor %d has processed %d /%d lines \n" % (comm_rank, cnt, len (local_ lines ) ) ) 
  output = line.strip () + ' process every line here' 
  print output
  3.2、对多个文件并行处理
  如果我们的文件太大,例如几千万行,那么mpi是没办法将这么大的数据bcast给所有的进程的,所以我们可以先把大的文件split成小的文件,再让每个进程处理少数的文件。
  #!usr/bin/env python 
  #-*- coding: utf-8 -*- 
  import sys 
  import os 
  import mpi4py.MPI as MPI 
  import numpy as np 
  # 
  # Global variables for MPI 
  # 
  # instance for invoking MPI relatedfunctions 
  comm = MPI.COMM _ WORLD 
  # the node rank in the whole community 
  comm_rank = comm.Get_ rank() 
  # the size of the whole community, i.e. ,the total number of working nodes in the MPI cluster 
  comm _ size = comm.Get_size () 
  if __ name__ == '__main__': 
  if len(sys.argv) != 2: 
  sys .stderr.write ("Usage: python *.py directoty _ with _files\n" ) 
  sys.exit(1) 
  path = sys.argv[1] 
  if comm_rank == 0: 
  file_list = os.listdir (path) 
  sys.stderr.write ("%d files\n" % len(file_list)) 
  file_list = comm.bcast(file_list if comm_rank == 0 else None , root = 0) 
  num_files = len (file_list) 
  local_files_offset = np.linspace (0, num_files, comm _ size +1).astype('int') 
  local_files = file_ list [local_ files_ offset [ comm_ rank] :local_files_offset[comm_rank + 1]] 
  sys.stderr.write("%d/%d processor gets %d /%d data \n" % (comm_rank, comm_size, len (local_ files ) , num_files) ) 
  cnt = 0 
  for file_name in local_files: 
  hd = open(os.path.join(path, file_name)) 
  for line in hd: 
  output = line.strip() + ' process every line here' 
  print output 
  cnt += 1 
  sys.stderr.write ("processor %d has processed %d / %d files \n" % (comm_rank, cnt, len (local _ files ))) 
  hd.close ()
  3.3、联合numpy对矩阵的多个行或者多列并行处理
  Mpi4py一个非常优秀的特性是完美支持numpy!
  import os, sys, time 
  import numpy as np 
  import mpi4py.MPI as MPI 
  # 
  # Global variables for MPI 
  # 
  # instance for invoking MPI relatedfunctions 
  comm = MPI.COMM_WORLD 
  # the node rank in the whole community 
  comm_rank = comm.Get_rank () 
  # the size of the whole community, i.e.,the total number of working nodes in the MPI cluster 
  comm _size = comm.Get_size() 
  # test MPI 
  if __name__ == "__main__": 
  #create a matrix 
  if comm_rank == 0: 
  all_data = np.arange(20).reshape(4, 5) 
  print "************ data ******************" 
  print all_data 
  #broadcast the data to all processors 
  all_data = comm.bcast (all_data if comm_rank == 0 else None, root = 0) 
  #divide the data to each processor 
  num _ samples = all_data.shape [0] 
  local_data_offset = np.linspace (0, num_samples, comm _size + 1).astype('int') 
  #get the local data which will be processed in this processor 
  local_data = all_data [local_ data_ offset [comm _ rank] :local_data_ offset[comm_rank + 1]] 
  print "****** %d /%d processor gets local data **** " % (comm_rank, comm_size) 
  print local _ data 
  #reduce to get sum of elements 
  local_sum = local_data.sum() 
  all_sum = comm.reduce (local_sum, root = 0, op = MPI.SUM) 
  #process in local 
  local_result = local_data ** 2 
  #gather the result from all processors and broadcast it 
  result = comm.allgather (local_result) 
  result = np.vstack (result) 
  if comm_rank == 0: 
  print "*** sum: ", all_sum 
  print "************ result ******************" 
  print result
  四、MPI和mpi4py的环境搭建
  这章放到这里是作为一个附录。我们的环境是linux,需要安装的包有python、openmpi、numpy、cpython和mpi4py,过程如下:
  4.1、安装Python
  #tar xzvf Python-2.7.tgz 
  #cd Python-2.7 
  #./configure--prefix = /home/work /vis / zouxiaoyi / my_ tools 
  #make 
  #make install
  先将Python放到环境变量里面,还有Python的插件库
  exportPATH = /home /work /vis /zouxiaoyi /my_tools/bin :$PATH 
  exportPYTHONPATH = /home /work /vis /zouxiaoyi /my_tools /lib /python2.7 /site-packages : $PYTHONPATH
  执行#python,如果看到可爱的>>>出来,就表示成功了。按crtl+d退出
  4.2、安装openmpi
  #wget http://www.open-mpi.org /software /ompi /v1.4 /downloads /openmpi - 1.4.1.tar.gz 
  #tar xzvf openmpi- 1.4.1.tar.gz 
  #cd openmpi - 1.4.1 
  #./configure--prefix= /home /work/ vis /zouxiaoyi /my_tools 
  #make - j 8 
  #make install
  然后把bin路径加到环境变量里面:
  exportPATH = /home /work /vis /zouxiaoyi /my_ tools /bin: $PATH 
  exportLD_ LIBRARY_ PATH= /home /work/ vis /zouxiaoyi /my_ tools /lib:$LD_LIBRARY _PATH
  执行#mpirun,如果有帮助信息打印出来,就表示安装好了。需要注意的是,我安装了几个版本都没有成功,最后安装了1.4.1这个版本才能成功,因此就看你的人品了。
  4.3、安装numpy和Cython
  安装python库的方法可以参考之前的博客。过程一般如下:
  #tar –xgvf Cython-0.20.2.tar.gz 
  #cd Cython-0.20.2 
  #python setup.py install
  打开Python,import Cython,如果没有报错,就表示安装成功了
  4.4、安装mpi4py
  #tar –xgvf mpi4py_1.3.1.tar.gz 
  #cd mpi4py 
  #vi mpi.cfg
  在68行,[openmpi]下面,将刚才已经安装好的openmpi的目录给改上。
  mpi_dir = /home/work /vis /zouxiaoyi /my_tools 
  #python setup.py install
  打开Python,import mpi4py as MPI,如果没有报错,就表示安装成功了
  下面就可以开始属于你的并行之旅了,勇敢探索多核的乐趣吧。

22/2<12
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号