1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > 利用python itchat给女朋友定时发信息

利用python itchat给女朋友定时发信息

时间:2021-03-25 16:49:33

相关推荐

利用python itchat给女朋友定时发信息

利用itchat给女朋友定时发信息

涉及到的技术有itchat,redis,mysql,最主要的还是mysql咯,当然咯,这么多东西,我就只介绍我代码需要用到的,其他的,如果需要了解的话,就需要看参考资料了哟

实现的功能:

1.可以保存微信的消息,包括群聊和好友(文字/视频/语音/图片)

2.在群里@自己,可以调用图灵机器人的API进行文字回复(类似于机器人)

3.调用定时任务,在指定时间发送消息至某人

需要了解的基础:

1.python基础

2.mysql基础

3.redis基础

实现效果如下:

只需要在数据库中填写相应的数据,包括,发送给谁(to_user),如果有多个,则用分号(;)分开,执行间隔分钟数(exe_time),如,1440则代表一天,下一次执行时间戳(next_time),主要是抓这个时间来进行发送,抓取的键值(redis_keys)

发送的效果是这样的:

项目基础

itchat模块

官方参考文档:https://itchat.readthedocs.io/zh/latest/

安装

pip install itchat / pip3 install itchat

最简单的测试给好友发送消息

#产生二维码itchat.auto_login()#定义用户的昵称send_userid='用户的昵称'#查找用户的useriditcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']#利用send_msg发送消息itchat.send_msg('这是一个测试',toUserName=itcaht_user_name)

测试小脚本

#!/usr/bin/env python3import itchat#产生二维码itchat.auto_login()#定义用户的昵称send_userid='亲爱的'#查找用户的useriditcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']#利用send_msg发送消息itchat.send_msg('这是一个测试',toUserName=itcaht_user_name)

在执行之后,会弹出一个二维码,然后会让你登录到网页微信,紧接着会去搜索用户信息,搜索到了之后会去取用户的ID,这个ID是微信随机给的,然后利用send_msg进行发送信息

mysql模块

参考文档:/python3/python3-mysql.html

安装

pip install pymysql / pip3 install pymysql

pymysql的基本使用

连接mysql

db = pymysql.connect(host='hostname',port=port,user='username',passwd='password',db='databasename',charset='charset')

填写正确的信息后就成功的连接了mysql,注意,port这里,端口号不能加'',否则会连接不上mysql

简单的操作mysql

insert

#连接DBdb = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8')#编写SQLsql = "insert into test_message values (%d);" %(int(time.time()))#获取游标cursor = db.cursor()#数据库重连db.ping(reconnect=True)#执行SQLcursor.execute(sql)#commit mit()

执行完毕后,如果正确的话,就可以看到表中有数据了,当然,数据库和数据表得自己去建立才行,如果没有数据,则需要看看程序的输出结果了,还有一点,在做DML的时候,须要加上commit,这是事务的一个特性,否则数据会丢失的

select

#连接DBdb = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8')#编写SQLsql = "select * from test_message"#获取游标cursor = db.cursor()#数据库重连db.ping(reconnect=True)#执行SQLcursor.execute(sql)#获取全部结果cursor.fetchall()#返回单个数据cursor.fetchone()

执行完毕后,如果正确的话,就可以看到结果了

redis模块

参考文档:/xiaoming279/p/6293583.html

安装

pip install redis / pip3 install redis

连接Redis

r = redis.Redis('host',port,db_port,'password')

判断键是否存在

r = redis.Redis('localhost',6379,0,'redis')keys = '123'if r.exists(keys):print ("存在")else:print ("不存在")

随机取出列表中的数据

r = redis.Redis('localhost',6379,0,'redis')len_keys = r.llen(keys)random_int = int(random.randint(0,len_keys-1))print (r.lindex(keys,random_int).decode('utf-8'))

基本上在下面代码中,几乎会用到如上基础

MySQL数据库/表建立

数据库建立

CREATE DATABASE `itchat` /*!40100 DEFAULT CHARACTER SET utf8mb4 */

数据表的建立

itchat_login_info

