python的sqlite返回完整字典

介绍

在python中使用sql时,如果只是返回row信息,对于程序开发并不方便,如果要返回完整字典可参考如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d

def dic2InsertSql(dic, tblName):
sql_key = ', '.join(dic.keys())
sql_val = ''
for key in dic:
if type(dic[key]) is str:
sql_val = sql_val + '\'' + dic[key] + '\','
else:
sql_val = sql_val + str(dic[key]) + ' ,'

# 移除 sql_val最后一个 ','
return "insert into {} ({}) values ({}) ".format(tblName, sql_key, sql_val[:-1])


# 调用者需要自己补充 where之后的条件语句
def dic2UpdateSql(dic, tblName):
sql_key = ''
for key in dic:
if type(dic[key]) is str:
sql_key = sql_key + key + ' = \'' + dic[key] + '\','
else:
sql_key = sql_key + key + ' = ' + str(dic[key]) + ' ,'

# 移除 sql_key 最后一个 ','
return "update {} set {} where ".format(tblName, sql_key[:-1])


class SqliteBase:
def __init__(self, dbname):
self.conn = sqlite3.connect(dbname, check_same_thread=False)

# 设置返回的数据库信息中携带 表字段名

# dict_factory 一定要设置为静态函数,否则会导致变量引用计数多+1,
# 最后导致局部变量永远无法删除,这个bug没找到具体原因
self.conn.row_factory = dict_factory

def __del__(self):
self.conn.close()

@staticmethod
def initdb(dbname, script):
"""
初始化数据库,执行脚本语句
"""
if os.path.exists(dbname):
return

conn = sqlite3.connect(dbname)
cursor = conn.cursor()

try:
cursor.executescript(script)
conn.commit()
except Exception as e:
log.logger.warning('error: {}'.format(e))
finally:
cursor.close()
conn.close()


def exec(self, sql):
"""
执行sql
返回错误
:param sql:
:return:
"""
ret = True
cursor = self.conn.cursor()
try:
cursor.execute(sql)
self.conn.commit()
except Exception as e:
log.logger.warning('sql {} , error: {}'.format(sql, e))
ret = False
finally:
pass
return ret

def exec_fetchall(self, sql):
"""
#执行sql任务
返回链表字典数据
:param sql:
:return:
"""
lnt = []
cursor = self.conn.cursor()
try:
cursor.execute(sql)
self.conn.commit()
except Exception as e:
# log.logger.warning('sql {} , error: {}'.format(sql, e))
pass
else:
lnt = cursor.fetchall()
finally:
pass
return lnt

def exec_fetchone(self, sql):
"""
执行sql任务
返回字典
:param sql:
:return:
"""
dic = {}
cursor = self.conn.cursor()
try:
cursor.execute(sql)
self.conn.commit()
except Exception as e:
# log.logger.warning('sql {} , error: {}'.format(sql, e))
pass
else:
dic = cursor.fetchone()
finally:
pass
return dic

def getLastRowId(self):
"""
插入数据后,并返回最后的id
:return:
"""
sql = 'select last_insert_rowid() '
dic = self.exec_fetchone(sql)
if 'last_insert_rowid()' in dic:
return dic['last_insert_rowid()']
else:
return 0

基于以上,然后创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
DB_NAME = initRes.SQLITEPATH + '/account.db'
TBL_NAME = 'tbl_account'

DB_SCRIPT = f"""
CREATE TABLE IF NOT EXISTS {TBL_NAME} (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"userName" VARCHAR(256) NOT NULL default '' unique, --用户名
"password" VARCHAR(256) NOT NULL default '' , --密码
"createTime" VARCHAR(64) NOT NULL default '' --创建时间
);
"""


class TblAccount:

def __init__(self):
self.sqlbase = SqliteBase(DB_NAME)

def __del__(self):
pass

# 增加记录
def add(self, dic):
dic['createTime'] = func.localtime()
sql = sqlbase.dic2InsertSql(dic, TBL_NAME)
if self.sqlbase.exec(sql):
return self.sqlbase.getLastRowId()
return 0

# 修改密码
def modify(self, ids, dic):
sql = sqlbase.dic2UpdateSql(dic, TBL_NAME) + " id = {} ".format(ids)
return self.sqlbase.exec(sql)

# 注销账号
def delete(self, ids):
sql = "delete from {} where id = {} ".format(TBL_NAME, ids)
return self.sqlbase.exec(sql)

# 获取所有服务器信息
def all(self):
sql = "select * from {} LIMIT 1000".format(TBL_NAME)
return self.sqlbase.exec_fetchall(sql)

# 通过用户名查询密码
def get(self, userName):
sql = "select * from {} where userName = '{}' ".format(TBL_NAME, userName)
return self.sqlbase.exec_fetchone(sql)


sqliteBase.SqliteBase.initdb(DB_NAME, DB_SCRIPT)

这样再和web交互的json定义中就可以方便的使用数据库信息