-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredis_utils.py
64 lines (45 loc) · 1.76 KB
/
redis_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import redis
import json
from dotenv import load_dotenv
import os
load_dotenv()
HOST = os.getenv("REDIS_HOST") if os.getenv("REDIS_HOST") else 'localhost'
redis_client = redis.Redis(host=HOST, port='6379', db=0)
auth_store = redis.Redis(host=HOST, port='6379', db=1)
blacklist_sync_client = redis.Redis(host=HOST, port='6379', db=2)
def check_spamhaus_token() -> str:
if (auth_store.get("spamhaus_token")):
spamhaus_token = auth_store.get("spamhaus_token")
return spamhaus_token
return None
def add_spamhaus_token(token: str):
response = auth_store.set("spamhaus_token", token)
auth_store.expire("spamhaus_token", 60 * 60 * 12) # 12 Hours Expiry Time
return response
def check_ip_report(ip: str):
key = f"{ip}"
return redis_client.get(key)
def add_ip_report(ip: str, port: int, package_name: str, report: str):
key = f"{ip}"
json_report = report
response = redis_client.set(key, json_report)
redis_client.expire(key, 60 * 60 * 24 * 7) # Expire after 7 days
return response
def check_domain_report(domain: str):
key = f"{domain}"
return redis_client.get(key)
def add_domain_report(domain: str, package_name: str, report: str):
key = f"{domain}"
json_report = report
response = redis_client.set(key, json_report)
redis_client.expire(key, 60 * 60 * 24 * 7) # Expire after 7 days
return response
# Utils for Blacklists Sync (SETS)
def add_ip_to_blacklist(ip: str):
blacklist_sync_client.sadd("blacklist:ips", ip)
def add_domain_to_blacklist(domain: str):
blacklist_sync_client.sadd("blacklist:domains", domain)
def get_blacklisted_ips():
return blacklist_sync_client.smembers("blacklist:ips")
def get_blacklisted_domains():
return blacklist_sync_client.smembers("blacklist:domains")