你好,我是尹会生。

在我为运营工作提供技术咨询的时候,遇到过这样一个场景:这场运营活动,需要在电脑和手机端的多个不同应用程序,同时推送产品宣传图片和视频。这些大量的图片需要有不同的格式和尺寸,视频也需要根据不同的App截取不同的时长。

如果这类需要大量计算的多个任务成为你的日常工作,会花费你不少的时间和精力。不过别担心,我们可以通过程序并行计算,来提升任务效率。

不过你可能会说,用Python自动化执行,也可以提高计算效率啊,那为什么还要学习并行计算呢?

要知道,Python默认的自动化只能利用CPU的一个逻辑核心,如果采用并行计算,那就能够最大化地利用CPU资源,从而成倍提升大量计算的任务效率。接下来我就详细分析一下并行计算的高效之处。

为什么要进行并行计算

还是我在开头提出的运营工作的场景。如果你从这类任务消耗计算机主要资源的角度去考虑,会发现这类需求有两个共同的特点。

第一,它们都需要进行大量的计算,而计算主要是通过CPU来实现的。CPU的硬件指标上有两个和计算效率最相关的概念,分别是主频和多核。

主频决定CPU处理任务的快慢,多核决定处理的时候是否可以并行运行。这和生活中超市的收银一样,收银员的工作效率和超市开放了多少个收银台的通道,都决定了你能否以最快的速度购买到你想要买的商品。

第二,这些任务往往都需要运行相同的程序,但是程序的参数却需要根据不同的需求进行调整。

虽然咱们可以使用Python自动化执行这些程序,从而减少手动操作时间,但是我们还可以利用CPU的多核特性,让程序尽可能并行执行,发挥CPU的全部计算能力,提高运行效率。

那么接下来,我就来教你怎样利用Python的多进程库,来实现程序的并行计算,以及怎么提高并行计算的效率。

怎样实现并行计算

要想实现程序的并行计算,需要使用到标准库中的multiprocessing多进程库。你可能会问,进程是什么呢?

进程,是计算机用来描述程序运行状态的名词。一个进程在运行时需要消耗一定的资源,包括CPU的时间、内存、设备I/O等。如果两个进程互相独立,在同一个任务处理过程中,没有前后依赖关系,那你可以利用multiprocessing库同时运行多个进程,这样就能成倍地减少多个任务执行的总时间。

接下来,我就以计算1-100的平方为例,看看怎么使用multiprocessing实现并行计算。代码如下:

from multiprocessing import Pool

# 计算平方
def f(x):
    return x*x

with Pool(8) as p:
    # 并行计算
    res = p.map(f, range(1, 101))
    print(f'计算平方的结果是:{res}')

在这段代码中,我通过Pool包的map()函数来求1到100平方计算,由于每次计算平方的过程和下一次计算没有直接关联,我就可以使用并行的方式进行计算,提高计算效率。

为了让map()函数能够实现并行计算,我们必须在使用它之前,通过Pool()包为它指定并行计算的进程数量,设置要执行的函数名称f,以及f()函数所需参数。那么接下来,我就带你学习一下我是怎样使用with语句来设置函数的参数,并正确执行map()函数的。

首先来看最关键的map()函数,它是Pool包实现并行计算的函数之一。在代码中我为map()函数赋值了f和range()函数两个参数。

第一个参数是函数对象。

函数对象会作为map()函数创建进程以后,即将执行的主要任务。因此,由于这里的含义是指定f对象将要被创建的进程执行,而不是将f()函数执行的结果作为新的进程执行,所以第一个参数必须使用函数对象f,而不能使用f()函数。

第二个参数要求必须是可迭代的对象。

例如我在代码中需要为f函数传递参数为1-100的整数,就可以使用range()函数产生1到100的整数并直接返回,因为它的返回值就是可迭代对象。

如果参数不是数字,就可以采用列表、元组、字典等支持迭代的数据类型,代替range()函数,作为f()函数的参数。举个例子,如果你需要并行调整多个视频的时长,就可以采用字典存储路径和要调整的视频时长,并把这个字典作为map()函数的第二个参数,map()函数会为字典的每个键值对创建一个进程来并行处理它们。

接下来是map()函数中的三个主要部分,我来分析一下它们各自在并行计算中的功能。

第一,with语句。这是我们在第七讲学习怎么使用Python打开文件之后,第二次用到with语句了。

和文件操作类似,进程打开后也需要妥善关闭,但是进程关闭属于较为底层的操作,如果你不进行操作系统层面的程序设计,是不需要对关闭进程的函数进行修改的,因为使用默认关闭进程的行为,就能满足编写并行计算的需求。

