需求场景:

  实现一个flask服务,通过接口控制一个定时任务任务(对酒店订房情况进行检查)的开启和停止。要求定时任务完成后,可以通过飞书机器人推送任务完成的消息。

展现效果:

  1. 启动定时任务
    在这里插入图片描述

  2. 关闭定时任务
    在这里插入图片描述

  3. 飞书推送消息
    在这里插入图片描述

代码实现:

  1. 项目结构:
    在这里插入图片描述

  2. 业务代码:

    1. 定时任务。先通过schedule模块实现基础的定时任务业务代码。

      # -*- coding:UTF-8 -*-
      
      """
       @ProjectName  : 
       @FileName     : schedule_monitor_task
       @Description  : 
       @Time         : 2023/9/18 9:21
       @Author       : Qredsun
       """
      import os
      import time
      from datetime import datetime
      from datetime import timedelta
      
      import schedule
      import functools
      from utils.webhook import send_message
      
      
      def read_check_list_from_excel(file):
          try:
             logger.info("业务代码")
          except Exception as e:
              logger.error(f'程序异常:{e}')
          finally:
              prompt_message = f'**酒店订房检查已完成**\n' \
                               f'检查时间:{begin_time} - {end_time} \n' \
                               f'检查报告已生成: {save_path}'
              send_message(Environment.WEBHOOK_URL, Environment.WEBHOOK_SECRET, prompt_message)
      
      
      # todo  定时任务配置
      def catch_exceptions(cancel_on_failure = False):
          def catch_exceptions_decorator(job_func):
              @functools.wraps(job_func)
              def wrapper(*args, **kwargs):
                  try:
                      return job_func(*args, **kwargs)
                  except:
                      import traceback
                      traceback.format_exc()
                      if cancel_on_failure:
                          return schedule.CancelJob
      
              return wrapper
      
          return catch_exceptions_decorator
      
      
      # 异常捕获方法的使用
      @catch_exceptions(cancel_on_failure=False)
      def task_job(file_path):
          # 定时任务
          read_check_list_from_excel(file_path)
      
      file_path = r'../data/订房检查任务.xlsx'
      schedule.every().day.at("09:00").do(task_job, file_path)
      schedule.run_all()
      
      while True:
          schedule.run_pending()  # 运行所有可以运行的任务
          time.sleep(60 * 30)
      
    2. 飞书消息推送功能实现

      # -*- coding:UTF-8 -*-
      
      """
       @ProjectName  : 
       @FileName     : webhook
       @Description  : 飞书消息推送
       @Time         : 2023/9/17 13:36
       @Author       : Qredsun
       """
      import base64
      import hashlib
      import hmac
      import json
      from datetime import datetime
      
      import requests
      
      from utils.env_manager import Environment, logger
      
      WEBHOOK_URL = '机器人推送地址'
      WEBHOOK_SECRET = '机器人密码'
      
      
      def gen_sign(secret, timestamp):
          # 拼接时间戳以及签名校验
          string_to_sign = '{}\n{}'.format(timestamp, secret)
      
          # 使用 HMAC-SHA256 进行加密
          hmac_code = hmac.new(
                  string_to_sign.encode("utf-8"), digestmod=hashlib.sha256
          ).digest()
      
          # 对结果进行 base64 编码
          sign = base64.b64encode(hmac_code).decode('utf-8')
      
          return sign
      
      
      def send_message(WEBHOOK_URL, WEBHOOK_SECRET, MSG):
          timestamp = int(datetime.now().timestamp())
          sign = gen_sign(WEBHOOK_SECRET, timestamp)
          params = {
              "timestamp": timestamp,
              "sign"     : sign,
              "msg_type" : "interactive",
              "card"     : {
                  "config"   : {
                      "wide_screen_mode": True
                  },
                  "elements" : [
                      {
                          "tag"    : "markdown",
                          "content": f"<at id=all></at> \n "
                                     f"{MSG}"
                      },
                      {
                          "tag"    : "action",
                          "actions": [
                              {
                                  "tag"      : "button",
                                  "text"     : {
                                      "tag"    : "plain_text",
                                      "content": "跳转至订房检查"
                                  },
                                  "type"     : "primary",
                                  "multi_url": {
                                      "url"        : Environment.CALL_BACK_URL,
                                      "android_url": "",
                                      "ios_url"    : "",
                                      "pc_url"     : ""
                                  }
                              }
                          ]
                      }
                  ],
                  "header"   : {
                      "template": "blue",
                      "title"   : {
                          "content": "订房检查异常提示",
                          "tag"    : "plain_text"
                      }
                  },
                  "card_link": {
                      "url"        : "",
                      "pc_url"     : "",
                      "android_url": "",
                      "ios_url"    : ""
                  }
              },
          }
      
          resp = requests.post(WEBHOOK_URL, json=params)
          resp.raise_for_status()
          result = resp.json()
          if result.get("code") and result.get("code") != 0:
              logger.error(f'飞书机器人消息 : {MSG} 发送失败:{resp.text}')
          else:
              logger.debug(f'飞书机器人消息 : {MSG} 发送成功')
      
    3. flask_apscheduler替换schedule

      app.schedule_job.add_job(id="check_room", func='s_app:read_check_list_from_excel', **{
          "args"   : (Environment.SRC_FILE,),
          # 'trigger': 'interval',  # 指定 定时任务的类型
          # 'seconds': 5  # 运行的间隔时间
      
          # 每天九点开始执行
          'trigger': 'cron',
          'day'    : '*',
          'hour'   : '09',
          'minute' : '00',
          'second' : '00'
      })
      app.schedule_job.start()  # 启动任务列表
      
    4. flask服务中启动、停止接口实现

      from flask import Flask
      
      from utils.env_manager import Environment, logger
      from utils.schedule_monitor_task import read_check_list_from_excel
      from flask_apscheduler import APScheduler
      
      app = Flask(__name__)
      app.schedule_job = APScheduler()
      
      
      @app.route('/', methods=['get'])
      def hello_world():
          return '酒店订房检查服务!'
      
      
      @app.route('/stop', methods=['get'])
      def stop_job():
          if not app.schedule_job.get_jobs():
              return '没有正在执行的订房检查任务'
          else:
              app.schedule_job.remove_all_jobs()
              return '终止订房检查任务'
      
      
      class Config(object):
          DEBUG = True  # flask 调试模式
          """flask_apscheduler 配置"""
          SCHEDULER_API_ENABLED = True  # 开放API
          SCHEDULER_TIMEZONE = 'Asia/Shanghai'  # 使用上海时间
      
      
      @app.route('/start', methods=['get'])
      def start_job():
          if app.schedule_job.get_jobs():
              return '订房检查任务已启动'
          else:
              app.schedule_job.add_job(id="check_room", func='s_app:read_check_list_from_excel', **{
                  "args"   : (Environment.SRC_FILE,)
                  # 每天九点开始执行
                  'trigger': 'cron',
                  'day'    : '*',
                  'hour'   : '09',
                  'minute' : '00',
                  'second' : '00'
              })
              return '开始订房检查任务'
      
      
      app.config.from_object(Config)
      app.schedule_job.init_app(app)  # 把任务列表放入 flask
      app.schedule_job.start()  # 启动任务列表
      app.run()
      
      

