Source code for src.processing.transformer

"""Data transformation and normalization."""

from typing import Dict, Any, List
from datetime import datetime
import pandas as pd

from ..utils.errors import TransformError
from ..utils.logging import get_logger

logger = get_logger(__name__)


[docs] class DataTransformer: """Transform raw API data into structured DataFrames."""
[docs] def transform_interest_data( self, api_response: Dict[str, Any], query_labels: Dict[str, str] ) -> pd.DataFrame: """ Transform Google Trends interest_over_time data into DataFrame. Args: api_response: Raw API response query_labels: Mapping of queries to display labels Returns: DataFrame with columns: date, query, value Raises: TransformError: If transformation fails """ try: interest_data = api_response.get("interest_over_time", {}) timeline_data = interest_data.get("timeline_data", []) if not timeline_data: raise TransformError("No timeline data found in API response") rows = [] for time_point in timeline_data: # Parse date date_str = time_point.get("date", "") timestamp = time_point.get("timestamp") if timestamp: date = datetime.fromtimestamp(int(timestamp)) else: date = self._parse_date_string(date_str) # Extract values for each query values = time_point.get("values", []) for value_data in values: query = value_data.get("query", "") value = self._parse_interest_value(value_data) # Use label if available label = query_labels.get(query, query) rows.append({ "date": date, "query": label, "value": value }) df = pd.DataFrame(rows) # Ensure proper types df["date"] = pd.to_datetime(df["date"]) df["value"] = pd.to_numeric(df["value"], errors="coerce") # Sort by date df = df.sort_values("date").reset_index(drop=True) logger.info(f"Transformed {len(df)} data points for {df['query'].nunique()} queries") return df except Exception as e: raise TransformError(f"Failed to transform interest data: {e}")
def _parse_date_string(self, date_str: str) -> datetime: """ Parse various date string formats from Google Trends. Args: date_str: Date string (e.g., "Nov 1 – 7, 2024" or "Nov 2024") Returns: datetime object """ try: # Handle date ranges (take start date) if "–" in date_str or "-" in date_str: # Split on dash and comma parts = date_str.replace("–", "-").split(",") if len(parts) > 1: month_day = parts[0].split("-")[0].strip() year = parts[1].strip() date_str = f"{month_day} {year}" # Try common formats for fmt in ["%b %d %Y", "%b %Y", "%Y-%m-%d"]: try: return datetime.strptime(date_str.strip(), fmt) except ValueError: continue # Fallback: use current date logger.warning(f"Could not parse date: {date_str}, using current date") return datetime.now() except Exception: return datetime.now() def _parse_interest_value(self, value_data: Dict[str, Any]) -> float: """ Parse interest value from API response. Args: value_data: Value data dictionary Returns: Numeric interest value """ # Try extracted_value first value = value_data.get("extracted_value") if value is not None: return float(value) # Try value field value = value_data.get("value", 0) # Handle special values if isinstance(value, str): value = value.strip() if value == "<1": return 0.5 if value == "": return 0.0 try: return float(value) except (ValueError, TypeError): return 0.0
[docs] def add_moving_average( self, df: pd.DataFrame, window: int = 3, column: str = "value" ) -> pd.DataFrame: """ Add moving average column to DataFrame. Args: df: Input DataFrame window: Window size for moving average column: Column to smooth Returns: DataFrame with added value_ma column """ df = df.copy() # Calculate moving average per query df["value_ma"] = df.groupby("query")[column].transform( lambda x: x.rolling(window=window, min_periods=1, center=False).mean() ) logger.debug(f"Added moving average with window={window}") return df
[docs] def normalize_to_100(self, df: pd.DataFrame, column: str = "value") -> pd.DataFrame: """ Normalize values to 0-100 scale. Args: df: Input DataFrame column: Column to normalize Returns: DataFrame with normalized column """ df = df.copy() # Find global max max_value = df[column].max() if max_value > 0: df[f"{column}_normalized"] = (df[column] / max_value) * 100 else: df[f"{column}_normalized"] = 0 return df