forked from ZKeeer/WeChatAssistant
-
Notifications
You must be signed in to change notification settings - Fork 0
/
signin.py
139 lines (123 loc) · 4.66 KB
/
signin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# -*-encoding:utf-8-*-
import os
import sqlite3
import time
import itchat
import config
class SignInMPS:
def __init__(self):
self.db = config.database
self.table = config.signin_table
self.signin_list = {}
self.Date_path = "./signflag.txt"
db_connect = sqlite3.connect(self.db)
try:
db_connect.execute(
"""CREATE TABLE IF NOT EXISTS {} (MPS TEXT NOT NULL, CONTENT NOT NULL);""".format(self.table))
db_connect.commit()
except BaseException as e:
db_connect.rollback()
finally:
db_connect.close()
if not os.path.exists(self.Date_path):
with open(self.Date_path, "w") as fw:
fw.write(str(time.localtime().tm_mday - 1))
def GetCommand(self):
"""
从command.txt文件中获取公众号及其对应口令。
口令格式为:#公众号:口令#
例如:#招商银行信用卡:签到#
:return: 返回一个dict类型
"""
command_dict = {}
db_connect = sqlite3.connect(self.db)
db_cursor = db_connect.cursor()
for item in db_cursor.execute("""SELECT * FROM {};""".format(self.table)).fetchall():
try:
command_dict.update({item[0]: item[1]})
except BaseException as e:
continue
db_cursor.close()
db_connect.close()
return command_dict
def IsSigned(self):
"""
获取上次签到日期,并进行判断当天是否签到了
:return: True:两次日期相同,已经签到了
"""
if not os.path.exists(self.Date_path):
with open(self.Date_path, "w") as fw:
fw.write(str(time.localtime().tm_mday))
last_date = str((time.localtime().tm_mday - 1))
else:
with open(self.Date_path, 'r') as fr:
last_date = fr.read()
return (last_date == str(time.localtime().tm_mday))
def SignIn(self):
"""
获取当前日期,后期上次签到日期,若一致放弃签到,不同则进行签到
获取签到口令
对需要签到的公众号进行签到。
:return: 无
"""
if self.IsSigned():
pass
else:
sign_list = self.GetCommand()
if sign_list:
for item_key in sign_list.keys():
for item_key_mps in itchat.search_mps(name=item_key):
if item_key_mps:
itchat.send(sign_list.get(item_key, '签到'), toUserName=item_key_mps['UserName'])
with open(self.Date_path, "w") as fw:
fw.write(str(time.localtime().tm_mday))
def ShowComd(self):
t_dict = self.GetCommand()
result = ""
try:
for k, v in zip(t_dict.keys(), t_dict.values()):
result += "{}:{}、{}".format(k, v, '\n')
except BaseException as e:
pass
if result:
return result
else:
return "暂无签到口令"
def ClearComd(self):
db_connect = sqlite3.connect(self.db)
try:
db_connect.execute("""DELETE FROM {};""".format(self.table))
db_connect.commit()
return "清空签到口令成功"
except BaseException as e:
db_connect.rollback()
return "清空签到口令失败"
finally:
db_connect.close()
def AddComd(self, mps, comd):
db_connect = sqlite3.connect(self.db)
try:
db_connect.execute("""INSERT INTO {} VALUES ('{}', '{}')""".format(self.table, mps, comd))
db_connect.commit()
return "添加签到口令【{}:{}】成功".format(mps,comd)
except BaseException as e:
db_connect.rollback()
return "添加签到口令【{}:{}】失败".format(mps,comd)
finally:
db_connect.close()
def DeleteComd(self, mps):
db_connect = sqlite3.connect(self.db)
db_cursor = db_connect.cursor()
try:
if db_cursor.execute("""SELECT * FROM {} WHERE MPS = '{}';""".format(self.table, mps)).fetchall():
db_connect.execute("""DELETE FROM {} WHERE MPS = '{}';""".format(self.table, mps))
db_connect.commit()
return "删除口令【{}】成功".format(mps)
else:
return "口令【{}】不存在,请重试".format(mps)
except BaseException as e:
db_connect.rollback()
return "删除口令【{}】失败".format(mps)
finally:
db_cursor.close()
db_connect.close()