ps:

  • 机器人推送接口的配置:
    在这里插入图片描述

  • 在flask的配置中将SCHEDULER_API_ENABLED设置为True,服务启动后自动加载flask_apscheduler提供的API接口:
    1. /scheduler [GET] > 获取服务基本信息
    2. /scheduler/jobs [POST json job data] > 添加新的任务
    3. /scheduler/jobs/<job_id> [GET] > 根据job_id返回任务的详细信息
    4. /scheduler/jobs [GET] > 返回所有任务的信息
    5. /scheduler/jobs/<job_id> [DELETE] > 删除任务
    6. /scheduler/jobs/<job_id> [PATCH json job data] > 更新一个已经存在的任务
    7. /scheduler/jobs/<job_id>/pause [POST] > 暂停一个任务并返回任务的信息
    8. /scheduler/jobs/<job_id>/resume [POST] > 重新启动一个任务并返回任务信息
    9. /scheduler/jobs/<job_id>/run [POST] > 启动一个任务并返回任务的信息
    在这里插入图片描述

  • 在实现定时任务的启动和关闭时,并没有直接flask_apscheduler使用自带的接口,而是通过flask_apscheduler提供的定时任务管理方法实现。还有下面一些方法可参考使用:

    1. scheduler.start() 开始任务
    2. scheduler.shutdown() 停止任务
    3. scheduler.pause() 暂停所有任务
    4. scheduler.resume() 开启任务
    5. scheduler.add_listener(<callback function>,<event>) 添加监听事件
    6. scheduler.remove_listener(<callback function>) 去除监听事件
    7. scheduler.add_job(<id>,<function>, **kwargs) 添加job
    8. scheduler.remove_job(<id>, **<jobstore>) 删除job
    9. scheduler.remove_all_jobs(**<jobstore>) 删除所有定时任务
    10. scheduler.get_job(<id>,**<jobstore>) 获取job信息
    11. scheduler.modify_job(<id>,**<jobstore>, **kwargs) 修改job
    12. scheduler.pause_job(<id>, **<jobstore>) 暂停job
    13. scheduler.resume_job(<id>, **<jobstore>) 恢复job
    14. scheduler.run_job(<id>, **<jobstore>) 启动job
    15. scheduler.authenticate(<function>) 验证

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部