Python并发(二)

Stella981
• 阅读 819

并发是指一次处理多件事,而并行是指一次做多件事。二者不同,但互相有联系。打个比方:像Python的多线程,就是并发,因为Python的解释器GIL是线程不安全的,一次只允许执行一个线程的Python字节码,我们在使用多线程时,看上去像很多个任务同时进行,但实际上但一个线程在执行的时候,其他线程是处于休眠状态的。而在多CPU的服务器上,Java或Go的多线程,则是并行,因为他们的多线程会利用到服务器上的每个CPU,如果一个服务器上只有一个CPU,那么Java或者Go的多线程依旧是并发,而不是并行。

在上个章节,我们讨论了Python的多线程,在这个章节,我们将通过asyncio包来实现并发,这个包使用事件循环驱动的协程来实现并发

下面,我们看一下asyncio包的简单使用

import asyncio
from time import strftime


@asyncio.coroutine
def hello():
    print(strftime('[%H:%M:%S]'), "Hello world!")
    r = yield from asyncio.sleep(1)
    print(strftime('[%H:%M:%S]'), "Hello again!")


loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()

运行结果:

[17:01:59] Hello world!
[17:02:00] Hello again!

@asyncio.coroutine把一个生成器标记为协程类型,然后,我们就把这个协程扔到EventLoop中执行

现在,我们封装两个协程扔进EventLoop中执行

import threading
import asyncio
from time import strftime


@asyncio.coroutine
def hello(id):
    print(strftime('[%H:%M:%S]'), 'coroutine_id:%s thread_id:%s' % (id, threading.currentThread()))
    yield from asyncio.sleep(1)
    print(strftime('[%H:%M:%S]'), 'coroutine_id:%s thread_id:%s' % (id, threading.currentThread()))


loop = asyncio.get_event_loop()
tasks = [hello(1), hello(2)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

运行结果:

[17:10:51] coroutine_id:1 thread_id:<_MainThread(MainThread, started 5100)>
[17:10:51] coroutine_id:2 thread_id:<_MainThread(MainThread, started 5100)>
[17:10:52] coroutine_id:1 thread_id:<_MainThread(MainThread, started 5100)>
[17:10:52] coroutine_id:2 thread_id:<_MainThread(MainThread, started 5100)>

由打印的当前线程名称可以看出,两个协程是由同一个线程并发执行的。
如果把asyncio.sleep()换成真正的IO操作,则多个协程就可以由一个线程并发执行。

async/await

我们可以用asyncio提供的@asyncio.coroutine可以把一个生成器标记为协程类型,然后在协程内部用yield from调用另一个协程实现异步操作。为了简化并更好地标识异步IO,从Python3.5开始引入了新的语法async和await,可以让协程的代码更简洁易读。async和await是针对协程的新语法,要使用新的语法,只需要做两步简单的替换:

import asyncio
from time import strftime


async def hello():
    print(strftime('[%H:%M:%S]'), "Hello world!")
    r = await asyncio.sleep(1)
    print(strftime('[%H:%M:%S]'), "Hello again!")


loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()

运行结果:

[17:19:55] Hello world!
[17:19:56] Hello again!

下面,让我们用协程并发下载多张图片,这里需要用到aiohttp包,asyncio包只支持TCP和UDP,如果想要使用HTTP协议,需要使用第三方的包,而aiohttp包,则是支持HTTP协议的

import asyncio
import time
import aiohttp
import sys
import os
from time import strftime, sleep

POP20_CC = ["pms_1508850965.67096774", "pms_1509723338.05097112", "pms_1508125822.19716710",
            "pms_1512614327.2483640", "pms_1525853341.8312102", "pms_1511228654.33099308"]

BASE_URL = 'https://i1.mifile.cn/a1'

DEST_DIR = 'downloads/'


async def get_flag(cc):  # <1>
    url = '{}/{cc}.jpg'.format(BASE_URL, cc=cc.lower())
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            image = await resp.read()
    return image


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


async def download_one(cc):  # <2>
    image = await get_flag(cc)
    sys.stdout.flush()
    save_flag(image, cc.lower() + '.jpg')
    return cc


def download_many(cc_list):  # <3>
    loop = asyncio.get_event_loop()
    to_do = [download_one(cc) for cc in sorted(cc_list)]
    wait_coro = asyncio.wait(to_do)
    res, _ = loop.run_until_complete(wait_coro)
    loop.close()
    return len(res)


def main(download_many):
    path = os.path.join(DEST_DIR)
    if not os.path.exists(path):
        os.mkdir(path)
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many)

运行结果:

6 flags downloaded in 0.25s

<1>处,我们通过async/await将这个生成器声明为协程类型,我们用aiohttp获取远程的图片资源,当发生网络请求的时候,主线程会切换到其他的协程执行

<2>处,当<1>处的网络请求发回响应时,将返回的图片存入本地

<3>处,我们在这个方法里生成多个协程,并提交到EventLoop中运行

上面的程序,还有几处值的修改的地方:

