三、常见用法
3.1、对一个文件的多个行并行处理
#!usr/bin/env python #-*- coding: utf-8 -*- import sys import os import mpi4py.MPI as MPI import numpy as np # # Global variables for MPI # # instance for invoking MPI relatedfunctions comm = MPI.COMM_WORLD # the node rank in the whole community comm_rank = comm.Get_rank() # the size of the whole community, i.e.,the total number of working nodes in the MPI cluster comm _ size = comm.Get_size() if __name__ == '__main__': if comm_rank == 0: sys.stderr.write("processor root starts reading data...\n") all_lines = sys.stdin.readlines() all_lines = comm.bcast(all _lines if comm _ rank == 0 else None, root = 0) num_lines = len (all_lines) local_lines_offset = np.linspace( 0, num_lines, comm_size +1) .astype ('int') local_lines = all_lines [local _lines _offset [comm_rank] : local_lines_offset[comm_rank + 1]] sys.stderr.write ("%d/%d processor gets %d/%d data \n" % (comm_rank, comm_size, len (local_ lines ) , num_lines)) cnt = 0 for line in local_lines: fields = line.strip().split('\t') cnt += 1 if cnt % 100 == 0: sys.stderr.write ("processor %d has processed %d /%d lines \n" % (comm_rank, cnt, len (local_ lines ) ) ) output = line.strip () + ' process every line here' print output |
3.2、对多个文件并行处理
如果我们的文件太大,例如几千万行,那么mpi是没办法将这么大的数据bcast给所有的进程的,所以我们可以先把大的文件split成小的文件,再让每个进程处理少数的文件。
#!usr/bin/env python #-*- coding: utf-8 -*- import sys import os import mpi4py.MPI as MPI import numpy as np # # Global variables for MPI # # instance for invoking MPI relatedfunctions comm = MPI.COMM _ WORLD # the node rank in the whole community comm_rank = comm.Get_ rank() # the size of the whole community, i.e. ,the total number of working nodes in the MPI cluster comm _ size = comm.Get_size () if __ name__ == '__main__': if len(sys.argv) != 2: sys .stderr.write ("Usage: python *.py directoty _ with _files\n" ) sys.exit(1) path = sys.argv[1] if comm_rank == 0: file_list = os.listdir (path) sys.stderr.write ("%d files\n" % len(file_list)) file_list = comm.bcast(file_list if comm_rank == 0 else None , root = 0) num_files = len (file_list) local_files_offset = np.linspace (0, num_files, comm _ size +1).astype('int') local_files = file_ list [local_ files_ offset [ comm_ rank] :local_files_offset[comm_rank + 1]] sys.stderr.write("%d/%d processor gets %d /%d data \n" % (comm_rank, comm_size, len (local_ files ) , num_files) ) cnt = 0 for file_name in local_files: hd = open(os.path.join(path, file_name)) for line in hd: output = line.strip() + ' process every line here' print output cnt += 1 sys.stderr.write ("processor %d has processed %d / %d files \n" % (comm_rank, cnt, len (local _ files ))) hd.close () |
Mpi4py一个非常优秀的特性是完美支持numpy!
import os, sys, time import numpy as np import mpi4py.MPI as MPI # # Global variables for MPI # # instance for invoking MPI relatedfunctions comm = MPI.COMM_WORLD # the node rank in the whole community comm_rank = comm.Get_rank () # the size of the whole community, i.e.,the total number of working nodes in the MPI cluster comm _size = comm.Get_size() # test MPI if __name__ == "__main__": #create a matrix if comm_rank == 0: all_data = np.arange(20).reshape(4, 5) print "************ data ******************" print all_data #broadcast the data to all processors all_data = comm.bcast (all_data if comm_rank == 0 else None, root = 0) #divide the data to each processor num _ samples = all_data.shape [0] local_data_offset = np.linspace (0, num_samples, comm _size + 1).astype('int') #get the local data which will be processed in this processor local_data = all_data [local_ data_ offset [comm _ rank] :local_data_ offset[comm_rank + 1]] print "****** %d /%d processor gets local data **** " % (comm_rank, comm_size) print local _ data #reduce to get sum of elements local_sum = local_data.sum() all_sum = comm.reduce (local_sum, root = 0, op = MPI.SUM) #process in local local_result = local_data ** 2 #gather the result from all processors and broadcast it result = comm.allgather (local_result) result = np.vstack (result) if comm_rank == 0: print "*** sum: ", all_sum print "************ result ******************" print result |
四、MPI和mpi4py的环境搭建
这章放到这里是作为一个附录。我们的环境是linux,需要安装的包有python、openmpi、numpy、cpython和mpi4py,过程如下:
4.1、安装Python
#tar xzvf Python-2.7.tgz #cd Python-2.7 #./configure--prefix = /home/work /vis / zouxiaoyi / my_ tools #make #make install |
先将Python放到环境变量里面,还有Python的插件库
exportPATH = /home /work /vis /zouxiaoyi /my_tools/bin :$PATH exportPYTHONPATH = /home /work /vis /zouxiaoyi /my_tools /lib /python2.7 /site-packages : $PYTHONPATH |
执行#python,如果看到可爱的>>>出来,就表示成功了。按crtl+d退出
4.2、安装openmpi
#wget http://www.open-mpi.org /software /ompi /v1.4 /downloads /openmpi - 1.4.1.tar.gz #tar xzvf openmpi- 1.4.1.tar.gz #cd openmpi - 1.4.1 #./configure--prefix= /home /work/ vis /zouxiaoyi /my_tools #make - j 8 #make install |
然后把bin路径加到环境变量里面:
exportPATH = /home /work /vis /zouxiaoyi /my_ tools /bin: $PATH exportLD_ LIBRARY_ PATH= /home /work/ vis /zouxiaoyi /my_ tools /lib:$LD_LIBRARY _PATH |
执行#mpirun,如果有帮助信息打印出来,就表示安装好了。需要注意的是,我安装了几个版本都没有成功,最后安装了1.4.1这个版本才能成功,因此就看你的人品了。
4.3、安装numpy和Cython
安装python库的方法可以参考之前的博客。过程一般如下:
#tar –xgvf Cython-0.20.2.tar.gz #cd Cython-0.20.2 #python setup.py install |
打开Python,import Cython,如果没有报错,就表示安装成功了
4.4、安装mpi4py
#tar –xgvf mpi4py_1.3.1.tar.gz #cd mpi4py #vi mpi.cfg |
在68行,[openmpi]下面,将刚才已经安装好的openmpi的目录给改上。
mpi_dir = /home/work /vis /zouxiaoyi /my_tools #python setup.py install |
打开Python,import mpi4py as MPI,如果没有报错,就表示安装成功了
下面就可以开始属于你的并行之旅了,勇敢探索多核的乐趣吧。