""" HelpScout data loader — mirrors SentimentDataLoader architecture. Three loading modes: - load_dashboard_data() : lightweight (no long text), cached 24 h - load_analysis_data(...) : filtered with SUMMARY + notes, on-demand, cached 24 h - load_demographics_data() : email-keyed user demographics, cached 24 h """ import re import sys from datetime import datetime, timedelta from pathlib import Path import pandas as pd import streamlit as st from dateutil.relativedelta import relativedelta root_dir = Path(__file__).resolve().parent.parent.parent sys.path.append(str(root_dir)) from visualization.SnowFlakeConnection import SnowFlakeConn from visualization.utils.helpscout_utils import ( load_topic_taxonomy, parse_topics, compute_escalation_flag ) import json class HelpScoutDataLoader: """ Loads HelpScout conversation features from Snowflake with caching. """ def __init__(self, config_path=None): if config_path is None: config_path = Path(__file__).parent.parent / "config" / "viz_config.json" with open(config_path, "r") as f: self.config = json.load(f) self.hs_config = self.config.get("helpscout", {}) self.dashboard_query = self.hs_config.get("dashboard_query", "") self.demographics_query = self.hs_config.get("demographics_query", "") self.escalation_sentiments = self.hs_config.get("escalation_sentiments", ["negative", "very_negative"]) self.default_date_range_days = self.hs_config.get("default_date_range_days", 60) self.max_summary_conversations = self.hs_config.get("max_summary_conversations", 300) self.topic_colors = self.config.get("color_schemes_helpscout", {}).get("topics", {}) self.status_colors = self.config.get("color_schemes_helpscout", {}).get("status", {}) self.flag_colors = self.config.get("color_schemes_helpscout", {}).get("boolean_flags", {}) self.sentiment_colors = self.config.get("color_schemes", {}).get("sentiment_polarity", {}) self.demographics_config = self.config.get("demographics", {}) self.taxonomy = load_topic_taxonomy() # ───────────────────────────────────────────────────────────── # Dashboard data (lightweight, 24-hour cache) # ───────────────────────────────────────────────────────────── @st.cache_data(ttl=86400) def load_dashboard_data(_self): """Load lightweight HelpScout dashboard data — no long-form text columns.""" try: conn = SnowFlakeConn() df = conn.run_read_query(_self.dashboard_query, "HelpScout dashboard data") conn.close_connection() if df is None or df.empty: st.error("No HelpScout data returned from Snowflake") return pd.DataFrame() df = _self._process_dashboard_df(df) if _self.demographics_query: demo_df = _self.load_demographics_data() if not demo_df.empty: df = _self.merge_demographics(df, demo_df) return df except Exception as e: st.error(f"Error loading HelpScout dashboard data: {e}") return pd.DataFrame() def _process_dashboard_df(self, df): df.columns = df.columns.str.lower() for ts_col in ("first_message_at", "last_message_at", "processed_at"): if ts_col in df.columns: df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce", utc=True).dt.tz_localize(None) df["sentiment_polarity"] = df["sentiment_polarity"].fillna("unknown") df["status"] = df["status"].fillna("unknown").str.lower() df["state"] = df["state"].fillna("unknown").str.lower() df["source_type"] = df["source_type"].fillna("unknown").str.lower() for bool_col in ("is_refund_request", "is_cancellation", "is_membership"): if bool_col in df.columns: df[bool_col] = df[bool_col].fillna(False).astype(bool) if "emotions" not in df.columns: df["emotions"] = None # topics_list for filter options df["topics_list"] = df["topics"].apply(parse_topics) # escalation flag df["is_escalation"] = compute_escalation_flag(df, self.escalation_sentiments) return df # ───────────────────────────────────────────────────────────── # Analysis page data (on-demand, 24-hour cache) # ───────────────────────────────────────────────────────────── def load_analysis_data(self, sentiments=None, topics=None, refund_only=False, cancel_only=False, membership_only=False, statuses=None, sources=None, date_range=None, top_n=None): """ Load filtered HelpScout conversations with full text for the Analysis page. Caches based on argument tuple. """ sentiments_key = tuple(sorted(sentiments)) if sentiments else () topics_key = tuple(sorted(topics)) if topics else () statuses_key = tuple(sorted(statuses)) if statuses else () sources_key = tuple(sorted(sources)) if sources else () date_key = (str(date_range[0]), str(date_range[1])) if date_range and len(date_range) == 2 else () return self._fetch_analysis_data( sentiments_key, topics_key, bool(refund_only), bool(cancel_only), bool(membership_only), statuses_key, sources_key, date_key, top_n or 0 ) @st.cache_data(ttl=86400) def _fetch_analysis_data(_self, sentiments, topics, refund_only, cancel_only, membership_only, statuses, sources, date_range, top_n): """Cached analysis data fetch — returns full-detail conversation df.""" try: query = _self._build_analysis_query( sentiments, topics, refund_only, cancel_only, membership_only, statuses, sources, date_range, top_n ) conn = SnowFlakeConn() df = conn.run_read_query(query, "HelpScout analysis data") conn.close_connection() if df is None or df.empty: return pd.DataFrame() df = _self._process_analysis_df(df) return df except Exception as e: st.error(f"Error loading HelpScout analysis data: {e}") return pd.DataFrame() def _build_analysis_query(self, sentiments, topics, refund_only, cancel_only, membership_only, statuses, sources, date_range, top_n): """Build dynamic SQL for the analysis page with all filters pushed to Snowflake.""" where_clauses = [] if date_range and len(date_range) == 2: where_clauses.append(f"FIRST_MESSAGE_AT >= '{date_range[0]}' AND FIRST_MESSAGE_AT <= '{date_range[1]}'") if sentiments: safe = "', '".join(self._sanitize(s) for s in sentiments) where_clauses.append(f"SENTIMENT_POLARITY IN ('{safe}')") if topics: topic_conditions = [] for t in topics: safe_t = self._sanitize(t) topic_conditions.append( f"ARRAY_CONTAINS('{safe_t}'::VARIANT, SPLIT(TOPICS, ','))" ) where_clauses.append("(" + " OR ".join(topic_conditions) + ")") if statuses: safe = "', '".join(self._sanitize(s.lower()) for s in statuses) where_clauses.append(f"LOWER(STATUS) IN ('{safe}')") if sources: safe = "', '".join(self._sanitize(s.lower()) for s in sources) where_clauses.append(f"LOWER(SOURCE_TYPE) IN ('{safe}')") if refund_only: where_clauses.append("IS_REFUND_REQUEST = TRUE") if cancel_only: where_clauses.append("IS_CANCELLATION = TRUE") if membership_only: where_clauses.append("IS_MEMBERSHIP = TRUE") where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else "" limit_sql = f"LIMIT {int(top_n)}" if top_n and top_n > 0 else "" return f""" SELECT CONVERSATION_ID, LOWER(CUSTOMER_EMAIL) AS CUSTOMER_EMAIL, CUSTOMER_FIRST, CUSTOMER_LAST, THREAD_COUNT, FIRST_MESSAGE_AT, LAST_MESSAGE_AT, DURATION_HOURS, STATUS, STATE, SOURCE_TYPE, SOURCE_VIA, SENTIMENT_POLARITY, EMOTIONS, SENTIMENT_CONFIDENCE, SENTIMENT_NOTES, TOPICS, IS_REFUND_REQUEST, IS_CANCELLATION, IS_MEMBERSHIP, TOPIC_CONFIDENCE, TOPIC_NOTES, SUMMARY, PROCESSED_AT FROM SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES {where_sql} ORDER BY FIRST_MESSAGE_AT DESC {limit_sql} """ def _process_analysis_df(self, df): df.columns = df.columns.str.lower() for ts_col in ("first_message_at", "last_message_at", "processed_at"): if ts_col in df.columns: df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce", utc=True).dt.tz_localize(None) df["sentiment_polarity"] = df["sentiment_polarity"].fillna("unknown") df["status"] = df["status"].fillna("unknown").str.lower() df["source_type"] = df["source_type"].fillna("unknown").str.lower() for bool_col in ("is_refund_request", "is_cancellation", "is_membership"): if bool_col in df.columns: df[bool_col] = df[bool_col].fillna(False).astype(bool) if "emotions" not in df.columns: df["emotions"] = None df["topics_list"] = df["topics"].apply(parse_topics) df["is_escalation"] = compute_escalation_flag(df, self.escalation_sentiments) # Short summary for cards (100 chars) if "summary" in df.columns: text = df["summary"].fillna("").astype(str) df["summary_short"] = text.where(text.str.len() <= 120, text.str[:120] + "…") return df # ───────────────────────────────────────────────────────────── # Demographics (email-keyed, 24-hour cache) # ───────────────────────────────────────────────────────────── @st.cache_data(ttl=86400) def load_demographics_data(_self): """Load user demographics keyed by email.""" if not _self.demographics_query: return pd.DataFrame() try: conn = SnowFlakeConn() df = conn.run_read_query(_self.demographics_query, "HelpScout user demographics") conn.close_connection() if df is None or df.empty: return pd.DataFrame() return _self._process_demographics_df(df) except Exception as e: st.warning(f"Could not load HelpScout demographics: {e}") return pd.DataFrame() def _process_demographics_df(self, df): df.columns = df.columns.str.lower() if "birthday" in df.columns: df["birthday"] = df["birthday"].astype(str) df["birthday"] = pd.to_datetime(df["birthday"], errors="coerce", utc=True) df["birthday"] = df["birthday"].dt.tz_localize(None) df["age"] = df["birthday"].apply(self._calculate_age) df["age_group"] = df["age"].apply(self._categorize_age) if "timezone" in df.columns: df["timezone_region"] = df["timezone"].apply(self._extract_timezone_region) if "experience_level" in df.columns: df["experience_group"] = df["experience_level"].apply(self._categorize_experience) if "customer_email" in df.columns: df = df[df["customer_email"].notna()] df["customer_email"] = df["customer_email"].str.lower() return df def merge_demographics(self, df, demo_df): """Merge demographic data with HelpScout conversations on customer_email.""" if demo_df.empty or "customer_email" not in df.columns: for col, val in [("age", None), ("age_group", "Unknown"), ("timezone", None), ("timezone_region", "Unknown"), ("experience_level", None), ("experience_group", "Unknown")]: df[col] = val return df if "customer_email" not in demo_df.columns: return df merge_cols = ["customer_email"] for c in ["age", "age_group", "timezone", "timezone_region", "experience_level", "experience_group"]: if c in demo_df.columns: merge_cols.append(c) merged = df.merge(demo_df[merge_cols], on="customer_email", how="left") for col in ["age_group", "timezone_region", "experience_group"]: if col in merged.columns: merged[col] = merged[col].fillna("Unknown") return merged # ───────────────────────────────────────────────────────────── # Filter helpers # ───────────────────────────────────────────────────────────── def get_filter_options(self, df): """Return unique values for all in-page filters from the dashboard df.""" topics_flat = df["topics_list"].explode().dropna().unique().tolist() if "topics_list" in df.columns else [] return { "sentiments": sorted(df["sentiment_polarity"].dropna().unique().tolist()), "topics": sorted(t for t in topics_flat if t), "statuses": sorted(df["status"].dropna().unique().tolist()), "states": sorted(df["state"].dropna().unique().tolist()) if "state" in df.columns else [], "sources": sorted(df["source_type"].dropna().unique().tolist()), } # ───────────────────────────────────────────────────────────── # Demographics calculation helpers (mirrors SentimentDataLoader) # ───────────────────────────────────────────────────────────── @staticmethod def _calculate_age(birthday): if pd.isna(birthday): return None try: age = relativedelta(datetime.now(), birthday).years return age if 0 <= age <= 120 else None except Exception: return None def _categorize_age(self, age): if pd.isna(age) or age is None: return "Unknown" for group_name, (min_age, max_age) in self.demographics_config.get("age_groups", {}).items(): if min_age <= age <= max_age: return group_name return "Unknown" @staticmethod def _extract_timezone_region(timezone): if pd.isna(timezone) or not isinstance(timezone, str): return "Unknown" parts = timezone.split("/") return parts[0] if parts else "Unknown" def _categorize_experience(self, experience_level): if pd.isna(experience_level): return "Unknown" try: exp_level = float(experience_level) except Exception: return "Unknown" for group_name, (min_exp, max_exp) in self.demographics_config.get("experience_groups", {}).items(): if min_exp <= exp_level <= max_exp: return group_name return "Unknown" # ───────────────────────────────────────────────────────────── # Internal helpers # ───────────────────────────────────────────────────────────── @staticmethod def _sanitize(value: str) -> str: return re.sub(r"['\";\\]", "", str(value))