# -*- encoding: utf-8 -*- # ----------------------------------------------------------------------------- # @File Name : app.py # @Time : 2020/11/18 下午12:45 # @Author : X. Peng # @Email : acepengxiong@163.com # @Software : PyCharm # ----------------------------------------------------------------------------- import logging import redis import os import sys import yaml from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.pool import NullPool env = sys.argv[-1] work_dir = os.getcwd() CFG_FILEPATH = work_dir + '/app/config/config.yaml' config = yaml.load(open(CFG_FILEPATH, 'r'), Loader=yaml.FullLoader) tamp_user_engine = create_engine( 'mysql+pymysql://{user}:{password}@{host}:{port}/{db}?charset={charset}'.format( db=config[env]['MySQL']['tamp_user_db'], host=config[env]['MySQL']['host'], port=config[env]['MySQL']['port'], user=config[env]['MySQL']['user'], password=config[env]['MySQL']['password'], charset="utf8" ), poolclass=NullPool # max_overflow=0, # 超过连接池大小外最多创建的连接 # pool_size=10, # 连接池大小 # pool_timeout=5, # pool_recycle=60 ) tamp_pay_engine = create_engine( 'mysql+pymysql://{user}:{password}@{host}:{port}/{db}?charset={charset}'.format( db=config[env]['MySQL']['tamp_pay_db'], host=config[env]['MySQL']['host'], port=config[env]['MySQL']['port'], user=config[env]['MySQL']['user'], password=config[env]['MySQL']['password'], charset="utf8"), poolclass=NullPool # max_overflow=0, # 超过连接池大小外最多创建的连接 # pool_size=10, # 连接池大小 # pool_timeout=5, # pool_recycle=60 ) tamp_zhibo_engine = create_engine( 'mysql+pymysql://{user}:{password}@{host}:{port}/{db}?charset={charset}'.format( db=config[env]['MySQL']['tamp_zhibo_db'], host=config[env]['MySQL']['host'], port=config[env]['MySQL']['port'], user=config[env]['MySQL']['user'], password=config[env]['MySQL']['password'], charset="utf8" ), poolclass=NullPool # max_overflow=0, # 超过连接池大小外最多创建的连接 # pool_size=10, # 连接池大小 # pool_timeout=5, # pool_recycle=60 ) tamp_course_engine = create_engine( 'mysql+pymysql://{user}:{password}@{host}:{port}/{db}?charset={charset}'.format( db=config[env]['MySQL']['tamp_course_db'], host=config[env]['MySQL']['host'], port=config[env]['MySQL']['port'], user=config[env]['MySQL']['user'], password=config[env]['MySQL']['password'], charset="utf8" ), poolclass=NullPool # max_overflow=0, # 超过连接池大小外最多创建的连接 # pool_size=10, # 连接池大小 # pool_timeout=5, # pool_recycle=60 ) redis = redis.Redis( host=config[env]['redis']['host'], port=config[env]['redis']['port'], db=config[env]['redis']['db'], password=config[env]['redis']['password'] ) logging.basicConfig(level=logging.INFO, # filename=work_dir + config[env]['log']['filename'], # filemode=config[env]['log']['filemode'], format=config[env]['log']['format'], datefmt=config[env]['log']['datefmt'] ) class TAMP_SQL(object): """[sqlalchemy 封装] Args: object ([type]): [description] Returns: [type]: [description] """ def __init__(self, db_engine): # 创建DBSession类型: # self.DBSession = scoped_session(sessionmaker(bind=db_engine)) self.DBSession = sessionmaker(bind=db_engine) self.session = self.DBSession() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): try: self.session.commit() except: try: self.session.rollback() except: pass finally: self.session.close()