#!/usr/bin/env python # -*- coding:utf-8 -*- # # 基于torndb开发,对于常用的SQL命令进行了封装 # import ast import MySQLdb import torndb __author__ = "xxx_828@163.com" IntegrityError = MySQLdb.IntegrityError OperationalError = MySQLdb.OperationalError class XTDB(torndb.Connection): def __init__(self, host, database, user=None, password=None, max_idle_time=7 * 3600, connect_timeout=0, time_zone="+0:00", prefix="", prefix_sign="###"): super(XTDB, self).__init__(host, database, user, password, max_idle_time, connect_timeout, time_zone) self.prefix = prefix # 表前缀 self.prefix_sign = prefix_sign # 替换表前缀的字符 # 同时返回 lastrowid 和 rowcount def execute_both(self, query, *parameters, **kwparameters): cursor = self._cursor() try: self._execute(cursor, query, parameters, kwparameters) return cursor.lastrowid, cursor.rowcount finally: cursor.close() def executemany_both(self, query, parameters): # 同时返回 lastrowid 和 rowcount cursor = self._cursor() try: cursor.executemany(query, parameters) return cursor.lastrowid, cursor.rowcount finally: cursor.close() def mk_insert_query(self, table, field, many=False): # 生成 insert 语句,many 表示是否是多条插入 table = self.prefix + table if isinstance(field, str): # 去除空格后,分割为列表 field = [i.strip() for i in field.split(",")] field_str = " (" # 字段名部分 field_value_str = " (" # 字段名对应的参数部分 for i in field: # 判断是否有使用 mysql 自身函数的情况 if i.startswith("{") and i.endswith("}"): ei = ast.literal_eval(i) field_str += (ei.keys()[0] + ",") field_value_str += (ei.values()[0] + ",") else: field_str += (i + ",") field_value_str += "%s," field_str = field_str[:-1] + ") " field_value_str = field_value_str[:-1] + ") " if not many: query = "INSERT INTO " + table + field_str + " VALUE " + field_value_str else: query = "INSERT INTO " + table + field_str + " VALUES " + field_value_str return query def mk_delete_query(self, table, condition): # 生成 delete 语句 table = self.prefix + table query = "DELETE FROM " + table + " " + condition return query def mk_update_query(self, table, field, condition): # 生成 update 语句 table = self.prefix + table if isinstance(field, str): # 去除空格后,分割为列表 field = [i.strip() for i in field.split(",")] field_str = " " for i in field: # 判断是否有使用 mysql 自身函数的情况 if i.startswith("{") and i.endswith("}"): i = ast.literal_eval(i) field_str += (i.keys()[0] + "=" + str(i.values()[0]) + ", ") else: field_str += (i + "=%s, ") field_str = field_str[:-2] + " " query = "UPDATE " + table + " SET" + field_str + " " + condition return query def insert(self, table, field, *parameters, **kwparameters): query = self.mk_insert_query(table, field, many=True) return self.execute_both(query, *parameters, **kwparameters) def insert_many(self, table, field, *parameters, **kwparameters): query = self.mk_insert_query(table, field) return self.executemany_both(query, *parameters, **kwparameters) def delete(self, table, condition, *parameters, **kwparameters): query = self.mk_delete_query(table, condition) return self.execute_both(query, *parameters, **kwparameters) def delete_many(self, table, condition, *parameters, **kwparameters): query = self.mk_delete_query(table, condition) return self.executemany_both(query, *parameters, **kwparameters) def update(self, table, field, condition, *parameters, **kwparameters): query = self.mk_update_query(table, field, condition) return self.execute_both(query, *parameters, **kwparameters) def update_many(self, table, field, condition, *parameters, **kwparameters): query = self.mk_update_query(table, field, condition) return self.executemany_both(query, *parameters, **kwparameters) def select(self, table, field, condition, *parameters, **kwparameters): table = self.prefix + table query = "SELECT " + field + " FROM " + table + " " + condition return self.query(query, *parameters, **kwparameters) def get(self, table, field, condition, *parameters, **kwparameters): # 只返回select的第一条数据 data = self.select(table, field, condition, *parameters, **kwparameters) return data[0] if data else [] def count(self, table, field, condition, *parameters, **kwparameters): # 用于计数,和select类似,简化了一些输入 table = self.prefix + table field = 'COUNT(' + field + ') AS rows_count' query = "SELECT " + field + " FROM " + table + " " + condition rows_count = self.query(query, *parameters, **kwparameters) if rows_count: return int(rows_count[0]["rows_count"]) else: return 0 def alter(self, table, condition, *parameters, **kwparameters): query = "ALTER TABLE " + self.prefix + table + " " + condition return self.execute_lastrowid(query, *parameters, **kwparameters)