因此,multiprocessing库对Pool包,支持了比较友好的进程打开和关闭方式,即with语句。也就是说,multiprocessing库把对进程的操作写在with语句块中,而with语句就会自动处理进程的打开和关闭,这样在实现并行计算的代码中,你就不用学习进程的基本操作,也能减轻你学习并发程序的负担。

在了解了with语句可以操作进程的打开和关闭后,我们来看代码中我是怎么使用with语句的。

我在代码中使用了“ with Pool(8) as p ”这条语句,这里的Pool()类是多进程库支持的进程池功能,它的作用是指定一个多进程的程序,最多能够并行执行的进程数量。它的参数“8”,表示map()函数最多同时运行8个进程。

剩下两个部分是range()函数和f()函数。

range()函数的作用是产生1-100的整数,这些整数会在每次创建新的进程时,依次作为f()函数的参数并赋值。而f()函数得到参数后,会把计算结果返回给map()函数。当f()函数处理完所有的参数后,map()函数还会返回一个列表作为运行的结果,并进行输出。

以上就是实现并行计算的主要过程。

如何提高并行计算的效率

我们除了需要掌握并行计算的基本方法外,还可以继续提升并行计算的效率。所以在程序中还有两个地方需要优化。

一个是为并行程序自动指定并行度。在并行计算的基本方法中,我使用了手动指定并行度的方式,来指定进程最多能够运行多少个。不过手动指定的并行度并不能适合所有的电脑,因此就需要根据计算机的CPU核数设置合理的并行度。而且,每台计算机的CPU资源是固定不变的,那么设置合理的进程数量能让你的并行计算任务充分利用CPU资源。

另一个是统计程序运行的时间。当你对并行计算的数量做了修改后,那程序是否对计算效率起到了提升效果呢?就还需要更精确的测量,这样才能得到更准确的结果。所以我们还需要使用Python统计出程序执行过程一共消耗了多长的时间。

我们先来看怎么自动设置适合你的电脑的并行度。

为并行程序自动指定并行度

计算类的任务包括数字计算、数据统计、视频的编解码等,都属于计算密集型应用,它们最大的资源开销就是CPU时间的消耗,设置的并行度过大或过小都不能达到最好的运行效率。

那并行度该怎么设置才合理呢?通常情况下,我们会把并行度设置为逻辑CPU数量的两倍。不过假如计算任务达到小时级别(这类任务需要长时间占用CPU资源),为了减少切换任务时的开销,我建议计算的并行度和逻辑CPU数量保持相等。

这就又有一个问题了,该怎么获得计算机的逻辑CPU个数呢?Windows可以通过任务管理器获得,MacOS可以通过活动监视器获得。如果你希望取得逻辑CPU的个数之后,可以根据它的数量自动设置创建进程的数量,那么可以通过安装第三方包psutils,利用其中的cpu_count()函数取得逻辑CPU个数。

我把并行度自动设置为当前逻辑CPU两倍的代码写在下面,供你参考。

from multiprocessing import Pool,Queue
import os
import psutil

# 逻辑cpu个数
count = psutil.cpu_count()

# 定义一个队列用于存储进程id
queue = Queue()

# 用于计算平方和将运行函数的进程id写入队列
def f(x):
    queue.put(os.getpid())
    return x*x

with Pool(count*2) as p:
    # 并行计算
    res = p.map(f, range(1, 101))
    print(f'计算平方的结果是:{res}')

# 并行计算用到的进程id
pids = set()
while not queue.empty():
    pids.add(queue.get())
    
print(f'用到的进程id是: {pids}')

在代码中,我使用了 psutil.cpu_count() 函数来获取逻辑CPU的个数,它把“count*2”作为参数传递给Pool()类,并以逻辑CPU两倍作为最大创建进程数量,从而计算1-100的平方。

这里有两点需要你注意。第一,psutils是process and system utilities的缩写,所以它除了获取逻辑CPU数量外,还可以获取内存、硬盘、网络连接等和操作系统相关的信息。如果你在工作中需要取得操作系统的运行状态,就可以采用psutils包。

第二,psutils是第三方库,因此,在Windows上你需要通过cmd命令行执行pip3 install psutil安装后,才能释放psutils包,否则会出现模块无法找到的错误。

由于map()函数的第二个参数可能会被传入不可迭代对象,这时有可能会导致只运行了一个进程,因此我就在多进程执行过程中,增加了记录进程ID的功能。而在这一功能中,我使用的是os库、队列库和集合数据类型按照下面三个步骤来实现对所有创建的进程ID的统计。

首先,使用os库的getpid()函数获取进程ID。

