Skip to content

Commit

Permalink
Add multi stream support (#6)
Browse files Browse the repository at this point in the history
* Add multi stream

* Remove XML support

* Formatting

* Add pls support

* Delete kingfmva.pls
  • Loading branch information
cidrblock authored May 28, 2024
1 parent d5b1dd0 commit 7b4578a
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 129 deletions.
122 changes: 73 additions & 49 deletions tunein/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from urllib.parse import urlparse, urlunparse

import requests
from tunein.xml_helper import xml2dict
from tunein.parse import fuzzy_match


Expand All @@ -15,6 +16,14 @@ def title(self):
def artist(self):
return self.raw.get("artist", "")

@property
def bit_rate(self):
return self.raw.get("bitrate")

@property
def media_type(self):
return self.raw.get("media_type")

@property
def image(self):
return self.raw.get("image")
Expand Down Expand Up @@ -44,72 +53,87 @@ def dict(self):
"""Return a dict representation of the station."""
return {
"artist": self.artist,
"bit_rate": self.bit_rate,
"description": self.description,
"image": self.image,
"match": self.match(),
"media_type": self.media_type,
"stream": self.stream,
"title": self.title,
}


class TuneIn:
search_url = "http://opml.radiotime.com/Search.ashx"
featured_url = "http://opml.radiotime.com/Browse.ashx?c=local" # local stations
search_url = "https://opml.radiotime.com/Search.ashx"
featured_url = "http://opml.radiotime.com/Browse.ashx" # local stations
stnd_query = {"formats": "mp3,aac,ogg,html,hls", "render": "json"}

@staticmethod
def get_stream_url(url):
res = requests.get(url)
for url in res.text.splitlines():
if (len(url) > 4):
if url[-3:] == 'm3u':
return url[:-4]
if url[-3:] == 'pls':
res2 = requests.get(url)
# Loop through the data looking for the first url
for line in res2.text.splitlines():
if line.startswith("File1="):
return line[6:]
else:
return url
def get_stream_urls(url):
_url = urlparse(url)
for scheme in ("http", "https"):
url_str = urlunparse(
_url._replace(scheme=scheme, query=_url.query + "&render=json")
)
res = requests.get(url_str)
try:
res.raise_for_status()
break
except requests.exceptions.RequestException:
continue
else:
return "Failed to get stream url"

stations = res.json().get("body", {})

for station in stations:
if station.get("url", "").endswith(".pls"):
res = requests.get(station["url"])
file1 = [line for line in res.text.split("\n") if line.startswith("File1=")]
if file1:
station["url"] = file1[0].split("File1=")[1]

return stations

@staticmethod
def featured():
res = requests.post(TuneIn.featured_url)
return list(TuneIn._get_stations(res))
res = requests.post(
TuneIn.featured_url,
data={**TuneIn.stnd_query, **{"c": "local"}}
)
stations = res.json().get("body", [{}])[0].get("children", [])
return list(TuneIn._get_stations(stations))

@staticmethod
def search(query):
res = requests.post(TuneIn.search_url, data={"query": query, "formats": "mp3,aac,ogg,html,hls"})
return list(TuneIn._get_stations(res, query))
res = requests.post(
TuneIn.search_url,
data={**TuneIn.stnd_query, **{"query": query}}
)
stations = res.json().get("body", [])
return list(TuneIn._get_stations(stations, query))

@staticmethod
def _get_stations(res: requests.Response, query: str = ""):
res = xml2dict(res.text)
if not res.get("opml"):
return
# stations might be nested based on Playlist/Search
outline = res['opml']['body']["outline"]

if not isinstance(outline, list):
return
if outline[0].get("outline"):
stations = outline[0]["outline"]
else:
stations = outline

def _get_stations(stations: requests.Response, query: str = ""):
for entry in stations:
try:
if not entry.get("key") == "unavailable" \
and entry.get("type") == "audio" \
and entry.get("item") == "station":
yield TuneInStation(
{"stream": TuneIn.get_stream_url(entry["URL"]),
"url": entry["URL"],
"title": entry.get("current_track") or entry.get("text"),
"artist": entry.get("text"),
"description": entry.get("subtext"),
"image": entry.get("image"),
"query": query
})
except:
if (
entry.get("key") == "unavailable"
or entry.get("type") != "audio"
or entry.get("item") != "station"
):
continue
streams = TuneIn.get_stream_urls(entry["URL"])
for stream in streams:
yield TuneInStation(
{
"stream": stream["url"],
"bitrate": stream["bitrate"],
"media_type": stream["media_type"],
"url": entry["URL"],
"title": entry.get("current_track") or entry.get("text"),
"artist": entry.get("text"),
"description": entry.get("subtext"),
"image": entry.get("image"),
"query": query,
}
)
57 changes: 44 additions & 13 deletions tunein/subcommands/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from tunein import TuneIn


class Ansi:
"""ANSI escape codes."""

Expand All @@ -26,17 +27,19 @@ class Ansi:
YELLOW = "\x1B[33m"
GREY = "\x1B[90m"


NOPRINT_TRANS_TABLE = {
i: None for i in range(0, sys.maxunicode + 1) if not chr(i).isprintable()
}


class Search:
"""The search subcommand for tunein."""

def __init__(self: Search, args: argparse.Namespace) -> None:
"""Initialize the search subcommand."""
self._args: argparse.Namespace = args

def run(self: Search) -> None:
"""Run the search subcommand."""
tunein = TuneIn()
Expand All @@ -45,7 +48,7 @@ def run(self: Search) -> None:
if not stations:
print(f"No results for {self._args.station}")
sys.exit(1)
stations.sort(key=lambda x: x["match"], reverse=True)
stations.sort(key=lambda x: (x["match"], x["bit_rate"]), reverse=True)
for station in stations:
station["title"] = self._printable(station["title"])
station["artist"] = self._printable(station["artist"])
Expand All @@ -55,28 +58,58 @@ def run(self: Search) -> None:
print(json.dumps(stations, indent=4))
elif self._args.format == "table":
max_widths = {}
columns = ["title", "artist", "description"]
columns = ["title", "bit_rate", "media_type", "artist", "description"]
for column in columns:
max_width = max(len(str(station[column])) for station in stations)
max_width = max(
[len(str(station[column])) for station in stations] + [len(column)]
)
if column == "description":
term_width = shutil.get_terminal_size().columns
remaining = term_width - max_widths["title"] - max_widths["artist"] - 2
max_width = min(max_width, remaining)
remaining = (
term_width
- sum(
[
max_widths[column]
for column in columns
if column != "description"
]
)
- len(columns)
- 1
)
max_width = min(max_width, remaining)
max_widths[column] = max_width

print(" ".join(column.ljust(max_widths[column]).capitalize() for column in columns))
print(
" ".join(
column.ljust(max_widths[column]).capitalize().replace("_", " ")
for column in columns
)
)
print(" ".join("-" * max_widths[column] for column in columns))
for station in stations:
line_parts = []
# title as link
link = self._term_link(station.get("stream"), station["title"])
line_parts.append(f"{link}{' '*(max_widths['title']-len(station['title']))}")
line_parts.append(
f"{link}{' '*(max_widths['title']-len(station['title']))}"
)
# bit rate
line_parts.append(
str(station["bit_rate"]).ljust(max_widths["bit_rate"])
)
# media type
line_parts.append(
str(station["media_type"]).ljust(max_widths["media_type"])
)
# artist
line_parts.append(str(station["artist"]).ljust(max_widths["artist"]))
# description clipped
line_parts.append(str(station["description"])[:max_widths["description"]])
line_parts.append(
str(station["description"])[: max_widths["description"]]
)
print(" ".join(line_parts))

@staticmethod
def _term_link(uri: str, label: str) -> str:
"""Return a link.
Expand All @@ -97,12 +130,10 @@ def _term_link(uri: str, label: str) -> str:
@staticmethod
def _printable(string: str) -> str:
"""Replace non-printable characters in a string.
Args:
string: The string to replace non-printable characters in.
Returns:
The string with non-printable characters replaced.
"""
return string.translate(NOPRINT_TRANS_TABLE)


67 changes: 0 additions & 67 deletions tunein/xml_helper.py

This file was deleted.

0 comments on commit 7b4578a

Please sign in to comment.