import os import json import hashlib from typing import Any, Optional from datetime import datetime, timedelta import pathlib import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') logger = logging.getLogger(__name__) class FileCache: def __init__(self, cache_dir: str = ".cache", ttl_days: int = 90): """Initialize the cache with a directory and TTL.""" self.cache_dir = pathlib.Path(cache_dir) self.ttl = timedelta(days=ttl_days) self.cache_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Cache initialized at: {self.cache_dir.absolute()}") def _get_cache_key(self, key_parts: dict) -> str: """Generate a unique cache key from the request parameters only.""" # Create a copy of key_parts without function and ticker request_params = key_parts.copy() request_params.pop('function', None) request_params.pop('ticker', None) # Create a descriptive prefix based on the parameters prefix_parts = [] for key, value in sorted(request_params.items()): if key in ['start_date', 'end_date', 'report_period']: prefix_parts.append(f"{value}") elif key == 'period': prefix_parts.append(f"p{value}") elif key == 'limit': prefix_parts.append(f"l{value}") elif key == 'line_items' and isinstance(value, list): prefix_parts.append(f"items{len(value)}") prefix = '_'.join(prefix_parts) if prefix_parts else 'default' # Create hash from all parameters for uniqueness sorted_items = sorted(request_params.items()) key_str = json.dumps(sorted_items, sort_keys=True) hash_suffix = hashlib.md5(key_str.encode()).hexdigest()[:8] # Use first 8 chars of hash return f"{prefix}_{hash_suffix}" def _get_cache_path(self, key_parts: dict, cache_key: str) -> pathlib.Path: """Get the full path for a cache file, organized by ticker and endpoint.""" # Get ticker from key_parts, default to 'general' if not found ticker = key_parts.get('ticker', 'general').upper() # Get function name from key_parts, default to 'misc' if not found endpoint = key_parts.get('function', 'misc') # Create ticker and endpoint specific subfolders ticker_dir = self.cache_dir / ticker endpoint_dir = ticker_dir / endpoint endpoint_dir.mkdir(parents=True, exist_ok=True) return endpoint_dir / f"{cache_key}.json" def get(self, key_parts: dict) -> Optional[Any]: """Retrieve data from cache if it exists and is not expired.""" cache_key = self._get_cache_key(key_parts) cache_path = self._get_cache_path(key_parts, cache_key) if not cache_path.exists(): #logger.info(f"Cache miss for: {key_parts.get('function', 'unknown')} - {key_parts}") # keep for debugging return None try: with open(cache_path, 'r') as f: cached_data = json.load(f) # Check if cache is expired cached_time = datetime.fromisoformat(cached_data['timestamp']) if datetime.now() - cached_time > self.ttl: logger.info(f"Cache expired for: {key_parts.get('function', 'unknown')} - {key_parts}") return None logger.info(f"Cache hit for: {key_parts.get('function', 'unknown')} - {key_parts}") return cached_data['data'] except (json.JSONDecodeError, KeyError, ValueError): logger.warning(f"Cache error for: {key_parts.get('function', 'unknown')} - {key_parts}") return None def set(self, key_parts: dict, data: Any) -> None: """Store data in cache.""" cache_key = self._get_cache_key(key_parts) cache_path = self._get_cache_path(key_parts, cache_key) cache_data = { 'timestamp': datetime.now().isoformat(), 'data': data } with open(cache_path, 'w') as f: json.dump(cache_data, f) # logger.info(f"Cached data for: {key_parts.get('function', 'unknown')} - {key_parts}") # keep for debugging