Skip to content

Latest commit

 

History

History
834 lines (581 loc) · 59.2 KB

File metadata and controls

834 lines (581 loc) · 59.2 KB

十四、并发

并发是使计算机一次完成(或看起来可以完成)多项任务的技术。 从历史上看,这意味着邀请处理器每秒在不同任务之间切换多次。 在现代系统中,这实际上也意味着在单独的处理器内核上同时执行两项或多项操作。

并发本质上不是面向对象的话题,但是 Python 的并发系统是建立在我们在整个模块中介绍的面向对象的构造之上的。 本章将向您介绍以下主题:

  • 线程数
  • 多处理
  • 期货
  • 异步 IO

并发很复杂。 基本概念非常简单,但是众所周知,很难发现可能发生的错误。 但是,对于许多项目,并发是获得我们所需性能的唯一方法。 想象一下,在上一个请求完成之前,Web 服务器是否无法响应用户的请求! 我们不会详细讨论它有多难(需要另一个完整的模块)的所有细节,但是我们将看到如何在 Python 中进行基本的并发,以及避免的一些最常见的陷阱。

线程

通常,创建并发是为了在程序等待 I / O 发生时可以继续进行工作。 例如,服务器可以在等待来自先前请求的数据到达时开始处理新的网络请求。 交互式程序可能会在等待用户按下键的同时渲染动画或执行计算。 请记住,尽管一个人每分钟可以键入 500 个以上的字符,但计算机每秒可以执行数十亿条指令。 因此,即使快速键入,也可能在各个按键之间进行大量处理。

从理论上讲,有可能管理程序中活动之间的所有切换,但实际上不可能正确。 相反,我们可以依靠 Python 和操作系统来处理棘手的切换部分,同时创建似乎独立但同时运行的对象。 这些对象称为线程; 在 Python 中,它们具有非常简单的 API。 让我们看一个基本的例子:

from threading import Thread

class InputReader(Thread):
    def run(self):
        self.line_of_text = input()

print("Enter some text and press enter: ")
thread = InputReader()
thread.start()

count = result = 1
while thread.is_alive():
    result = count * count
    count += 1

print("calculated squares up to {0} * {0} = {1}".format(
    count, result))
print("while you typed '{}'".format(thread.line_of_text))

本示例运行两个线程。 你看得到他们吗? 每个程序都有一个线程,称为主线程。 从头开始执行的代码就发生在该线程中。 更明显地,第二个线程作为InputReader类存在。

要构造线程,我们必须扩展Thread类并实现run方法。 run方法内部的任何代码(或从该方法内部调用的代码)都在单独的线程中执行。

直到我们在对象上调用start()方法后,新线程才开始运行。 在这种情况下,线程立即暂停以等待键盘输入。 同时,原始线程在调用start时继续执行。 它开始计算while循环内的平方。 while循环中的条件检查InputReader线程是否已经退出其run方法; 完成后,它将一些摘要信息输出到屏幕。

如果运行示例并键入字符串“ hello world”,则输出如下所示:

Enter some text and press enter:
hello world
calculated squares up to 1044477 * 1044477 = 1090930114576
while you typed 'hello world'

当然,在键入字符串时,您将计算出更多或更少的平方,因为数字既与我们的相对键入速度有关,又与我们正在运行的计算机的处理器速度有关。

当我们调用start方法时,线程仅在并发模式下开始运行。 如果我们想进行并发调用以查看其比较,可以在最初称为thread.start()的地方调用thread.run()。 输出表明:

Enter some text and press enter:
hello world
calculated squares up to 1 * 1 = 1
while you typed 'hello world'

在这种情况下,线程永远不会存活,并且while循环也不会执行。 在打字时,我们浪费了很多 CPU 闲置时间。

有效使用线程有很多不同的模式。 我们不会涵盖所有内容,但是我们将介绍一种常见的方法,以便我们可以了解join方法。 让我们检查一下加拿大每个省的省会城市的当前温度:

from threading import Thread
import json
from urllib.request import urlopen
import time

CITIES = [
    'Edmonton', 'Victoria', 'Winnipeg', 'Fredericton',
    "St. John's", 'Halifax', 'Toronto', 'Charlottetown',
    'Quebec City', 'Regina'
]

class TempGetter(Thread):
 def __init__(self, city):
 super().__init__()
 self.city = city

    def run(self):
        url_template = (
            'http://api.openweathermap.org/data/2.5/'
            'weather?q={},CA&units=metric')
        response = urlopen(url_template.format(self.city))
        data = json.loads(response.read().decode())
        self.temperature = data['main']['temp']

threads = [TempGetter(c) for c in CITIES]
start = time.time()
for thread in threads:
    thread.start()

for thread in threads:
 thread.join()

for thread in threads:
    print(
        "it is {0.temperature:.0f}°C in {0.city}".format(thread))
print(
    "Got {} temps in {} seconds".format(
    len(threads), time.time() - start))

此代码在启动 10 个线程之前会先构建它们。 注意,我们如何重写构造函数以将它们传递给Thread对象,记住要调用super以确保Thread正确初始化。 请注意:新线程尚未运行,因此__init__方法仍在主线程内部执行。 我们在一个线程中构造的数据可从其他正在运行的线程访问。

在启动 10 个线程之后,我们再次遍历它们,在每个线程上调用join()方法。 该方法从本质上说“等待线程完成之后再做任何事情”。 我们称其为十次。 在所有十个线程完成之前,for 循环不会退出。

此时,我们可以打印存储在每个线程对象上的温度。 再次注意,我们可以从主线程访问在线程内构造的数据。 在线程中,默认情况下共享所有状态。

在我的 100 兆位连接上执行此代码大约需要十分之二秒的时间:

it is 5°C in Edmonton
it is 11°C in Victoria
it is 0°C in Winnipeg
it is -10°C in Fredericton
it is -12°C in St. John's
it is -8°C in Halifax
it is -6°C in Toronto
it is -13°C in Charlottetown
it is -12°C in Quebec City
it is 2°C in Regina
 Got 10 temps in 0.18970298767089844 seconds

如果我们在单个线程中运行此代码(通过将start()调用更改为run()并注释掉join()调用),则大约需要 2 秒钟的时间,因为每个 0.2 秒的请求都必须先完成 下一个开始。 10 倍的加速表明了并发编程的实用性。

线程的许多问题

线程可能有用,尤其是其他编程语言中的,但是现代 Python 程序员倾向于出于某些原因而避免使用它们。 我们将看到,还有其他进行并发编程的方法正受到 Python 开发人员的更多关注。 在进入更重要的主题之前,让我们讨论其中的一些陷阱。

共享内存

线程的主要问题也是它们的主要优点。 线程可以访问所有内存,从而可以访问程序中的所有变量。 这很容易导致程序状态不一致。 您是否曾经遇到过一个房间,其中一个灯有两个开关,两个不同的人同时打开它们? 每个人(线程)都期望自己的动作打开灯(变量),但是结果值(灯熄灭)与这些期望不一致。 现在,假设这两个线程是在银行帐户之间转移资金还是在车辆中管理巡航控制系统。

