-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCommandAndControl.py
130 lines (93 loc) · 4.14 KB
/
CommandAndControl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#===========================#
# I M P O R T S #
#===========================#
import os
import platform
import socket
import threading
#===========================#
# C O L O R S #
#===========================#
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
BACKGROUND_MAGENTA = '\033[105m'
BACKGROUND_WHITE = '\033[47m'
CYAN = '\033[36m'
WHITE = '\033[37m'
ORANGE = YELLOW
os.system("color") # Comment out on Linux
#===========================#
client_socket = None
#===========================#
def handle_client(client_socket, client_address):
try:
print(f"| [{bcolors.OKGREEN}>{bcolors.ENDC}] Connection from [{bcolors.OKCYAN}{client_address}{bcolors.ENDC}].")
while True:
data = client_socket.recv(1024) # Buffer size is 1024 bytes
if not data:
break
print(f"| [{bcolors.OKGREEN}>{bcolors.ENDC}] Received: [{bcolors.WARNING}{data.decode('utf-8', errors='replace')}{bcolors.ENDC}]")
except ConnectionError:
print(f"| [{bcolors.WARNING}x{bcolors.ENDC}] Connection error with [{bcolors.OKCYAN}{client_address}{bcolors.ENDC}].")
finally:
client_socket.close()
print(f"| [{bcolors.WARNING}x{bcolors.ENDC}] Connection with [{bcolors.OKCYAN}{client_address}{bcolors.ENDC}] closed.")
#===========================#
def connect_to_server(host='127.0.0.1', port=12345, message="Hello!"):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
try:
# Connect to the server
client_socket.connect((host, port))
print(f"| [{bcolors.OKGREEN}>{bcolors.ENDC}] Connected to [{bcolors.WARNING}{host}:{port}{bcolors.ENDC}]")
# Send the message
client_socket.sendall(message.encode('utf-8'))
print(f"| [{bcolors.OKGREEN}>{bcolors.ENDC}] Sent: [{bcolors.OKCYAN}{message}{bcolors.ENDC}]")
# Close the connection
client_socket.close()
print(f"| [{bcolors.FAIL}x{bcolors.ENDC}] Connection closed.")
except Exception as e:
print(f"| [{bcolors.WARNING}x{bcolors.ENDC}] Error: {e}")
#===========================#
def start_server(host='0.0.0.0', port=12345):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
# Bind the server to the host and port
server_socket.bind((host, port))
# Listen for incoming connections
server_socket.listen()
print(f"| [{bcolors.OKGREEN}>{bcolors.ENDC}] Listening on [{bcolors.WARNING}{host}:{port}{bcolors.ENDC}]...")
while True:
try:
client_socket, client_address = server_socket.accept()
client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print(f"| [{bcolors.WARNING}x{bcolors.ENDC}] Server shutting down...")
break
except Exception as e:
print(f"| [{bcolors.WARNING}x{bcolors.ENDC}] Error: {e}")
#===========================#
if __name__ == "__main__":
# Get host information.
hostname = socket.gethostname()
hostIP = socket.gethostbyname(hostname)
systemInfo = platform.system()
processorInfo = platform.processor()
alive_host = False
# Print host information.
print(f"| [{bcolors.OKGREEN}>{bcolors.ENDC}] [{bcolors.WARNING}" + hostIP + f"{bcolors.ENDC}] [{bcolors.OKGREEN}" + hostname + f"{bcolors.ENDC}] [{bcolors.OKGREEN}" + systemInfo + f"{bcolors.ENDC}] [{bcolors.OKGREEN}" + processorInfo + f"{bcolors.ENDC}]")
start_server(port=12345)