Skip to content

Commit

Permalink
Integrate logging to application/features/.
Browse files Browse the repository at this point in the history
  • Loading branch information
IreneLime committed Sep 27, 2024
1 parent dba5f6c commit 49c74e1
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 16 deletions.
28 changes: 24 additions & 4 deletions application/features/Audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from .Connection import Connection
from .. import app
from ..utils import find_free_port, get_headers_dict_from_str, local_auth
import logging.config

logger = logging.getLogger(__name__)

AUDIO_CONNECTIONS = {}

Expand All @@ -57,13 +60,16 @@ def __del__(self):
super().__del__()

def connect(self, *args, **kwargs):
logger.debug("Audio: Establishing Audio connection")
return super().connect(*args, **kwargs)

def launch_audio(self):
try:
logger.debug("Audio: Launching Audio connection. Forwarding request to 127.0.0.1, port 0.")
self.transport = self.client.get_transport()
self.remote_port = self.transport.request_port_forward('127.0.0.1', 0)
except Exception as e:
logger.warning("Audio: exception raised during launch audio: {}".format(e))
return False, str(e)

self.id = uuid.uuid4().hex
Expand All @@ -83,11 +89,12 @@ def handleConnected(self):
headers = get_headers_dict_from_str(headers)
if not local_auth(headers=headers, abort_func=self.close):
# local auth failure
logger.warning("AudioWebSocket: Local Authentication Failure")
return

audio_id = self.request.path[1:]
if audio_id not in AUDIO_CONNECTIONS:
print(f'AudioWebSocket: Requested audio_id={audio_id} does not exist.')
logger.warning(f'AudioWebSocket: Requested audio_id={audio_id} does not exist.')
self.close()
return

Expand All @@ -103,26 +110,31 @@ def handleConnected(self):
f'module-null-sink sink_name={sink_name} '
exit_status, _, stdout, _ = self.audio.exec_command_blocking(load_module_command)
if exit_status != 0:
print(f'AudioWebSocket: audio_id={audio_id}: unable to load pactl module-null-sink sink_name={sink_name}')
logger.warning(f'AudioWebSocket: audio_id={audio_id}: unable to load pactl module-null-sink sink_name={sink_name}')
return
load_module_stdout_lines = stdout.readlines()
logger.debug("AudioWebSocket: Load Module: {}".format(load_module_stdout_lines))
self.module_id = int(load_module_stdout_lines[0])

keep_launching_ffmpeg = True

def ffmpeg_launcher():
logger.debug("AudioWebSocket: ffmpeg_launcher thread started")
# TODO: support requesting audio format from the client
launch_ffmpeg_command = f'killall ffmpeg; ffmpeg -f pulse -i "{sink_name}.monitor" ' \
f'-ac 2 -acodec pcm_s16le -ar 44100 -f s16le "tcp://127.0.0.1:{self.audio.remote_port}"'
# keep launching if the connection is not accepted in the writer() below
while keep_launching_ffmpeg:
logger.debug(f"AudioWebSocket: Launch ffmpeg: {launch_ffmpeg_command}")
_, ffmpeg_stdout, _ = self.audio.client.exec_command(launch_ffmpeg_command)
ffmpeg_stdout.channel.recv_exit_status()
# if `ffmpeg` launches successfully, `ffmpeg_stdout.channel.recv_exit_status` should not return
logger.debug("AudioWebSocket: ffmpeg_launcher thread ended")

ffmpeg_launcher_thread = threading.Thread(target=ffmpeg_launcher)

def writer():
logger.debug("AudioWebSocket: writer thread started")
channel = self.audio.transport.accept(FFMPEG_LOAD_TIME * TRY_FFMPEG_MAX_COUNT)

