跳转至

整篇博文比较长,完整看完可能需要十五分钟

讲到python的并发编程,就不得不提到python的协程,多线程,多进程这些经典的并发模型。虽然都是用来解决并发问题的,但是不同场景下运用不同模型差别会很大。本文基于极客时间的python核心技术这个专栏来讲解下这三者的区别

协程,线程,进程的区别

首先这三者是有明显的大小关系

进程 > 线程 > 协程

换个说法就是一个进程可以有多个线程,而一个线程里面可以有多个协程。

进程是直接受==操作系统==控制的,是操作系统进行资源分配和调度的基本单位,进程之间相互==独立==,并且进程的稳定性好,一个进程崩了不影响另外一个进程

线程则是==CPU资源分配和调度的基本单位==, 一个进程下的多个线程可以共享资源,但缺点就是线程崩溃后,其对应的整个进程也跟着崩溃

协程则是一个更小的概念,它是位于线程内部,完全受==程序==所控制,它的特点就是运行效率极高

举个python中协程最常见的例子,生成器 我相信学过python的人肯定对此都不陌生 生成器不同于列表,它不会一下子把所有结果计算出来,它只有在程序需要的时候,弹出一个对应的结果。而列表则是将所有结果计算出来,存储在内存,因此开销也就大,接下来我们看一段较为简单的代码

def generator():
    for i in range(5):
        yield i**2

if __name__ == '__main__':
    # main()
    a = generator()
    print(next(a))
    print(next(a))
    print(next(a))
    print(next(a))

输出结果:
0
1
4
9
next是一个函数,接受可迭代对象,没调用一次next则会输出一次

本质上,协程是单线程的,它通过事件循环(event loop)这个概念来执行多个任务,eventloop维护两个任务列表,一个任务列表存放预备任务(即该任务目前空闲,随时可以被执行),另一个任务列表存放是等待任务(即该任务正在运行,但在等待外部的某些操作,如IO操作之类的)

eventloop会选取一个预备任务执行,直到控制权回到eventloop上,它回检查任务完成情况,然后再决定把这一任务放到合适的任务列表中。接着会==遍历一遍等待任务列表==,看这些任务是否完成,如果完成了,则放置在预备状态的列表如果未完成,则放置在等待任务列表, 接着开启新的一轮循环,直到所有任务完成

并发编程

首先我们需要区分==并发==和==并行==这两个概念。并发是指特定时刻执行一个操作,只不过线程和任务会进行切换。并行则是指同一时刻,同时执行多个操作。

在举个通俗点的例子,比如我的任务列表是这样: 爬取网页,等待打印机打印(相当于IO操作),写邮件

普通的顺序执行就是一步步来,先爬取网页,再等待打印,最后写邮件 而并发的思路则会是,等待打印的时候去执行其他两个任务 并行的思路则是三个任务同时执行

并发的思路常用于爬虫领域,如果我爬取网页后要等待打印,这时候大量的IO操作是很耗费时间的,因此效率底下

如果以并发形式,则会在执行IO这种耗费时间的操作同时,去爬取后续的网页,这样一来就会大大提升效率(同时也会提升被封IP概率,因为爬取速度过快)

python里如何实现并发编程?

在以前python中常用的并发编程库是Thread模块,这个模块并不太友好。所幸有concurrent模块和asyncio模块帮助我们能较为快速的实现并发编程 我们先看下concurrent的futures模块

import requests
import time
import concurrent.futures

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)

