forked from ma6174/acmjudger
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprotect.py
executable file
·87 lines (74 loc) · 1.96 KB
/
protect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/env python
# coding=utf-8
"""
为了服务器安全,隐藏部分sql语句。
程序执行需要相关数据库和测试数据。
"""
import os
import sys
import shutil
import subprocess
import codecs
import logging
import shlex
import time
import config
import lorun
import threading
import MySQLdb
from db import run_sql
from Queue import Queue
def low_level():
try:
os.setuid(int(os.popen("id -u %s" % "nobody").read()))
except:
pass
try:
# 降低程序运行权限,防止恶意代码
os.setuid(int(os.popen("id -u %s" % "nobody").read()))
except:
logging.error("please run this program as root!")
sys.exit(-1)
# 初始化队列
q = Queue(config.queue_size)
# 创建数据库锁,保证一个时间只能一个程序都写数据库
dblock = threading.Lock()
def start_work_thread():
'''开启工作线程'''
for i in range(config.count_thread):
t = threading.Thread(target=worker)
t.deamon = True
t.start()
def start_get_task():
'''开启获取任务线程'''
t = threading.Thread(target=put_task_into_queue, name="get_task")
t.deamon = True
t.start()
def check_thread():
low_level()
'''检测评测程序是否存在,小于config规定数目则启动新的'''
while True:
try:
if threading.active_count() < config.count_thread + 2:
logging.info("start new thread")
t = threading.Thread(target=worker)
t.deamon = True
t.start()
time.sleep(1)
except:
pass
def start_protect():
'''开启守护进程'''
low_level()
t = threading.Thread(target=check_thread, name="check_thread")
t.deamon = True
t.start()
def main():
low_level()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s --- %(message)s',)
start_get_task()
start_work_thread()
start_protect()
if __name__ == '__main__':
main()