CREATE TABLE `itchat_login_info` (`createdate` int(11) NOT NULL,`myid` varchar(128) DEFAULT NULL,`username` varchar(128) DEFAULT NULL,`mysignature` varchar(128) DEFAULT NULL,`mysex` varchar(4) DEFAULT NULL,`myuseruin` varchar(128) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

itchat_message

CREATE TABLE `itchat_message` (`createdate` int(11) NOT NULL,`msgid` varchar(128) NOT NULL,`nickname` varchar(128) DEFAULT NULL,`username` varchar(128) DEFAULT NULL,`actualnickname` varchar(128) DEFAULT NULL,`actualusername` varchar(128) DEFAULT NULL,`msgtext` varchar(5000) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

itchat_friend_message

CREATE TABLE `itchat_friend_message` (`createdate` int(11) NOT NULL,`msgid` varchar(128) NOT NULL,`fromuser` varchar(128) NOT NULL,`fromuserid` varchar(128) NOT NULL,`fromsex` varchar(4) NOT NULL,`touser` varchar(128) NOT NULL,`touserid` varchar(128) NOT NULL,`tousersex` varchar(4) NOT NULL,`msgtext` varchar(5000) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

auto_timer

CREATE TABLE `auto_timer` (`timer_id` int(11) NOT NULL,`createdate` int(11) NOT NULL,`content` varchar(200) DEFAULT NULL,`to_user` varchar(1000) NOT NULL,`exe_time` int(11) NOT NULL,`last_time` int(11) DEFAULT NULL,`next_time` int(11) NOT NULL,`redis_keys` varchar(100) NOT NULL,`timer_count` bigint(20) NOT NULL,PRIMARY KEY (`timer_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

代码

mian.py

#!/usr/bin/env python3# -*- coding: UTF-8 -*-import itchatfrom itchat.content import *import collections_logsimport save_dbimport liwang_redis_itchatimport time import threadingimport os#定义装饰器,用于监听图片,视频等消息,并且下载下来@itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO],isFriendChat=True, isGroupChat=True)def download_files(msg):logger.debug(msg['FromUserName'])logger.debug("发送的是语音/视频等,需要保存至本地")msg.download(msg.fileName)file_value = '127.0.0.1/' + msg.FileName + '_' + msg['Type']logger.debug(file_value)#将消息写入数据库if '@@' in msg['FromUserName'] :logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中")sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(file_value))save_db.exe_db(db,sql,logger)elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']):logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中")sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(file_value))save_db.exe_db(db,sql,logger)else :#将群聊设置为0,让后面的@抓不到isgroup = 0#判断是否是自己发送if msg['FromUserName'] == MyID :logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中")sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],file_value)save_db.exe_db(db,sql,logger)else:logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中")sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,file_value)save_db.exe_db(db,sql,logger)#定义装饰器,用于监听好友和群聊微信群聊内容@itchat.msg_register(TEXT, isFriendChat=True, isGroupChat=True) def simple_reply(msg):#定义群聊isgroup = 1#将消息写入数据库if '@@' in msg['FromUserName'] :logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中")sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(msg['Text']))save_db.exe_db(db,sql,logger)elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']):logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中")sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(msg['Text']))save_db.exe_db(db,sql,logger)else :#将群聊设置为0,让后面的@抓不到isgroup = 0#判断是否是自己发送if msg['FromUserName'] == MyID :logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中")sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],msg['Text'])save_db.exe_db(db,sql,logger)else:logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中")sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,msg['Text'])save_db.exe_db(db,sql,logger)#将所有的聊天信息都写入到mongodb中#暂时不写,mongodb还未掌握基本的使用#这里写聊天机器人端口#判断是否@了自己,且为群聊if isgroup == 1 :if msg['isAt'] :logger.debug("@了自己,移动到redis_itchat")from_message = msg['Text']send_msg = liwang_redis_itchat.auto_box(from_message,redis_db,logger)itchat.send_msg(send_msg,toUserName=msg['ToUserName'])itchat.send_msg(send_msg,toUserName=msg['FromUserName'])else:logger.debug("没有@自己")#空出两个空LOGlogger.debug("-------------------------------------------------------------------")logger.debug("-------------------------------------------------------------------")def auto_reply():logger.debug("============================================================================================")#获取全部ID sql sql = "select timer_id from auto_timer;"#获取结果userid_list = save_db.select_db(db,sql,logger)#遍历auto_timer IDfor userid in userid_list:#打印正在处理的信息sql = "select content from auto_timer where timer_id = '%s';" %(userid)id_content = save_db.select_db(db,sql,logger)[0]logger.debug("判断ID:%s" %(userid))logger.debug("判断内容:%s" %(id_content))#判断时间是否符合#获取当前的时间戳now_time = int(time.time())logger.debug("当前时间为:%s" %(now_time))sql = "select next_time from auto_timer where timer_id = '%s';" %(userid)next_time = save_db.select_db(db,sql,logger)[0]next_time = int(next_time[0])logger.debug("下次执行时间戳:%s" %(next_time))#判断,如果当前时间大于或等于下一次执行时间,就执行if now_time >= next_time :logger.debug("处理内容:%s" %(id_content))#获取发送人sql = "select to_user from auto_timer where timer_id = '%s';" %(userid)to_user = save_db.select_db(db,sql,logger)[0]to_user = to_user[0]logger.debug("查找用户为:%s" %(to_user))#单用户single_user = 1#判断字符串是否有分割if ';' in to_user :allocation_user = to_user.split(';')logger.debug("需要发送多人,信息为:%s" %(allocation_user))single_user = 0else:allocation_user = to_userlogger.debug("需要发送一人,信息为:%s" %(allocation_user))#判断为单用户,不需要做处理if single_user == 1 :to_user = to_user + ';'allocation_user = to_user.split(';')for send_userid in allocation_user :logger.debug("正在处理%s用户" %(send_userid))#查找微信是否存在此用户try:itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']if itcaht_user_name == " ":logger.debug("微信不存在此好友,请检查")else:#获取发送的内容:sql = "select redis_keys from auto_timer where timer_id = '%s';" %(userid)redis_keys = save_db.select_db(db,sql,logger)[0]redis_keys = str(redis_keys[0])return_values = liwang_redis_itchat.return_rand_keys(redis_db,logger,redis_keys)#获取其他信息sql = "select last_time from auto_timer where timer_id = '%s';" %(userid)last_time = save_db.select_db(db,sql,logger)[0]last_time = last_time[0]sql = "select timer_count from auto_timer where timer_id = '%s';" %(userid)timer_count = save_db.select_db(db,sql,logger)[0]timer_count = int(timer_count[0])sql = "select exe_time from auto_timer where timer_id = '%s';" %(userid)exe_time = save_db.select_db(db,sql,logger)[0]exe_time = int(exe_time[0]) * 60logger.debug("将要发送的信息如下")logger.debug("timer_ID:%s" %(userid))logger.debug("发送简介为:%s" %(id_content))logger.debug("上一次执行的时间戳是:%s" %(last_time))logger.debug("下一次执行的时间戳是:%s" %(next_time))logger.debug("获取Redis的值为:%s" %(redis_keys))logger.debug("间隔描述为:%s" %(exe_time))logger.debug("已经执行次数为:%s" %(timer_count))try:logger.debug("将要发送的消息: %s" %(return_values))itchat.send_msg(return_values,toUserName=itcaht_user_name)except Exception as e:logger.debug("信息发送失败,详细信息如下:")logger.debug(e)except Exception as e:logger.debug("未找到该用户信息,详细信息如下:")logger.debug(e)try:# 更新信息last_time = now_time + exe_timetimer_count = timer_count + 1userid = userid[0]sql = "update auto_timer set last_time = '%s' , next_time = '%s' , timer_count = '%s' where timer_id = '%s';" \%(now_time,last_time,timer_count,userid)save_db.exe_db(db,sql,logger)logger.debug("更新信息完毕")except Exception as e:logger.debug("消息已经发送,但是更新数据库失败,详细信息如下:")logger.debug(e)else:logger.debug("判断时间未到")#修整60秒logger.debug("============================================================================================")time.sleep(3)time.sleep(60)if __name__ == "__main__" :#设置登陆的PKloginpk = 'login_' + str(time.time())#定义微信自动登录itchat.auto_login(enableCmdQR=2,statusStorageDir=loginpk)# itchat.auto_login(statusStorageDir=loginpk)#获取自己的IDglobal MyIDMyID = itchat.get_friends(update=True)[0]["UserName"]#获取用户信息global myUserNameglobal mysexmyUserName = itchat.get_friends()[0]['NickName']myUserUin = itchat.get_friends()[0]['Uin']mysignature = itchat.get_friends()[0]['Signature']mysex = itchat.get_friends()[0]['Sex']#设置loggerglobal loggerlogname = str(myUserName) + '_' + str(myUserUin)logger = collections_logs.build_logs(logname)#定义数据库global dbdb = save_db.itchat_connect_mysql(logger)global redis_dbredis_db = liwang_redis_itchat.connect_redis(logger)#每登陆一次,记录一次loginlogger.debug("记录登陆Log")sql = "insert into itchat_login_info values (%d,'%s','%s','%s','%s','%s');" %(int(time.time()),MyID,myUserName,mysignature,mysex,myUserUin)save_db.exe_db(db,sql,logger)#启动线程 threading._start_new_thread(itchat.run,())#开始循环while 1:itchat.configured_reply()auto_reply()

collections_logs.py

#!/usr/bin/env python3# -*- coding: UTF-8 -*-import loggingimport timedef build_logs(file_name):#获取当前时间now_time = time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime())#设置log名称logname = 'itchat' + file_name + now_time#定义loggerlogger = logging.getLogger()#设置级别为debuglogger.setLevel(level = logging.DEBUG)#设置 logging文件名称handler = logging.FileHandler(logname)#设置级别为debug handler.setLevel(logging.DEBUG)#设置log的格式formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')#将格式压进logger handler.setFormatter(formatter)console = logging.StreamHandler()console.setLevel(logging.DEBUG)#写入logger logger.addHandler(handler)logger.addHandler(console)#将logger返回return logger

save_db.py

#!/usr/bin/env python3# -*- coding: UTF-8 -*-import pymysqldef itchat_connect_mysql(logger):try:db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='itchat',charset='utf8')logger.debug('MySQL数据库连接成功')logger.debug('数据库ID:%s' %(db))return dbexcept Exception as e:logger.debug('MySQL数据库连接失败')logger.debug(e)def exe_db(db,sql,logger):try:cursor = db.cursor()logger.debug('获取游标:%s' %(cursor))db.ping(reconnect=True)logger.debug("数据库重连")cursor.execute(sql)logger.debug("执行SQL")logger.debug(sql)mit()logger.debug("数据库commit")except Exception as e:logger.debug('SQL执行失败,请至日志查看该SQL记录')logger.debug(sql)logger.debug(e)def select_db(db,sql,logger):try:cursor = db.cursor()logger.debug('获取游标:%s' %(cursor))db.ping(reconnect=True)logger.debug("数据库重连")cursor.execute(sql)logger.debug("执行SQL")logger.debug(sql)return cursor.fetchall()except Exception as e:logger.debug('SQL执行失败,请至日志查看该SQL记录')logger.debug(sql)logger.debug(e)

liwang_redis_itchat.py

#!/usr/bin/env python3# -*- coding: UTF-8 -*-import redisimport jsonimport requestsimport randomdef connect_redis(logger):try:r = redis.Redis('127.0.0.1',6379,0,'redis')logger.debug("Redis数据库连接成功")return rexcept Exception as e:logger.debug("Redis数据库连接失败")logger.debug(e)def return_rand_keys(redis_db,logger,keys):#判断是否有这个keyif redis_db.exists(keys):len_keys = redis_db.llen(keys)random_int = int(random.randint(0,len_keys-1))logger.debug("Redis获取的随机值为:%s" %(random_int))return_redis_values = redis_db.lindex(keys,random_int).decode('utf-8')logger.debug("Redis将要返回的值为:%s" %(return_redis_values))return return_redis_valueselse:logger.debug("没有找到相关key")return 0def auto_box(message,redis_db,logger):#设置分隔符if '\u' in message :myid='@小子\u'else:myid = '@小子 '#获取分隔符之后的值if len(message.split(myid)) == 1 :return "@我了要说话哟"else:dealwith = message.split(myid)[1]if dealwith == " " :logger.debug("只是@了我,并没有输入内容")else:#图灵机器人API接口url_api = "/openapi/api/v2"#将问题赋值给box_questbox_quest = dealwith#定义json post request_json = json.dumps ({"perception":{"inputText":{"text": box_quest},"selfInfo":{"location":{"city": "成都","province": "成都","street": "天府三街"}}},"userInfo": {"apiKey": "APIkey","userId": "tuling"}})# 获取POST之后的值r = requests.post(url_api, data=request_json)r_test = r.textr_json = json.loads(r_test)#定义返回值return_str = (r_json['results'][0]['values']['text'])logger.debug("图灵机器人将要返回的值:")logger.debug(return_str)#需要自己建立自己的机器人仓库logger.debug("将返回的结果存入Redis数据库中")redis_db.rpush(box_quest,return_str)return return_str

实现的效果

如下以log展现

接收消息并且保存至数据库

图灵机器人回复

定时任务

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。