前言:

        工作需求,写一个接口,用Python来编写,我首先想到用flask小型框架来支撑,配置sqlalchemy来实现,但是在实现的过程中,发生循环导入问题

        我想到用蓝图来解决此问题,但是仍然会出死循环的问题,网上找了很多资料,许多都是轻描淡写,可能不是很明白,为了能够帮助更多踩坑的人,分享一下自己的经历!!

了解导入循环的机制

报错代码:

        cannot import name '......' from partially initialized module '.....'

Python的 死循环机制:

        当模块之间相互导入时,两个或多个模块之间相互依赖,它们通过import语句相互引用对方,而没有一个明确的导入顺序或者依赖管理来打破这种循环,形成了一个闭环,当尝试从任何一个模块中导入或执行函数时,Python 解释器将陷入无限循环的导入过程中

比如:A导入B,B导入A  

模块A:module_a.py

import module_b  
  
def func_a():  
    print("Function A")  
    module_b.func_b()

模块B:module_b.py

import module_a  
  
def func_b():  
    print("Function B")  
    module_a.func_a()

比如:A导入C,B导入A,C导入B

模块A:test1.py

import test3


def t1():
    print('test1')
    test3.t3()

t1()

模块B:test2.py

import test1


def t2():
    test1.t1()
    print("test2")

模块C:test3.py

import test2


def t3():
    test2.t2()
    print("test3")

解决sqlalchemy集成flask导入死循环的问题

        言归正传,要解决问题,要注意db实例化的位置,app导入的位置,蓝图导入的顺序!!!

解决方案:

   1. 在cab_api_test/views/__init__.py 文件下,实例化db,函数封装app   

   3. 导入蓝图时,如果蓝图有引用db,位置需要在db的下方  
   2. 在models中,main语句下,导入app,如果在全局还是会发生循环问题

        可能说的有些不清不楚,还是通过代码体现,就能明白啦,每步骤关键都有注释,注意好导入的顺序,和导入的位置,主要是app和db的位置,和模型创建用到的app导入问题!!

目录结构:

cab_api_test
├── app.py
├── gunicorn_config.py
├── logs
│   ├── cab_api_access.log
│   └── cab_api_error.log
├── requirements.txt
└── views
    ├── __init__.py
    ├── __pycache__
    │   ├── __init__.py
    │   ├── cab_api.py
    │   ├── filed_heck.py
    │   ├── logger.py
    │   ├── models.py
    │   └── session.py

cab_api_test/views/__init__.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

# 1. 实例化db时 要注意导入蓝图的顺序!!!
# 2. 如果蓝图中有引用db,在这导入的蓝图必须在db的下面,不然还是会报错
# 3. 我是用的蓝图是 cab_api.py
from views.cab_api import cab_api

host = '127.0.0.1'
port = 3306
user = 'root'
password = 'wq123456'
database = 'gf_test'