在线程编程中,此问题的解决方案是“同步”对读取或写入共享变量的任何代码的访问。 有几种不同的方法可以做到这一点,但是我们这里不再赘述,因此我们可以专注于更多的 Pythonic 结构。 同步解决方案可以工作,但是忘记应用它太容易了。 更糟糕的是,由于线程执行操作的顺序不一致,由于同步使用不当而导致的错误确实很难找到。 我们无法轻易重现该错误。 通常,使用已经适当使用锁的轻量级数据结构来强制线程之间进行通信是最安全的。 Python 提供了queue.Queue类来执行此操作; 它的功能基本上与我们将在下一节中讨论的multiprocessing.Queue相同。

在某些情况下,的这些缺点可能会被允许共享内存的一个优点所抵消:它的速度很快。 如果多个线程需要访问巨大的数据结构,则共享内存可以快速提供该访问。 但是,这种优势通常由于以下事实而无效:在 Python 中,运行在不同 CPU 内核上的两个线程不可能完全同时执行计算。 这将我们带到线程的第二个问题。

全局解释器锁

为了使有效地管理内存,垃圾回收和对库中机器代码的调用,Python 提供了一个名为全局解释器锁GIL 的实用程序。 这是不可能关闭的,这意味着线程在 Python 中是无用的,因为它们在其他语言中擅长的一件事是:并行处理。 对于我们而言,GIL 的主要作用是防止两个线程即使在有工作要做的情况下也完全同时工作。 在这种情况下,“完成工作”意味着使用 CPU,因此多个线程访问磁盘或网络是完全可以的。 一旦线程开始等待某些操作,就会释放 GIL。

GIL 被高度贬低,主要是因为那些不了解 GIL 是什么或它给 Python 带来的所有好处的人。 如果我们的语言没有此限制,那肯定会很好,但是 Python 参考开发人员已经确定,至少到目前为止,它带来的价值超过成本。 它使参考实现更易于维护和开发,并且在最初开发 Python 的单核处理器时代,它实际上使解释器更快。 但是,GIL 的最终结果是它在不降低成本的情况下限制了线程带给我们的好处。

注意

尽管 GIL 是大多数人使用的 Python 参考实现中的问题,但已在 IronPython 和 Jython 等某些非标准实现中解决了它。 不幸的是,在发布之时,这些都不支持 Python 3。

线程开销

与异步系统相比,线程的最后一个限制是维护线程的成本。 每个线程占用一定数量的内存(在 Python 进程和操作系统内核中)以记录该线程的状态。 在线程之间切换还占用(少量)CPU 时间。 这项工作可以无缝进行而无需任何额外的编码(我们只需要调用start(),其余的工作就可以了),但是这项工作仍然必须在某个地方进行。

通过结构化我们的工作量,可以稍微减轻这种负担,以便可以重用线程来执行多个作业。 Python 提供了ThreadPool功能来处理此问题。 它作为多处理库的一部分提供,其行为与ProcessPool相同,我们将在稍后进行讨论,因此我们将讨论推迟到下一部分。

多处理

多处理 API 最初旨在模拟线程 API。 但是,它已经发展了,并且在 Python 3 的最新版本中,它更强大地支持更多功能。 当需要大量执行 CPU 密集型工作并可以使用多个内核时,可以设计多处理库(鉴于目前可以以 35 美元的价格购买四核 Raspberry Pi,通常有多个内核)。 当进程将大部分时间都花在等待 I / O(例如,网络,磁盘,数据库或键盘)上时,多处理没有用,但是它们是进行并行计算的方式。

多处理模块启动新的操作系统进程来完成工作。 在 Windows 计算机上,这是一个相对昂贵的操作。 在 Linux 上,进程在内核中的实现方式与线程相同,因此开销受限于在每个进程中运行单独的 Python 解释器的成本。

让我们尝试使用与threading API 提供的结构相似的结构并行化繁重的计算操作:

from multiprocessing import Process, cpu_count
import time
import os

class MuchCPU(Process):
    def run(self):
 print(os.getpid())
        for i in range(200000000):
            pass

if __name__ == '__main__':
 procs =  [MuchCPU() for f in range(cpu_count())]
    t = time.time()
    for p in procs:
 p.start()
    for p in procs:
 p.join()
    print('work took {} seconds'.format(time.time() - t))

这个示例仅占用 CPU 2 亿次迭代。 您可能不认为这是有用的工作,但是今天天气很冷,我非常感谢笔记本电脑在这种负载下产生的热量。

该 API 应该很熟悉; 我们实现Process的子类(而不是Thread)并实现run方法。 在进行一些繁重的工作(如果被误导)之前,此方法会打印出进程 ID(操作系统分配给计算机上每个进程的唯一编号)。

要特别注意模块级别代码周围的if __name__ == '__main__':防护,以防在导入模块时该模块无法运行,而不是作为程序运行。 通常,这是一个好习惯,但是在某些操作系统上使用多处理时,这是必不可少的。 在后台,多处理可能必须在新进程内部导入模块才能执行run()方法。 如果我们允许整个模块在那时执行,它将开始递归地创建新进程,直到操作系统资源耗尽为止。

我们为计算机上的每个处理器核心构建一个进程,然后启动并加入每个进程。 在我的 2014 时代四核笔记本电脑上,输出如下所示:

6987
6988
6989
6990
work took 12.96659541130066 seconds

前四行是在每个MuchCPU实例内部打印的进程 ID。 最后一行显示 2 亿次迭代可以在我的计算机上运行约 13 秒。 在这 13 秒钟内,我的进程监视器表明我所有四个内核都以 100%的速度运行。

如果我们将threading.Thread而不是MuchCPU中的multiprocessing.Process子类化,则输出如下所示:

7235
7235
7235
7235
work took 28.577413082122803 seconds

这次,四个线程在同一个进程中运行,并且运行时间接近前者的三倍。 这就是全局解释器锁定的成本。 在其他语言或 Python 实现中,线程版本的运行速度至少与多处理版本的运行速度相同。我们可能希望它的运行时间是多处理版本的四倍,但请记住,笔记本电脑上还运行着许多其他程序。 在多处理版本中,这些程序还需要共享四个 CPU。 在线程版本中,这些程序可以使用其他三个 CPU 代替。

多处理池

通常,没有理由拥有比计算机上的处理器更多的进程。 这有几个原因:

  • 只有cpu_count()进程可以同时运行
  • 每个进程都会消耗 Python 解释器的完整副本的资源
  • 进程之间的通信很昂贵
  • 创建流程花费的时间不为零

考虑到这些限制,有必要在程序启动时最多创建cpu_count()进程,然后让它们根据需要执行任务。 实现一系列基本的通信过程并不难,但是调试,测试和正确调试可能会很棘手。 当然,Python 是 Python,我们不必做所有这些工作,因为 Python 开发人员已经以多处理池的形式为我们完成了它。

