Skip to content

Commit

Permalink
General fixes to make deepsource happier
Browse files Browse the repository at this point in the history
  • Loading branch information
dmunozv04 committed Nov 5, 2023
1 parent 138d8bd commit 28b6ac3
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 54 deletions.
23 changes: 13 additions & 10 deletions iSponsorBlockTV/api_helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from cache import AsyncTTL, AsyncLRU
from cache import AsyncLRU
from .conditional_ttl_cache import AsyncConditionalTTL
from . import constants, dial_client
from hashlib import sha256
from asyncio import create_task
from aiohttp import ClientSession
import html

Expand Down Expand Up @@ -39,17 +38,17 @@ async def get_vid_id(self, title, artist, api_key, web_session):
return

for i in data["items"]:
if (i["id"]["kind"] != "youtube#video"):
if i["id"]["kind"] != "youtube#video":
continue
title_api = html.unescape(i["snippet"]["title"])
artist_api = html.unescape(i["snippet"]["channelTitle"])
if title_api == title and artist_api == artist:
return (i["id"]["videoId"], i["snippet"]["channelId"])
return i["id"]["videoId"], i["snippet"]["channelId"]
return

@AsyncLRU(maxsize=100)
async def is_whitelisted(self, vid_id):
if (self.apikey and self.channel_whitelist):
if self.apikey and self.channel_whitelist:
channel_id = await self.__get_channel_id(vid_id)
# check if channel id is in whitelist
for i in self.channel_whitelist:
Expand All @@ -66,7 +65,7 @@ async def __get_channel_id(self, vid_id):
if "error" in data:
return
data = data["items"][0]
if (data["kind"] != "youtube#video"):
if data["kind"] != "youtube#video":
return
return data["snippet"]["channelId"]

Expand Down Expand Up @@ -114,17 +113,20 @@ async def get_segments(self, vid_id):
url = constants.SponsorBlock_api + "skipSegments/" + vid_id_hashed
async with self.web_session.get(url, headers=headers, params=params) as response:
response_json = await response.json()
if(response.status != 200):
if response.status != 200:
response_text = await response.text()
print(f"Error getting segments for video {vid_id}, hashed as {vid_id_hashed}. Code: {response.status} - {response_text}")
print(
f"Error getting segments for video {vid_id}, hashed as {vid_id_hashed}. "
f"Code: {response.status} - {response_text}")
return ([], True)
for i in response_json:
if str(i["videoID"]) == str(vid_id):
response_json = i
break
return self.process_segments(response_json)

def process_segments(self, response):
@staticmethod
def process_segments(response):
segments = []
ignore_ttl = True
try:
Expand Down Expand Up @@ -153,7 +155,8 @@ def process_segments(self, response):
return (segments, ignore_ttl)

async def mark_viewed_segments(self, UUID):
"""Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled. Lets the contributor know that someone skipped the segment (thanks)"""
"""Marks the segments as viewed in the SponsorBlock API, if skip_count_tracking is enabled.
Lets the contributor know that someone skipped the segment (thanks)"""
if self.skip_count_tracking:
for i in UUID:
url = constants.SponsorBlock_api + "viewedVideoSponsorTime/"
Expand Down
46 changes: 27 additions & 19 deletions iSponsorBlockTV/config_setup.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import json
import asyncio
import sys
import aiohttp
from . import api_helpers, ytlounge

async def pair_device(loop):

async def pair_device():
try:
lounge_controller = ytlounge.YtLoungeApi("iSponsorBlockTV")
pairing_code = input("Enter pairing code (found in Settings - Link with TV code): ")
pairing_code = int(pairing_code.replace("-", "").replace(" ","")) # remove dashes and spaces
pairing_code = int(pairing_code.replace("-", "").replace(" ", "")) # remove dashes and spaces
print("Pairing...")
paired = await lounge_controller.pair(pairing_code)
if not paired:
Expand All @@ -23,20 +22,23 @@ async def pair_device(loop):
except Exception as e:
print(f"Failed to pair device: {e}")
return



