ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Integration with FastAPI and APScheduler

2021-07-23 23:00:07  阅读:1049  来源: 互联网

标签:scanner rate FastAPI Integration cpu APScheduler job import id


API Server with Scheduler

一般API服务器,仅仅提供API接口, 执行单次业务逻辑的执行。

如果在API服务器后台,执行定时执行功能, 让后台承担业务逻辑的定时执行功能, 添加APScheduler库。

 

FastAPI 

https://fastapi.tiangolo.com

现代,快速,web框架, 用于构建APIs。

FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.6+ based on standard Python type hints.

The key features are:

  • Fast: Very high performance, on par with NodeJS and Go (thanks to Starlette and Pydantic). One of the fastest Python frameworks available.

  • Fast to code: Increase the speed to develop features by about 200% to 300%. *

  • Fewer bugs: Reduce about 40% of human (developer) induced errors. *
  • Intuitive: Great editor support. Completion everywhere. Less time debugging.
  • Easy: Designed to be easy to use and learn. Less time reading docs.
  • Short: Minimize code duplication. Multiple features from each parameter declaration. Fewer bugs.
  • Robust: Get production-ready code. With automatic interactive documentation.
  • Standards-based: Based on (and fully compatible with) the open standards for APIs: OpenAPI (previously known as Swagger) and JSON Schema.

 

APScheduler

https://apscheduler.readthedocs.io/en/latest/index.html

支持添加和删除任务

对任务保持持久化,重启后可以恢复JOB  

Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please. If you store your jobs in a database, they will also survive scheduler restarts and maintain their state. When the scheduler is restarted, it will then run all the jobs it should have run while it was offline 1.

 

pydantic

https://pydantic-docs.helpmanual.io/

用于数据校验 和 配置管理。

Data validation and settings management using python type annotations.

pydantic enforces type hints at runtime, and provides user friendly errors when data is invalid.

Define how data should be in pure, canonical python; validate it with pydantic.

 

定义 数据模型 和 将数据填充到 数据模型中。

from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: Optional[datetime] = None
    friends: List[int] = []


external_data = {
    'id': '123',
    'signup_ts': '2019-06-01 12:22',
    'friends': [1, 2, '3'],
}
user = User(**external_data)
print(user.id)
#> 123
print(repr(user.signup_ts))
#> datetime.datetime(2019, 6, 1, 12, 22)
print(user.friends)
#> [1, 2, 3]
print(user.dict())
"""
{
    'id': 123,
    'signup_ts': datetime.datetime(2019, 6, 1, 12, 22),
    'friends': [1, 2, 3],
    'name': 'John Doe',
}
"""

 

如果将数据转换 到 数据模型中, 遇到报错, 则将所有报错 收集到错误处理信息中。

from pydantic import ValidationError

try:
    User(signup_ts='broken', friends=[1, 2, 'not number'])
except ValidationError as e:
    print(e.json())

output

[
  {
    "loc": [
      "id"
    ],
    "msg": "field required",
    "type": "value_error.missing"
  },
  {
    "loc": [
      "signup_ts"
    ],
    "msg": "invalid datetime format",
    "type": "value_error.datetime"
  },
  {
    "loc": [
      "friends",
      2
    ],
    "msg": "value is not a valid integer",
    "type": "type_error.integer"
  }
]

 

Demo

https://github.com/fanqingsong/fastapi_apscheduler

使用 psutil获取cpu使用百分比。

提供API直接获取,和定期打印。

cpu scanner

uvicorn cpu_scanner:app --reload

Description: To demostrating how to use fastapi and apscheduler

Requirements: previde API to get CPU rate, and get it periodically

(1) get_cpu_rate -- get current cpu rate by this call

(2) set_cpu_scanner_job -- set one scheduled job to scan cpu rate periodically

(3) del_cpu_scanner_job -- delete the scheduled job

 

code

#FastAPI and Pydantic Related Libraries
from fastapi import FastAPI
from pydantic import BaseModel,Field
from typing import List

#APScheduler Related Libraries
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

import uuid

import logging
import psutil
from datetime import datetime
import os


# Global Variables
app = FastAPI(title="APP for demostrating integration with FastAPI and APSCheduler", version="2020.11.1",
              description="An Example of Scheduling CPU scanner info periodically")
Schedule = None
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scan_cpu_rate(job_id):
    logging.info(f'!!!!!!!!!!!!!!!! Tick! call by job {job_id}')

    cpu_rate = psutil.cpu_percent(interval=1)

    logging.info(f"cpu_rate = {cpu_rate}")


class CPURateResponse(BaseModel):
    cpu_rate:float=Field(title="CPU Rate", description="The current CPU rate")


class SetCPUScannerJobResponse(BaseModel):
    job_id:str=Field(title="CPU Scanner Job ID", description="CPU Scanner Job ID")


class DelCPUScannerJobResponse(BaseModel):
    job_id:str=Field(title="CPU Scanner Job ID", description="CPU Scanner Job ID")



@app.on_event("startup")
async def load_schedule_or_create_blank():
    """
    Instatialise the Schedule Object as a Global Param and also load existing Schedules from SQLite
    This allows for persistent schedules across server restarts.
    """
    global Schedule
    try:
        jobstores = {
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        Schedule = AsyncIOScheduler(jobstores=jobstores)
        Schedule.start()
        logger.info("Created Schedule Object")
    except:
        logger.error("Unable to Create Schedule Object")


@app.on_event("shutdown")
async def pickle_schedule():
    """
    An Attempt at Shutting down the schedule to avoid orphan jobs
    """
    global Schedule
    Schedule.shutdown()
    logger.info("Disabled Schedule")


@app.post("/get_cpu_rate/", response_model=CPURateResponse, tags=["API"])
def get_cpu_rate():
    cpu_rate = psutil.cpu_percent(interval=1)

    logging.info(f"cpu_rate = {cpu_rate}")

    return {"cpu_rate": cpu_rate}


@app.post("/set_cpu_scanner_job/", response_model=SetCPUScannerJobResponse, tags=["API"])
def set_cpu_scanner_job():
    random_suffix = uuid.uuid1()
    job_id = str(random_suffix)

    cpu_scanner_job = Schedule.add_job(scan_cpu_rate, 'interval', seconds=30, id=job_id, args=[job_id])

    job_id = cpu_scanner_job.id
    logging.info(f"set cpu scanner job, id = {job_id}")

    return {"job_id": job_id}


@app.post("/del_cpu_scanner_job/", response_model=DelCPUScannerJobResponse, tags=["API"])
def del_cpu_scanner_job(job_id:str):

    Schedule.remove_job(job_id)

    logging.info(f"set cpu scanner job, id = {job_id}")

    return {"job_id": job_id}

 

Reference

Scheduled Jobs with FastAPI and APScheduler

https://ahaw021.medium.com/scheduled-jobs-with-fastapi-and-apscheduler-5a4c50580b0e

Adding Job

https://apscheduler.readthedocs.io/en/latest/userguide.html#adding-jobs

Removing Job

https://apscheduler.readthedocs.io/en/latest/userguide.html#removing-jobs

UUID 生成,作为Job id

https://docs.python.org/3/library/uuid.html#example

psutil获取cpu rate

https://psutil.readthedocs.io/en/latest/#psutil.cpu_percent

 

标签:scanner,rate,FastAPI,Integration,cpu,APScheduler,job,import,id
来源: https://www.cnblogs.com/lightsong/p/15054120.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有