池的主要优点是,它们消除了确定主进程中正在执行什么代码以及子进程中正在运行哪些代码的开销。 与多处理模仿的线程 API 一样,通常很难记住谁在执行什么。 池抽象限制了不同进程中的代码彼此交互的位置的数量,从而更易于跟踪。

  • 池还无缝地隐藏了在流程之间传递数据的流程。 使用池看起来很像一个函数调用。 您将数据传递给一个函数,该数据将在另一个进程或多个进程中执行,当工作完成时,将返回一个值。 重要的是要了解,在后台进行了大量工作来支持此工作:对一个过程中的对象进行酸洗并将其传递到管道中。
  • 另一个过程从管道检索数据并将其释放。 在子流程中完成工作并产生结果。 将结果腌制并传递到管道中。 最终,原始进程对其进行处理,并将其返回。

所有这些腌制并将数据传递到管道中需要花费时间和内存。 因此,理想的是将传入池中和从池中返回的数据量和大小保持最小,并且只有在必须对相关数据进行大量处理的情况下使用池才是有利的。

有了这些知识,使所有这些机械都能工作的代码非常简单。 让我们看一下计算随机数列表的所有素数的问题。 这是各种密码算法的常见且昂贵的部分(更不用说对这些算法的攻击了!)。 它需要数年的处理能力才能破解用于保护银行帐户的极大数量。 以下实现虽然可读,但根本没有效率,但这没关系,因为我们希望看到它使用大量的 CPU 时间:

import random
from multiprocessing.pool import Pool

def prime_factor(value):
    factors = []
    for divisor in range(2, value-1):
        quotient, remainder = divmod(value, divisor)
        if not remainder:
            factors.extend(prime_factor(divisor))
            factors.extend(prime_factor(quotient))
            break
    else:
        factors = [value]
    return factors

if __name__ == '__main__':
 pool = Pool()

    to_factor = [
        random.randint(100000, 50000000) for i in range(20)
    ]
 results = pool.map(prime_factor, to_factor)
 for value, factors in zip(to_factor, results):
        print("The factors of {} are {}".format(value, factors))

让我们将集中在并行处理方面,因为用于计算因子的蛮力递归算法非常清晰。 我们首先构造一个多处理池实例。 默认情况下,该池为运行它的计算机中的每个 CPU 内核创建一个单独的进程。

map方法接受一个函数和一个可迭代的函数。 池对可迭代项中的每个值进行腌制,并将其传递到可用进程中,该进程在其上执行功能。 当该过程完成工作时,它将对结果列表进行腌制并将其传递回池中。 所有池完成处理工作后(可能需要一些时间),结果列表将传递回原始过程,该过程一直耐心等待所有这些工作完成。

使用类似的map_async方法通常更有用,即使进程仍在工作,该方法也会立即返回。 在那种情况下,结果变量将不是值列表,而是一个承诺,稍后将通过调用results.get()返回值列表。 这个 promise 对象还具有ready()wait()之类的方法,这些方法使我们可以检查是否所有结果都已经存在。

另外,如果我们不知道要预先获得结果的所有值,则可以使用apply_async方法将单个作业排队。 如果池中有尚未运行的进程,它将立即启动;否则,它将立即启动。 否则,它将一直执行任务,直到有可用的免费进程为止。

池也可以是close d,它拒绝执行任何其他任务,但是处理队列中当前的所有内容;也可以是terminate d,它进一步执行一步,并且拒绝启动仍在队列中的任何作业,尽管有任何作业 当前运行仍允许完成。

队列

如果需要对进程之间的通信进行更多控制,则可以使用QueueQueue数据结构可用于将消息从一个进程发送到一个或多个其他进程。 任何可腌制的对象都可以发送到Queue中,但是请记住,腌制可能是一项昂贵的操作,因此请使此类对象保持较小。 为了说明队列,让我们为文本内容构建一个小的搜索引擎,以将所有相关条目存储在内存中。

这不是构建基于文本的搜索引擎的最明智的方法,但是我使用了这种模式来查询数字数据,这些数据需要使用 CPU 密集型流程来构建然后呈现给用户的图表。

这个特定的搜索引擎并行扫描当前目录中的所有文件。 为 CPU 上的每个内核构建一个进程。 这些命令均指示将某些文件加载​​到内存中。 让我们看一下执行加载和搜索的函数:

def search(paths, query_q, results_q):
    lines = []
    for path in paths:
        lines.extend(l.strip() for l in path.open())

 query = query_q.get()
    while query:
 results_q.put([l for l in lines if query in l])
 query = query_q.get()

请记住,此函数在与主线程不同的进程中运行(实际上,在cpucount()不同的进程中运行)。 它传递path.path对象和两个multiprocessing.Queue对象的列表; 一种用于传入查询,另一种用于发送传出结果。 这些队列与我们在第 6 章,“Python 数据结构”中讨论的Queue类具有相似的接口。 但是,他们正在做额外的工作来对队列中的数据进行腌制并将其通过管道传递给子流程。 这两个队列是在主进程中设置的,并通过管道传递到子进程内部的搜索功能中。

无论从效率还是功能上来说,搜索代码都非常笨拙。 它遍历存储在内存中的每一行,并将匹配的行放入列表中。 该列表放在队列中,并传递回主流程。

让我们看一下设置这些队列的主要过程:

if __name__ == '__main__':
    from multiprocessing import Process, Queue, cpu_count
    from path import path
    cpus = cpu_count()
    pathnames = [f for f in path('.').listdir() if f.isfile()]
    paths = [pathnames[i::cpus] for i in range(cpus)]
 query_queues = [Queue() for p in range(cpus)]
 results_queue = Queue()

 search_procs = [
 Process(target=search, args=(p, q, results_queue))
 for p, q in zip(paths, query_queues)
 ]
    for proc in search_procs: proc.start()

为了简化的描述,我们假设cpu_count为 4。 注意导入语句如何放置在if保护内部? 这是一个很小的优化,可防止它们在某些操作系统上的每个子进程(不需要它们)中导入。 我们列出当前目录中的所有路径,然后将列表分为四个大致相等的部分。 我们还构造了四个Queue对象的列表,以将数据发送到每个子进程中。 最后,我们构造一个single结果队列; 这将传递到所有四个子流程中。 他们每个人都可以将数据放入队列中,并将在主进程中进行汇总。

现在,让我们看一下实际进行搜索的代码:

    for q in query_queues:
 q.put("def")
 q.put(None)  # Signal process termination

    for i in range(cpus):
 for match in results_queue.get():
            print(match)
    for proc in search_procs: proc.join()

此代码对"def"执行一次搜索(因为它是充满 Python 文件的目录中的常用短语!)。 在更适合生产的系统中,我们可能会将套接字连接到该搜索代码。 在这种情况下,我们必须更改进程间协议,以使返回队列中返回的消息包含足够的信息,以标识结果附加到许多查询中的哪一个。