nonlocal keep_launching_ffmpeg
Expand All @@ -138,14 +150,17 @@ def writer():
while True:
data = channel.recv(AUDIO_BUFFER_SIZE)
if not data:
logger.debug("AudioWebSocket: Close audio socket connection")
self.close()
break
buffer += data
if len(buffer) >= AUDIO_BUFFER_SIZE:
compressed = zlib.compress(buffer, level=4)
logger.debug(f"AudioWebSocket: Send compressed message of size {len(compressed)}")
self.sendMessage(compressed)
# print(len(compressed) / len(buffer) * 100)
buffer = b''
logger.debug("AudioWebSocket: write thread ended")

writer_thread = threading.Thread(target=writer)

Expand All @@ -155,8 +170,10 @@ def writer():
def handleClose(self):
if self.module_id is not None:
# unload the module before leaving
logger.debug(f"AudioWebSocket: Unload module {self.module_id}")
self.audio.client.exec_command(f'pactl unload-module {self.module_id}')

logger.debug(f"AudioWebSocket: End audio socket {self.audio.id} connection")
del AUDIO_CONNECTIONS[self.audio.id]
del self.audio

Expand All @@ -166,18 +183,21 @@ def handleClose(self):
# if we are in debug mode, run the server in the second round
if not app.debug or os.environ.get("WERKZEUG_RUN_MAIN") == "true":
AUDIO_PORT = find_free_port()
print("AUDIO_PORT =", AUDIO_PORT)
# print("AUDIO_PORT =", AUDIO_PORT)
logger.debug("Audio: Audio port {}".format(AUDIO_PORT))

if os.environ.get('SSL_CERT_PATH') is None:
logger.debug("Audio: SSL Certification Path not set. Generating self-signing certificate")
# no certificate provided, generate self-signing certificate
audio_server = SimpleSSLWebSocketServer('127.0.0.1', AUDIO_PORT, AudioWebSocket,
ssl_context=generate_adhoc_ssl_context())
else:
logger.debug("Audio: SSL Certification Path exists")
import ssl

audio_server = SimpleSSLWebSocketServer('0.0.0.0', AUDIO_PORT, AudioWebSocket,
certfile=os.environ.get('SSL_CERT_PATH'),
keyfile=os.environ.get('SSL_KEY_PATH'),
version=ssl.PROTOCOL_TLS)

threading.Thread(target=audio_server.serveforever, daemon=True).start()
threading.Thread(target=audio_server.serveforever, daemon=True).start()
34 changes: 33 additions & 1 deletion application/features/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
import paramiko
import select

import logging.config

logger = logging.getLogger(__name__)

class ForwardServerHandler(socketserver.BaseRequestHandler):
def handle(self):
self.server: ForwardServer
try:
logger.debug("Connection: Open forward server channel")
chan = self.server.ssh_transport.open_channel(
"direct-tcpip",
("127.0.0.1", self.server.chain_port),
Expand All @@ -49,6 +53,14 @@ def handle(self):
("127.0.0.1", self.server.chain_port),
)
)
logger.debug(
"Connected! Tunnel open %r -> %r -> %r"
% (
self.request.getpeername(),
chan.getpeername(),
("127.0.0.1", self.server.chain_port),
)
)

try:
while True:
Expand All @@ -67,6 +79,7 @@ def handle(self):
print(e)

try:
logger.debug("Connection: Close forward server channel")
chan.close()
self.server.shutdown()
except Exception as e:
Expand Down Expand Up @@ -102,6 +115,9 @@ def __del__(self):
def _client_connect(self, client: paramiko.SSHClient,
host, username,
password=None, key_filename=None, private_key_str=None):
if self._jump_channel != None:
logger.debug("Connection: Connection initialized through Jump Channel")
logger.debug(f"Connection: Connecting to {username}@{host}")
if password is not None:
client.connect(host, username=username, password=password, timeout=15, sock=self._jump_channel)
elif key_filename is not None:
Expand All @@ -128,13 +144,16 @@ def _init_jump_channel(self, host, username, **auth_methods):

