1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > Python实现微信公众号消息推送

Python实现微信公众号消息推送

时间:2023-07-07 07:52:38

相关推荐

Python实现微信公众号消息推送

本章内容主要阐述如何通过Python语言来实现微信公众号客服消息的推送,包括文本信息以及媒体信息(picture,voice,video等),其他方式可参考腾讯官方文档 https://developers./doc/offiaccount/Getting_Started/Overview.html

实现思路:

上微信公众号平台注册一个微信公众号测试号,链接:https://mp./debug/cgi-bin/sandboxinfo?action=showinfo&t=sandbox/index获取微信公众号的appID 和appsecret(在微信公众号平台上注册微信公众号后即可查看到)获取公众号的access_token值,access_token是公众号全局唯一接口调用凭据,公众号调用各接口时都需要使用access_token。该值目前有效期限是每2小时,超过2小时后需重新获取,且每天仅可获取2000次。

api接口:

接口调用请求说明

https请求方式: GET https://api./cgi-bin/token?grant_type=client_credential&appid=APPID&secret=APPSECRET

获取设备的opend_id。opend_id为关注公众号粉丝(有关注公众号的微信账号)的用户id,想要给粉丝发消息,就必须要获取到粉丝的open_id根据微信公众号平台上给的api接口使用方法,调用对应的api接口给粉丝发送不同类型的消息。(详情请见微信公众号官方文档)

API接口:

http请求方式: POST https://api./cgi-bin/message/custom/send?access_token=ACCESS_TOKEN

下列代码示例是通过“客服消息”接口来实现给粉丝发送消息。

# -*- encoding:utf-8 -*-import requestsimport jsonclass SendMessage():def __init__(self):self.appID = 'xxxx'self.appsecret = 'xxxx'self.access_token = self.get_access_token()self.opend_ids = self.get_openid()def get_access_token(self):"""获取微信公众号的access_token值"""url = 'https://api./cgi-bin/token?grant_type=client_credential&appid={}&secret={}'.\format(self.appID, self.appsecret)print(url)headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.67 Safari/537.36'}response = requests.get(url, headers=headers).json()access_token = response.get('access_token')# print(access_token)return access_tokendef get_openid(self):"""获取所有粉丝的openid"""next_openid = ''url_openid = 'https://api./cgi-bin/user/get?access_token=%s&next_openid=%s' % (self.access_token, next_openid)ans = requests.get(url_openid)print(ans.content)open_ids = json.loads(ans.content)['data']['openid']return open_idsdef sendmsg(self, msg):"""给所有粉丝发送文本消息"""url = "https://api./cgi-bin/message/custom/send?access_token={}".format(self.access_token)print(url)if self.opend_ids != '':for open_id in self.opend_ids:body = {"touser": open_id,"msgtype":"text","text":{"content": msg}}data = bytes(json.dumps(body, ensure_ascii=False).encode('utf-8'))print(data)response = requests.post(url, data=data)# 这里可根据回执code进行判定是否发送成功(也可以根据code根据错误信息)result = response.json()print(result)else:print("当前没有用户关注该公众号!")def upload_media(self, media_type, media_path):"""上传临时文件到微信服务器,并获取该文件到meida_id"""url = 'https://api./cgi-bin/media/upload?access_token={}&type={}'.format(self.access_token, media_type)print(url)meida = {'media': open(media_path, 'rb')}rsponse = requests.post(url, files=meida)parse_json = json.loads(rsponse.content.decode())print(parse_json)return parse_json.get('media_id')def send_media_to_user(self, media_type, media_path):"""给所有粉丝发送媒体文件,媒体文件以meida_id表示"""media_id = self.upload_media(media_type, media_path)url = 'https://api./cgi-bin/message/custom/send?access_token={}'.format(self.access_token)if self.opend_ids != '':for open_id in self.opend_ids:if media_type == "image":body = {"touser": open_id,"msgtype": "image","image":{"media_id": media_id}}if media_type == "voice":body = {"touser": open_id,"msgtype": "voice","voice":{"media_id": media_id}}data = bytes(json.dumps(body, ensure_ascii=False).encode('utf-8'))print(data)response = requests.post(url, data=data)# 这里可根据回执code进行判定是否发送成功(也可以根据code根据错误信息)result = response.json()print(result)else:print("当前没有用户关注该公众号!")if __name__ == "__main__":data = 'Hello,3Nod!'sends = SendMessage()sends.sendmsg(data)sends.send_media_to_user("image", './test.png')

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。