| """ |
| HelpScout data loader β mirrors SentimentDataLoader architecture. |
| |
| Three loading modes: |
| - load_dashboard_data() : lightweight (no long text), cached 24 h |
| - load_analysis_data(...) : filtered with SUMMARY + notes, on-demand, cached 24 h |
| - load_demographics_data() : email-keyed user demographics, cached 24 h |
| """ |
| import re |
| import sys |
| from datetime import datetime, timedelta |
| from pathlib import Path |
|
|
| import pandas as pd |
| import streamlit as st |
| from dateutil.relativedelta import relativedelta |
|
|
| root_dir = Path(__file__).resolve().parent.parent.parent |
| sys.path.append(str(root_dir)) |
|
|
| from visualization.SnowFlakeConnection import SnowFlakeConn |
| from visualization.utils.helpscout_utils import ( |
| load_topic_taxonomy, parse_topics, compute_escalation_flag |
| ) |
| import json |
|
|
|
|
| class HelpScoutDataLoader: |
| """ |
| Loads HelpScout conversation features from Snowflake with caching. |
| """ |
|
|
| def __init__(self, config_path=None): |
| if config_path is None: |
| config_path = Path(__file__).parent.parent / "config" / "viz_config.json" |
| with open(config_path, "r") as f: |
| self.config = json.load(f) |
|
|
| self.hs_config = self.config.get("helpscout", {}) |
| self.dashboard_query = self.hs_config.get("dashboard_query", "") |
| self.demographics_query = self.hs_config.get("demographics_query", "") |
| self.escalation_sentiments = self.hs_config.get("escalation_sentiments", ["negative", "very_negative"]) |
| self.default_date_range_days = self.hs_config.get("default_date_range_days", 60) |
| self.max_summary_conversations = self.hs_config.get("max_summary_conversations", 300) |
| self.topic_colors = self.config.get("color_schemes_helpscout", {}).get("topics", {}) |
| self.status_colors = self.config.get("color_schemes_helpscout", {}).get("status", {}) |
| self.flag_colors = self.config.get("color_schemes_helpscout", {}).get("boolean_flags", {}) |
| self.sentiment_colors = self.config.get("color_schemes", {}).get("sentiment_polarity", {}) |
| self.demographics_config = self.config.get("demographics", {}) |
|
|
| self.taxonomy = load_topic_taxonomy() |
|
|
| |
| |
| |
|
|
| @st.cache_data(ttl=86400) |
| def load_dashboard_data(_self): |
| """Load lightweight HelpScout dashboard data β no long-form text columns.""" |
| try: |
| conn = SnowFlakeConn() |
| df = conn.run_read_query(_self.dashboard_query, "HelpScout dashboard data") |
| conn.close_connection() |
|
|
| if df is None or df.empty: |
| st.error("No HelpScout data returned from Snowflake") |
| return pd.DataFrame() |
|
|
| df = _self._process_dashboard_df(df) |
|
|
| if _self.demographics_query: |
| demo_df = _self.load_demographics_data() |
| if not demo_df.empty: |
| df = _self.merge_demographics(df, demo_df) |
|
|
| return df |
| except Exception as e: |
| st.error(f"Error loading HelpScout dashboard data: {e}") |
| return pd.DataFrame() |
|
|
| def _process_dashboard_df(self, df): |
| df.columns = df.columns.str.lower() |
|
|
| for ts_col in ("first_message_at", "last_message_at", "processed_at"): |
| if ts_col in df.columns: |
| df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce", utc=True).dt.tz_localize(None) |
|
|
| df["sentiment_polarity"] = df["sentiment_polarity"].fillna("unknown") |
| df["status"] = df["status"].fillna("unknown").str.lower() |
| df["state"] = df["state"].fillna("unknown").str.lower() |
| df["source_type"] = df["source_type"].fillna("unknown").str.lower() |
|
|
| for bool_col in ("is_refund_request", "is_cancellation", "is_membership"): |
| if bool_col in df.columns: |
| df[bool_col] = df[bool_col].fillna(False).astype(bool) |
|
|
| if "emotions" not in df.columns: |
| df["emotions"] = None |
|
|
| |
| df["topics_list"] = df["topics"].apply(parse_topics) |
|
|
| |
| df["is_escalation"] = compute_escalation_flag(df, self.escalation_sentiments) |
|
|
| return df |
|
|
| |
| |
| |
|
|
| def load_analysis_data(self, sentiments=None, topics=None, |
| refund_only=False, cancel_only=False, |
| membership_only=False, statuses=None, |
| sources=None, date_range=None, top_n=None): |
| """ |
| Load filtered HelpScout conversations with full text for the Analysis page. |
| Caches based on argument tuple. |
| """ |
| sentiments_key = tuple(sorted(sentiments)) if sentiments else () |
| topics_key = tuple(sorted(topics)) if topics else () |
| statuses_key = tuple(sorted(statuses)) if statuses else () |
| sources_key = tuple(sorted(sources)) if sources else () |
| date_key = (str(date_range[0]), str(date_range[1])) if date_range and len(date_range) == 2 else () |
| return self._fetch_analysis_data( |
| sentiments_key, topics_key, bool(refund_only), bool(cancel_only), |
| bool(membership_only), statuses_key, sources_key, date_key, top_n or 0 |
| ) |
|
|
| @st.cache_data(ttl=86400) |
| def _fetch_analysis_data(_self, sentiments, topics, refund_only, cancel_only, |
| membership_only, statuses, sources, date_range, top_n): |
| """Cached analysis data fetch β returns full-detail conversation df.""" |
| try: |
| query = _self._build_analysis_query( |
| sentiments, topics, refund_only, cancel_only, |
| membership_only, statuses, sources, date_range, top_n |
| ) |
| conn = SnowFlakeConn() |
| df = conn.run_read_query(query, "HelpScout analysis data") |
| conn.close_connection() |
|
|
| if df is None or df.empty: |
| return pd.DataFrame() |
|
|
| df = _self._process_analysis_df(df) |
| return df |
| except Exception as e: |
| st.error(f"Error loading HelpScout analysis data: {e}") |
| return pd.DataFrame() |
|
|
| def _build_analysis_query(self, sentiments, topics, refund_only, cancel_only, |
| membership_only, statuses, sources, date_range, top_n): |
| """Build dynamic SQL for the analysis page with all filters pushed to Snowflake.""" |
| where_clauses = [] |
|
|
| if date_range and len(date_range) == 2: |
| where_clauses.append(f"FIRST_MESSAGE_AT >= '{date_range[0]}' AND FIRST_MESSAGE_AT <= '{date_range[1]}'") |
|
|
| if sentiments: |
| safe = "', '".join(self._sanitize(s) for s in sentiments) |
| where_clauses.append(f"SENTIMENT_POLARITY IN ('{safe}')") |
|
|
| if topics: |
| topic_conditions = [] |
| for t in topics: |
| safe_t = self._sanitize(t) |
| topic_conditions.append( |
| f"ARRAY_CONTAINS('{safe_t}'::VARIANT, SPLIT(TOPICS, ','))" |
| ) |
| where_clauses.append("(" + " OR ".join(topic_conditions) + ")") |
|
|
| if statuses: |
| safe = "', '".join(self._sanitize(s.lower()) for s in statuses) |
| where_clauses.append(f"LOWER(STATUS) IN ('{safe}')") |
|
|
| if sources: |
| safe = "', '".join(self._sanitize(s.lower()) for s in sources) |
| where_clauses.append(f"LOWER(SOURCE_TYPE) IN ('{safe}')") |
|
|
| if refund_only: |
| where_clauses.append("IS_REFUND_REQUEST = TRUE") |
| if cancel_only: |
| where_clauses.append("IS_CANCELLATION = TRUE") |
| if membership_only: |
| where_clauses.append("IS_MEMBERSHIP = TRUE") |
|
|
| where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else "" |
| limit_sql = f"LIMIT {int(top_n)}" if top_n and top_n > 0 else "" |
|
|
| return f""" |
| SELECT |
| CONVERSATION_ID, |
| LOWER(CUSTOMER_EMAIL) AS CUSTOMER_EMAIL, |
| CUSTOMER_FIRST, |
| CUSTOMER_LAST, |
| THREAD_COUNT, |
| FIRST_MESSAGE_AT, |
| LAST_MESSAGE_AT, |
| DURATION_HOURS, |
| STATUS, |
| STATE, |
| SOURCE_TYPE, |
| SOURCE_VIA, |
| SENTIMENT_POLARITY, |
| EMOTIONS, |
| SENTIMENT_CONFIDENCE, |
| SENTIMENT_NOTES, |
| TOPICS, |
| IS_REFUND_REQUEST, |
| IS_CANCELLATION, |
| IS_MEMBERSHIP, |
| TOPIC_CONFIDENCE, |
| TOPIC_NOTES, |
| SUMMARY, |
| PROCESSED_AT |
| FROM SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES |
| {where_sql} |
| ORDER BY FIRST_MESSAGE_AT DESC |
| {limit_sql} |
| """ |
|
|
| def _process_analysis_df(self, df): |
| df.columns = df.columns.str.lower() |
|
|
| for ts_col in ("first_message_at", "last_message_at", "processed_at"): |
| if ts_col in df.columns: |
| df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce", utc=True).dt.tz_localize(None) |
|
|
| df["sentiment_polarity"] = df["sentiment_polarity"].fillna("unknown") |
| df["status"] = df["status"].fillna("unknown").str.lower() |
| df["source_type"] = df["source_type"].fillna("unknown").str.lower() |
|
|
| for bool_col in ("is_refund_request", "is_cancellation", "is_membership"): |
| if bool_col in df.columns: |
| df[bool_col] = df[bool_col].fillna(False).astype(bool) |
|
|
| if "emotions" not in df.columns: |
| df["emotions"] = None |
|
|
| df["topics_list"] = df["topics"].apply(parse_topics) |
| df["is_escalation"] = compute_escalation_flag(df, self.escalation_sentiments) |
|
|
| |
| if "summary" in df.columns: |
| text = df["summary"].fillna("").astype(str) |
| df["summary_short"] = text.where(text.str.len() <= 120, text.str[:120] + "β¦") |
|
|
| return df |
|
|
| |
| |
| |
|
|
| @st.cache_data(ttl=86400) |
| def load_demographics_data(_self): |
| """Load user demographics keyed by email.""" |
| if not _self.demographics_query: |
| return pd.DataFrame() |
| try: |
| conn = SnowFlakeConn() |
| df = conn.run_read_query(_self.demographics_query, "HelpScout user demographics") |
| conn.close_connection() |
|
|
| if df is None or df.empty: |
| return pd.DataFrame() |
|
|
| return _self._process_demographics_df(df) |
| except Exception as e: |
| st.warning(f"Could not load HelpScout demographics: {e}") |
| return pd.DataFrame() |
|
|
| def _process_demographics_df(self, df): |
| df.columns = df.columns.str.lower() |
|
|
| if "birthday" in df.columns: |
| df["birthday"] = df["birthday"].astype(str) |
| df["birthday"] = pd.to_datetime(df["birthday"], errors="coerce", utc=True) |
| df["birthday"] = df["birthday"].dt.tz_localize(None) |
| df["age"] = df["birthday"].apply(self._calculate_age) |
| df["age_group"] = df["age"].apply(self._categorize_age) |
|
|
| if "timezone" in df.columns: |
| df["timezone_region"] = df["timezone"].apply(self._extract_timezone_region) |
|
|
| if "experience_level" in df.columns: |
| df["experience_group"] = df["experience_level"].apply(self._categorize_experience) |
|
|
| if "customer_email" in df.columns: |
| df = df[df["customer_email"].notna()] |
| df["customer_email"] = df["customer_email"].str.lower() |
|
|
| return df |
|
|
| def merge_demographics(self, df, demo_df): |
| """Merge demographic data with HelpScout conversations on customer_email.""" |
| if demo_df.empty or "customer_email" not in df.columns: |
| for col, val in [("age", None), ("age_group", "Unknown"), |
| ("timezone", None), ("timezone_region", "Unknown"), |
| ("experience_level", None), ("experience_group", "Unknown")]: |
| df[col] = val |
| return df |
|
|
| if "customer_email" not in demo_df.columns: |
| return df |
|
|
| merge_cols = ["customer_email"] |
| for c in ["age", "age_group", "timezone", "timezone_region", "experience_level", "experience_group"]: |
| if c in demo_df.columns: |
| merge_cols.append(c) |
|
|
| merged = df.merge(demo_df[merge_cols], on="customer_email", how="left") |
|
|
| for col in ["age_group", "timezone_region", "experience_group"]: |
| if col in merged.columns: |
| merged[col] = merged[col].fillna("Unknown") |
|
|
| return merged |
|
|
| |
| |
| |
|
|
| def get_filter_options(self, df): |
| """Return unique values for all in-page filters from the dashboard df.""" |
| topics_flat = df["topics_list"].explode().dropna().unique().tolist() if "topics_list" in df.columns else [] |
| return { |
| "sentiments": sorted(df["sentiment_polarity"].dropna().unique().tolist()), |
| "topics": sorted(t for t in topics_flat if t), |
| "statuses": sorted(df["status"].dropna().unique().tolist()), |
| "states": sorted(df["state"].dropna().unique().tolist()) if "state" in df.columns else [], |
| "sources": sorted(df["source_type"].dropna().unique().tolist()), |
| } |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _calculate_age(birthday): |
| if pd.isna(birthday): |
| return None |
| try: |
| age = relativedelta(datetime.now(), birthday).years |
| return age if 0 <= age <= 120 else None |
| except Exception: |
| return None |
|
|
| def _categorize_age(self, age): |
| if pd.isna(age) or age is None: |
| return "Unknown" |
| for group_name, (min_age, max_age) in self.demographics_config.get("age_groups", {}).items(): |
| if min_age <= age <= max_age: |
| return group_name |
| return "Unknown" |
|
|
| @staticmethod |
| def _extract_timezone_region(timezone): |
| if pd.isna(timezone) or not isinstance(timezone, str): |
| return "Unknown" |
| parts = timezone.split("/") |
| return parts[0] if parts else "Unknown" |
|
|
| def _categorize_experience(self, experience_level): |
| if pd.isna(experience_level): |
| return "Unknown" |
| try: |
| exp_level = float(experience_level) |
| except Exception: |
| return "Unknown" |
| for group_name, (min_exp, max_exp) in self.demographics_config.get("experience_groups", {}).items(): |
| if min_exp <= exp_level <= max_exp: |
| return group_name |
| return "Unknown" |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _sanitize(value: str) -> str: |
| return re.sub(r"['\";\\]", "", str(value)) |