forked from BoYanZh/QQ-Group-Repeater
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRepeater.py
590 lines (551 loc) · 21.3 KB
/
Repeater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
from Bot import Bot
import datetime
import requests
import aiohttp
import random
import json
import six
import os
import re
import math
import majsoul
import tenhou
import tenhou2
import spider
import wg
import majsoul2
async def aioGet(url):
try:
async with aiohttp.ClientSession() as sess:
async with sess.get(url) as response:
if response.status != 200:
return ""
return await response.text()
except aiohttp.ClientError:
return ""
def Repeater():
bot = Bot()
def getReply(key):
re = Bot.REPLY.get(key)
return random.choice(re) if re else ''
# recursively solve 24 points
def solve24(num):
def cleanBracketsFor24(equ):
equ = list(equ)
brackets, re = [], []
for i in range(len(equ)):
if equ[i] == '(':
re.append(i)
if equ[i] == ')':
brackets.append([re.pop(), i])
for pair in brackets:
tmpEqu = equ.copy()
tmpEqu[pair[0]] = ''
tmpEqu[pair[1]] = ''
if eval(''.join(tmpEqu)) == 24:
equ = tmpEqu
return ''.join(equ)
if len(num) == 1:
try:
if abs(eval(num[0]) - 24) < 0.00001:
re = cleanBracketsFor24(num[0][1:-1])
return f'{re} = 24'
except ZeroDivisionError:
return None
for k in '+-*/':
for i in range(0, len(num)):
for j in range(0, len(num)):
if (i != j):
tmp = solve24(num[0:min(i, j)] +
num[min(i, j) + 1:max(i, j)] +
num[max(i, j) + 1:len(num)] +
[f'({num[i]} {k} {num[j]})'])
if tmp: return tmp
"""
@bot.onCommand(r'算\s*((\d{1,5}\s+){3}\d{1,5}\s*)$')
async def reply24(self):
tmp_reg = re.search(r'算\s*((\d{1,5}\s+){3}\d{1,5}\s*)$', self.msg)
res = solve24(tmp_reg.group(1).split())
return res if res else getReply("24_failed")
"""
#@bot.onCommand(r'Tu')
#async def testTu(self):
#print("Sending...")
#url = "dijian.png"
#ans = f"[CQ:image,file={url}]"
#print(ans)
#return ans
@bot.onCommand(r'help|Help')
async def dispHelp(self):
str = "Help message:\n\n"
str = str+"#Help||#help: 命令列表\n"
str = str+"@bot 帮助: 获取直播姬命令列表\n"
str = str+"#查雀魂 + 用户名||#查四麻 + 用户名||#查四麻 + 用户名 + 场次: 雀魂四麻水表\n"
str = str+"#查三麻 + 用户名: 雀魂三麻水表\n"
str = str+"#查天凤 + 用户名: 天凤四麻水表\n"
str = str+"#围观 + 用户名||#取关 + 用户名||#围观列表: 天凤观战\n"
str = str+"#猫猫||#猫图: 来点猫片\n"
str = str+"#狗图||#狗狗: 来点汪汪\n"
#str = str+"#draw||#draw + 正整数: OP来抽卡\n"
str = str+"\n还想要其它的?等DLC吧!\n"
return str
"""
@bot.onCommand(r'扔(.*)')
async def replyThrow(self):
tmp_reg = re.search(r'扔(.*)', self.msg)
keyword = tmp_reg.group(1)
res = ''
if keyword in ['骰子', '色子']:
res = str(random.randint(1, 6))
elif keyword in ['硬币']:
coin_re = ['正', '反']
res = coin_re[random.randint(0, 1)]
elif '复读' in keyword or 'bot' in keyword:
res = self.getReply('throw_bot')
if not res:
res = f'#扔[CQ:at,qq={self.context["user_id"]}]'
elif not keyword:
res = self.getReply('throw_nothing')
else:
tmp_re = Bot.TRASHES.get(keyword)
if tmp_re is not None:
res = f'{keyword}:{tmp_re}\n'
tmp_dict = dict()
for key, value in Bot.NEW_TRASHES.items():
if keyword.lower() in key.lower():
tmp_dict[key] = value
for key, value in sorted(tmp_dict.items(),
key=lambda d: len(d[0])):
res = f'{res}{key}:{value}\n'
res = res.strip('\n')
return res if res else self.getReply('throw_failed')
"""
"""
@bot.onCommand(r'(([A-Za-z]{2}|)\d{3})是什么')
async def replyCourseInfo(self):
tmp_reg = re.search(r'(([A-Za-z]{2}|)\d{3})是什么', self.msg)
keyword = tmp_reg.group(1)
resDict = {}
for item in Bot.COURSES:
courseCode = item['courseCode']
if keyword in courseCode:
if resDict.get(courseCode) is None:
resDict[courseCode] = item.copy()
resDict[courseCode]['termName'] = []
resDict[courseCode]['teams'] = []
resDict[courseCode]['termName'].append(item['termName'])
resDict[courseCode]['teams'].extend(item['teams'])
res = ''
for item in resDict.values():
res += f"课程代码:{item['courseCode']}\n"
res += f"课程名称:{item['courseName']} {item['courseNameEn']}\n"
res += f"学分:{item['credit']}\n"
res += f"开设时间:{', '.join(set(item['termName']))}\n"
res += f"教师:{', '.join(set(item['teams']))}\n\n"
res = res.strip()
return res if res else self.getReply("course_failed")
"""
""""
@bot.onCommand(r'查([\s\S]{2,})|([\s\S]{2,})是谁')
async def replyContacts(self):
tmp_reg = re.search(r'查([\s\S]{2,})|([\s\S]{2,})是谁', self.msg.lstrip('#'))
keyword = tmp_reg.group(1)
if not keyword:
keyword = tmp_reg.group(2)
keyword = keyword.lower()
res = ""
for item in Bot.CONTACTS:
if keyword in item['name'].lower() or keyword in ''.join(
[word[0] for word in item['name'].lower().split() if word]):
res += f"姓名:{item['name']}\n职称:{item['title']}\n办公室:{item['office']}\n电话:{item['tel']}\n邮箱:{item['email']}\n照片:[CQ:image,file={item['imageUrl']}]\n\n"
return res.strip() if res else self.getReply("contacts_failed")
"""
@bot.onCommand(r'\b查雀魂\b|\b查四麻\b|\bmspt\b|\bmspt4\b')
async def searchQueHun(self):
#return "功能正在维护中,明天再来吧~"
name = str((self.msg)).replace("#查雀魂","")
name = name.replace("#mspt4","")
name = name.replace("#查四麻","")
name = name.replace("#mspt","")[1:]
if (name == ''):
return "你叫什么名字?"
res = majsoul2.searchQueHun2(name,4)
return res
@bot.onCommand(r'\b查三麻\b|\bmspt3\b')
async def searchQueHunThree(self):
#return "功能正在维护中,明天再来吧~"
name = str((self.msg)).replace("#查三麻","")
name = name.replace("#mspt3","")[1:]
if (name == ''):
return "你叫什么名字?"
res = majsoul2.searchQueHun2(name,3)
return res
@bot.onCommand(r'\b查天凤\b|\bthpt\b')
async def searchTenhou(self):
name = str((self.msg)).replace("#查天凤","")
name = name.replace("#thpt","")[1:]
if (name == ''):
return "你叫什么名字?"
#print(name)
#res = tenhou.getinfo(name)
res = tenhou2.getinfo(name)
return res
@bot.onCommand('语录')
async def yulu(self):
res = spider.getapic()
return f"[CQ:image,file={str(res)}]"
@bot.onCommand(r'\bwgadd\b')
async def wgadd(self):
id = str((self.msg)).replace("#wgadd","")[1:]
res = wg.wgadd(id,self.fromGroup)
return res
@bot.onCommand(r'\bwgdel\b')
async def wgdel(self):
id = str((self.msg)).replace("#wgdel","")[1:]
res = wg.wgdel(id,self.fromGroup)
return res
@bot.onCommand(r'\bwglist\b')
async def wglist(self):
res = wg.wglist(self.fromGroup)
return res
@bot.onCommand(r'\bwg\b')
async def wgp(self):
htag = str((self.msg)).replace("#wg","")[1:]
print("Htag is:"+str(htag))
if htag[0] != "#":
return "无效的观战码"
else:
return wg.wg(self.fromGroup,htag)
@bot.onCommand(r'\bwg-\b')
async def wgd(self):
htag = str((self.msg)).replace("#wg-","")[1:]
if htag[0] != "#":
return "无效的观战码"
else:
return wg.wgminus(self.fromGroup,htag)
@bot.onCommand(r'\bwginfo\b')
async def wginfo(self):
htag = str((self.msg)).replace("#wginfo","")[1:]
if htag[0] != "#":
return "无效的观战码"
else:
res = await wg.wginfo(self.fromGroup,htag)
if res != "":
return res
@bot.on(r'呆呆|呆呆兽|呆哥')
async def daidai(self):
if random.random() <= 0.02:
res = spider.getapic()
return f"[CQ:image,file={str(res)}]"
else:
print("No daidai")
@bot.onCommand('weapon')
async def weapon(self):
res = ""
size = len(Bot.WEAPONS)
item = Bot.WEAPONS[random.randint(0,size-1)]
res += f"你抽到了{item['level']}星武器{item['name']}!\n[CQ:image,file={item['pic']},cache=0]\n"
#print(res)
return res.strip()
"""
@bot.onCommand(r'([\s\S]{2,})教什么')
async def replyTeaching(self):
tmp_reg = re.search(r'([\s\S]{2,})教什么', self.msg.lstrip('#'))
keyword = tmp_reg.group(1)
keyword = keyword.lower()
res = ""
reDict = dict()
for item in Bot.COURSES:
if keyword in [name.lower() for name in item['teams']]:
if reDict.get(item['courseCode']) is None: reDict[item['courseCode']] = []
reDict[item['courseCode']].append(item['termName'])
for key, value in reDict.items():
res += f"{key} ({', '.join(value)})\n"
return res.strip() if res else self.getReply("teaching_failed")
"""
"""
@bot.onCommand(r'第几周')
async def replyWeek(self):
d1 = datetime.now()
d2 = datetime(2020, 5, 11)
return f"今天 {d1.strftime('%m / %d')} 第 {(d1 - d2).days // 7 + 1} 周"
"""
@bot.onCommand(r'\b猫图\b|\b猫猫\b')
async def replyKitty(self):
url = "https://api.thecatapi.com/v1/images/search"
tmpJson = json.loads(await aioGet(url))
imgUrl = tmpJson[0]["url"]
return f"[CQ:image,file={imgUrl}]"
@bot.onCommand(r'\b狗图\b|\b狗狗\b')
async def replyKitty(self):
url = "https://dog.ceo/api/breeds/image/random"
tmpJson = json.loads(await aioGet(url))
imgUrl = tmpJson["message"]
return f"[CQ:image,file={imgUrl}]"
@bot.onCommand(r'狐狸图|大白猫|fubuki|小狐狸')
async def replyFubuki(self):
path = random.choice(self.fbkImgs)
imgPath = six.moves.urllib_parse.urljoin(
"file://",
six.moves.urllib.request.pathname2url(os.path.abspath(path)))
return f"[CQ:image,file={imgPath}]"
"""
@bot.onCommand(r'深度抽象\s*(.+)')
async def replyDeepAbstract(self):
tmp_reg = re.search(r'深度抽象\s*(.+)', self.msg)
return self.eh.transform(tmp_reg.group(1), isDeepMode=True)
"""
"""
@bot.onCommand(r'抽象\s*(.+)')
async def replyDeepAbstract(self):
tmp_reg = re.search(r'抽象\s*(.+)', self.msg)
return self.eh.transform(tmp_reg.group(1))
"""
"""
@bot.on(r'^xm|^羡慕')
async def checkXM(self):
myrand = random.random()
if myrand <= Bot.SETTINGS['XM_PR']:
if '呸,老子才不羡慕' + re.sub(r'^xm|^羡慕', '',
self.msg) not in self.selfArr:
return self.msg
elif myrand >= 1 - Bot.SETTINGS['NOT_XM_PR']: # 避免循环羡慕
if self.msg not in self.selfArr and \
'呸,老子才不羡慕' + re.sub(r'^xm|^羡慕', '', self.msg) \
not in self.selfArr:
return '呸,老子才不羡慕' + re.sub(r'^xm|^羡慕', '', self.msg)
"""
"""
@bot.on(r"问:([\s\S]+)\s+答:([\s\S]+)")
async def study(self):
reg = re.search("问:([\s\S]+)\s+答:([\s\S]+)", self.msg)
if not reg or len(reg.groups()) != 2:
return
ask = reg.groups()[0]
ans = reg.groups()[1]
if len(ask) <= 2:
return self.getReply("question_too_short")
if len(ans) >= 500:
return self.getReply("answer_too_long")
if Bot.STUDIED_REPLY.get(ask) is None:
Bot.STUDIED_REPLY[ask] = {
"answers": list(),
"adders": list(),
"from": list()
}
for i, answer in enumerate(Bot.STUDIED_REPLY[ask]["answers"]):
if ans == answer and Bot.STUDIED_REPLY[ask]["from"][
i] == self.context['group_id']:
return self.getReply("study_failed")
Bot.STUDIED_REPLY[ask]["answers"].append(ans)
Bot.STUDIED_REPLY[ask]["adders"].append(self.context['user_id'])
Bot.STUDIED_REPLY[ask]["from"].append(self.context['group_id'])
with open("data/study.json", 'w', encoding='UTF-8') as f:
json.dump(Bot.STUDIED_REPLY, f, ensure_ascii=False, indent=4)
return self.getReply("study_successful")
"""
# check keywords
@bot.on(r'tql|nb|ydl|ddw')
async def checkKeywords(self):
if random.random() <= Bot.SETTINGS['KW_REPEAT_PR']:
return self.msg
"""
@bot.onCommand(r'draw ([1-9][0-9]{0,3})|draw')
async def drawCards(self):
totaldraw = 0
if(self.msg == '#draw'):
totaldraw = 10
else:
tempmsg = re.search(r"draw ([1-9][0-9]{0,3})",self.msg)
totaldraw = int(tempmsg.group(1))
if(totaldraw > 90):
return "你真当自己是亿万富翁啦?"
string = "原神抽卡"+str(totaldraw)+"连 "+'[CQ:at,qq='+str(self.sender)+']'+"\n"
result = []
last4count = 0
last5count = 0
flag5 = 0
for count in range(1,totaldraw+1):
randnum = random.randint(1,1000)
if randnum <= 6 + 1/(1/994 + math.exp(-(last5count-72)/1.5)):
result.append(2)
if(last5count <= 20):
flag5 = 1
elif(last5count > 70 and flag5==0):
flag5 = 2
last5count = 0
last4count = 0
elif randnum <= 106 + 1/(1/994 + math.exp(-(last5count-72)/1.5)):
result.append(1)
last4count = 0
last5count += 1
else:
result.append(0)
last4count += 1
last5count += 1
if (last4count == 10):
if result[count-1] != 2:
result[count-1] = 1
last4count = 0
count = 0
if(self.msg == '#draw'):
for item in result:
string += getWeapon(item+3)
return string
for item in result:
if item == 0:
string += "3 ★ "
elif item == 1:
string += "4 ★ "
else:
string += "5 ★!!! "
count += 1
if(count %10 == 0):
string += "\n"
if flag5 == 2:
string += "吃保底了,小伙子挺非嗷\n"
elif flag5 == 1:
string += "?有欧狗\n"
return string
"""
""""
@bot.onCommand(r'色图|涩图')
async def getSetu(self):
try:
if self.fromGroup not in Bot.SETTINGS['ADMIN_GROUP'] and \
self.context['user_id'] not in Bot.SETTINGS['ADMIN']:
return
res = ""
if re.search(r'他的|她的|它的', self.msg):
url = "https://yande.re/post.json?limit=1&" + \
f"tags=uncensored&page={random.randint(1, 1000)}"
tmpJson = json.loads(await aioGet(url))
res = tmpJson[0]['file_url']
elif re.search(r'色图', self.msg):
tag = random.choice(
['breasts', 'stockings', 'thighhighs', 'cleavage'])
url = "https://konachan.net/post.json?" + \
f"tags={tag}&page={random.randint(1, 100)}"
tmpJson = json.loads(await aioGet(url))
urls = [
item['file_url'] for item in tmpJson
if item['rating'] == 's'
]
res = random.choice(urls)
elif re.search(r'涩图', self.msg):
tag = random.choice(['uncensored'])
url = "https://konachan.net/post.json?" + \
f"tags={tag}&page={random.randint(1, 100)}"
tmpJson = json.loads(await aioGet(url))
urls = [
item['file_url'] for item in tmpJson
if item['rating'] != 's'
]
res = random.choice(urls)
return f"[CQ:image,file={res}]"
except:
return getReply("get_image_failed")
"""
# reply call
@bot.on()
async def replyAT(self):
if (re.search(r'\[CQ:at,qq={}\]'.format(self.context['self_id']),
self.msg)):
if(not((re.search("帮助",self.msg))
or(re.search("关注",self.msg))
or(re.search("开启全体",self.msg))
or(re.search("关闭全体",self.msg))
or(re.search("开启动态",self.msg))
or(re.search("关闭动态",self.msg))
or(re.search("关闭直播",self.msg))
or(re.search("开启直播",self.msg))
or(re.search("关闭权限",self.msg))
or(re.search("开启权限",self.msg))
or(re.search("关注列表",self.msg))
or(re.search("取关",self.msg))
or(re.search(r'CQ:reply',self.msg))
)):
return random.choice(Bot.FIXED_REPLY_DICT['AT'])
# random repeat
@bot.on()
async def rndRepeat(self):
if self.lastMsgInvl > Bot.SETTINGS['MIN_MSG_INVL'] and len(
self.msg) <= Bot.SETTINGS['MAX_RND_RE_LEN']:
if not(re.search('[CQ:image]',self.msg) or re.search('[CQ:face]',self.msg)):
myrand = random.random()
if (myrand <= Bot.SETTINGS['RND_REPEAT_PR']):
self.lastMsgInvl = 0
return self.msg
"""
# random XM
@bot.on()
async def rndXM(self):
if len(self.msg) > 2 and not re.search(r'^xm|^羡慕|\?$|?$', self.msg) and (
not re.search(r'\[CQ:image,file=.+\]',self.msg)) and(
not re.search(r'\[CQ:face,id=+\]',self.msg)):
if (self.lastMsgInvl > Bot.SETTINGS['MIN_MSG_INVL']
and len(self.msg) <= Bot.SETTINGS['MAX_RND_XM_LEN']):
myrand = random.random()
if (myrand <= Bot.SETTINGS['RND_XM_PR']):
self.lastMsgInvl = 0
self.msg = re.sub(r'^我的|^我', '', self.msg)
return '羡慕' + self.msg
"""
# check meme & regex replys
@bot.on()
async def checkMeme(self):
for regex, words in Bot.REG_REPLY_DICT.items():
if re.search(regex, self.msg):
if(random.random()>0.9):
return random.choice(words)
@bot.on()
async def replyStudy(self):
msg = re.sub(r'\[CQ:image,file=.+\]', '', self.msg)
for key, value in Bot.STUDIED_REPLY.items():
if key in msg:
reList = []
for i, answer in enumerate(value['answers']):
if value["from"][i] in [self.context['group_id']
] + Bot.SETTINGS['ADMIN_GROUP']:
reList.append(answer)
return random.choice(reList)
# followd repeat
@bot.on()
async def followRepeat(self):
tmpMsg = self.msg
if '[CQ:image' in self.msg and 'url' in self.msg and 'file' in self.msg:
tmpMsg = ','.join(self.msg.split(',')[:2]) + ']'
if self.mbrArr.count(tmpMsg) >= 3 and (
not re.search(r'\[CQ:image,file=.+\]',self.msg)) and(
random.random() >= 0.6
):
self.mbrArr = [''] * 10
return self.msg
return bot
async def test():
repeater = Repeater()
re = await repeater.responseMsg({
'message': "#manuel教什么",
'self_id': 123456,
'user_id': 1623464502,
'group_id': 925787157
})
print(re)
def getWeapon(a):
res = ""
if a == 5:
item = Bot.WEAPONS[random.randint(0,9)]
elif a == 4:
item = Bot.WEAPONS[random.randint(10,27)]
elif a == 3:
item = Bot.WEAPONS[random.randint(28,len(Bot.WEAPONS)-1)]
else:
return "No image!\n"
res += f"[CQ:image,file={item['pic']},cache=0]"
return res.strip()
if __name__ == "__main__":
import asyncio
loop = asyncio.get_event_loop()
tasks = [test(), test()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()