def main():
    sites = [
        'https://baike.baidu.com/item/Python/407313?fr=aladdin',
        'https://baike.baidu.com/item/Java/85979?fr=aladdin',
        'https://baike.baidu.com/item/C++/99272',
        'https://baike.baidu.com/item/Ruby/11419?fr=aladdin',
        'https://baike.baidu.com/item/go/953521?fromtitle=Golang&fromid=2215139&fr=aladdin',
        'https://baike.baidu.com/item/https/285356?fr=aladdin',
        'https://baike.baidu.com/item/c%E8%AF%AD%E8%A8%80/105958?fromtitle=c&fromid=7252092&fr=aladdin'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

首先我们写了个方法download_one,它有request的get方法来去访问一个url,接着是download_all,我们在一个==上下文环境==,使用concurrent.futures.ThreadPoolExecutor

它实质上开启了一个线程池,max_workers参数是指最大线程数目,然后调用executor自带的map函数,这个map函数跟python标准库的map函数很相似,就是传入一个函数名,再传入一个可迭代对象,可迭代对象里的每个元素都会作为参数传进这个函数内部执行。这里我的网站是爬取百度百科的一些词条url,然后用time库的函数来记录时间

另外关于max_workers的值==并不是设的越高越好==。因为你使用多线程那必然会有线程的==切换,创建,删除==,而这些操作也是会耗费一定时间和资源的。在实际中我们通常需要根据运行情况进行调试

下面的代码是为了介绍futures模块的其他方法而改写的,尽管看上去不如用map来的优雅

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executer:
        to_do = []
        for site in sites:
            future = executer.submit(download_one, site)
            to_do.append(future)

        for future in concurrent.futures.as_completed(to_do):
            future.result()
def main():
    sites = [
        'https://baike.baidu.com/item/Python/407313?fr=aladdin',
        'https://baike.baidu.com/item/Java/85979?fr=aladdin',
        'https://baike.baidu.com/item/C++/99272',
        'https://baike.baidu.com/item/Ruby/11419?fr=aladdin',
        'https://baike.baidu.com/item/go/953521?fromtitle=Golang&fromid=2215139&fr=aladdin',
        'https://baike.baidu.com/item/https/285356?fr=aladdin',
        'https://baike.baidu.com/item/c%E8%AF%AD%E8%A8%80/105958?fromtitle=c&fromid=7252092&fr=aladdin'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
主要的改动是在download_all这个函数里面 submit方法接受一个函数名和对应参数,返回一个futures对象 然后我们添加到to_do这个列表里面 as_completed()这个方法是读入一个由futures对象组成的迭代器,直到整个futures对象完成其任务。可以理解为一个任务列表,当子任务完成后,直接用result()方法返回

接下来我们看怎么实现==多进程== 在futures模块下,我们只需要替换上下文那一句就行 就是 将

 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)
替换为
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(download_one, sites)
通常我们使用多进程不传参数给Executor,它会根据CPU返回合适的进程数目

最后我们来看下协程 虽然协程效率高,但是它最大的问题是其兼容性,并不是所有标准库都支持asyncio的协程编程,而request也不支持,所以我们这次换用aiohttp这个库来执行网页访问的操作

import asyncio
import aiohttp
import time


async def download_one(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print("Read {} from {}".format(resp.content_length, url))


async def download_all(sites):
    tasks = [asyncio.create_task(download_one(site)) for site in sites]
    await asyncio.gather(*tasks)

def main():
    sites = [
            'https://baike.baidu.com/item/Python/407313?fr=aladdin',
            'https://baike.baidu.com/item/Java/85979?fr=aladdin',
            'https://baike.baidu.com/item/C++/99272',
            'https://baike.baidu.com/item/Ruby/11419?fr=aladdin',
            'https://baike.baidu.com/item/go/953521?fromtitle=Golang&fromid=2215139&fr=aladdin',
            'https://baike.baidu.com/item/https/285356?fr=aladdin',
            'https://baike.baidu.com/item/c%E8%AF%AD%E8%A8%80/105958?fromtitle=c&fromid=7252092&fr=aladdin'
        ]
    start_time = time.perf_counter()
    asyncio.run(download_all(sites))
    end_time = time.perf_counter()
    print("Download {} sites in {} seconds".format(len(sites), end_time-start_time))

if __name__ == "__main__":
    main()
其中async和await表示这个函数/语句是non-block,也就是可以等待的,这就是之前讲的事件循环部分,它可以放到等待任务列表,然后再执行预备任务列表内的任务。

比如我们的download_one有个print的io操作, 加上async后我们就可以在等待打印的时候执行其他任务

在download_all中我们用一个列表推导式并调用了create_task函数来创建一个任务列表 await asyncio.gather中用星号*解引用任务列表,为了确保每个任务都完成。

三种并发模型该用哪个?

如果是 I/O操作数量大,并且 I/O 操作==很慢==,需要很多任务 / 线程协同实现,那么==使用 Asyncio 更合适==。

如果是 I/O操作数量大,但是 I/O 操作==很快==,只需要有限数量的任务 / 线程,那么使用==多线程==就可以了。

如果是 CPU操作量大,则需要使用==多进程==来提高程序运行效率,但这也会耗费大量的系统资源

接着我们看一个例子,用三种模型写出来的运行效率区别 首先来看一个原始版本的代码

def cpu_bound(number):
    print(sum(i * i for i in range(number)))


def calculate_sums(numbers):
    for number in numbers:
        cpu_bound(number)

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))
    # 23.64s
cpu_bound这个函数是计算一个整数平方和,由于我们在main函数里设置数字很大,所以它是一个对cpu耗费资源较大的一个任务,此时我们运行它,我本地得到的结果是23.64秒

接着我们用协程来重写

async def cpu_bound(number):
    print(sum(i*i for i in range(number)))

async def calculate_sums(numbers):
    task = [asyncio.create_task(cpu_bound(number)) for number in numbers]
    await asyncio.gather(*task)

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    asyncio.run(calculate_sums(numbers))
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))
    # 25.33秒
我本地跑出来是25秒,甚至效果更差了!

接着我们用多线程来实现 这里我用的还是futures模块

def cpu_bound(number):
    print(sum(i * i for i in range(number)))


def calculate_sums(numbers):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(cpu_bound, numbers)


def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))
    # 24.03s
这里我本地跑出来的是24s,略微好一点

因为IO操作并不密集,所以多线程和协程写出来的效果并没有那么好

最后我们来看下多进程

def cpu_bound(number):
    print(sum(i * i for i in range(number)))


def calculate_sums(numbers):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(cpu_bound, numbers)


def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))
    # 5.9s

多进程能缩短到6s,是原来执行时间的四分之一!

总结

这篇博文总结了比较常用的三种并发模型,当然并发这个问题并不是那么简单,只是python在更新过程中开发了那么多库供开发者更方便地进行开发,对于本质,原理,仍然需要我们不断地学习,探索。最后祝各位小伙伴学习进步!