# -*- coding: utf-8 -*- import logging import sys from sqlite3 import Row import pymysql from config.config import Settings logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) def get_connection(): return pymysql.connect( host=Settings.MYSQL['host'], port=Settings.MYSQL['port'], user=Settings.MYSQL['user'], password=Settings.MYSQL['password'], database=Settings.MYSQL['database'], charset='utf8mb4', local_infile=1 ) def fetch_one(sql, args): conn = get_connection() cursor = conn.cursor() cursor.execute(sql, args) column_names = [col[0] for col in cursor.description] dict_res = [dict(zip(column_names, row)) for row in cursor.fetchall()] conn.commit() connect_close(cursor, conn) return dict_res def fetch_all(sql, args): conn = get_connection() cursor = conn.cursor() cursor.execute(sql, args) column_names = [col[0] for col in cursor.description] dict_res = [dict(zip(column_names, row)) for row in cursor.fetchall()] conn.commit() connect_close(cursor, conn) return dict_res def fetch_one_list(sql, args): conn = get_connection() cursor = conn.cursor() cursor.execute(sql, args) data = cursor.fetchone() conn.commit() connect_close(cursor, conn) return data def fetch_all_list(sql, args): conn = get_connection() cursor = conn.cursor() cursor.execute(sql, args) data_list = cursor.fetchall() conn.commit() connect_close(cursor, conn) return data_list def insert(sql, args): conn = get_connection() cursor = conn.cursor() row = cursor.execute(sql, args) conn.commit() connect_close(cursor, conn) return row # def insert_batch(sql, args): # conn = get_connection() # cursor = conn.cursor() # # row = cursor.executemany(sql, args) # # conn.commit() # # connect_close(cursor, conn) # row = 0 # try: # row = cursor.executemany(sql, args) # logging.info(sql, args) # conn.commit() # except Exception as e: # cursor.rollback() # logging.error(e) # connect_close(cursor, conn) # return row def insert_batch(sql, args): conn = get_connection() cursor = conn.cursor() row = cursor.executemany(sql, args) conn.commit() connect_close(cursor, conn) return row def update(sql, args): conn = get_connection() cursor = conn.cursor() row = cursor.execute(sql, args) conn.commit() connect_close(cursor, conn) return row # 批量更新 def update_batch(sql, args): conn = get_connection() cursor = conn.cursor() row = cursor.executemany(sql, args) conn.commit() connect_close(cursor, conn) return row def connect_close(cursor, conn): cursor.close() conn.close() # 保存至mysql def save_result(database, table, result_dict, file_name): if result_dict: result_table = database + '.' + table logging.info(f'save {result_table} start') row = 0 fields = None place_holder = None values = list() for i in result_dict: row = row + 1 fields = ','.join([f"`{k}`" for k in i.keys()]) place_holder = ','.join(["%s" for _ in i.keys()]) values.append(tuple(i.values())) sql = f'''replace into {result_table} ( {fields} ) values ( {place_holder} )''' rs = insert_batch(sql, values) if rs == row or rs == 2 * row: # 因为这里用的是replace logging.info(f'save {result_table} success {row}') else: logging.error(f'save {result_table} error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}') def rows_to_dict_list(cursor): columns = [i[0] for i in cursor.description] return [dict(zip(columns, row)) for row in cursor]