此文只是测试使用,后续可能对接应用平台。
#!/bin/python3 ####################################################### # This script is to send messages to Lijuan regularly # # Date: 2020-2-17 # # Author: cuijianzhe # # Email: 598941324@qq.com # ####################################################### import requests,json,sys mobiles=sys.argv[1] messages=sys.argv[2] def get_token(): data = {"app_id":"cli_xxxxxxxxxxx9d","app_secret":"YJJxxxxxxxxxxxxxxxxxxxxxxxxxxxxYUi"} headers = {"Content-Type": "application/json"} url_token = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" try: res = requests.post(url_token, json=data, headers=headers) if res.status_code == 200: token = (json.loads(res.text)).get('tenant_access_token') return token except: print('请求失败') headers_group = { "Authorization" : "Bearer %s"%(get_token()), "Content-Type" : "application/json" } def getuserid(): #根据手机号get用户id userurl = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?mobiles=%s"%mobiles res_data = requests.get(url=userurl, headers=headers_group) userid = json.loads(res_data.text)['data']['mobile_users'][mobiles][0]['user_id'] return userid def send_messages(userID): data1 = { "user_id": userID, "msg_type": "text", "content": { "text": "%s <at user_id=\"%s\">test</at>"%(messages,userID) } } url_mess = "https://open.feishu.cn/open-apis/message/v4/send/" res_mess = requests.post(url_mess,json=data1,headers=headers_group) if __name__ == "__main__": token = get_token() user_ID = getuserid() send_messages(user_ID)
import requests,json,linecache,random file = 'word' def get_token(): data = {"app_id":"cli_xxxxxxxxxxxxxxxd","app_secret":"YJJxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxYUi"} headers = {"Content-Type": "application/json"} url_token = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" try: res = requests.post(url_token, json=data, headers=headers) if res.status_code == 200: token = (json.loads(res.text)).get('tenant_access_token') return token except: print('请求失败') headers_group = { "Authorization" : "Bearer %s"%(get_token()), "Content-Type" : "application/json" } def get_chatid(): url_group = "https://open.feishu.cn/open-apis/chat/v4/list?" try: res_group = requests.get(url_group,headers=headers_group) if res_group.status_code == 200: chatid = ((json.loads(res_group.text)).get('data').get('groups'))[0].get('chat_id') return chatid except: print('请求失败') def getuserid(): #根据手机号get用户id mobiles = "18600796142" #测试号,可@所有人 userurl = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?mobiles=%s"%mobiles res_data = requests.get(url=userurl, headers=headers_group) userid = json.loads(res_data.text)['data']['mobile_users'][mobiles][0]['user_id'] return userid def send_messages(userID,chatID): with open(file, 'r', encoding='utf-8') as good: word = linecache.getline(file, random.randint(1, len(good.readlines()))).replace(',', ' ') data1 = { "chat_id": chatID, "user_id": userID, "msg_type": "text", "content": { "text": "%s <at user_id=\"%s\">love</at>"%(word,userID) } } url_mess = "https://open.feishu.cn/open-apis/message/v4/send/" res_mess = requests.post(url_mess,json=data1,headers=headers_group) if __name__ == "__main__": token = get_token() chatid = get_chatid() user_ID = getuserid() send_messages(user_ID,chatid)
#!/bin/env python3 ######################################################## # This script is to send emails to Lijuan regularly # # Date: 2020-2-24 # # Author: cuijianzhe # # Email: 598941324@qq.com # ######################################################## import requests import json import os import random import linecache import logging import datetime logging.basicConfig(filename='/scripts/feishu/log', level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S', format='%(asctime)s - %(levelname)s - %(lineno)d - %(message)s') logger = logging.getLogger(__name__) def get_token(): data = {"app_id":"cli_xxxxxxxxxxx","app_secret":"YJJ7xxxxxxxxxxxxxxxxxxxxYUi"} headers = {"Content-Type": "application/json"} url_token = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" try: res = requests.post(url_token, json=data, headers=headers) if res.status_code == 200: token = (json.loads(res.text)).get('tenant_access_token') return token except: print('请求失败') headers_group = { "Authorization" : "Bearer %s"%(get_token()), "Content-Type" : "application/json" } def getuserid(mobile): #根据手机号get用户id userurl = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?mobiles=%s"%mobile res_data = requests.get(url=userurl, headers=headers_group) userid = json.loads(res_data.text)['data']['mobile_users'][mobile][0]['user_id'] return userid def uploadimg(): imgname = random.choice(os.listdir('/scripts/feishu/images')) # 上传图片接口,get image key with open("/scripts/feishu/images/%s"%imgname,'rb') as p: image = p.read() imgurl = "https://open.feishu.cn/open-apis/image/v4/put/" headers = {"Authorization" : "Bearer %s"%get_token()} files = { 'image':image } imgdata = { "image_type": "message" } resp = requests.post(url=imgurl,headers=headers,files=files,data=imgdata) os.remove('/scripts/feishu/images/%s'%imgname) resp.raise_for_status() content = resp.json() return content['data']['image_key'] def sendmess(path,user_id,image_key=None): with open(path, encoding='utf-8') as yuju: qinghua = linecache.getline(path, random.randint(1, len(yuju.readlines()))).split('、')[1].strip().replace(' ',' ') message_url = "https://open.feishu.cn/open-apis/message/v4/send/" # 发送富文本消息 data = { "user_id": user_id, "msg_type": "post", "content": { "post": { "zh_cn": { "title": "表情包来了", "content": [ [ { "tag": "text", "un_escape": True, "text": "%s :"%qinghua }, { "tag": "at", "user_id": user_id } ], [ { "tag": "img", "image_key": image_key, "width": 1080, "height": 1080 } ] ] } } } } request = requests.post(url=message_url, headers=headers_group, json=data) if __name__ == '__main__': logger.info('Started..') token = get_token() mobiles = ["18xxxxx42","17xxxxxxx3"] for iphone in mobiles: user_ID = getuserid(iphone) imgkey = uploadimg() sendmess('/scripts/feishu/wenben',user_ID,imgkey) logger.info("Finished!\n")