1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > 基于Python3多进程(多线程)+多协程的数据并发处理模版

基于Python3多进程(多线程)+多协程的数据并发处理模版

时间:2023-07-26 00:53:46

相关推荐

基于Python3多进程(多线程)+多协程的数据并发处理模版

图片来源:elenabsl/Shutterstock

上一篇文章《基于Python3单进程+多线程+多协程的生产者-消费者模型示例代码》介绍了如何使用Python在单进程的情况下利用协程并发地处理数据,由于Python的GIL,所有代码只利用到了一个CPU核心,无法发挥多核心优势,所以我又做了一个多进程+多协程的模板,这里的代码不涉及具体业务。

代码地址:/MacwinWin/multiprocessing_asyncio_data_processing

现状与挑战

以Cpython作为Python解释器时,由于GIL的存在导致:

多线程实际上始终在单核上进行运行,因而不适合进行计算密集型任务;而多线程之间可以共享数据,所以适合进行IO密集型任务;单线程间切换存在性能损耗,大规模并发并不适合多线程;多进程能有效利用多核的计算优势,因而适合计算密集型任务;但由于Python的多进程是通过启动多个解释器实现的,之间存在数据隔离,因而不适合进行IO密集型任务;协程是用户态的线程,用户可灵活操作其生命周期,且协程间切换及乎没有性能损耗,更适合大规模并发,但由于GIL限制,所以不适合进行计算密集型任务。

因而,使用Python进行实时大数据规模处理存在挑战。

总览

这里用到的库包括Python3自己的multiprocessing、threading、asyncio以及第三方库aioprocessing1。

整体架构如下图所示:

详细说明

为了方便介绍代码逻辑,我将项目抽象成了现实世界中的装配工厂。

组织架构如下图所示:

其中四部门为单独的进程,采购员、车间主任、调度员、运维员、副车间主任为线程,主任助理、工人、监工、搬运工、司机为协程。

各部门、岗位职责:

CEO: 根据公司资源合理分配给各部门,尤其是产品部,更多的车间意味着更大的产量,但也会消耗更多的资源(CPU 核心);创建车间,为车间分配生产所需的资源;为每一个车间创建一条生产流水线(线程安全的同步队列);创建一条各车间共享的运输传送带(线程安全的异步队列)管理四部门的正常运作 采购部: 采购员(一名): 购买原材料,并为原材料贴上所属的产品ID标签(一条生产流水线负责多个产品,一个产品可能由多种原材料组成)将原材料放在对应的生产流水线上 生产部: 0车间: 车间主任(一名): 管理主任助理和副主任;从生产流水线上取出原材料,根据上面的产品ID标签,在员工花名册上找到所属的员工,与该员工重新续签合同,并登记后交给副主任;如果在员工花名册上没有找到所属员工则新招聘一名工人,并与其签订合同;

(关于合同机制2) 副主任(一名): 占有车间一半资源以供其安排生产;从主任处获得原材料,放置在对应工人的传送带上;仅将资源分配给手中有原材料的工人; 工人(n名): 每名工人配有一条传送带,工人从传送带上获取原材料;根据产品装配图,对原材料进行处理;完成后给产品贴上产品类标签,然后放入运输传送带上;每名工人受合同约束; 监工(n名): 监工属于公司编制内,没有合同约束,每名监工定时监督一名工人; 主任助理(一名): 接受主任的管理,对主任负责;定时检查员工花名册 1车间…n车间 运输部: 调度员(一名): 管理搬运工和司机;管理仓库; 搬运工(一名): 从运输传送带上获取产品;对产品做一些处理,例如包装;将产品根据产品类型放在仓库中; 司机(一名): 每隔一段时间清空仓库中的产品 运维部: 运维员(一名): 每隔一段时间巡检一遍生产流水线

使用方式

# build image>>> docker build -t test:1.0# run the image as a container>>> docker run -i -t -d -v $(pwd):/app --name test test:1.0>>> docker exec -i -t test /bin/bash>>> python3 ceo.py

测试

在我的MacBook Pro 16, i7 2.6上测试,Docker分配8核8G内存,结果如下图。

可以看到每秒“采购”5000+原材料,在生产流水线上无积压,说明都消耗完,员工花名册总数稳定在5000左右,内存消耗也保持稳定。

但如果在‘采购’时不加限制(无延迟)则会导致原材料数量远超实际产能,原材料大量积压在生产流水线上,严重降低生产流水线性能,车间无法获得足够原材料,导致公司产能大幅下降。

所以最佳方案是根据资源数量(CPU核数)确定车间数量,再合理设置采购速度,使得原材料采购速度与实际产能匹配,生产流水线上不产生积压现象。

结论

通过结合使用Python多进程、多线程、协程的方式,实现了单机4核CPU8G内存下约5000条/s的数据处理速度。如果想实现更大规模的数据处理,可通过增加CPU核数内存数。

计划

持续优化代码,提高速度添加类型提示(type hints)

项目中使用aioprocessing主要是用它的进程间异步队列,该队列汇集各进程的输出结果,但由于整体是由Python实现的,队列性能较差,可考虑替换之。 ↩︎

合同机制:

该机制保证了工人在一段时间后仍然没有获得新原材料的情况下,判定为订单取消,解雇该名工人,以保证不会存在过多“划水”员工 ↩︎

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。