From 28b6ac32180d7ce8243ab261e927c79beff1c7b6 Mon Sep 17 00:00:00 2001 From: dmunozv04 <39565245+dmunozv04@users.noreply.github.com> Date: Sun, 5 Nov 2023 17:56:51 +0100 Subject: [PATCH] General fixes to make deepsource happier --- iSponsorBlockTV/api_helpers.py | 23 ++++++++++------- iSponsorBlockTV/config_setup.py | 46 +++++++++++++++++++-------------- iSponsorBlockTV/dial_client.py | 25 ++++++++---------- iSponsorBlockTV/helpers.py | 3 ++- iSponsorBlockTV/main.py | 6 ++--- iSponsorBlockTV/ytlounge.py | 11 ++++---- main_tui.py | 2 +- 7 files changed, 62 insertions(+), 54 deletions(-) diff --git a/iSponsorBlockTV/api_helpers.py b/iSponsorBlockTV/api_helpers.py index 0d83a5d..b462aa1 100644 --- a/iSponsorBlockTV/api_helpers.py +++ b/iSponsorBlockTV/api_helpers.py @@ -1,8 +1,7 @@ -from cache import AsyncTTL, AsyncLRU +from cache import AsyncLRU from .conditional_ttl_cache import AsyncConditionalTTL from . import constants, dial_client from hashlib import sha256 -from asyncio import create_task from aiohttp import ClientSession import html @@ -39,17 +38,17 @@ async def get_vid_id(self, title, artist, api_key, web_session): return for i in data["items"]: - if (i["id"]["kind"] != "youtube#video"): + if i["id"]["kind"] != "youtube#video": continue title_api = html.unescape(i["snippet"]["title"]) artist_api = html.unescape(i["snippet"]["channelTitle"]) if title_api == title and artist_api == artist: - return (i["id"]["videoId"], i["snippet"]["channelId"]) + return i["id"]["videoId"], i["snippet"]["channelId"] return @AsyncLRU(maxsize=100) async def is_whitelisted(self, vid_id): - if (self.apikey and self.channel_whitelist): + if self.apikey and self.channel_whitelist: channel_id = await self.__get_channel_id(vid_id) # check if channel id is in whitelist for i in self.channel_whitelist: @@ -66,7 +65,7 @@ async def __get_channel_id(self, vid_id): if "error" in data: return data = data["items"][0] - if (data["kind"] != "youtube#video"): + if data["kind"] != "youtube#video": return return data["snippet"]["channelId"] @@ -114,9 +113,11 @@ async def get_segments(self, vid_id): url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed async with self.web_session.get(url, headers=headers, params=params) as response: response_json = await response.json() - if(response.status != 200): + if response.status != 200: response_text = await response.text() - print(f"Error getting segments for video {vid_id}, hashed as {vid_id_hashed}. Code: {response.status} - {response_text}") + print( + f"Error getting segments for video {vid_id}, hashed as {vid_id_hashed}. " + f"Code: {response.status} - {response_text}") return ([], True) for i in response_json: if str(i["videoID"]) == str(vid_id): @@ -124,7 +125,8 @@ async def get_segments(self, vid_id): break return self.process_segments(response_json) - def process_segments(self, response): + @staticmethod + def process_segments(response): segments = [] ignore_ttl = True try: @@ -153,7 +155,8 @@ def process_segments(self, response): return (segments, ignore_ttl) async def mark_viewed_segments(self, UUID): - """Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled. Lets the contributor know that someone skipped the segment (thanks)""" + """Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled. + Lets the contributor know that someone skipped the segment (thanks)""" if self.skip_count_tracking: for i in UUID: url = constants.SponsorBlock_api + "viewedVideoSponsorTime/" diff --git a/iSponsorBlockTV/config_setup.py b/iSponsorBlockTV/config_setup.py index f9ae79d..8a034af 100644 --- a/iSponsorBlockTV/config_setup.py +++ b/iSponsorBlockTV/config_setup.py @@ -1,14 +1,13 @@ -import json import asyncio -import sys import aiohttp from . import api_helpers, ytlounge -async def pair_device(loop): + +async def pair_device(): try: lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV") pairing_code = input("Enter pairing code (found in Settings - Link with TV code): ") - pairing_code = int(pairing_code.replace("-", "").replace(" ","")) # remove dashes and spaces + pairing_code = int(pairing_code.replace("-", "").replace(" ", "")) # remove dashes and spaces print("Pairing...") paired = await lounge_controller.pair(pairing_code) if not paired: @@ -23,7 +22,8 @@ async def pair_device(loop): except Exception as e: print(f"Failed to pair device: {e}") return - + + def main(config, debug: bool) -> None: print("Welcome to the iSponsorBlockTV cli setup wizard") loop = asyncio.get_event_loop_policy().get_event_loop() @@ -31,12 +31,14 @@ def main(config, debug: bool) -> None: loop.set_debug(True) asyncio.set_event_loop(loop) if hasattr(config, "atvs"): - print("The atvs config option is deprecated and has stopped working. Please read this for more information on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2") + print( + "The atvs config option is deprecated and has stopped working. Please read this for more information on " + "how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2") if input("Do you want to remove the legacy 'atvs' entry (the app won't start with it present)? (y/n) ") == "y": del config["atvs"] devices = config.devices while not input(f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n": - task = loop.create_task(pair_device(loop)) + task = loop.create_task(pair_device()) loop.run_until_complete(task) device = task.result() if device: @@ -56,41 +58,46 @@ def main(config, debug: bool) -> None: apikey = input("Enter your API key: ") config["apikey"] = apikey config.apikey = apikey - + skip_categories = config.skip_categories if skip_categories: if input("Skip categories already specified. Change them? (y/n) ") == "y": categories = input( - "Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access interaction poi_highlight intro outro preview filler music_offtopic]:\n" + "Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access " + "interaction poi_highlight intro outro preview filler music_offtopic]:\n" ) skip_categories = categories.replace(",", " ").split(" ") - skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings + skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings else: categories = input( - "Enter skip categories (space or comma sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n" + "Enter skip categories (space or comma sepparated) Options: [sponsor, selfpromo, exclusive_access, " + "interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n" ) skip_categories = categories.replace(",", " ").split(" ") - skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings + skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings config.skip_categories = skip_categories channel_whitelist = config.channel_whitelist if input("Do you want to whitelist any channels from being ad-blocked? (y/n) ") == "y": - if(not apikey): - print("WARNING: You need to specify an API key to use this function, otherwise the program will fail to start.\nYou can add one by re-running this setup wizard.") + if not apikey: + print( + "WARNING: You need to specify an API key to use this function, otherwise the program will fail to " + "start.\nYou can add one by re-running this setup wizard.") web_session = aiohttp.ClientSession() + api_helper = api_helpers.ApiHelper(config, web_session) while True: channel_info = {} channel = input("Enter a channel name or \"/exit\" to exit: ") if channel == "/exit": break - task = loop.create_task(api_helpers.search_channels(channel, apikey, web_session)) + task = loop.create_task(api_helper.search_channels(channel, apikey, web_session)) loop.run_until_complete(task) results = task.result() if len(results) == 0: print("No channels found") continue - + for i in range(len(results)): print(f"{i}: {results[i][1]} - Subs: {results[i][2]}") print("5: Enter a custom channel ID") @@ -115,9 +122,10 @@ def main(config, debug: bool) -> None: channel_whitelist.append(channel_info) # Close web session asynchronously loop.run_until_complete(web_session.close()) - + config.channel_whitelist = channel_whitelist - - config.skip_count_tracking = not input("Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) ") == "n" + + config.skip_count_tracking = not input( + "Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) ") == "n" print("Config finished") config.save() diff --git a/iSponsorBlockTV/dial_client.py b/iSponsorBlockTV/dial_client.py index aa553cd..8c0bd74 100644 --- a/iSponsorBlockTV/dial_client.py +++ b/iSponsorBlockTV/dial_client.py @@ -1,24 +1,19 @@ """Send out a M-SEARCH request and listening for responses.""" import asyncio import socket - -import aiohttp import ssdp from ssdp import network import xmltodict -''' -Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”), with or without modification, -are permitted provided that the following conditions are met: -● Redistributions of the DIAL Specification must retain the above copyright notice, this list of conditions and the following -disclaimer. -● Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright notice, this -list of conditions and the following disclaimer. -● Redistributions of implementations of the DIAL Specification in binary form must include the above copyright notice. -● The DIAL mark, the NETFLIX mark and the names of contributors to the DIAL Specification may not be used to endorse or -promote specifications, software, products, or any other materials derived from the DIAL Specification without specific prior -written permission. The DIAL mark is owned by Netflix and information on licensing the DIAL mark is available at -www.dial-multiscreen.org.''' +'''Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”), +with or without modification, are permitted provided that the following conditions are met: ● Redistributions of the +DIAL Specification must retain the above copyright notice, this list of conditions and the following disclaimer. ● +Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright +notice, this list of conditions and the following disclaimer. ● Redistributions of implementations of the DIAL +Specification in binary form must include the above copyright notice. ● The DIAL mark, the NETFLIX mark and the names +of contributors to the DIAL Specification may not be used to endorse or promote specifications, software, products, +or any other materials derived from the DIAL Specification without specific prior written permission. The DIAL mark +is owned by Netflix and information on licensing the DIAL mark is available at www.dial-multiscreen.org.''' ''' MIT License @@ -44,6 +39,7 @@ SOFTWARE.''' '''Modified code from https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py''' + def get_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(0) @@ -69,6 +65,7 @@ def clear(self): def __call__(self): return self + def response_received(self, response: ssdp.messages.SSDPResponse, addr): headers = response.headers headers = {k.lower(): v for k, v in headers} diff --git a/iSponsorBlockTV/helpers.py b/iSponsorBlockTV/helpers.py index 9649bea..481e45d 100644 --- a/iSponsorBlockTV/helpers.py +++ b/iSponsorBlockTV/helpers.py @@ -43,7 +43,8 @@ def __init__(self, data_dir): def validate(self): if hasattr(self, "atvs"): print( - "The atvs config option is deprecated and has stopped working. Please read this for more information on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2", + "The atvs config option is deprecated and has stopped working. Please read this for more information " + "on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2", ) print("Exiting in 10 seconds...") time.sleep(10) diff --git a/iSponsorBlockTV/main.py b/iSponsorBlockTV/main.py index 98c65ac..a059c26 100644 --- a/iSponsorBlockTV/main.py +++ b/iSponsorBlockTV/main.py @@ -3,8 +3,6 @@ import time import logging from . import api_helpers, ytlounge -from .constants import youtube_client_blacklist -import traceback class DeviceListener: @@ -115,7 +113,7 @@ async def cancel(self): self.cancelled = True try: self.task.cancel() - except Exception as e: + except Exception: pass @@ -141,7 +139,7 @@ def main(config, debug): tasks.append(loop.create_task(device.refresh_auth_loop())) try: loop.run_forever() - except KeyboardInterrupt as e: + except KeyboardInterrupt: print("Keyboard interrupt detected, cancelling tasks and exiting...") loop.run_until_complete(finish(devices)) finally: diff --git a/iSponsorBlockTV/ytlounge.py b/iSponsorBlockTV/ytlounge.py index e465f43..ebedbcc 100644 --- a/iSponsorBlockTV/ytlounge.py +++ b/iSponsorBlockTV/ytlounge.py @@ -4,10 +4,10 @@ import pyytlounge from .constants import youtube_client_blacklist - # Temporary imports from pyytlounge.api import api_base from pyytlounge.wrapper import NotLinkedException, desync + create_task = asyncio.create_task @@ -30,7 +30,7 @@ async def _watchdog(self): await asyncio.sleep(35) # YouTube sends at least a message every 30 seconds (no-op or any other) try: self.subscribe_task.cancel() - except Exception as e: + except Exception: pass # Subscribe to the lounge and start the watchdog @@ -46,7 +46,7 @@ async def subscribe_monitored(self, callback): # Process a lounge subscription event def _process_event(self, event_id: int, event_type: str, args): - #print(f"YtLoungeApi.__process_event({event_id}, {event_type}, {args})") + # print(f"YtLoungeApi.__process_event({event_id}, {event_type}, {args})") # (Re)start the watchdog try: self.subscribe_task_watchdog.cancel() @@ -54,7 +54,8 @@ def _process_event(self, event_id: int, event_type: str, args): pass finally: self.subscribe_task_watchdog = asyncio.create_task(self._watchdog()) - # A bunch of events useful to detect ads playing, and the next video before it starts playing (that way we can get the segments) + # A bunch of events useful to detect ads playing, and the next video before it starts playing (that way we + # can get the segments) if event_type == "onStateChange": data = args[0] # print(data) @@ -111,7 +112,7 @@ def _process_event(self, event_id: int, event_type: str, args): device_info = json.loads(device.get("deviceInfo", "")) if device_info.get("clientName", "") in youtube_client_blacklist: self._sid = None - self._gsession = None # Force disconnect + self._gsession = None # Force disconnect # elif event_type == "onAutoplayModeChanged": # data = args[0] # create_task(self.set_auto_play_mode(data["autoplayMode"] == "ENABLED")) diff --git a/main_tui.py b/main_tui.py index 4b7a9af..0b78dbe 100644 --- a/main_tui.py +++ b/main_tui.py @@ -2,4 +2,4 @@ from iSponsorBlockTV.helpers import Config config = Config("data/config.json") -setup_wizard.main(config) \ No newline at end of file +setup_wizard.main(config)