def main(config, debug: bool) -> None:
print("Welcome to the iSponsorBlockTV cli setup wizard")
loop = asyncio.get_event_loop_policy().get_event_loop()
if debug:
loop.set_debug(True)
asyncio.set_event_loop(loop)
if hasattr(config, "atvs"):
print("The atvs config option is deprecated and has stopped working. Please read this for more information on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2")
print(
"The atvs config option is deprecated and has stopped working. Please read this for more information on "
"how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2")
if input("Do you want to remove the legacy 'atvs' entry (the app won't start with it present)? (y/n) ") == "y":
del config["atvs"]
devices = config.devices
while not input(f"Paired with {len(devices)} Device(s). Add more? (y/n) ") == "n":
task = loop.create_task(pair_device(loop))
task = loop.create_task(pair_device())
loop.run_until_complete(task)
device = task.result()
if device:
Expand All @@ -56,41 +58,46 @@ def main(config, debug: bool) -> None:
apikey = input("Enter your API key: ")
config["apikey"] = apikey
config.apikey = apikey

skip_categories = config.skip_categories
if skip_categories:
if input("Skip categories already specified. Change them? (y/n) ") == "y":
categories = input(
"Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access interaction poi_highlight intro outro preview filler music_offtopic]:\n"
"Enter skip categories (space or comma sepparated) Options: [sponsor selfpromo exclusive_access "
"interaction poi_highlight intro outro preview filler music_offtopic]:\n"
)
skip_categories = categories.replace(",", " ").split(" ")
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings
else:
categories = input(
"Enter skip categories (space or comma sepparated) Options: [sponsor, selfpromo, exclusive_access, interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n"
"Enter skip categories (space or comma sepparated) Options: [sponsor, selfpromo, exclusive_access, "
"interaction, poi_highlight, intro, outro, preview, filler, music_offtopic:\n"
)
skip_categories = categories.replace(",", " ").split(" ")
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings
skip_categories = [x for x in skip_categories if x != ''] # Remove empty strings
config.skip_categories = skip_categories

channel_whitelist = config.channel_whitelist
if input("Do you want to whitelist any channels from being ad-blocked? (y/n) ") == "y":
if(not apikey):
print("WARNING: You need to specify an API key to use this function, otherwise the program will fail to start.\nYou can add one by re-running this setup wizard.")
if not apikey:
print(
"WARNING: You need to specify an API key to use this function, otherwise the program will fail to "
"start.\nYou can add one by re-running this setup wizard.")
web_session = aiohttp.ClientSession()
api_helper = api_helpers.ApiHelper(config, web_session)
while True:
channel_info = {}
channel = input("Enter a channel name or \"/exit\" to exit: ")
if channel == "/exit":
break

task = loop.create_task(api_helpers.search_channels(channel, apikey, web_session))
task = loop.create_task(api_helper.search_channels(channel, apikey, web_session))
loop.run_until_complete(task)
results = task.result()
if len(results) == 0:
print("No channels found")
continue

for i in range(len(results)):
print(f"{i}: {results[i][1]} - Subs: {results[i][2]}")
print("5: Enter a custom channel ID")
Expand All @@ -115,9 +122,10 @@ def main(config, debug: bool) -> None:
channel_whitelist.append(channel_info)
# Close web session asynchronously
loop.run_until_complete(web_session.close())

config.channel_whitelist = channel_whitelist

config.skip_count_tracking = not input("Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) ") == "n"

config.skip_count_tracking = not input(
"Do you want to report skipped segments to sponsorblock. Only the segment UUID will be sent? (y/n) ") == "n"
print("Config finished")
config.save()
25 changes: 11 additions & 14 deletions iSponsorBlockTV/dial_client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
"""Send out a M-SEARCH request and listening for responses."""
import asyncio
import socket

import aiohttp
import ssdp
from ssdp import network
import xmltodict

'''
Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”), with or without modification,
are permitted provided that the following conditions are met:
● Redistributions of the DIAL Specification must retain the above copyright notice, this list of conditions and the following
disclaimer.
● Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright notice, this
list of conditions and the following disclaimer.
● Redistributions of implementations of the DIAL Specification in binary form must include the above copyright notice.
● The DIAL mark, the NETFLIX mark and the names of contributors to the DIAL Specification may not be used to endorse or
promote specifications, software, products, or any other materials derived from the DIAL Specification without specific prior
written permission. The DIAL mark is owned by Netflix and information on licensing the DIAL mark is available at
www.dial-multiscreen.org.'''
'''Redistribution and use of the DIAL DIscovery And Launch protocol specification (the “DIAL Specification”),
with or without modification, are permitted provided that the following conditions are met: ● Redistributions of the
DIAL Specification must retain the above copyright notice, this list of conditions and the following disclaimer. ●
Redistributions of implementations of the DIAL Specification in source code form must retain the above copyright
notice, this list of conditions and the following disclaimer. ● Redistributions of implementations of the DIAL
Specification in binary form must include the above copyright notice. ● The DIAL mark, the NETFLIX mark and the names
of contributors to the DIAL Specification may not be used to endorse or promote specifications, software, products,
or any other materials derived from the DIAL Specification without specific prior written permission. The DIAL mark
is owned by Netflix and information on licensing the DIAL mark is available at www.dial-multiscreen.org.'''

