-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathplatform.py
96 lines (78 loc) · 2.68 KB
/
platform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# coding=utf-8
import traceback
import socket
import json
import yaml
import sys
import os
ENVIRON = {"ROOTPATH": os.path.split(os.path.realpath(sys.argv[0]))[0] + "/"}
class CMDProcessor(object):
PLUGINS = {}
def __init__(self):
self.ENVIRON = ENVIRON
def process(self, cmd):
if cmd not in self.PLUGINS.keys():
return {"code": 0, "msg": "no method"}
try:
r = self.PLUGINS[cmd](self).pRun(cmd)
return r
except Exception as e:
print("[system] Plugin \033[1;37;46m %s \033[0m failed to run" % cmd)
print("\033[31m[ERROR] %s\033[0m" % e)
print("\033[31m%s\033[0m" % traceback.format_exc())
return {"code": 0, "msg": "plugin error"}
@classmethod
def plugin_register(cls, plugin_name):
def wrapper(plugin):
cls.PLUGINS.update({plugin_name: plugin})
return plugin
return wrapper
@classmethod
def core_register(cls, core_name):
def wrapper(core):
setattr(cls, core_name, core())
return core
return wrapper
@classmethod
def core_register_auto(cls, core_name, loads={}):
info = {"ENVIRON": ENVIRON}
for x in loads:
info[x] = cls.loadSet(loads[x])
def wrapper(core):
try:
setattr(cls, core_name, core(info).auto())
except Exception as e:
print(
"[system] Core \033[1;37;46m %s \033[0m failed to load" % core_name
)
print("\033[31m[ERROR] %s\033[0m" % e)
print("\033[31m%s\033[0m" % traceback.format_exc())
return core
return wrapper
@staticmethod
def getEnv():
return ENVIRON
@staticmethod
def loadSet(uri):
uri = uri.replace("{ROOTPATH}", ENVIRON["ROOTPATH"])
try:
with open(uri, "r", encoding="UTF-8") as c:
sfx = uri.split(".")[-1]
if sfx == "json":
return json.load(c)
elif sfx == "yml" or sfx == "yaml":
return yaml.safe_load(c)
else:
return c
except Exception as e:
print("[system] \033[1;37;46m %s \033[0m failed to load" % uri)
print("\033[31m[ERROR] %s\033[0m" % e)
print("\033[31m%s\033[0m" % traceback.format_exc())
return None
@staticmethod
def isPortInUse(port):
port = int(port)
if port >= 0 and port <= 65535:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("localhost", port)) == 0
return False