Skip to content

Commit

Permalink
Refactor & Fix MDBList pagination and filtering logic.
Browse files Browse the repository at this point in the history
Simplified and restructured functions for fetching and processing data from MDBList. Improved caching behavior, streamlined unfiltered vs filtered results handling, and removed unnecessary complexity in pagination logic. Updated background tasks to fetch missing metadata more effectively.
  • Loading branch information
mhdzumair committed Jan 9, 2025
1 parent 8002690 commit 458fc0d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 130 deletions.
76 changes: 25 additions & 51 deletions db/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async def get_mdblist_meta_list(
skip: int = 0,
limit: int = 25,
) -> list[schemas.Meta]:
"""Get a list of metadata entries from MDBList with improved pagination handling"""
"""Get a list of metadata entries from MDBList"""
if not user_data.mdblist_config:
return []

Expand Down Expand Up @@ -191,24 +191,20 @@ async def get_mdblist_meta_list(
use_filters=False,
)

# For filtered results, we need to maintain a window of filtered results
# that's larger than the requested page to handle pagination properly
WINDOW_SIZE = max(500, skip + limit * 2) # Maintain a larger window of results

# Get a window of IMDb IDs from MDBList
# For filtered results, get all IMDb IDs first
imdb_ids = await mdblist_scraper.get_list_items(
list_id=list_id,
media_type=media_type,
skip=0, # Always start from beginning for filtered results
limit=WINDOW_SIZE,
skip=0,
limit=0, # Ignored for filtered results
genre=genre,
use_filters=True,
)

if not imdb_ids:
return []

