1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# -*- encoding: utf-8 -*-
# -----------------------------------------------------------------------------
# @File Name : app.py
# @Time : 2020/11/18 下午12:45
# @Author : X. Peng
# @Email : acepengxiong@163.com
# @Software : PyCharm
# -----------------------------------------------------------------------------
import logging
import redis
import os
import sys
import yaml
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
env = sys.argv[-1]
work_dir = os.getcwd()
CFG_FILEPATH = work_dir + '/app/config/config.yaml'
config = yaml.load(open(CFG_FILEPATH, 'r'), Loader=yaml.FullLoader)
tamp_user_engine = create_engine(
'mysql+pymysql://{user}:{password}@{host}:{port}/{db}?charset={charset}'.format(
db=config[env]['MySQL']['tamp_user_db'],
host=config[env]['MySQL']['host'],
port=config[env]['MySQL']['port'],
user=config[env]['MySQL']['user'],
password=config[env]['MySQL']['password'],
charset="utf8"
),
echo=True
)
tamp_pay_engine = create_engine(
'mysql+pymysql://{user}:{password}@{host}:{port}/{db}?charset={charset}'.format(
db=config[env]['MySQL']['tamp_pay_db'],
host=config[env]['MySQL']['host'],
port=config[env]['MySQL']['port'],
user=config[env]['MySQL']['user'],
password=config[env]['MySQL']['password'],
charset="utf8")
)
# redis = redis.StrictRedis(
# host=config[env]['redis']['host'],
# port=config[env]['redis']['port'],
# db=config[env]['redis']['db']
# )
logging.basicConfig(level=logging.INFO,
filename=work_dir + config[env]['log']['filename'],
filemode=config[env]['log']['filemode'],
format=config[env]['log']['format'],
datefmt=config[env]['log']['datefmt']
)
class TAMP_SQL(object):
"""[sqlalchemy 封装]
Args:
object ([type]): [description]
Returns:
[type]: [description]
"""
def __init__(self, db_engine):
# 创建DBSession类型:
self.DBSession = scoped_session(sessionmaker(bind=db_engine))
self.session = self.DBSession()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.session.commit()
except:
self.session.rollback()
finally:
self.session.close()