队列的使用实际上是可能成为分布式系统的本地版本。 想象一下,是否将搜索发送到多台计算机,然后重新组合。 我们不在这里讨论它,但是多处理模块包括一个管理器类,该类可以从前面的代码中提取很多样板。 甚至还有multiprocessing.Manager版本,可以管理远程系统上的子进程以构建基本的分布式应用。 如果您有兴趣进一步进行此操作,请查看 Python 多处理文档。

多处理的问题

和线程一样,多处理也有问题,我们已经讨论了其中的一些问题。 没有最好的并发方法。 在 Python 中尤其如此。 我们始终需要检查并行问题,以找出许多可用的解决方案中最适合该问题的解决方案。 有时,没有最佳解决方案。

在多处理的情况下,主要缺点是在进程之间共享数据非常昂贵。 正如我们已经讨论的,进程之间的所有通信,无论是通过队列,管道还是更隐式的机制,都需要对对象进行腌制。 过多的酸洗会很快占据加工时间。 当在过程之间传递相对较小的对象并且每个过程需要完成大量工作时,多处理效果最佳。 另一方面,如果不需要进程之间的通信,那么使用该模块可能根本没有意义。 我们可以启动四个独立的 Python 进程并独立使用它们。

多重处理的另一个主要问题是,与线程一样,很难确定正在访问哪个进程或变量的方法。在多重处理中,如果您从另一个进程访问变量,则通常会覆盖当前正在运行的变量 流程,而另一个流程保留了旧值。 维护起来确实很混乱,所以不要这样做。

期货

让我们开始以一种更异步的并发方式研究。 期货根据我们需要的并发类型包装多处理或线程(趋向于 I / O 与趋向于 CPU)。 它们不能完全解决意外更改共享状态的问题,但可以使我们对代码进行结构化,以便在执行代码时更容易进行跟踪。 期货在不同线程或进程之间提供了明确的界限。 与多处理池类似,它们对于“呼叫和应答”类型的交互很有用,在这种交互中,处理可以在另一个线程中进行,然后在将来的某个时刻(毕竟它们被恰当地命名),您可以要求它提供结果 。 它实际上只是多处理池和线程池的包装,但是它提供了更简洁的 API 并鼓励了更好的代码。

将来是一个基本上包装函数调用的对象。 该函数调用在线程或进程的后台运行。 未来对象具有检查未来是否已完成并在完成后获得结果的方法。

让我们做另一个文件搜索示例。 在上一节中,我们实现了unix grep命令的版本。 这次,让我们做一个find命令的简单版本。 示例将在整个文件系统中搜索包含给定字符串的路径:

from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from os.path import sep as pathsep
from collections import deque

def find_files(path, query_string):
    subdirs = []
    for p in path.iterdir():
        full_path = str(p.absolute())
        if p.is_dir() and not p.is_symlink():
            subdirs.append(p)
        if query_string in full_path:
                print(full_path)

    return subdirs

query = '.py'
futures = deque()
basedir = Path(pathsep).absolute()

with ThreadPoolExecutor(max_workers=10) as executor:
    futures.append(
        executor.submit(find_files, basedir, query))
    while futures:
        future = futures.popleft()
        if future.exception():
            continue
        elif future.done():
            subdirs = future.result()
            for subdir in subdirs:
                futures.append(executor.submit(
                    find_files, subdir, query))
        else:
            futures.append(future)

该代码包含一个名为find_files的函数,该函数在单独的线程(或进程,如果使用ProcessPoolExecutor)中运行。 此函数没有什么特别的地方,但是请注意它不访问任何全局变量。 与外部环境的所有交互都将传递到函数中或从中返回。 这不是技术要求,而是在进行期货编程时将大脑保持在头骨内的最佳方法。

注意

在没有适当同步的情况下访问外部变量会导致出现竞争条件。 例如,假设有两个并发写入尝试递增整数计数器。 它们同时开始,并且都读取值为 5。然后,它们都将值递增,并将结果写回 6。但是,如果两个进程试图递增变量,则预期结果将是变量递增 2。 ,因此结果应为 7。现代人认为,避免这种情况的最简单方法是保持尽可能多的状态为私有,并通过已知安全的构造(例如队列)共享它们。

在开始之前,我们先设置了几个变量。 在此示例中,我们将搜索所有包含字符'.py'的文件。 我们有一排即将要讨论的期货。 basedir变量指向文件系统的根目录; 在 Unix 机器上为'/',在 Windows 上可能为C:\

首先,让我们学习搜索理论的短期课程。 该算法并行实现广度优先搜索。 它不是使用深度优先搜索来递归搜索每个目录,而是将当前文件夹中的所有子目录添加到队列中,然后将这些文件夹中的每一个的所有子目录添加到队列中,依此类推。

该程序的本质被称为事件循环。 我们可以将ThreadPoolExecutor构造为上下文管理器,以便在完成后自动清除它并关闭其线程。 它需要一个max_workers参数来指示一次运行的线程数。 如果提交的作业超过此数目,它将把其余的作业排入队列,直到工作线程可用。 使用ProcessPoolExecutor时,通常将其限制为计算机上的 CPU 数量,但对于线程而言,它可能更高,具体取决于一次要等待 I / O 的数量。 每个线程都占用一定数量的内存,因此它不应过高。 瓶颈是磁盘速度,而不是并行请求的数量,并不需要那么多线程。

构造执行程序后,我们将使用根目录向其提交作业。 submit()方法立即返回一个Future对象,该对象有望最终为我们提供结果。 未来排在前面。 然后,循环反复从队列中删除第一个 Future,并对其进行检查。 如果它仍在运行,则将其添加回队列末尾。 否则,我们通过调用future.exception()来检查函数是否引发异常。 如果是这样,我们将忽略它(这通常是一个权限错误,尽管真正的应用需要更加小心该异常是什么)。 如果我们不在此处检查此异常,则在调用result()时会引发该异常,并且可以通过常规try ... except机制进行处理。

假设没有发生异常,我们可以调用result()来获取函数调用的返回值。 由于函数返回的不是符号链接的子目录列表(我的防止无限循环的惰性方式),因此result()返回相同的内容。 这些新的子目录被提交给执行者,并且将生成的期货扔到队列中,以便在以后的迭代中搜索其内容。

这就是开发基于将来的 I / O 绑定应用所需要的全部。 在后台,它使用与我们已经讨论过的相同的线程或流程 API,但是它提供了更易理解的界面,并使得查看并发运行的函数之间的边界更加容易(只是不要尝试从内部访问全局变量)。 未来!)。

AsyncIO

AsyncIO 是在 Python 并发编程中的最新技术。 它结合了期货和事件循环的概念以及我们在第 9 章,“迭代器模式”中讨论的协程。 其结果与编写并发代码时获得的结果一样优雅且易于理解,尽管说明并不多!