self._jump_client = paramiko.SSHClient()
self._jump_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logger.debug(f"Connection: Initialize Jump Client for connection to {username}@remote.ecf.utoronto.ca")
self._client_connect(self._jump_client, 'remote.ecf.utoronto.ca', username, **auth_methods)
logger.debug(f"Connection: Open Jump channel connection to {host} at port 22")
self._jump_channel = self._jump_client.get_transport().open_channel('direct-tcpip',
(host, 22),
('127.0.0.1', 22))

def connect(self, host: str, username: str, **auth_methods):
try:
logger.debug(f"Connection: Connection attempt to {username}@{host}")
self._init_jump_channel(host, username, **auth_methods)
self._client_connect(self.client, host, username, **auth_methods)
except Exception as e:
Expand All @@ -145,6 +164,7 @@ def connect(self, host: str, username: str, **auth_methods):
self.host = host
self.username = username

logger.debug(f"Connection: Successfully connected to {username}@{host}")
return True, ''

@staticmethod
Expand All @@ -160,9 +180,11 @@ def ssh_keygen(key_filename=None, key_file_obj=None, public_key_comment=''):

# save the private key
if key_filename is not None:
logger.debug(f"Connection: RSA SSH private key written to {key_filename}")
rsa_key.write_private_key_file(key_filename)
elif key_file_obj is not None:
rsa_key.write_private_key(key_file_obj)
logger.debug(f"Connection: RSA SSH private key written to {key_file_obj}")
else:
raise ValueError('Neither key_filename nor key_file_obj is provided.')

Expand Down Expand Up @@ -192,6 +214,7 @@ def save_keys(self, key_filename=None, key_file_obj=None, public_key_comment='')
"mkdir -p ~/.ssh && chmod 700 ~/.ssh && echo '%s' >> ~/.ssh/authorized_keys" % pub_key)
if exit_status != 0:
return False, "Connection::save_keys: unable to save public key; Check for disk quota and permissions with any conventional SSH clients. "
logger.debug("Connection: Public ssh key saved to remove server ~/.ssh/authorized_keys")

return True, ""

Expand All @@ -217,22 +240,28 @@ def exec_command_blocking_large(self, command):
return '\n'.join(stdout) + '\n' + '\n'.join(stderr)

def _port_forward_thread(self, local_port, remote_port):
logger.debug("Connection: Port forward thread started")
forward_server = ForwardServer(("", local_port), ForwardServerHandler)

forward_server.ssh_transport = self.client.get_transport()
forward_server.chain_port = remote_port

forward_server.serve_forever()
forward_server.server_close()
logger.debug("Connection: Port forward thread ended")

def port_forward(self, *args):
forwarding_thread = threading.Thread(target=self._port_forward_thread, args=args)
forwarding_thread.start()

def is_eecg(self):
if 'eecg' in self.host:
logger.debug("Connection: Target host is eecg")
return 'eecg' in self.host

def is_ecf(self):
if 'ecf' in self.host:
logger.debug("Connection: Target host is ecf")
return 'ecf' in self.host

def is_uoft(self):
Expand All @@ -256,6 +285,9 @@ def is_load_high(self):

my_pts_count = len(output) - 1 # -1: excluding the `uptime` output

logger.debug(f"Connection: pts count: {pts_count}; my pts count: {my_pts_count}")
logger.debug(f"Connection: load sum: {load_sum}")

if pts_count > my_pts_count: # there are more terminals than mine
return True
elif load_sum > 1.0:
Expand All @@ -265,4 +297,4 @@ def is_load_high(self):
# it is considered a high load
return True

return False
return False
19 changes: 17 additions & 2 deletions application/features/SFTP.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
from paramiko.sftp_client import SFTPClient

from .Connection import Connection
import logging.config

logger = logging.getLogger(__name__)

class SFTP(Connection):
def __init__(self):
Expand All @@ -41,11 +43,13 @@ def __del__(self):
super().__del__()

def connect(self, *args, **kwargs):
logger.debug("SFTP: Establishing SFTP connection")
status, reason = super().connect(*args, **kwargs)
if not status:
return status, reason

