Skip to content

Commit

Permalink
🔥 Add deduplicated list
Browse files Browse the repository at this point in the history
  • Loading branch information
awtkns committed Aug 7, 2024
1 parent 9c33120 commit c258d99
Show file tree
Hide file tree
Showing 7 changed files with 609 additions and 478 deletions.
4 changes: 2 additions & 2 deletions harambe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
from harambe.normalize_url import normalize_url
from harambe.observer import (
DownloadMeta,
DuplicateHandler,
LocalStorageObserver,
LoggingObserver,
ObservationTrigger,
OutputObserver,
)
from harambe.pagination import DuplicateHandler
from harambe.parser.parser import PydanticSchemaParser
from harambe.tracker import FileDataTracker
from harambe.types import (
Expand Down Expand Up @@ -288,7 +288,7 @@ async def _notify_observers(
"""
duplicated = False
if check_duplication:
duplicated = await getattr(self._deduper, method)(*args, **kwargs)
duplicated = getattr(self._deduper, method)(*args, **kwargs)

if not duplicated:
return await asyncio.gather(
Expand Down
51 changes: 0 additions & 51 deletions harambe/observer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import hashlib
import json
from abc import abstractmethod
from typing import (
Any,
List,
Literal,
Optional,
Protocol,
Tuple,
TypedDict,
Expand Down Expand Up @@ -126,51 +123,3 @@ def urls(self) -> List[Tuple[URL, Context, Options]]:
@property
def files(self) -> List[Tuple[str, bytes]]:
return self._files


class DuplicateHandler:
def __init__(self) -> None:
self._saved_data: set[bytes] = set()
self.rows_on_page = 0
self.previously_saved_rows_on_page = 0

async def on_save_data(self, data: dict[str, Any]) -> bool:
return self._add_data(data)

async def on_queue_url(
self, url: URL, _: Optional[Context], __: Optional[Options]
) -> bool:
return self._add_data(url)

# noinspection PyTypeChecker
async def on_download(
self, download_url: str, filename: str, content: bytes
) -> bool:
return self._add_data((download_url, filename))

async def on_paginate(self, next_url: str) -> bool:
if self.rows_on_page == self.previously_saved_rows_on_page:
raise StopAsyncIteration()

self.rows_on_page = 0
self.previously_saved_rows_on_page = 0
return False

def _add_data(self, data: Any) -> bool:
self.rows_on_page += 1

hash_value = self.compute_hash(data)
if hash_value in self._saved_data:
self.previously_saved_rows_on_page += 1
return True # return True if data is duplicated
else:
self._saved_data.add(hash_value)
return False

@staticmethod
def compute_hash(data: Any) -> bytes:
if isinstance(data, dict):
data = {k: v for k, v in data.items() if not k.startswith("__")}

data_str = json.dumps(data, separators=(",", ":"), sort_keys=True)
return hashlib.md5(data_str.encode()).digest()
87 changes: 87 additions & 0 deletions harambe/pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import hashlib
import json
from typing import Any, Optional, Sequence, Iterable

from harambe.types import URL, Context, Options


class DuplicateHandler:
def __init__(self) -> None:
self._saved_data: set[bytes] = set()
self.rows_on_page = 0
self.previously_saved_rows_on_page = 0

def on_save_data(self, data: dict[str, Any]) -> bool:
"""
Save data and check if it is duplicated
:param data: data to be saved
:return: bool indicating if the data is duplicated, true if it is duplicated
"""

return self._add_data(data)

def on_queue_url(
self, url: URL, _: Optional[Context], __: Optional[Options]
) -> bool:
return self._add_data(url)

# noinspection PyTypeChecker
def on_download(self, download_url: str, filename: str, content: bytes) -> bool:
return self._add_data((download_url, filename))

def should_continue(self, strict: bool = False) -> bool:
"""
Check if the current page should be continued to be processed
:param strict: if strict is True, it will only return True if all rows on the page are duplicated,
otherwise it will return True if there are any duplicated rows
:return: bool indicating if the page should be continued
"""

if strict:
return self.previously_saved_rows_on_page == 0

return self.rows_on_page == self.previously_saved_rows_on_page

def on_paginate(self, next_url: str) -> bool:
if self.should_continue():
raise StopAsyncIteration()

self.rows_on_page = 0
self.previously_saved_rows_on_page = 0
return False

def _add_data(self, data: Any) -> bool:
self.rows_on_page += 1

hash_value = self.compute_hash(data)
if hash_value in self._saved_data:
self.previously_saved_rows_on_page += 1
return True # return True if data is duplicated
else:
self._saved_data.add(hash_value)
return False

@staticmethod
def compute_hash(data: Any) -> bytes:
if isinstance(data, dict):
data = {k: v for k, v in data.items() if not k.startswith("__")}

data_str = json.dumps(data, separators=(",", ":"), sort_keys=True)
return hashlib.md5(data_str.encode()).digest()


class PaginatedList(list[Any]):
def __init__(self) -> None:
super().__init__()
self._handler = DuplicateHandler()

def append(self, item: Any) -> None:
if not self._handler.on_save_data(item):
super().append(item)

def extend(self, items: Iterable[Any]) -> None:
for item in items:
self.append(item)

def should_continue(self) -> bool:
return self._handler.should_continue(strict=True)
Loading

0 comments on commit c258d99

Please sign in to comment.