Sentiment_analysis / visualization /data /helpscout_data_loader.py
Danialebrat's picture
Adding HelpScout to UI
58db664
"""
HelpScout data loader β€” mirrors SentimentDataLoader architecture.
Three loading modes:
- load_dashboard_data() : lightweight (no long text), cached 24 h
- load_analysis_data(...) : filtered with SUMMARY + notes, on-demand, cached 24 h
- load_demographics_data() : email-keyed user demographics, cached 24 h
"""
import re
import sys
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
import streamlit as st
from dateutil.relativedelta import relativedelta
root_dir = Path(__file__).resolve().parent.parent.parent
sys.path.append(str(root_dir))
from visualization.SnowFlakeConnection import SnowFlakeConn
from visualization.utils.helpscout_utils import (
load_topic_taxonomy, parse_topics, compute_escalation_flag
)
import json
class HelpScoutDataLoader:
"""
Loads HelpScout conversation features from Snowflake with caching.
"""
def __init__(self, config_path=None):
if config_path is None:
config_path = Path(__file__).parent.parent / "config" / "viz_config.json"
with open(config_path, "r") as f:
self.config = json.load(f)
self.hs_config = self.config.get("helpscout", {})
self.dashboard_query = self.hs_config.get("dashboard_query", "")
self.demographics_query = self.hs_config.get("demographics_query", "")
self.escalation_sentiments = self.hs_config.get("escalation_sentiments", ["negative", "very_negative"])
self.default_date_range_days = self.hs_config.get("default_date_range_days", 60)
self.max_summary_conversations = self.hs_config.get("max_summary_conversations", 300)
self.topic_colors = self.config.get("color_schemes_helpscout", {}).get("topics", {})
self.status_colors = self.config.get("color_schemes_helpscout", {}).get("status", {})
self.flag_colors = self.config.get("color_schemes_helpscout", {}).get("boolean_flags", {})
self.sentiment_colors = self.config.get("color_schemes", {}).get("sentiment_polarity", {})
self.demographics_config = self.config.get("demographics", {})
self.taxonomy = load_topic_taxonomy()
# ─────────────────────────────────────────────────────────────
# Dashboard data (lightweight, 24-hour cache)
# ─────────────────────────────────────────────────────────────
@st.cache_data(ttl=86400)
def load_dashboard_data(_self):
"""Load lightweight HelpScout dashboard data β€” no long-form text columns."""
try:
conn = SnowFlakeConn()
df = conn.run_read_query(_self.dashboard_query, "HelpScout dashboard data")
conn.close_connection()
if df is None or df.empty:
st.error("No HelpScout data returned from Snowflake")
return pd.DataFrame()
df = _self._process_dashboard_df(df)
if _self.demographics_query:
demo_df = _self.load_demographics_data()
if not demo_df.empty:
df = _self.merge_demographics(df, demo_df)
return df
except Exception as e:
st.error(f"Error loading HelpScout dashboard data: {e}")
return pd.DataFrame()
def _process_dashboard_df(self, df):
df.columns = df.columns.str.lower()
for ts_col in ("first_message_at", "last_message_at", "processed_at"):
if ts_col in df.columns:
df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce", utc=True).dt.tz_localize(None)
df["sentiment_polarity"] = df["sentiment_polarity"].fillna("unknown")
df["status"] = df["status"].fillna("unknown").str.lower()
df["state"] = df["state"].fillna("unknown").str.lower()
df["source_type"] = df["source_type"].fillna("unknown").str.lower()
for bool_col in ("is_refund_request", "is_cancellation", "is_membership"):
if bool_col in df.columns:
df[bool_col] = df[bool_col].fillna(False).astype(bool)
if "emotions" not in df.columns:
df["emotions"] = None
# topics_list for filter options
df["topics_list"] = df["topics"].apply(parse_topics)
# escalation flag
df["is_escalation"] = compute_escalation_flag(df, self.escalation_sentiments)
return df
# ─────────────────────────────────────────────────────────────
# Analysis page data (on-demand, 24-hour cache)
# ─────────────────────────────────────────────────────────────
def load_analysis_data(self, sentiments=None, topics=None,
refund_only=False, cancel_only=False,
membership_only=False, statuses=None,
sources=None, date_range=None, top_n=None):
"""
Load filtered HelpScout conversations with full text for the Analysis page.
Caches based on argument tuple.
"""
sentiments_key = tuple(sorted(sentiments)) if sentiments else ()
topics_key = tuple(sorted(topics)) if topics else ()
statuses_key = tuple(sorted(statuses)) if statuses else ()
sources_key = tuple(sorted(sources)) if sources else ()
date_key = (str(date_range[0]), str(date_range[1])) if date_range and len(date_range) == 2 else ()
return self._fetch_analysis_data(
sentiments_key, topics_key, bool(refund_only), bool(cancel_only),
bool(membership_only), statuses_key, sources_key, date_key, top_n or 0
)
@st.cache_data(ttl=86400)
def _fetch_analysis_data(_self, sentiments, topics, refund_only, cancel_only,
membership_only, statuses, sources, date_range, top_n):
"""Cached analysis data fetch β€” returns full-detail conversation df."""
try:
query = _self._build_analysis_query(
sentiments, topics, refund_only, cancel_only,
membership_only, statuses, sources, date_range, top_n
)
conn = SnowFlakeConn()
df = conn.run_read_query(query, "HelpScout analysis data")
conn.close_connection()
if df is None or df.empty:
return pd.DataFrame()
df = _self._process_analysis_df(df)
return df
except Exception as e:
st.error(f"Error loading HelpScout analysis data: {e}")
return pd.DataFrame()
def _build_analysis_query(self, sentiments, topics, refund_only, cancel_only,
membership_only, statuses, sources, date_range, top_n):
"""Build dynamic SQL for the analysis page with all filters pushed to Snowflake."""
where_clauses = []
if date_range and len(date_range) == 2:
where_clauses.append(f"FIRST_MESSAGE_AT >= '{date_range[0]}' AND FIRST_MESSAGE_AT <= '{date_range[1]}'")
if sentiments:
safe = "', '".join(self._sanitize(s) for s in sentiments)
where_clauses.append(f"SENTIMENT_POLARITY IN ('{safe}')")
if topics:
topic_conditions = []
for t in topics:
safe_t = self._sanitize(t)
topic_conditions.append(
f"ARRAY_CONTAINS('{safe_t}'::VARIANT, SPLIT(TOPICS, ','))"
)
where_clauses.append("(" + " OR ".join(topic_conditions) + ")")
if statuses:
safe = "', '".join(self._sanitize(s.lower()) for s in statuses)
where_clauses.append(f"LOWER(STATUS) IN ('{safe}')")
if sources:
safe = "', '".join(self._sanitize(s.lower()) for s in sources)
where_clauses.append(f"LOWER(SOURCE_TYPE) IN ('{safe}')")
if refund_only:
where_clauses.append("IS_REFUND_REQUEST = TRUE")
if cancel_only:
where_clauses.append("IS_CANCELLATION = TRUE")
if membership_only:
where_clauses.append("IS_MEMBERSHIP = TRUE")
where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else ""
limit_sql = f"LIMIT {int(top_n)}" if top_n and top_n > 0 else ""
return f"""
SELECT
CONVERSATION_ID,
LOWER(CUSTOMER_EMAIL) AS CUSTOMER_EMAIL,
CUSTOMER_FIRST,
CUSTOMER_LAST,
THREAD_COUNT,
FIRST_MESSAGE_AT,
LAST_MESSAGE_AT,
DURATION_HOURS,
STATUS,
STATE,
SOURCE_TYPE,
SOURCE_VIA,
SENTIMENT_POLARITY,
EMOTIONS,
SENTIMENT_CONFIDENCE,
SENTIMENT_NOTES,
TOPICS,
IS_REFUND_REQUEST,
IS_CANCELLATION,
IS_MEMBERSHIP,
TOPIC_CONFIDENCE,
TOPIC_NOTES,
SUMMARY,
PROCESSED_AT
FROM SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES
{where_sql}
ORDER BY FIRST_MESSAGE_AT DESC
{limit_sql}
"""
def _process_analysis_df(self, df):
df.columns = df.columns.str.lower()
for ts_col in ("first_message_at", "last_message_at", "processed_at"):
if ts_col in df.columns:
df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce", utc=True).dt.tz_localize(None)
df["sentiment_polarity"] = df["sentiment_polarity"].fillna("unknown")
df["status"] = df["status"].fillna("unknown").str.lower()
df["source_type"] = df["source_type"].fillna("unknown").str.lower()
for bool_col in ("is_refund_request", "is_cancellation", "is_membership"):
if bool_col in df.columns:
df[bool_col] = df[bool_col].fillna(False).astype(bool)
if "emotions" not in df.columns:
df["emotions"] = None
df["topics_list"] = df["topics"].apply(parse_topics)
df["is_escalation"] = compute_escalation_flag(df, self.escalation_sentiments)
# Short summary for cards (100 chars)
if "summary" in df.columns:
text = df["summary"].fillna("").astype(str)
df["summary_short"] = text.where(text.str.len() <= 120, text.str[:120] + "…")
return df
# ─────────────────────────────────────────────────────────────
# Demographics (email-keyed, 24-hour cache)
# ─────────────────────────────────────────────────────────────
@st.cache_data(ttl=86400)
def load_demographics_data(_self):
"""Load user demographics keyed by email."""
if not _self.demographics_query:
return pd.DataFrame()
try:
conn = SnowFlakeConn()
df = conn.run_read_query(_self.demographics_query, "HelpScout user demographics")
conn.close_connection()
if df is None or df.empty:
return pd.DataFrame()
return _self._process_demographics_df(df)
except Exception as e:
st.warning(f"Could not load HelpScout demographics: {e}")
return pd.DataFrame()
def _process_demographics_df(self, df):
df.columns = df.columns.str.lower()
if "birthday" in df.columns:
df["birthday"] = df["birthday"].astype(str)
df["birthday"] = pd.to_datetime(df["birthday"], errors="coerce", utc=True)
df["birthday"] = df["birthday"].dt.tz_localize(None)
df["age"] = df["birthday"].apply(self._calculate_age)
df["age_group"] = df["age"].apply(self._categorize_age)
if "timezone" in df.columns:
df["timezone_region"] = df["timezone"].apply(self._extract_timezone_region)
if "experience_level" in df.columns:
df["experience_group"] = df["experience_level"].apply(self._categorize_experience)
if "customer_email" in df.columns:
df = df[df["customer_email"].notna()]
df["customer_email"] = df["customer_email"].str.lower()
return df
def merge_demographics(self, df, demo_df):
"""Merge demographic data with HelpScout conversations on customer_email."""
if demo_df.empty or "customer_email" not in df.columns:
for col, val in [("age", None), ("age_group", "Unknown"),
("timezone", None), ("timezone_region", "Unknown"),
("experience_level", None), ("experience_group", "Unknown")]:
df[col] = val
return df
if "customer_email" not in demo_df.columns:
return df
merge_cols = ["customer_email"]
for c in ["age", "age_group", "timezone", "timezone_region", "experience_level", "experience_group"]:
if c in demo_df.columns:
merge_cols.append(c)
merged = df.merge(demo_df[merge_cols], on="customer_email", how="left")
for col in ["age_group", "timezone_region", "experience_group"]:
if col in merged.columns:
merged[col] = merged[col].fillna("Unknown")
return merged
# ─────────────────────────────────────────────────────────────
# Filter helpers
# ─────────────────────────────────────────────────────────────
def get_filter_options(self, df):
"""Return unique values for all in-page filters from the dashboard df."""
topics_flat = df["topics_list"].explode().dropna().unique().tolist() if "topics_list" in df.columns else []
return {
"sentiments": sorted(df["sentiment_polarity"].dropna().unique().tolist()),
"topics": sorted(t for t in topics_flat if t),
"statuses": sorted(df["status"].dropna().unique().tolist()),
"states": sorted(df["state"].dropna().unique().tolist()) if "state" in df.columns else [],
"sources": sorted(df["source_type"].dropna().unique().tolist()),
}
# ─────────────────────────────────────────────────────────────
# Demographics calculation helpers (mirrors SentimentDataLoader)
# ─────────────────────────────────────────────────────────────
@staticmethod
def _calculate_age(birthday):
if pd.isna(birthday):
return None
try:
age = relativedelta(datetime.now(), birthday).years
return age if 0 <= age <= 120 else None
except Exception:
return None
def _categorize_age(self, age):
if pd.isna(age) or age is None:
return "Unknown"
for group_name, (min_age, max_age) in self.demographics_config.get("age_groups", {}).items():
if min_age <= age <= max_age:
return group_name
return "Unknown"
@staticmethod
def _extract_timezone_region(timezone):
if pd.isna(timezone) or not isinstance(timezone, str):
return "Unknown"
parts = timezone.split("/")
return parts[0] if parts else "Unknown"
def _categorize_experience(self, experience_level):
if pd.isna(experience_level):
return "Unknown"
try:
exp_level = float(experience_level)
except Exception:
return "Unknown"
for group_name, (min_exp, max_exp) in self.demographics_config.get("experience_groups", {}).items():
if min_exp <= exp_level <= max_exp:
return group_name
return "Unknown"
# ─────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────
@staticmethod
def _sanitize(value: str) -> str:
return re.sub(r"['\";\\]", "", str(value))