首页app软件axios异步 asyncio异步编程实例 pythonasyncio用法

axios异步 asyncio异步编程实例 pythonasyncio用法

圆圆2025-07-15 22:00:34次浏览条评论

Python Asyncio:确保后台任务顺序执行的策略Python 中的论文讨论asyncio应用中,如何高效管理大量数据收集与顺序数据保存的场景。针对需要任务按后台顺序完成特定的需求,文章提出了两种核心策略:通过显式等待前一个任务完成启动再下一个,以及利用asyncio.Queue构建生产者-消费者模型。这两种方法各具有优劣性,旨在帮助开发者在保持异步优势的同时,确保关键操作的顺序性,避免数据混乱。

在开发高性能的异步应用时,我们经常会遇到需要并行执行任务以提高效率,但某些特定操作又必须严格按顺序进行的情况。一个典型的例子是数据收集与数据保存:应用可以持续、异步地收集数据批次,为了避免数据缺陷和竞态条件,数据保存操作(例如写入文件或数据库)通常需要按批次顺序进行,即前一个批次的数据保存完成,但下一个批次才能开始保存。

考虑场景如下:一个应用持续收集数据批次,并尝试在后台保存它们。如果收集速度快于保存速度,或者不同批次的数据大小不一致导致保存时间不一,就可能出现后收集的小批次数据先于前收集的大批次数据保存完成,从而导致数据混乱。

以下是一个简化的示例,展示了这种潜在的问题:导入asyncioimport randomasync秒: quot;quot;quot;模拟数据保存操作,运行2。quot;quot;quot;print(quot;我正在保存一批数据...quot;)await asyncio.sleep(2) print(quot;一批数据保存完毕。quot;)async defcollect_data():quot;quot;quot;模拟数据收集操作,同步时钟。quot;quot;quot;event_loop = asyncio.get_event_loop() while True: print(quot;我正在收集数据...quot;) wait asyncio.sleep(random.randint(1, 5)) # 模拟收集运行 # 直接创建后台任务保存数据,可能导致多个保存任务同时运行 event_loop.create_task(save_data())# 运行主程序# asyncio.run(collect_data()) # 如果运行此代码,会发现quot;我正在保存一批数据...quot;可能完成重复出现,且连续不确定登录后复制

上述代码的问题提出,event_loop.create_task(save_data())会立即启动一个新的后台任务,不会而等待上一个save_data()任务完成。这导致多个save_data()实例可能同时运行,结束了“一次只保存一批” ”的完成需求。策略一:显式等待前一个保存任务的完成。

解决此问题的一种直接方法是,在启动新的保存任务之前,先等待前一个保存任务的完成。这相当于双重缓冲机制,保证了保存操作的原子性和顺次性序性。

立即学习“Python免费学习笔记(深入)”;

实现原理:通过维护一个对上一个保存任务的引用。在每次准备启动新的保存任务时,检查是否存在未完成的旧任务。

如果存在完成,则使用await等待其,然后再启动新的保存任务。

示例代码:import asyncioimport randomasync def save_data_batch(): quot;quot;quot;模拟数据保存操作,运行2秒。quot;quot;quot;print(quot;我正在保存一批的数据...quot;)await asyncio.sleep(2) print(quot;一批的数据保存完毕。quot;)async def collect_data_sequential_save(): quot;quot;quot;数据收集,确保保存任务顺序执行。quot;quot;quot; event_loop = asyncio.get_event_loop() last_save_task = None # 用于存储上一个保存任务的引用 while True: print(quot;我正在收集数据...quot;) wait asyncio.sleep(random.randint(1, 5)) # 模拟收集运行#如果存在上一个未完成的保存任务,则等待其完成如果last_save_task: print(quot完成;等待上一个批次保存...quot;)await last_save_task print(quot;上一个批次已保存完成,准备启动新的保存任务。quot;)#启动新的保存任务,并保存其引用last_save_task = event_loop.create_task(save_data_batch())#为了演示,可以设置一个退出条件#if random.random()lt;0.1:#break#循环结束后,确保最后一个保存任务也完成 if last_save_task:await last_save_task#运行示例#asyncio.run(collect_data_sequential_save())登录后复制

优缺点:优点:实现简单插入,直接控制保存任务的顺序。缺点: 这种方法在某种程度上会“阻止”数据收集流程的。如果一个批次的保存特别长,而同时有多个小批次数据被收集等待,那么这些小批次将不必大批次保存完成才能开始自己的保存,这可能会导致收集到的数据在内存中完成,甚至影响整体吞吐量。它确保下一个保存任务的启动在上一个保存完成任务上,是真正适合之后的收集和保存完全时间。策略二:使用 asyncio.Queue实现生产者-消费者模型

为了更好地解耦数据收集高效和数据保存过程,并实现更大的并发,我们可以采用生产者-消费者模型,利用 asyncio.Queue 来缓冲待保存的数据批次。

实现原理:生产者(收集数据器):负责收集数据,将收集到的数据(或指示需要保存的信号)放入一个 asyncio.Queue 中。

消费者(保存数据器):启动一个独立的后台任务,持续从队列中取出数据,并执行保存操作。由于只有一个消费者任务,它自然会按顺序处理队列中的数据。asyncio.Queue的maxsize参数可以队列中允许的最大项目数,从而防止收集速度过快导致内存陷入。当队列满时,生产者(put操作)会自动等待,直到队列中有空间。

