flask celery 定时任务 flask定时更新数据库
本文旨在探讨在Flask应用中实现CSV文件刷新数据的策略。针对Web服务器无法执行时钟阻塞任务的原则,核心思想以数据抓取和CSV更新逻辑从Flask主应用中解耦,通过独立的后台进程或任务调度工具(如Cron、APS) cheduler、Celery)来定时执行。文章将详细介绍各种实现方案及其优缺点,并提供关键的并发访问和数据一致性处理建议,确保Web应用能稳定、高效地读取最新数据。理解核心问题:Web服务器与后台任务的分离
在web开发中,尤其在使用flask这样的微服务框架时,一个基本原则是web服务器应重点关注处理http请求并快速响应,而不是执行运行或阻塞性的后台任务,如数据抓取(scraping)或文件i/o操作。将此类任务直接嵌入到flask应用的主线程中,会导致请求响应延迟因此,对于“每10分钟自动刷新csv文件”的需求,最佳实践是将数据更新逻辑与flask应用本身解耦,让其在独立的进程中运行。flask应用只需读取负责已更新的csv文件。解决方案一:利用网络级任务调度(Cron)工作原理:编写一个独立的Python脚本,该脚本负责执行数据映射、处理并更新CSV文件的逻辑。使用crontab命令配置,让该脚本每10分钟执行一次。
示例(update_csv.py):# update_csv.pyimport pandas as pdimport datetimeimport osdef scrape_and_update_csv(): # 模拟数据抓取和处理 print(fquot;[{datetime.datetime.now()}] 开始抓取数据并更新CSV...quot;) data = { 'game': ['Game A', 'Game B', 'Game C'], 'stake': [1.5, 2.0, 1.8], 'timestamp': [日期时间.日期时间.now()] * 3 } df = pd.DataFrame(data) # 定义CSV文件路径 #注意:这里的路径应是绝对路径,以便cron正确找到 csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv') # 将数据保存到CSV df.to_csv(csv_file_path, index=False) print(fquot;[{datetime.datetime.now()}] CSV文件已更新:{csv_file_path}quot;)if __name__ == quot;__main__quot;: scrape_and_update_csv()登录后复制
配置 Cron Job:
打开终端,输入 crontab -e。
在打开的文件中添加一行(Python确保环境脚本和路径正确):*/10 * * * * /usr/bin/python3 /path/to/your/update_csv.py gt;gt; /path/to/your/cron.log 2gt;amp;1登录后复制*/10 * * * * 表示每10分钟执行一次。/usr/bin/python3是Python解释器的路径。/path/to/your/update_csv.py 是你编写的Python脚本的绝对路径。gt;gt;/path/to/your/cron.log 2gt;amp;1将脚本的输出脚本到日志文件,即可调试。
优点:简单、可靠,系统资源占用低。平台依赖(主要用于类Unix系统),任务管理不够灵活(例如,难以从Python代码中动态调度或取消任务)。解决方案二:使用Python任务调度库(APScheduler)
APScheduler(高级Python) Scheduler)是一个轻量级的Python库,允许你在Python应用内部(或独立脚本中)安排任务。它支持多种调度器类型,如BlockingScheduler(用于独立脚本)和BackgroundScheduler(用于在应用内部以非阻塞方式运行)。
工作原理:创建一个独立的Python脚本,使用BlockingScheduler来定时执行CSV更新函数。在Flask应用中,一般正常读取该CSV文件。
示例(scheduler_app.py):#scheduler_app.pyfrom apscheduler.schedulers.blocking import BlockingSchedulerimport pandas as pdimport datetimeimport osimportlogging#配置日志,方便调试logging.basicConfig(level=logging.INFO,format='(asctime)s - (levelname)s - (message)s')def scrape_and_update_csv(): logging.info(quot;开始数据提取并更新CSV...quot;) try: # 模拟数据提取和处理 data = { 'game': [f'Game {i}' for i in range(1, 4)], 'stake': [1.5 i * 0.1 for i in range(3)], 'timestamp': [datetime.datetime.now()] * 3 } df = pd.DataFrame(data) #定义CSV文件路径#同样建议使用绝对路径 csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv') # 将数据保存到CSV df.to_csv(csv_file_path,index=False)logging.info(fquot;CSV文件已更新:{csv_file_path}quot;) except Exception as e:logging.error(fquot;更新CSV文件失败:{e}quot;)if __name__ == '__main__':scheduler = BlockingScheduler() # 每10分钟执行一次scrape_and_update_csv 函数scheduler.add_job(scrape_and_update_csv, 'interval', 分钟=10)logging.info(quot;APScheduler已启动,等待执行...quot;) try:scheduler.start() except (KeyboardInterrupt, SystemExit): logging.info(quot;APScheduler已停止。quot;)登录后复制
运行方式:将scheduler_app.py作为一个独立的Python脚本运行:python3 Scheduler_app.py。Flask应用和这个调度器脚本将作为两个独立的进程运行。
优点:跨平台,纯Python实现,任务管理更灵活。
缺点:仍需独立进程运行,不适合多个任务。解决方案三:使用任务队列(Celery)
对于更复杂、需要循环处理、任务或者执行时间可能密集的场景,Celery是一个强大的选择。它是一个异步任务队列/基于可以多个消息传递的作业队列,处理大量操作。
工作原理:Broker(消息代理): Celery 使用消息代理来协调任务。常见的有 Redis 或 RabbitMQ。Worker(工作者): Celery Worker 是独立的进程,它们监听 Broker,接收任务并执行。Client(客户端): Flask 应用作为客户端,将任务发送到 Broker。Scheduler(调度器,如 Celery Beat): Celery Beat 可以作为独立的进程运行,根据预设的调度将任务发送到 Broker。
示例概述:
安装: pip install celery redis (如果使用 Redis 作为 Broker)。
创建Celery应用(celery_app.py):# celery_app.pyfrom celery import Celeryimport pandas as pdimport datetimeimport osimportlogging.basicConfig(level=logging.INFO,format='(asctime)s - (levelname)s - (message)s')celery_app = Celery( 'csv_updater', Broker='redis://localhost:6379/0', # 替换为你的Redis地址 backend='redis://localhost:6379/0')@celery_app.taskdef scrape_and_update_csv_task():logging.info(quot;Celery任务:开始抓取数据并更新CSV...quot;) try: data = { 'game': [f'Game {i}' for i in range(1, 4)], '赌注': [1.5 i * 0.1 for i in range(3)], '时间戳': [datetime.datetime.now()] * 3 } df = pd.DataFrame(data) csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv') df.to_csv(csv_file_path, index=False) logging.info(fquot;Celery任务:CSV文件已更新:{csv_file_path}quot;) except Exception as e:logging.error(fquot;Celery任务:更新CSV文件失败: {e}quot;)登录后复制
启动Celery Worker:在终端中运行:celery -A celery_app worker --loglevel=info
使用Celery Beat进行定时调度:创建celeryconfig.py文件:# celeryconfig.pyfrom datetime import timedeltaCELERY_BEAT_SCHEDULE = { 'update-csv-every-10-minutes': { 'task': 'celery_app.scrape_and_update_csv_task', 'schedule': timedelta(分钟=10), 'args': (), },}CELERY_TIMEZONE = 'Asia/Shanghai' # 根据需要设置时区登录后复制
启动Celery Beat:celery -A c
elery_appbeat -s celerybeat-schedule --loglevel=info
优点:强大、可扩展、支持多元化、任务重试、结果存储等高级功能,适用于生产环境复杂任务。缺点:配置相对复杂,引入了额外的组件(Broker、Worker、Beat)。数据一致性与文件锁定注意事项
当一个后台进程定时更新CSV文件,而Flask应用同时尝试读取该文件时,可能会出现数据不一致或文件锁定问题。
原子性写入:最佳实践是:将新数据写入一个临时文件(例如data.csv.tmp)。当写入完成后,原子性交换临时文件重命名为目标文件(data.csv)。大多数操作系统对文件重命名操作是原子的。# 修改 scrape_and_update_csv 函数 def scrape_and_update_csv(): # ... 数据抓取逻辑 ... csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv') temp_file_path = csv_file_path '.tmp' df.to_csv(temp_file_path,index=False) # 读取临时文件 os.replace(temp_file_path, csv_file_path) # 原子性替换logging.info(fquot;CSV文件已更新:{csv_file_path}quot;)登录后复制
数据库替代方案:数据量增大或对数据库访问有更高要求,将数据存储在数据库中(如SQLite,因为Flask应用已配置SQLAlchemy)而不是CSV文件是更健壮的选择。优点:如果数据库本身就提供了事务和数据库控制机制,避免了文件锁定问题。实现: 后台任务将读取数据后读取数据库,Flask 应用则通过 SQLAlchemy 从数据库中查询数据。这使得数据访问更加高效和可靠。Flask 应用中的数据读取
无论采用哪种后台更新策略,Flask 应用在用户请求时处理,只需从固定的 CSV 文件路径读取数据即可。
# from .views import views (假设在views.py中)fromflask import Blueprint, render_templateimport pandas as pdimport osviews = Blueprint('views', __name__)@views.route('/')def home(): csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv') #确定文件存在,并处理可能的文件不存在或读取错误 if os.path.exists(csv_file_path): try: df = pd.read_csv(csv_file_path) # 假设你的 CSV 有 'game' 和 'stake' 列 games_data = df.to_dict(orient='records') return render_template(quot;home.htmlquot;, games=games_data) except Exception as e: print(fquot;读取CSV文件失败: {e}quot;) return render_template(quot;home.htmlquot;, games=[], error=quot;无法加载数据quot;) else: return render_template(quot;home.htmlquot;, games=[], error=quot;数据文件不存在,请稍后再试quot;)登录复制
请注意,os.path.dirname(os.path.abspath(__file__))只需在当前文件直接运行时有效。在Flask中,你需要确保data.csv的路径相对于你的项目根目录或Flask应用实例可访问的路径。总结
在Flask等Web应用中实现定时数据刷新,核心原则是Tom操作从Web服务器的请求响应循环中分离出来。这篇文章简单介绍了第三个路径策略:Cron Job:适用于的、配置在Linux环境下的定时任务,配置直接,最新。APScheduler:提供Python运行的任务调度能力,跨平台,适合作为独立脚本。Celery: 目标复杂性、整数、高并发任务的强解决方案,功能丰富但配置相对复杂性。
无论选择哪种方案,都应注意数据一致性和文件访问的原子性。对于更严谨的数据管理,将数据存储在数据库中是比CSV文件更推荐的方案,因为它控制提供了更完善的一致性通过合理的选择和实施这些策略,可以确保Flask应用能够稳定、地提供最新数据,同时保持其相关响应性。
以上就是Flask应用中定时刷新CSV数据高效策略的详细内容,更多请关注乐哥常识网其他文章!