'''
MIT License
Expand All @@ -44,6 +39,7 @@
SOFTWARE.'''
'''Modified code from https://github.com/codingjoe/ssdp/blob/main/ssdp/__main__.py'''


def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
Expand All @@ -69,6 +65,7 @@ def clear(self):

def __call__(self):
return self

def response_received(self, response: ssdp.messages.SSDPResponse, addr):
headers = response.headers
headers = {k.lower(): v for k, v in headers}
Expand Down
3 changes: 2 additions & 1 deletion iSponsorBlockTV/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def __init__(self, data_dir):
def validate(self):
if hasattr(self, "atvs"):
print(
"The atvs config option is deprecated and has stopped working. Please read this for more information on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2",
"The atvs config option is deprecated and has stopped working. Please read this for more information "
"on how to upgrade to V2: \nhttps://github.com/dmunozv04/iSponsorBlockTV/wiki/Migrate-from-V1-to-V2",
)
print("Exiting in 10 seconds...")
time.sleep(10)
Expand Down
6 changes: 2 additions & 4 deletions iSponsorBlockTV/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import time
import logging
from . import api_helpers, ytlounge
from .constants import youtube_client_blacklist
import traceback


class DeviceListener:
Expand Down Expand Up @@ -115,7 +113,7 @@ async def cancel(self):
self.cancelled = True
try:
self.task.cancel()
except Exception as e:
except Exception:
pass


Expand All @@ -141,7 +139,7 @@ def main(config, debug):
tasks.append(loop.create_task(device.refresh_auth_loop()))
try:
loop.run_forever()
except KeyboardInterrupt as e:
except KeyboardInterrupt:
print("Keyboard interrupt detected, cancelling tasks and exiting...")
loop.run_until_complete(finish(devices))
finally:
Expand Down
11 changes: 6 additions & 5 deletions iSponsorBlockTV/ytlounge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import pyytlounge
from .constants import youtube_client_blacklist


# Temporary imports
from pyytlounge.api import api_base
from pyytlounge.wrapper import NotLinkedException, desync

create_task = asyncio.create_task


Expand All @@ -30,7 +30,7 @@ async def _watchdog(self):
await asyncio.sleep(35) # YouTube sends at least a message every 30 seconds (no-op or any other)
try:
self.subscribe_task.cancel()
except Exception as e:
except Exception:
pass

# Subscribe to the lounge and start the watchdog
Expand All @@ -46,15 +46,16 @@ async def subscribe_monitored(self, callback):

# Process a lounge subscription event
def _process_event(self, event_id: int, event_type: str, args):
#print(f"YtLoungeApi.__process_event({event_id}, {event_type}, {args})")
# print(f"YtLoungeApi.__process_event({event_id}, {event_type}, {args})")
# (Re)start the watchdog
try:
self.subscribe_task_watchdog.cancel()
except:
pass
finally:
self.subscribe_task_watchdog = asyncio.create_task(self._watchdog())
# A bunch of events useful to detect ads playing, and the next video before it starts playing (that way we can get the segments)
# A bunch of events useful to detect ads playing, and the next video before it starts playing (that way we
# can get the segments)
if event_type == "onStateChange":
data = args[0]
# print(data)
Expand Down Expand Up @@ -111,7 +112,7 @@ def _process_event(self, event_id: int, event_type: str, args):
device_info = json.loads(device.get("deviceInfo", ""))
if device_info.get("clientName", "") in youtube_client_blacklist:
self._sid = None
self._gsession = None # Force disconnect
self._gsession = None # Force disconnect
# elif event_type == "onAutoplayModeChanged":
# data = args[0]
# create_task(self.set_auto_play_mode(data["autoplayMode"] == "ENABLED"))
Expand Down
2 changes: 1 addition & 1 deletion main_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
from iSponsorBlockTV.helpers import Config

config = Config("data/config.json")
setup_wizard.main(config)
setup_wizard.main(config)

0 comments on commit 28b6ac3

Please sign in to comment.