示例代码:import asyncioimport randomasync def save_data_batch(): quot;quot;quot;模拟数据保存操作,运行2秒。quot;quot;;print(quot;我正在保存一批的数据...quot;)await asyncio.sleep(2) print(quot;一批的数据保存完毕。quot;)async def save_all_batches(queue: asyncio.Queue):quot;quot;quot;从消费者:队列中提取数据并保存。quot;quot; while True: try: # 从队列中获取一个批次,如果队列为空则等待 batch = wait queue.get() print(fquot;消费者:从队列中中断组件 {batch},准备保存。quot;) wait save_data_batch() # 执行保存操作queue.task_done() # 标记此任务已完成 except asyncio.CancelledError: # 当消费者任务被取消时,优雅退出 print(quot;消费者任务被取消,退出。quot;) break except Exception as e: print(fquot;消费者:保存过程中发生错误: {e}quot;) queue.task_done() #即使出错也要标记完成,避免死锁async def Collect_data_with_queue(): quot;quot;quot;生产者:收集数据并排列。

quot;quot;quot; event_loop = asyncio.get_event_loop() # 有最大容量的队列,防止创建内存溢出队列 = asyncio.Queue(maxsize=4) # 启动消费者任务 saving_task = event_loop.create_task(save_all_batches(queue)) batch_id = 0 try: while True: print(fquot;生产者:我正在收集数据 (一批 {batch_id})...quot;) wait;) asyncio.sleep(random.randint(1, 3)) # 模拟收集运行 # 将收集到的数据(这里用队列ID代替)插入队列 # 如果队列已满,此put操作会等待直到有空间 wait queue.put(fquot;Batch-{batch_id}quot;) print(fquot;生产者:队列 {batch_id} 已插入队列。队列当前大小: {queue.qsize()}quot;) batch_id = 1 #为了演示,在收集一定数量批次后停止 if batch_id gt;= 10: print(quot;生产者:已收集足够批次,停止收集。quot;)break finally:#收集完成后,等待队列中的所有任务完成 print(quot;生产者:等待所有队列中的批次已保存完成...quot;)awaitqueue.join() print(quot;生产者:所有队列中的批次已保存完成。quot;) # 取消消费者任务,从而成功退出 saving_task.cancel() # 防止消费者任务被取消并完成清理 wait saving_task# 运行结束# asyncio.run(collect_data_with_queue())登录后复制

优缺点:优点:真正的并发:因此收集数据和数据保存可以同时进行,最大化利用CPU和I/O资源。顺序保证:队列天然保证了数据的先进先出(FIFO)顺序,保存操作始终按收集顺序进行。流量控制:maxsize参数提供了内置的背压机制,防止生产者速度过快压垮消费者或队列内存。解耦:收集逻辑和保存逻辑完全分离,易于维护和扩展。缺点:复杂度增加: 需要管理队列、消费者任务的周期、取消和异常处理。死锁风险:如果消费者任务因未处理的异常而意外终止,而队列中终止未处理的项,queue.join()可能会永远等待,导致生命死锁。因此,健壮的异常处理关键。

关于异常处理的注意事项:在生产环境中,确保消费者任务(如save_all_batches)的健壮性关键。

如果消费者任务因未捕获的异常而退出,那么queue.join()可能会永远阻塞,因为task_done()不会被调用。一种更健壮的方法是,在生产者尝试任务queue.put()时,也同时监控消费者的状态。如果消费者任务意外完成(例如,因为它崩溃了),生产者应该停止生产并读取处理。 # 生产者在put监控时消费者状态的更健壮代码片段 # (这只是概念性,实际上可能更复杂) # put_task = event_loop.create_task(queue.put(fquot;Batch-{batch_id}quot;)) # did,pending = wait asyncio.wait({putting_task, saving_task}, # return_when=asyncio.FIRST_COMPLETED) # if saving_task in did: # # 消费者任务已完成(可能因错误),取消put操作并退出 # put_task.cancel() # print(quot;消费者已意外退出,停止生产。quot;) # break # else: # # put操作完成,继续生产 #await put_task #确保put操作的异常被捕获后复制总结与选择如果对性能要求不是极限,或者保存任务的同步相对稳定且不长,策略一(显等待)是一个简单有效的选择。它易于理解和实现,适合快速开发和。如果你需要积累吞吐量,并且数据收集和保存之间存在显着的性能差异,或者你需要精细控制数据流,那么策略二(asyncio.Queue)是更优的选择。它提供了更强的解耦合和流量控制能力,但需要更仔细地处理任务周期和异常情况,以保证系统的稳定性和健壮性。

在实际应用中,根据具体的业务需求、性能目标以及对代码复杂度的接受程度选择哪种策略。理解这两种模式的优缺点,将帮助你在 asyncio 应用中既构建出可靠的后台任务处理机制。

以上就是Python Asyncio:确保后台任务顺序执行的策略内容详细,更多请关注乐哥常识网相关文章!

Python Asy
python递归遍历文件夹 python递归读取文件
相关内容
发表评论

游客 回复需填写必要信息