由于map()函数会根据Pool()类的参数,事先创建好指定数量的进程,而每次运行f()函数都在创建好的进程中执行,所以我就采用os库的getpid()函数取得运行f()函数进程的唯一标识,这就是使用os库的用途。

接下来,使用队列库存储每次运行进程的ID

为了把每次运行的进程ID存到一个对象中,我使用了multiprocessing库的队列包。因为在多进程的程序中,不能采用标准数据类型来传递数据,所以multiprocessing库还提供了方便进程间通信的对象——Queue队列。

map()函数每执行一次f()函数,我就把进程ID作为队列的put()函数的参数,并把进程ID放入队Queue中,直到所有的f()函数执行完成,队列里就会记录每次执行的进程ID信息。

最后,使用集合数据类型存储本次f()函数运行的所有进程ID

为了实现这一功能,我需要通过while循环结构,根据队列不为空的条件,把队列中的进程ID使用get()函数取出来,放入pids变量中。

pids变量是集合数据类型,集合是一个无序的不重复元素序列,需要使用set()创建。你可以把集合当作一个只有键没有值的字典来记忆,它的特点是集合里的元素不能重复。

由于f()函数会多次在一个进程中执行,因此在队列中会记录重复的进程ID,我把进程ID从队列中取出后,放入集合数据类型中,自己就不用编写程序,自动把重复的进程ID去掉了。而且通过对集合pids中的进程ID进行输出,可以看到进程ID的数量刚好和Pool()类指定的并行进程数量相等。

这种用法是我经常在进行多进程程序调试的一种简单用法,我还会把它们的结果写入文件保存,以便程序出现异常执行结果时,可以根据调试的信息进行问题的定位。

统计程序运行的时间

我们除了需要掌握判断程序的并行度外,还可以统计并行计算比顺序计算节省了多少时间。那么再遇到相同场景的时候,你可以选择并行方式来运行程序,提高工作效率。接下来我来教你怎样统计Python程序运行的时间。

在Python中我们可以利用time库的time()函数,来记录当前时间的功能。

我把核心实现代码写在下面供你参考。

# 并行计算时间统计 
with Pool(4) as p:
    # 并行计算
    time1 = time.time()
    res = p.map(f, range(1, 10001))
    time2 = time.time()
    # print(f'计算平方的结果是:{res}')

print(str(time2-time1))


# 串行计算时间统计
list1 = []

time1 = time.time()
for i in range(1, 10001):
    list1.append(f(i))
time2 = time.time()

print(str(time2-time1))

在这段代码中,通过time1和time2的时间差就可以得到程序运行的时间了,那么根据运行时间,我们可以把并行程序和串行程序执行时间的性能进行对比。

这里你需要注意,由于计算平方的CPU开销较小,比较难体现并行计算的优势,你就可以采用并行访问网页,或其他CPU开销较高的程序,这样会让两个程序的时间差别更加明显。

总结

在最后,我来为你总结一下实现并行计算的基本方法和三个注意事项。

通过multiprocessing的Pool包可以实现基于进程的并行计算功能,Pool包的map()函数会根据Pool包指定的进程数量实现并行运行。这里还有三点需要你注意:

  1. 作为map()函数的第一个参数你需要传递函数对象f,不能传递函数的调用f()形式,这是初学者实现并行任务最容易出现的错误。
  2. 为了让并行度更适合你的电脑,应该根据逻辑CPU的个数设置并行度,并根据运行时间来对并行数量进一步优化。
  3. 实现并行计算任务的程序除了使用多进程模型外还可以使用多线程模型。多进程的并行计算更适用于计算密集型应用,即程序运行过程中主要为计算类CPU开销大的程序,多线程模型适合I/O密集型的应用,例如:通过互联网进行批量网页访问和下载。如果你想将多进程的并发模型改为多线程的并发模型只需在导入库的时候将“multiprocessing”改为“multiprocessing.dummy”就能实现多线程并行访问网页。我将多进程和多线程两种方式导入库的代码贴在下方供你参考。
# 多进程模型
from multiprocessing import Pool

# 多线程模型
from multiprocessing.dummy import Pool

# multiprocessing.dummy的Pool用法和multiprocessing库相同

我把这节课的相关代码放在了GitHub上,你可以自行查找、学习。

思考题

我为你留一道思考题,有一个软件包requests,可以通过requests.get('http://www.baidu.com').text 方式访问一个网站,并能够得到网页的源代码。假设我为你提供了几十个需要访问的网站,你是如何实现这些网站的并行访问的,你又能否通过Python对比出逐个访问网页的时间是并行访问的几倍吗?