AsyncIO 可以用于一些不同的并发任务,但是它是专门为网络 I / O 设计的。 大多数网络应用(尤其是在服务器端)花费大量时间等待数据从网络进入。 可以通过在单独的线程中处理每个客户端来解决此问题,但是线程会占用内存和其他资源。 AsyncIO 使用协程而不是线程。

该库还提供了自己的事件循环,从而避免了前面示例中的多行 while 循环。 但是,事件循环是有代价的。 当我们在事件循环上的异步任务中运行代码时,该代码必须立即返回,既不会阻止 I / O,也不会阻止长时间运行的计算。 在编写我们自己的代码时,这是一件小事,但这意味着在 I / O 上阻塞的任何标准库或第三方函数都必须创建非阻塞版本。

AsyncIO 通过创建一组协程来解决此问题,这些协程使用yield from语法立即将控制权返回给事件循环。 事件循环负责检查阻塞调用是否已完成并执行任何后续任务,就像我们在上一节中手动完成的一样。

AsyncIO 运行中

阻塞功能的典型示例是time.sleep调用。 让我们使用此调用的异步版本来说明 AsyncIO 事件循环的基础:

import asyncio
import random

@asyncio.coroutine
def random_sleep(counter):
    delay = random.random() * 5
    print("{} sleeps for {:.2f} seconds".format(counter, delay))
 yield from asyncio.sleep(delay)
    print("{} awakens".format(counter))

@asyncio.coroutine
def five_sleepers():
    print("Creating five tasks")
 tasks = [
 asyncio.async(random_sleep(i)) for i in range(5)]
    print("Sleeping after starting five tasks")
 yield from asyncio.sleep(2)
    print("Waking and waiting for five tasks")
 yield from asyncio.wait(tasks)

asyncio.get_event_loop().run_until_complete(five_sleepers())
print("Done five tasks")

这是一个相当基本的示例,但是它涵盖了 AsyncIO 编程的一些功能。 从执行的顺序(从上到下或多或少)最容易理解。

最后一行获取事件循环,并指示它运行将来直到完成。 有问题的未来被命名为five_sleepers。 一旦将来完成工作,循环将退出,我们的代码将终止。 作为异步程序员,我们不需要对run_until_complete调用内部发生的事情了解太多,但是要知道发生了很多事情。 这是我们在上一章中编写的 Futures 循环的协程版本,它知道如何处理迭代,异常,函数返回,并行调用等。

现在再仔细看看five_sleepers的未来。 忽略装饰器几段; 我们会回到它。 协程首先构建了random_sleep未来的五个实例。 生成的期货包装在asyncio.async任务中,该任务将它们添加到循环的任务队列中,以便在控制权返回事件循环时可以并行执行。

每当我们调用yield from时,都会返回该控件。 在这种情况下,我们调用yield from asyncio.sleep将协程暂停执行两秒钟。 在此休息期间,事件循环执行它排队的任务; 即五个random_sleep期货。 这些协程分别打印一条开始消息,然后在特定时间段内将控制发送回事件循环。 如果random_sleep内部的任何睡眠调用都短于两秒钟,则事件循环会将控制权传递回相关的 Future,后者在返回之前将其唤醒消息打印出来。 当five_sleepers内部的睡眠调用唤醒时,它将执行直到调用的下一个收益,该收益等待其余的random_sleep任务完成。 当所有睡眠调用均完成执行后,random_sleep任务返回,将其从事件队列中删除。 一旦完成所有这五个操作,就将返回asyncio.wait调用,然后返回five_sleepers方法。 最后,由于事件队列现在为空,因此run_until_complete调用可以终止并且程序结束。

asyncio.coroutine装饰器主要只是证明该协程打算用作事件循环中的未来。 在这种情况下,如果没有装饰器,程序将正常运行。 但是,asyncio.coroutine装饰器也可以用于包装常规函数(一个不产生任何结果的函数),以便可以将其视为将来的函数。 在这种情况下,整个函数将在控制权返回事件循环之前执行。 装饰器只是强制该函数满足协程 API,因此事件循环知道如何处理它。

阅读 AsyncIO 的未来

AsyncIO 协程按顺序执行每一行,直到遇到yield from语句为止,这时它将控制权返回给事件循环。 然后,事件循环将执行所有其他准备运行的任务,包括原始协程正在等待的任务。 每当该子任务完成时,事件循环就会将结果发送回协程,以便它可以继续执行,直到遇到另一个yield from语句或返回为止。

这使我们能够编写同步执行的代码,直到我们明确需要等待某些内容为止。 这消除了线程的不确定行为,因此我们不必过多担心共享状态。

注意

避免从协程内部访问共享状态仍然是一个好主意。 它使您的代码更容易推理。 更重要的是,即使理想世界中所有异步执行都可能发生在协程内部,但实际情况是,某些期货是在线程或进程内部幕后执行的。 坚持“不分享任何东西”的哲学,避免大量困难的错误。

另外,即使我们正在其他地方等待其他工作,AsyncIO 仍使我们可以在一个协程中一起收集代码的逻辑部分。 作为一个特定的实例,即使random_sleep协程中的yield from asyncio.sleep调用允许事件循环内发生大量事情,协程本身看起来也像是按顺序进行了所有操作。 这种读取相关异步代码段而无需担心等待任务完成的机器的能力是 AsyncIO 模块的主要优点。

用于网络的 AsyncIO

AsyncIO 是为与网络套接字一起使用而专门设计的,因此让我们实现一个 DNS 服务器。 更准确地说,让我们实现 DNS 服务器的一项极其基本的功能。

域名系统的基本目的是将域名(例如 www.amazon.com)转换为 IP 地址(例如 72.21.206.6)。 它必须能够执行多种类型的查询,并且在没有所需答案的情况下知道如何与其他 DNS 服务器联系。 我们将不会实施任何此类操作,但是以下示例能够直接响应标准 DNS 查询,以查找我的三个最近雇主的 IP:

import asyncio
from contextlib import suppress

ip_map = {
    b'facebook.com.': '173.252.120.6',
    b'yougov.com.': '213.52.133.246',
    b'wipo.int.': '193.5.93.80'
}

def lookup_dns(data):
    domain = b''
    pointer, part_length = 13, data[12]
    while part_length:
        domain += data[pointer:pointer+part_length] + b'.'
        pointer += part_length + 1
        part_length = data[pointer - 1]

    ip = ip_map.get(domain, '127.0.0.1')

    return domain, ip

def create_response(data, ip):
    ba = bytearray
    packet = ba(data[:2]) + ba([129, 128]) + data[4:6] * 2
    packet += ba(4) + data[12:]
    packet += ba([192, 12, 0, 1, 0, 1, 0, 0, 0, 60, 0, 4])
    for x in ip.split('.'): packet.append(int(x))
    return packet

