test.py 5.69 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
# 基于torndb开发,对于常用的SQL命令进行了封装
#
import ast
import MySQLdb
import torndb

__author__ = "xxx_828@163.com"

IntegrityError = MySQLdb.IntegrityError
OperationalError = MySQLdb.OperationalError


class XTDB(torndb.Connection):
    def __init__(self, host, database, user=None, password=None,
                 max_idle_time=7 * 3600, connect_timeout=0, time_zone="+0:00",
                 prefix="", prefix_sign="###"):
        super(XTDB, self).__init__(host, database, user, password,
                                   max_idle_time, connect_timeout, time_zone)
        self.prefix = prefix        # 表前缀
        self.prefix_sign = prefix_sign    # 替换表前缀的字符
    # 同时返回 lastrowid 和 rowcount
    def execute_both(self, query, *parameters, **kwparameters):
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            return cursor.lastrowid, cursor.rowcount
        finally:
            cursor.close()
    def executemany_both(self, query, parameters):
        # 同时返回 lastrowid 和 rowcount
        cursor = self._cursor()
        try:
            cursor.executemany(query, parameters)
            return cursor.lastrowid, cursor.rowcount
        finally:
            cursor.close()
    def mk_insert_query(self, table, field, many=False):
        # 生成 insert 语句,many 表示是否是多条插入
        table = self.prefix + table
        if isinstance(field, str):          # 去除空格后,分割为列表
            field = [i.strip() for i in field.split(",")]
        field_str = " ("                    # 字段名部分
        field_value_str = " ("              # 字段名对应的参数部分
        for i in field:                     # 判断是否有使用 mysql 自身函数的情况
            if i.startswith("{") and i.endswith("}"):
                ei = ast.literal_eval(i)
                field_str += (ei.keys()[0] + ",")
                field_value_str += (ei.values()[0] + ",")
            else:
                field_str += (i + ",")
                field_value_str += "%s,"
        field_str = field_str[:-1] + ") "
        field_value_str = field_value_str[:-1] + ") "
        if not many:
            query = "INSERT INTO " + table + field_str + " VALUE " + field_value_str
        else:
            query = "INSERT INTO " + table + field_str + " VALUES " + field_value_str
        return query
    def mk_delete_query(self, table, condition):
        # 生成 delete 语句
        table = self.prefix + table
        query = "DELETE FROM " + table + " " + condition
        return query
    def mk_update_query(self, table, field, condition):
        # 生成 update 语句
        table = self.prefix + table
        if isinstance(field, str):            # 去除空格后,分割为列表
            field = [i.strip() for i in field.split(",")]
        field_str = " "
        for i in field:            # 判断是否有使用 mysql 自身函数的情况
            if i.startswith("{") and i.endswith("}"):
                i = ast.literal_eval(i)
                field_str += (i.keys()[0] + "=" + str(i.values()[0]) + ", ")
            else:
                field_str += (i + "=%s, ")
        field_str = field_str[:-2] + " "
        query = "UPDATE " + table + " SET" + field_str + " " + condition
        return query
    def insert(self, table, field, *parameters, **kwparameters):
        query = self.mk_insert_query(table, field, many=True)
        return self.execute_both(query, *parameters, **kwparameters)
    def insert_many(self, table, field, *parameters, **kwparameters):
        query = self.mk_insert_query(table, field)
        return self.executemany_both(query, *parameters, **kwparameters)
    def delete(self, table, condition, *parameters, **kwparameters):
        query = self.mk_delete_query(table, condition)
        return self.execute_both(query, *parameters, **kwparameters)
    def delete_many(self, table, condition, *parameters, **kwparameters):
        query = self.mk_delete_query(table, condition)
        return self.executemany_both(query, *parameters, **kwparameters)
    def update(self, table, field, condition, *parameters, **kwparameters):
        query = self.mk_update_query(table, field, condition)
        return self.execute_both(query, *parameters, **kwparameters)
    def update_many(self, table, field, condition, *parameters, **kwparameters):
        query = self.mk_update_query(table, field, condition)
        return self.executemany_both(query, *parameters, **kwparameters)
    def select(self, table, field, condition, *parameters, **kwparameters):
        table = self.prefix + table
        query = "SELECT " + field + " FROM " + table + " " + condition
        return self.query(query, *parameters, **kwparameters)
    def get(self, table, field, condition, *parameters, **kwparameters):
        # 只返回select的第一条数据
        data = self.select(table, field, condition, *parameters, **kwparameters)
        return data[0] if data else []
    def count(self, table, field, condition, *parameters, **kwparameters):
        # 用于计数,和select类似,简化了一些输入
        table = self.prefix + table
        field = 'COUNT(' + field + ') AS rows_count'
        query = "SELECT " + field + " FROM " + table + " " + condition
        rows_count = self.query(query, *parameters, **kwparameters)
        if rows_count:
            return int(rows_count[0]["rows_count"])
        else:
            return 0
    def alter(self, table, condition, *parameters, **kwparameters):
        query = "ALTER TABLE " + self.prefix + table + " " + condition
        return self.execute_lastrowid(query, *parameters, **kwparameters)