def create_app():
    app = Flask(__name__)
    app.config['DEBUG'] = True
    # sqlalchemy 连接池配置
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config[
        'SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4&autocommit=true'
    app.config['SQLALCHEMY_POOL_SIZE'] = 10
    app.config['SQLALCHEMY_POOL_TIMEOUT'] = 10
    app.config['SQLALCHEMY_MAX_OVERFLOW'] = 5
    app.config['SQLALCHEMY_POOL_PRE_PING'] = True
    app.config['SQLALCHEMY_ECHO'] = False

    # 把app对象初始化到db中
    db.init_app(app=app)

    app.register_blueprint(cab_api, url_prefix='/api')
    return app


cab_api_test/views/models.py

import datetime
from uuid import uuid4
from sqlalchemy import Column, String, Text, DateTime

# 这个db是全新的SQLAlchemy对象
from views import db


class Cab_review(db.Model):
    __tablename__ = 'cab_review'  # 表名
    # 写字段
    uuid = Column(String(100), nullable=False, primary_key=True, default=str(uuid4()).replace('-', ''))
    cab_number = Column(String(64), nullable=True, comment='工单号')
    review_item = Column(String(100), comment='项目名')  # String(32) == varchar(32)
    review_info = Column(Text, comment='项目信息')
    review_link = Column(String(200), nullable=True, comment='项目链接')
    caller_id = Column(String(100), nullable=True, comment='调用方标识ID')
    caller_ip = Column(String(100), nullable=True, comment='调用方IP')
    updated_time = Column(DateTime, default=datetime.datetime.now, nullable=False, comment='更新时间')
    test_number = Column(String(64), nullable=True, comment='测试号')


if __name__ == '__main__':  # 导入时不执行下面的语句,只有执行当前文件的时候才会执行下面的代码

    # 注意要在main下面导入create_app,如果在全局导入,还是会发生循环导入的问题
    from views import create_app

    app = create_app()

    # 先清空再创建
    db.drop_all()
    db.create_all(app=app)


# 做一个简单的封装 db,使用with 语句
class Session:
    def __init__(self):
        self.db = None

    def __enter__(self):
        self.db = db.session
        return self.db

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.db:
            self.db.close()

 cab_api_test/views/cab_api.py

from flask import jsonify, request
import json, datetime
from uuid import uuid4
from views.logger import flask_logger
from views.filed_check import field_check
from flask import Blueprint
# Session 是我封装好的 db
from views.models import Cab_review, Session

cab_api = Blueprint('cab_api', __name__)


@cab_api.route('/', methods=['POST', ])
def main():
    # 传入的参数
    data = request.get_json()
    cno_test = data.get('cno_test')
    review_pro = data.get('review_pro')
    review_result = data.get('review_result')
    review_advice = data.get('review_advice')

    flask_logger.info(f'请求体:{data}')

    data['cno'] = '已废弃' if not data.get('cno') else data.get('cno')
    data['caller_ip'] = request.remote_addr
    data['url'] = data.get('url') if data.get('url') else ''
    data['caller_id'] = data.get('caller_id') if data.get('caller_id') else '调用方未提供caller_id'

    # 校验字段
    ok, msg = field_check(cno_test, review_pro, review_result, review_advice)
    if not ok:
        flask_logger.info(
            '-------------------------------------------------------------------------------------------------------')
        return jsonify(msg)

    # 查询cab_review中的数据
    with Session() as session:
        select_data = session.query(Cab_review).filter_by(test_number=cno_test,
                                                          review_item=review_pro).all()  # [queryset,queryset],数据为空则 []
        session.begin()  # 开启事务
        if not select_data:
            try:
                cab_review = Cab_review(uuid=str(uuid4()).replace('-', ''),
                                        cab_number=data.get('cno'),
                                        review_item=data.get('review_pro').strip(),
                                        review_info=json.dumps(
                                            {'review_result': data.get('review_result'),
                                             'review_advice': data.get('review_advice')},
                                            ensure_ascii=False),
                                        review_link=data.get('url'),
                                        caller_id=data.get('caller_id'),
                                        caller_ip=data.get('caller_ip'),
                                        test_number=data.get('cno_test'))
                session.add(cab_review)
                session.commit()
                flask_logger.info('数据插入成功')
                msg = '请求成功,数据已添加或更新'
                is_success = True
            except Exception as e:
                session.rollback()
                flask_logger.error(f'添加失败:{e}', exc_info=False)
                msg = '数据添加或更新失败,请联系管理员'
                is_success = False
        else:
            try:
                session.query(Cab_review).filter_by(test_number=data.get('cno_test'),
                                                    review_item=data.get('review_pro').strip()).update(
                    {'review_info': json.dumps(
                        {'review_result': data.get('review_result'), 'review_advice': data.get('review_advice')},
                        ensure_ascii=False), 'review_link': data.get('url'),
                        'caller_id': data.get('caller_id'),
                        'caller_ip': data.get('caller_ip'), 'updated_time': datetime.datetime.now()})

                session.commit()
                flask_logger.info('数据更新成功')
                msg = '请求成功,数据已添加或更新'
                is_success = True
            except Exception as e:
                session.rollback()
                flask_logger.error(f'更新失败:{e}', exc_info=False)
                msg = '数据添加或更新失败,请联系管理员'
                is_success = False

    review_name = ['评审项:' + review_pro, '评审结果:' + review_result, '评审意见:' + review_advice,
                   '评审链接:' + data.get('url')]
    flask_logger.info(
        '-------------------------------------------------------------------------------------------------------')
    return {'code': 200, 'success': is_success, 'msg': msg, 'data': '\n'.join(review_name)}


@cab_api.route('/', methods=['GET', ])
def health_check():
    return 'ok'

这样就不会在发生导入死循环的问题啦!!! 

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部