class DNSProtocol(asyncio.DatagramProtocol):
    def connection_made(self, transport):
        self.transport = transport

 def datagram_received(self, data, addr):
        print("Received request from {}".format(addr[0]))
        domain, ip = lookup_dns(data)
        print("Sending IP {} for {} to {}".format(
            domain.decode(), ip, addr[0]))
 self.transport.sendto(
 create_response(data, ip), addr)

loop = asyncio.get_event_loop()
transport, protocol = loop.run_until_complete(
 loop.create_datagram_endpoint(
 DNSProtocol, local_addr=('127.0.0.1', 4343)))
print("DNS Server running")

with suppress(KeyboardInterrupt):
 loop.run_forever()
transport.close()
loop.close()

本示例设置了一个字典,该字典将一些域愚蠢地映射到 IPv4 地址。 其后是两个函数,这些函数从二进制 DNS 查询数据包中提取信息并构建响应。 我们不会讨论这些; 如果您想了解有关 DNS 的更多信息,请阅读 RFC(“请求注释”,用于定义大多数 Internet 协议的格式)1034 和 1035。

您可以通过在另一个终端上运行以下命令来测试此服务:

nslookup -port=4343 facebook.com localhost

让我们继续吧。 AsyncIO 网络围绕着传输和协议的紧密联系的概念。 协议是具有特定方法的类,当相关事件发生时会调用该方法。 由于 DNS 在 UDP用户数据报协议)的顶部运行; 我们将协议类构建为DatagramProtocol的子类。 此类有多种事件可以响应。 我们对发生的初始连接(仅是为了保存传输以供将来使用)和datagram_received事件特别感兴趣。 对于 DNS,必须解析并响应每个收到的数据报,此时交互结束。

因此,当接收到数据报时,我们将使用我们不谈论的功能(它们是家族中的败类)来处理数据包,查找 IP 并构造响应。 然后,我们指示基础传输使用其sendto方法将结果包发送回请求客户端。

传输实质上代表了通信流。 在这种情况下,它将抽象化事件循环上 UDP 套接字上发送和接收数据的所有麻烦。 例如,有类似的传输方式可以与 TCP 套接字和子进程进行交互。

通过调用循环的create_datagram_endpoint协程来构造 UDP 传输。 这将构造适当的 UDP 套接字并开始对其进行侦听。 我们将套接字需要侦听的地址传递给它,更重要的是,传递给我们创建的协议类,以便传输器在接收到数据时知道要调用的内容。

由于初始化套接字的过程会花费很短的时间,并且会阻塞事件循环,因此create_datagram_endpoint函数是协程。 在我们的示例中,等待初始化时我们实际上不需要执行任何操作,因此将调用包装在loop.run_until_complete中。 事件循环负责管理未来,完成后将返回两个值的元组:新初始化的传输和从我们传入的类构造的协议对象。

在后台,传输已在事件循环中设置了一个任务,该任务正在侦听传入的 UDP 连接。 然后,我们要做的就是通过调用loop.run_forever()启动事件循环,以便任务可以处理这些数据包。 数据包到达时,将按照协议进行处理,一切正常。

唯一要注意的主要问题是,传输(以及实际上是事件循环)在完成后应该被关闭。 在这种情况下,无需两次调用close(),代码就可以很好地运行,但是如果我们正在动态地构建传输(或只是进行适当的错误处理!),我们需要更加注意它。 。

您可能很沮丧地看到在设置协议类和基础传输时需要多少样板文件。 AsyncIO 在称为流的这两个关键概念之上提供了抽象。 在下一个示例中,我们将看到 TCP 服务器中的流示例。

使用执行程序包装阻塞代码

AsyncIO 提供自己的期货库版本,以允许我们在没有进行适当的非阻塞调用时在单独的线程或进程中运行代码。 这实质上使我们能够将线程和进程与异步模型结合在一起。 此功能更有用的应用之一是当应用具有大量的 I / O 绑定和 CPU 绑定活动时,两者兼得。 I / O 绑定部分可以在事件循环中发生,而 CPU 密集型工作可以分解为其他进程。 为了说明这一点,让我们使用 AsyncIO 实现“作为服务排序”:

import asyncio
import json
from concurrent.futures import ProcessPoolExecutor

def sort_in_process(data):
    nums = json.loads(data.decode())
    curr = 1
    while curr < len(nums):
        if nums[curr] >= nums[curr-1]:
            curr += 1
        else:
            nums[curr], nums[curr-1] = \
                nums[curr-1], nums[curr]
            if curr > 1:
                curr -= 1

    return json.dumps(nums).encode()

@asyncio.coroutine
def sort_request(reader, writer):
    print("Received connection")
 length = yield from reader.read(8)
 data = yield from reader.readexactly(
 int.from_bytes(length, 'big'))
 result = yield from asyncio.get_event_loop().run_in_executor(
 None, sort_in_process, data)
    print("Sorted list")
    writer.write(result)
    writer.close()   
    print("Connection closed")     

loop = asyncio.get_event_loop()
loop.set_default_executor(ProcessPoolExecutor())
server = loop.run_until_complete(
 asyncio.start_server(sort_request, '127.0.0.1', 2015))
print("Sort Service running")

loop.run_forever()
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

这是的好代码示例,实现了一些非常愚蠢的想法。 作为服务排序的整个想法是非常荒谬的。 使用我们自己的排序算法而不是调用 Python 的sorted更糟。 我们使用的算法称为 gnome 排序,在某些情况下称为“愚蠢排序”。 这是在纯 Python 中实现的慢速排序算法。 我们定义了自己的协议,而不是使用野外存在的许多非常合适的应用协议之一。 甚至在这里使用多处理来并行处理的想法都可能令人怀疑。 我们仍然最终将所有数据传入和传出子流程。 有时候,重要的是要退出正在编写的程序,并问自己是否正在努力实现正确的目标。

但是,让我们看一下该设计的一些智能功能。 首先,我们将字节传入和传出子进程。 这比在主流程中解码 JSON 聪明得多。 这意味着(相对昂贵的)解码可以在不同的 CPU 上进行。 而且,腌制的 JSON 字符串通常比腌制的列表小,因此在进程之间传递的数据更少。

其次,这两种方法非常线性。 看起来代码正在一行接一行地执行。 当然,在 AsyncIO 中,这只是一种幻想,但是我们不必担心共享内存或并发原语。

前面的示例现在应该看起来很熟悉,因为它与其他 AsyncIO 程序具有类似的样板。 但是,有一些差异。 您会注意到我们叫start_server而不是create_server。 此方法挂接到 AsyncIO 的流中,而不使用底层的传输/协议代码。 除了传递协议类之外,我们还可以传递普通协程,该协程接收读取器和写入器参数。 两者都代表可以像文件或套接字一样读取和写入的字节流。 其次,因为这是 TCP 服务器而不是 UDP,所以在程序完成时需要进行一些套接字清理。 此清理是一个阻塞调用,因此我们必须在事件循环上运行wait_closed协程。