第一处是IO问题,程序员往往忽略一个事实,就是访问本地文件系统会阻塞,想当然的认为这种操作不会受网络访问高延迟的影响,而在上述示例中,save_flag()函数会阻塞客户端代码和asyncio事件循环共用的唯一线程,因此保存图片时,整个应用程序都会被冻结,而一旦受到I/O阻塞,则会浪费掉几百万个CPU周期,所以,就算是本地文件系统的访问,我们也应该把他提到另一个线程去执行,避免造成CPU周期的浪费。

第二处是管理协程的并发数,假设我们这里抓取的不再是仅仅几张图片,而是成千上百,可能我们的链接会断掉,甚至对方的网络因为我们的频繁访问禁止了我们的IP。

所以,我们还要对我们的图片下载代码进行修改

import asyncio
import collections
import contextlib
import time
import aiohttp
from aiohttp import web
import os
from collections import namedtuple
from enum import Enum

POP20_CC = ["pms_1508850965.67096774", "pms_1509723338.05097112", "pms_1508125822.19716710",
            "pms_1512614327.2483640", "pms_1525853341.8312102", "pms_1511228654.33099308", "error"]

BASE_URL = 'https://i1.mifile.cn/a1'

DEST_DIR = 'downloads/'

DEFAULT_CONCUR_REQ = 3
VERBOSE = True
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')


class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


async def get_flag(base_url, cc):
    url = '{}/{cc}.jpg'.format(base_url, cc=cc.lower())
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            with contextlib.closing(resp):  # <1>
                if resp.status == 200:
                    image = await resp.read()
                    return image
                elif resp.status == 404:
                    raise web.HTTPNotFound()
                else:
                    raise aiohttp.HttpProcessingError(
                        code=resp.status, message=resp.reason,
                        headers=resp.headers)


async def download_one(cc, base_url, semaphore, verbose):
    try:
        with (await semaphore):  # <2>
            image = await get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'is not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag, image, cc.lower() + '.jpg')  # <3>
        status = HTTPStatus.ok
        msg = 'is OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)


async def downloader_coro(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req)
    to_do = [download_one(cc, base_url, semaphore, verbose)
             for cc in sorted(cc_list)]
    to_do_iter = asyncio.as_completed(to_do)
    for future in to_do_iter:
        try:
            res = await future
        except FetchError as exc:
            country_code = exc.country_code
            try:
                error_msg = exc.__cause__.args[0]
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status

        counter[status] += 1

    return counter


def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro)
    return counts


def main(download_many):
    path = os.path.join(DEST_DIR)
    if not os.path.exists(path):
        os.mkdir(path)
    t0 = time.time()
    counter = download_many(POP20_CC, BASE_URL, VERBOSE, DEFAULT_CONCUR_REQ)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(counter, elapsed))


if __name__ == '__main__':
    main(download_many)

运行结果:

error is not found
pms_1511228654.33099308 is OK
pms_1512614327.2483640 is OK
pms_1509723338.05097112 is OK
pms_1525853341.8312102 is OK
pms_1508125822.19716710 is OK
pms_1508850965.67096774 is OK

Counter({<Status.ok: 1>: 6, <Status.not_found: 2>: 1}) flags downloaded in 0.41s

<1>处,在网络请求完毕,我们要关闭网络,避免因为网络请求过多最后造成链接中断

<2>处,我们用asyncio.Semaphore(concur_req)设置协程最大并发数,这里我们设置是3,然后再用with (await semaphore)执行协程

<3>处,loop.run_in_executor()方法是用来传入需要执行的对象,以及执行参数,这个方法会维护一个ThreadPoolExecutor()线程池,如果我们第一个参数是None,run_in_executor()就会把我们的执行对象和参数提交给背后维护的ThreadPoolExecutor()执行,如果我们传入自己定义的一个线程池,则把执行对象和参数传给我们定义的线程池执行

使用aiohttp编写web服务器

asyncio可以实现单线程并发IO操作,但asyncio只实现了TCP、UDP、SSL等协议,而aiohttp则是基于asyncio上实现了HTTP协议,所以,我们可以基于这asyncio和aiohttp两个框架实现自己的一个web服务器,代码如下:

import asyncio

from aiohttp import web, web_runner

CONTENT_TYPE = "text/html;"


async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b"<h1>Index</h1>", content_type=CONTENT_TYPE)


async def hello(request):
    await asyncio.sleep(0.5)
    text = "<h1>hello, %s!</h1>" % request.match_info["name"]
    return web.Response(body=text, content_type=CONTENT_TYPE)


async def init(loop):
    app = web.Application(loop=loop)
    app = web_runner.AppRunner(app=app).app()
    app.router.add_route("GET", "/", index)
    app.router.add_route("GET", "/hello/{name}", hello)
    srv = await loop.create_server(app.make_handler(), "127.0.0.1", 8000)
    print("Server started at http://127.0.0.1:8000...")
    return srv


loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

运行脚本后,在浏览器输入:

http://127.0.0.1:8000/

Python并发(二)

如果输入:http://127.0.0.1:8000/hello/Lily,就可以看见如下页面,/hello/后面的name可以替换

Python并发(二)

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Python3:sqlalchemy对mysql数据库操作,非sql语句
Python3:sqlalchemy对mysql数据库操作,非sql语句python3authorlizmdatetime2018020110:00:00coding:utf8'''
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这