# Prepare the filter pipeline
# Build filter pipeline
match_filter = {
"_id": {"$in": imdb_ids},
"type": catalog_type,
Expand Down Expand Up @@ -239,58 +235,36 @@ async def get_mdblist_meta_list(
if cert_filters:
match_filter["$or"] = cert_filters

# Use facet to get both filtered results and total count efficiently
# Get filtered results with pagination
poster_path = f"{settings.poster_host_url}/poster/{catalog_type}/"

pipeline = [
{"$match": match_filter},
{"$sort": {"last_stream_added": -1}},
{
"$facet": {
"results": [{"$skip": skip}, {"$limit": limit}],
"total": [{"$count": "count"}],
},
},
{"$skip": skip},
{"$limit": limit},
{"$set": {"poster": {"$concat": [poster_path, "$_id", ".jpg"]}}},
]

results = await meta_class.get_motor_collection().aggregate(pipeline).to_list()
if not results:
return []

filtered_results = [
schemas.Meta.model_validate(result) for result in results[0]["results"]
]
total_count = results[0]["total"][0]["count"] if results[0]["total"] else 0

# If we're close to the end of our window and there might be more results,
# trigger background fetch of next batch of metadata
if total_count >= WINDOW_SIZE - limit:
# Get the next batch of IMDb IDs for background processing
next_batch_ids = await mdblist_scraper.get_list_items(
list_id=list_id,
media_type=media_type,
skip=WINDOW_SIZE,
limit=100, # Fetch next batch
genre=genre,
use_filters=True,
# Check for missing metadata and trigger background fetch
existing_ids = set(
doc["_id"]
for doc in await meta_class.get_motor_collection()
.find({"_id": {"$in": imdb_ids}}, {"_id": 1})
.to_list(None)
)
missing_ids = list(set(imdb_ids) - existing_ids)

if next_batch_ids:
# Check which IDs are missing from our database
existing_meta_ids = set(
doc["_id"]
for doc in await meta_class.get_motor_collection()
.find({"_id": {"$in": next_batch_ids}}, {"_id": 1})
.to_list(None)
if missing_ids:
background_tasks.add_task(
fetch_metadata,
missing_ids,
catalog_type,
)
missing_ids = list(set(next_batch_ids) - existing_meta_ids)

if missing_ids:
background_tasks.add_task(
fetch_metadata,
missing_ids,
catalog_type,
)

return filtered_results
return []
return [schemas.Meta.model_validate(result) for result in results]

finally:
await mdblist_scraper.close()
Expand Down
141 changes: 62 additions & 79 deletions scrapers/mdblist.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,28 @@ def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.mdblist.com"
self.client = httpx.AsyncClient(proxy=settings.requests_proxy_url, timeout=30.0)
self.batch_size = 200

async def close(self):
await self.client.aclose()

async def _fetch_list(
self,
list_id: str,
limit: int = 100,
offset: int = 0,
genre: Optional[str] = None,
) -> Optional[Dict]:
"""Fetch a list from MDBList API"""
params = {
"apikey": self.api_key,
"limit": limit,
"limit": self.batch_size,
"offset": offset,
"append_to_response": "genre",
}
if genre:
params["filter_genre"] = genre

cache_key = (
f"mdblist:list:{list_id}:limit_{limit}:offset_{offset}:{genre or 'all'}"
)
cache_key = f"mdblist:raw:{list_id}:offset_{offset}:{genre or 'all'}"
cached_data = await REDIS_ASYNC_CLIENT.get(cache_key)

if cached_data:
Expand All @@ -48,8 +46,8 @@ async def _fetch_list(
f"{self.base_url}/lists/{list_id}/items", params=params
)
if response.status_code == 200:
# Cache for 15 minutes
await REDIS_ASYNC_CLIENT.set(cache_key, response.text, ex=900)
# Cache raw API response for 60 minutes
await REDIS_ASYNC_CLIENT.set(cache_key, response.text, ex=3600)
return response.json()
logging.error(f"Failed to fetch MDBList data: {response.status_code}")
return None
Expand All @@ -61,7 +59,6 @@ def _convert_to_meta(self, item: Dict, media_type: str) -> schemas.Meta:
"""Convert MDBList item to Meta object"""
genres = item.get("genre", [])
if genres and genres[0] is None:
# clean up genre data
genres = None
return schemas.Meta.model_validate(
{
Expand All @@ -74,110 +71,96 @@ def _convert_to_meta(self, item: Dict, media_type: str) -> schemas.Meta:
}
)

async def _fetch_and_process_batch(
async def get_all_list_items(
self,
list_id: str,
media_type: str,
offset: int,
limit: int,
skip: int,
genre: Optional[str],
use_filters: bool = False,
) -> List[schemas.Meta] | List[str]:
"""Helper method to fetch and process a batch of results"""
data = await self._fetch_list(list_id, 100, offset, genre)
if not data:
return []

items = data.get("movies" if media_type == "movie" else "shows", [])

if not use_filters:
# Convert directly to Meta objects
meta_list = [
self._convert_to_meta(item, media_type)
for item in items
if item.get("imdb_id", "").startswith("tt")
]
# Calculate the slice we need from this batch
start_idx = skip % 100
end_idx = start_idx + limit
return meta_list[start_idx:end_idx]

return [
item["imdb_id"]
for item in items
if item.get("imdb_id", "").startswith("tt")
]

async def get_list_items(
self,
list_id: str,
media_type: str,
skip: int = 0,
limit: int = 25,
genre: Optional[str] = None,
use_filters: bool = True,
) -> List[schemas.Meta] | List[str]:
) -> List[str]:
"""
Get items from a MDBList list with pagination support.
For filtered results, keeps fetching until we have enough items after filtering.
Fetch all IMDb IDs from a list until no more results are available.
Used for filtered results to ensure complete dataset.
"""
if not use_filters:
# Direct return for unfiltered results
fetch_limit = 100
offset = (skip // fetch_limit) * fetch_limit
return await self._fetch_and_process_batch(
list_id, media_type, offset, limit, skip, genre, use_filters=False
)
cache_key = f"mdblist:all_ids:{list_id}:{media_type}:{genre or 'all'}"
cached_ids = await REDIS_ASYNC_CLIENT.lrange(cache_key, 0, -1)

# For filtered results, we need to handle pagination differently
cache_key = f"mdblist:filtered:{list_id}:genre_{genre or 'all'}"
cached_filtered_ids = await REDIS_ASYNC_CLIENT.lrange(cache_key, 0, -1)

if cached_filtered_ids:
# Use cached filtered results
start_idx = skip
end_idx = skip + limit
return [
cached_id.decode()
for cached_id in cached_filtered_ids[start_idx:end_idx]
]
if cached_ids:
return [cached_id.decode() for cached_id in cached_ids]

# No cache - need to fetch and filter
all_imdb_ids = []
offset = 0
batch_size = 100

while len(all_imdb_ids) < skip + limit:
batch = await self._fetch_list(list_id, batch_size, offset, genre)
while True:
batch = await self._fetch_list(list_id, offset, genre)
if not batch:
break

# Get the correct list based on media type
items = batch.get("movies" if media_type == "movie" else "shows", [])
if not items:
break

# Filter valid IMDb IDs for the specific media type
new_ids = [
item["imdb_id"]
for item in items
if item.get("imdb_id", "").startswith("tt")
]
all_imdb_ids.extend(new_ids)

if len(items) < batch_size: # No more results available
if not new_ids:
break

offset += batch_size
all_imdb_ids.extend(new_ids)
offset += self.batch_size

# If we got fewer items than batch_size, we've reached the end
if len(items) < self.batch_size:
break

# Cache the full result for 15 minutes
# Cache the complete list if we got any results
if all_imdb_ids:
pipeline = await REDIS_ASYNC_CLIENT.pipeline()
pipeline.delete(cache_key)
pipeline.rpush(cache_key, *all_imdb_ids)
pipeline.expire(cache_key, 900) # 15 minutes
pipeline.expire(cache_key, 3600) # Cache for 1 hour
await pipeline.execute()

return all_imdb_ids[skip : skip + limit]
return all_imdb_ids

async def get_list_items(
self,
list_id: str,
media_type: str,
skip: int = 0,
limit: int = 25,
genre: Optional[str] = None,
use_filters: bool = True,
) -> List[schemas.Meta] | List[str]:
"""
Get items from a MDBList list.
For filtered results (use_filters=True), fetches all available items.
For unfiltered results, uses regular pagination.
"""
if use_filters:
return await self.get_all_list_items(list_id, media_type, genre)

# For unfiltered results, use regular pagination
offset = (skip // self.batch_size) * self.batch_size
batch = await self._fetch_list(list_id, offset, genre)
if not batch:
return []

items = batch.get("movies" if media_type == "movie" else "shows", [])
meta_list = [
self._convert_to_meta(item, media_type)
for item in items
if item.get("imdb_id", "").startswith("tt")
]

# Calculate the slice we need from this batch
start_idx = skip % self.batch_size
end_idx = min(start_idx + limit, len(meta_list))
return meta_list[start_idx:end_idx]


async def initialize_mdblist_scraper(api_key: str) -> MDBListScraper:
Expand Down

0 comments on commit 458fc0d

Please sign in to comment.