流很容易理解。 阅读是一个潜在的阻塞调用,因此我们必须使用yield from进行调用。 写作不会阻塞; 它只是将数据放在队列中,AsyncIO 在后台将其发送出去。

我们在sort_request方法中的代码发出两个读取请求。 首先,它从导线中读取 8 个字节,并使用大端符号将其转换为整数。 该整数表示客户端打算发送的数据字节数。 因此,在下一次调用readexactly时,它将读取那么多字节。 readreadexactly之间的区别在于,前者将读取最多请求的字节数,而后者将缓冲读取直到接收到所有字节,或者直到连接关闭。

执行器

现在让我们看一下执行器代码。 我们导入与上一节中使用的完全相同的ProcessPoolExecutor。 注意,我们不需要特殊的 AsyncIO 版本。 事件循环有一个方便的run_in_executor协程,可用于运行期货。 默认情况下,循环在ThreadPoolExecutor中运行代码,但是如果愿意,我们可以传入其他执行程序。 或者,就像在本示例中所做的那样,可以在通过调用loop.set_default_executor()设置事件循环时设置其他默认值。

您可能在上一节中回忆过,没有多少样板可用于与执行人一起使用期货。 但是,当我们将它们与 AsyncIO 一起使用时,根本没有! 协程程序会在将来自动包装函数调用,并将其提交给执行程序。 我们的代码将阻塞,直到将来完成为止,而事件循环将继续处理其他连接,任务或将来。 将来完成后,协程将唤醒并继续将数据写回到客户端。

您可能想知道,与其在一个事件循环中运行多个进程,不如在一个不同的进程中运行多个事件循环会更好。 答案是:“也许”。 但是,根据确切的问题空间,我们可能最好不要使用单个事件循环来运行程序的独立副本,而是尝试将所有内容与主多处理流程进行协调。

在本节中,我们已经达到 AsyncIO 的大部分要点,并且本章还介绍了许多其他并发原语。 并发是一个很难解决的问题,没有一种解决方案适合所有用例。 设计并发系统的最重要部分是确定哪种可用工具是解决问题的正确工具。 我们已经看到了几种并发系统的优缺点,现在已经有了一些见识,可以看出哪些是针对不同类型需求的更好选择。

案例研究

为了使结束本章以及该模块,让我们构建一个基本的图像压缩工具。 它将拍摄黑白图像(每像素 1 位,打开或关闭),并尝试使用一种非常基本的压缩形式(称为行程编码)对其进行压缩。 您可能会发现黑白图像有些牵强。 如果是这样,您在这个页面上的工作时间不够!

我在本章的示例代码中包含了一些黑白 BMP 图像样本(易于将数据读入,并留有很多机会来改善文件大小)。

我们将使用一种称为游程长度编码的简单技术来压缩图像。 该技术基本上采用一系列位,并用重复的位数替换任何重复的位字符串。 例如,可以将字符串 000011000 替换为 04 12 03,以指示 4 个零之后是 2 个,再是 3 个零。 为了使事情更有趣,我们将每一行分成 127 位的块。

我没有任意选择 127 位。 可以将 127 个不同的值编码为 7 位,这意味着如果一行包含全 1 或全 0,则可以将其存储在单个字节中; 第一位指示是 0 行还是 1 行,其余 7 位指示存在多少位。

将图像分解为的另一个好处是; 我们可以并行处理各个块,而无需彼此依赖。 但是,也有一个主要的缺点。 如果运行中仅有几个或零,则它将占用压缩文件中的more空间。 当我们将长期运行分解为多个块时,我们可能最终会创建更多此类次运行并膨胀文件的大小。

处理文件时,我们必须考虑压缩文件中字节的确切布局。 我们的文件将在文件的开头存储两个字节的 little-endian 整数,代表完整文件的宽度和高度。 然后,它将写入代表每一行的 127 位块的字节。

现在,在开始设计并发系统以构建此类压缩映像之前,我们应该问一个基本问题:该应用受 I / O 约束还是受 CPU 约束?

老实说,我的回答是“我不知道”。 我不确定该应用是否会花费更多时间从磁盘加载数据并将其写回或在内存中进行压缩。 我怀疑它原则上是受 CPU 限制的应用,但是一旦我们开始将图像字符串传递到子进程中,我们可能会失去并行性的任何好处。 解决此问题的最佳方法可能是编写 C 或 Cython 扩展,但让我们看看使用纯 Python 可以获得多远的效果。

我们将使用自下而上的设计来构建此应用。 这样,我们将有一些构建基块,可以将它们组合成不同的并发模式,以了解它们之间的比较。 让我们从使用游程长度编码压缩 127 位块的代码开始:

from bitarray import bitarray
def compress_chunk(chunk):
    compressed = bytearray()
    count = 1
    last = chunk[0]
    for bit in chunk[1:]:
        if bit != last:
            compressed.append(count | (128 * last))
            count = 0
            last = bit
        count += 1
    compressed.append(count | (128 * last))
    return compressed

该代码使用bitarray类来处理各个零和一。 它作为第三方模块分发,您可以使用命令pip install bitarray进行安装。 传递到compress_chunks的块是此类的一个实例(尽管该示例对于布尔值列表同样适用)。 在这种情况下,位数组的主要好处是在进程之间进行腌制时,它们占据了布尔列表或 1s 和 0s 字节串的空间的八分之一。 因此,它们的腌制速度更快。 与进行大量按位运算相比,它们也更容易使用(双关语意)。

该方法使用行程编码对数据进行压缩,并返回包含打包数据的字节数组。 如果位数组就像一个一和零的列表,那么字节数组就像一个字节对象的列表(每个字节当然包含 8 个一或零)。

执行压缩的算法非常简单(尽管我想指出,我花了两天时间来实现和调试它。简单易懂并不一定意味着易于编写!)。 它首先将last变量设置为当前运行中的位类型(TrueFalse)。 然后,它循环遍历这些位,对每个位进行计数,直到找到不同的位为止。 当这样做时,它会根据last变量所包含的内容,通过将字节的最左位(128 位置)设为零或一来构造一个新的字节。 然后它重置计数器并重复该操作。 循环完成后,它将为最后一次运行创建最后一个字节,并返回结果。

在创建构建块时,让我们创建一个压缩一行图像数据的函数:

def compress_row(row):
    compressed = bytearray()
    chunks = split_bits(row, 127)
    for chunk in chunks:
        compressed.extend(compress_chunk(chunk))
    return compressed

该函数接受一个名为 row 的位数组。 使用我们将很快定义的函数,它将它分成每个 127 位宽的块。 然后,它使用先前定义的compress_chunk压缩这些块中的每个块,将结果连接到bytearray中,并返回。

我们将split_bits定义为一个简单的生成器:

def split_bits(bits, width):
    for i in range(0, len(bits), width):
        yield bits[i:i+width]