try:
logger.debug("SFTP: Open SFTP client connection")
self.sftp = self.client.open_sftp()
self.sftp.chdir(".")
except Exception as e:
Expand All @@ -59,6 +63,7 @@ def ls(self, path=""):
self.sftp.chdir(path)
cwd = self.sftp.getcwd()
attrs = self.sftp.listdir_attr(cwd)
logger.debug(f"SFTP: ls {cwd}: {attrs}")

file_list = []
# TODO: should support uid and gid later
Expand Down Expand Up @@ -100,9 +105,11 @@ def _zip_dir_recurse(self, z, parent, file):
mode = self.sftp.stat(fullpath).st_mode
if stat.S_ISREG(mode):
# print(fullpath, 'is file')
logger.debug(f"SFTP: {fullpath} is a file")
z.write_iter(fullpath, self.dl_generator(fullpath))
elif stat.S_ISDIR(mode):
# print(fullpath, 'is dir')
logger.debug(f"SFTP: {fullpath} is a directory")
# TODO: support writing an empty directory if len(dir_ls)==0
# That will involve modifying the zipstream library
dir_ls = self.sftp.listdir(fullpath)
Expand All @@ -116,10 +123,12 @@ def _zip_dir_recurse(self, z, parent, file):
return

def zip_generator(self, cwd, file_list):
logger.debug(f"SFTP: zip_generator on directory: {cwd}")
self.sftp.chdir(cwd)
z = zipstream.ZipFile(compression=zipstream.ZIP_DEFLATED, allowZip64=True)

for file in file_list:
logger.debug(f"SFTP: zip_generator on file: {file}")
self._zip_dir_recurse(z, '', file)

return z
Expand All @@ -128,6 +137,7 @@ def rename(self, cwd, old, new):
try:
self.sftp.chdir(cwd)
self.sftp.rename(old, new)
logger.debug(f"SFTP: Rename {old} in directory {cwd} to {new}")
except Exception as e:
return False, repr(e)

Expand All @@ -136,9 +146,11 @@ def rename(self, cwd, old, new):
def chmod(self, path, mode, recursive):
_, _, _, stderr = self.exec_command_blocking(
f'chmod {"-R" if recursive else ""} {"{0:0{1}o}".format(mode, 3)} "{path}"')
logger.debug("SFTP: Change permission on " + path + " to '{0:0{1}o}'".format(mode, 3))
stderr_lines = stderr.readlines()
if len(stderr_lines) != 0:
print(stderr_lines)
logger.warning(f"SFTP: chmod failed due to {stderr_lines}")
# print(stderr_lines)
return False, 'Some files were not applied with the request mode due to permission issues.'

return True, ''
Expand All @@ -159,6 +171,7 @@ def rm(self, cwd, file_list):

counter += 1
if counter == 50:
logger.debug(f"SFTP: Execute Command {' '.join(cmd_list)}")
_, _, stderr = self.client.exec_command(" ".join(cmd_list))
stderr_lines = stderr.readlines()
if len(stderr_lines) != 0:
Expand All @@ -169,6 +182,7 @@ def rm(self, cwd, file_list):
counter = 0
cmd_list = [f'cd "{cwd}" && rm -rf']

logger.debug(f"SFTP: Execute Command {' '.join(cmd_list)}")
_, _, stderr = self.client.exec_command(" ".join(cmd_list))
stderr_lines = stderr.readlines()
if len(stderr_lines) != 0:
Expand All @@ -180,8 +194,9 @@ def rm(self, cwd, file_list):
return True, ''

def mkdir(self, cwd, name):
logger.debug(f"SFTP: Make directory {name} at {cwd}")
_, _, _, stderr = self.exec_command_blocking(f'cd "{cwd}"&& mkdir "{name}"')
stderr_lines = stderr.readlines()
if len(stderr_lines) != 0:
return False, stderr_lines[0]
return True, ''
return True, ''
Loading

0 comments on commit 49c74e1

Please sign in to comment.