现在,由于我们尚不确定这是否可以在线程或进程中更有效地运行,因此让我们将这些函数包装在一种方法中,该方法可以在提供的执行程序中运行所有操作:

def compress_in_executor(executor, bits, width):
    row_compressors = []
    for row in split_bits(bits, width):
 compressor = executor.submit(compress_row, row)
        row_compressors.append(compressor)

    compressed = bytearray()
    for compressor in row_compressors:
 compressed.extend(compressor.result())
    return compressed

这个例子几乎不需要解释; 它使用我们已经定义的相同split_bits函数根据图像的宽度将输入的位分成几行(自下而上的设计!)。

请注意,此代码将压缩任何位序列,尽管它会膨胀,而不是压缩位值频繁变化的二进制数据。 黑白图像绝对是有关压缩算法的良好候选者。 现在,让我们创建一个函数,该函数使用第三方枕头模块加载图像文件,然后将其转换为位并进行压缩。 我们可以使用古老的注释语句轻松地在执行程序之间切换:

from PIL import Image
def compress_image(in_filename, out_filename, executor=None):
    executor = executor if executor else ProcessPoolExecutor()
    with Image.open(in_filename) as image:
        bits = bitarray(image.convert('1').getdata())
        width, height = image.size

    compressed = compress_in_executor(executor, bits, width)

    with open(out_filename, 'wb') as file:
        file.write(width.to_bytes(2, 'little'))
        file.write(height.to_bytes(2, 'little'))
        file.write(compressed)

def single_image_main():
    in_filename, out_filename = sys.argv[1:3]
    #executor = ThreadPoolExecutor(4)
 executor = ProcessPoolExecutor()
    compress_image(in_filename, out_filename, executor)

image.convert()调用将图像更改为黑白(一位)模式,而getdata()返回这些值的迭代器。 我们将结果打包到一个位数组中,以便它们更快地通过导线传输。 当输出压缩文件时,我们首先写入图像的宽度和高度,然后写入压缩数据,压缩数据以字节数组的形式出现,可以直接写入二进制文件。

编写完所有这些代码后,我们终于可以测试线程池或进程池是否为我们提供了更好的性能。 我创建了一个大的(7200 x 5600 像素)黑白图像,并将其运行在两个池中。 ProcessPool大约需要 7.5 秒来处理系统上的映像,而ThreadPool始终需要大约 9 秒钟。因此,正如我们所怀疑的那样,在进程之间来回腌制位和字节的成本几乎消耗了所有效率。 通过在多个处理器上运行可以获得好处(尽管看我的 CPU 监视器,它确实充分利用了我计算机上的所有四个内核)。

因此,看起来压缩单个图像是在单独的过程中最有效的方法,但这仅仅是勉强完成的工作,因为我们在父过程和子过程之间来回传递了这么多数据。 当进程之间传递的数据量很少时,多处理会更有效。

因此,让我们扩展应用以并行压缩目录中的所有位图。 我们唯一要传递给子进程的是文件名,因此与使用线程相比,我们应该获得速度上的提高。 另外,要发疯,我们将使用现有代码来压缩单个图像。 这意味着我们将在每个子流程中运行ProcessPoolExecutor以创建更多子流程。 我不建议在现实生活中这样做!

from pathlib import Path
def compress_dir(in_dir, out_dir):
    if not out_dir.exists():
        out_dir.mkdir()

 executor = ProcessPoolExecutor()
    for file in (
            f for f in in_dir.iterdir() if f.suffix == '.bmp'):
        out_file = (out_dir / file.name).with_suffix('.rle')
 executor.submit(
 compress_image, str(file), str(out_file))

def dir_images_main():
    in_dir, out_dir = (Path(p) for p in sys.argv[1:3])
    compress_dir(in_dir, out_dir)

这段代码使用了我们先前定义的compress_image函数,但是对每个图像在单独的进程中运行它。 它不会将执行程序传递给该函数,因此,一旦新进程开始运行,compress_image就会创建一个ProcessPoolExecutor

现在我们正在执行器内部运行执行器,可以使用四种线程和进程池组合来压缩图像。 它们各自具有完全不同的时序配置文件:

|   |

每个映像的处理池

|

每个映像的线程池

| | --- | --- | --- | | 每行的处理池 | 42 秒 | 53 秒 | | 每行线程池 | 34 秒 | 64 秒 |

如我们所料,对每个图像使用线程,对每行再次使用线程是最慢的,因为 GIL 阻止我们并行执行任何工作。 鉴于我们在使用单个图像时对每行使用单独的进程时速度稍快,如果您在单独的进程中处理每个图像,您可能会惊讶地发现对行使用ThreadPool功能更快 。 花一些时间来了解为什么会这样。

我的机器仅包含四个处理器核心。 每个图像中的每一行都在单独的池中进行处理,这意味着所有这些行都在争夺处理能力。 当只有一张图像时,我们通过并行运行每一行来获得(非常适度的)加速。 但是,当我们一次增加要处理的图像数量时,将所有行数据传入和传出子流程的成本正在积极地浪费其他每个图像的处理时间。 因此,如果我们可以在单独的处理器上处理每个图像,那么只需将几个文件名添加到子进程管道中,就可以使速度得到大幅提升。

因此,我们看到不同的工作负载需要不同的并发范例。 即使我们只是使用期货,我们也必须就使用哪种执行人做出明智的决定。

还要注意,对于通常大小的图像,该程序运行得足够快,以至于我们使用哪种并发结构都没关系。 实际上,即使我们根本不使用任何并发性,我们最终也可能会获得相同的用户体验。

这个问题也可以直接使用线程和/或多处理模块来解决,尽管要编写的样板代码要多得多。 您可能想知道 AsyncIO 在这里是否有用。 答案是:“可能不会”。 大多数操作系统没有很好的方法来进行非阻塞的文件系统读取,因此无论如何,该库最终都会将所有调用包装在期货中。

为了完整起见,这是我用来解压缩 RLE 图像的代码,以确认算法是否正常工作(实际上,直到我修复压缩和解压缩中的错误时,我仍然不确定是否 完美。我应该使用测试驱动的开发!):

from PIL import Image
import sys

def decompress(width, height, bytes):
    image = Image.new('1', (width, height))

    col = 0
    row = 0
    for byte in bytes:
        color = (byte & 128) >> 7
        count = byte & ~128
        for i in range(count):
            image.putpixel((row, col), color)
            row += 1
        if not row % width:
            col += 1
            row = 0
    return image

with open(sys.argv[1], 'rb') as file:
    width = int.from_bytes(file.read(2), 'little')
    height = int.from_bytes(file.read(2), 'little')

    image = decompress(width, height, file.read())
    image.save(sys.argv[2], 'bmp')

此代码非常简单。 每次运行都编码为一个字节。 它使用一些按位数学运算来提取像素的颜色和行程的长度。 然后,它根据图像中运行的每个像素设置像素,增加下一个像素的行和列,以适当的间隔进行检查。

Case study

Case study

Case study

Case study