| """ |
| Main execution script for comment processing workflow. |
| Orchestrates data fetching, processing, and storage using agentic workflow. |
| Supports parallel processing with multiprocessing for improved performance. |
| Supports multiple data sources (social media and Musora internal comments). |
| """ |
|
|
| import json |
| import os |
| import logging |
| import argparse |
| from datetime import datetime |
| import pandas as pd |
| from dotenv import load_dotenv |
| from multiprocessing import Pool, cpu_count, Manager |
| from functools import partial |
| import traceback |
| from typing import Dict, Any, List |
|
|
| from SnowFlakeConnection import SnowFlakeConn |
| from workflow.comment_processor import CommentProcessingWorkflow |
|
|
| |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
| |
| ROOT_DIR = os.path.dirname(SCRIPT_DIR) |
| load_dotenv(os.path.join(ROOT_DIR, '.env')) |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler(os.path.join(SCRIPT_DIR, 'logs', f'comment_processing_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')), |
| logging.StreamHandler() |
| ] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def calculate_optimal_batch_size(total_comments: int, num_workers: int, min_batch: int = 20, max_batch: int = 100) -> int: |
| """ |
| Calculate optimal batch size based on total comments and number of workers. |
| |
| Args: |
| total_comments: Total number of comments to process |
| num_workers: Number of parallel workers |
| min_batch: Minimum batch size (default: 20) |
| max_batch: Maximum batch size (default: 1000) |
| |
| Returns: |
| Optimal batch size |
| """ |
| if total_comments <= min_batch: |
| return total_comments |
|
|
| |
| batch_size = total_comments // num_workers |
|
|
| |
| batch_size = max(min_batch, min(max_batch, batch_size)) |
|
|
| return batch_size |
|
|
|
|
| def process_batch_worker(batch_data: tuple) -> dict: |
| """ |
| Worker function to process a single batch of comments. |
| This function runs in a separate process. |
| |
| Args: |
| batch_data: Tuple containing (batch_num, batch_comments, config, api_key, overwrite_first_batch, data_source_config) |
| |
| Returns: |
| Dictionary with batch statistics and results |
| """ |
| batch_num, batch_comments, config, api_key, overwrite_first_batch, data_source_config = batch_data |
|
|
| |
| worker_logger = logging.getLogger(f"Worker-{batch_num}") |
|
|
| try: |
| worker_logger.info(f"Batch {batch_num}: Starting processing of {len(batch_comments)} comments") |
|
|
| |
| snowflake = SnowFlakeConn() |
|
|
| |
| workflow = CommentProcessingWorkflow(config, api_key) |
|
|
| |
| results = workflow.process_batch(batch_comments) |
|
|
| |
| results_df = pd.DataFrame(results) |
|
|
| |
| initial_count = len(results_df) |
| df_successful = results_df[results_df['success'] == True].copy() |
| filtered_count = initial_count - len(df_successful) |
|
|
| worker_logger.info(f"Batch {batch_num}: Processed {initial_count} comments, {len(df_successful)} successful") |
|
|
| |
| output_columns = { |
| 'comment_sk': 'COMMENT_SK', |
| 'comment_id': 'COMMENT_ID', |
| 'comment_text': 'ORIGINAL_TEXT', |
| 'platform': 'PLATFORM', |
| 'comment_timestamp': 'COMMENT_TIMESTAMP', |
| 'author_name': 'AUTHOR_NAME', |
| 'author_id': 'AUTHOR_ID', |
| 'parent_comment_id': 'PARENT_COMMENT_ID', |
| 'parent_comment_text': 'PARENT_COMMENT_TEXT', |
| 'content_sk': 'CONTENT_SK', |
| 'content_id': 'CONTENT_ID', |
| 'content_description': 'CONTENT_DESCRIPTION', |
| 'channel_sk': 'CHANNEL_SK', |
| 'channel_name': 'CHANNEL_NAME', |
| 'channel_display_name': 'CHANNEL_DISPLAY_NAME', |
| 'language': 'DETECTED_LANGUAGE', |
| 'language_code': 'LANGUAGE_CODE', |
| 'is_english': 'IS_ENGLISH', |
| 'language_confidence': 'LANGUAGE_CONFIDENCE', |
| 'detection_method': 'DETECTION_METHOD', |
| 'has_text': 'HAS_TEXT', |
| 'translated_text': 'TRANSLATED_TEXT', |
| 'translation_performed': 'TRANSLATION_PERFORMED', |
| 'translation_confidence': 'TRANSLATION_CONFIDENCE', |
| 'translation_notes': 'TRANSLATION_NOTES', |
| 'sentiment_polarity': 'SENTIMENT_POLARITY', |
| 'intent': 'INTENT', |
| 'requires_reply': 'REQUIRES_REPLY', |
| 'sentiment_confidence': 'SENTIMENT_CONFIDENCE', |
| 'analysis_notes': 'ANALYSIS_NOTES', |
| 'success': 'PROCESSING_SUCCESS' |
| } |
|
|
| |
| if 'additional_fields' in data_source_config: |
| for field in data_source_config['additional_fields']: |
| field_lower = field.lower() |
| output_columns[field_lower] = field |
| worker_logger.debug(f"Batch {batch_num}: Added {len(data_source_config['additional_fields'])} additional fields") |
|
|
| output_df = pd.DataFrame() |
| for source_col, target_col in output_columns.items(): |
| if source_col in df_successful.columns: |
| output_df[target_col] = df_successful[source_col] |
| else: |
| output_df[target_col] = None |
| |
| if source_col in ['permalink_url', 'thumbnail_url']: |
| worker_logger.warning(f"Batch {batch_num}: Column '{source_col}' not found in DataFrame. Available columns: {list(df_successful.columns)}") |
|
|
| |
| output_df['PROCESSED_AT'] = datetime.now() |
| output_df['WORKFLOW_VERSION'] = '1.0' |
|
|
| |
| if len(output_df) > 0: |
| |
| table_name = data_source_config['output_config']['table_name'] |
| database = data_source_config['output_config']['database'] |
| schema = data_source_config['output_config']['schema'] |
|
|
| |
| overwrite = overwrite_first_batch and batch_num == 1 |
|
|
| snowflake.store_df_to_snowflake( |
| table_name=table_name, |
| dataframe=output_df, |
| database=database, |
| schema=schema, |
| overwrite=overwrite |
| ) |
|
|
| worker_logger.info(f"Batch {batch_num}: Stored {len(output_df)} records to Snowflake ({table_name})") |
| else: |
| worker_logger.warning(f"Batch {batch_num}: No successful records to store") |
|
|
| |
| snowflake.close_connection() |
|
|
| |
| translations = output_df['TRANSLATION_PERFORMED'].sum() if 'TRANSLATION_PERFORMED' in output_df.columns else 0 |
| non_english = (~output_df['IS_ENGLISH']).sum() if 'IS_ENGLISH' in output_df.columns else 0 |
| requires_reply = output_df['REQUIRES_REPLY'].sum() if 'REQUIRES_REPLY' in output_df.columns else 0 |
|
|
| return { |
| 'batch_num': batch_num, |
| 'success': True, |
| 'total_processed': initial_count, |
| 'total_stored': len(output_df), |
| 'failed_count': filtered_count, |
| 'translations': int(translations), |
| 'non_english': int(non_english), |
| 'requires_reply': int(requires_reply), |
| 'error': None |
| } |
|
|
| except Exception as e: |
| error_msg = f"Batch {batch_num} failed: {str(e)}" |
| worker_logger.error(error_msg) |
| worker_logger.error(traceback.format_exc()) |
|
|
| return { |
| 'batch_num': batch_num, |
| 'success': False, |
| 'total_processed': len(batch_comments), |
| 'total_stored': 0, |
| 'failed_count': len(batch_comments), |
| 'translations': 0, |
| 'non_english': 0, |
| 'requires_reply': 0, |
| 'error': error_msg |
| } |
|
|
|
|
| class CommentProcessor: |
| """ |
| Main processor class that orchestrates the entire workflow. |
| Supports multiple data sources (social media and Musora internal comments). |
| """ |
|
|
| def __init__(self, config_path: str = None, data_sources_config_path: str = None): |
| """ |
| Initialize the comment processor. |
| |
| Args: |
| config_path: Path to configuration file (default: config_files/sentiment_config.json relative to script) |
| data_sources_config_path: Path to data sources config (default: config_files/data_sources_config.json) |
| """ |
| |
| if config_path is None: |
| config_path = os.path.join(SCRIPT_DIR, 'config_files', 'sentiment_config.json') |
|
|
| if data_sources_config_path is None: |
| data_sources_config_path = os.path.join(SCRIPT_DIR, 'config_files', 'data_sources_config.json') |
|
|
| |
| with open(config_path, 'r') as f: |
| self.config = json.load(f) |
|
|
| |
| with open(data_sources_config_path, 'r') as f: |
| self.data_sources_config = json.load(f) |
|
|
| |
| self.snowflake = SnowFlakeConn() |
|
|
| |
| self.api_key = os.getenv("OPENAI_API_KEY") |
| if not self.api_key: |
| raise ValueError("OPENAI_API_KEY not found in environment variables") |
|
|
| |
| self.workflow = CommentProcessingWorkflow(self.config, self.api_key) |
|
|
| logger.info("CommentProcessor initialized successfully") |
|
|
| def get_enabled_data_sources(self) -> List[Dict[str, Any]]: |
| """ |
| Get list of enabled data sources from configuration. |
| |
| Returns: |
| List of enabled data source configurations |
| """ |
| enabled_sources = [] |
| for source_key, source_config in self.data_sources_config['data_sources'].items(): |
| if source_config.get('enabled', True): |
| enabled_sources.append({ |
| 'key': source_key, |
| 'config': source_config |
| }) |
| return enabled_sources |
|
|
| def fetch_comments(self, data_source_key: str, limit: int = None) -> pd.DataFrame: |
| """ |
| Fetch comments from Snowflake using the SQL query for a specific data source. |
| |
| Args: |
| data_source_key: Key identifying the data source (e.g., 'social_media', 'musora_comments') |
| limit: Optional limit on number of comments to fetch |
| |
| Returns: |
| DataFrame containing comment data |
| """ |
| data_source_config = self.data_sources_config['data_sources'][data_source_key] |
| source_name = data_source_config['name'] |
|
|
| logger.info(f"Fetching comments from {source_name}...") |
|
|
| |
| sql_file = data_source_config['sql_query_file'] |
| sql_path = os.path.join(SCRIPT_DIR, sql_file) |
| with open(sql_path, 'r') as f: |
| query = f.read() |
|
|
| |
| if limit: |
| query = query.rstrip(';') + f"\nLIMIT {limit};" |
|
|
| |
| df = self.snowflake.run_read_query(query, f"{source_name} comments") |
|
|
| logger.info(f"Fetched {len(df)} comments from {source_name}") |
|
|
| |
| df.columns = df.columns.str.lower() |
|
|
| |
| if 'comment_text' in df.columns: |
| initial_count = len(df) |
| df = df[df['comment_text'].notna() & (df['comment_text'].str.strip() != '')] |
| filtered_count = initial_count - len(df) |
| if filtered_count > 0: |
| logger.info(f"Filtered out {filtered_count} empty comments in post-processing") |
|
|
| logger.info(f"Final count: {len(df)} non-empty comments") |
| return df |
|
|
| def calculate_num_workers(self) -> int: |
| """ |
| Calculate the number of parallel workers to use. |
| Uses CPU count - 2, with a maximum of 5 workers. |
| |
| Returns: |
| Number of workers |
| """ |
| num_cpus = cpu_count() |
| num_workers = max(1, min(5, num_cpus - 2)) |
| logger.info(f"Using {num_workers} parallel workers (CPU count: {num_cpus})") |
| return num_workers |
|
|
| def process_comments_parallel(self, df: pd.DataFrame, data_source_config: Dict[str, Any], overwrite: bool = False) -> dict: |
| """ |
| Process comments through the agentic workflow using parallel processing. |
| |
| Args: |
| df: DataFrame containing raw comment data |
| data_source_config: Configuration for the data source being processed |
| overwrite: Whether to overwrite existing Snowflake table |
| |
| Returns: |
| Dictionary with aggregated statistics |
| """ |
| |
| comments = df.to_dict('records') |
| total_comments = len(comments) |
|
|
| logger.info(f"Processing {total_comments} comments using parallel processing...") |
|
|
| |
| num_workers = self.calculate_num_workers() |
|
|
| |
| batch_size = calculate_optimal_batch_size(total_comments, num_workers) |
| logger.info(f"Batch size: {batch_size} (min: 20, max: 100)") |
|
|
| |
| batches = [] |
| for i in range(0, total_comments, batch_size): |
| batch = comments[i:i + batch_size] |
| batch_num = (i // batch_size) + 1 |
| batches.append((batch_num, batch, self.config, self.api_key, overwrite, data_source_config)) |
|
|
| total_batches = len(batches) |
| logger.info(f"Split into {total_batches} batches") |
|
|
| |
| with Pool(processes=num_workers) as pool: |
| results = pool.map(process_batch_worker, batches) |
|
|
| |
| total_processed = sum(r['total_processed'] for r in results) |
| total_stored = sum(r['total_stored'] for r in results) |
| failed_count = sum(r['failed_count'] for r in results) |
| translations = sum(r['translations'] for r in results) |
| non_english = sum(r['non_english'] for r in results) |
| requires_reply = sum(r['requires_reply'] for r in results) |
|
|
| |
| failed_batches = [r for r in results if not r['success']] |
| if failed_batches: |
| logger.error(f"{len(failed_batches)} batch(es) failed:") |
| for fb in failed_batches: |
| logger.error(f" Batch {fb['batch_num']}: {fb['error']}") |
|
|
| return { |
| 'total_processed': total_processed, |
| 'total_stored': total_stored, |
| 'failed_count': failed_count, |
| 'translations': translations, |
| 'non_english': non_english, |
| 'requires_reply': requires_reply, |
| 'failed_batches': len(failed_batches) |
| } |
|
|
| def process_comments_sequential(self, df: pd.DataFrame, data_source_config: Dict[str, Any], overwrite: bool = False) -> dict: |
| """ |
| Process comments through the agentic workflow sequentially (for debugging). |
| |
| Args: |
| df: DataFrame containing raw comment data |
| data_source_config: Configuration for the data source being processed |
| overwrite: Whether to overwrite existing Snowflake table |
| |
| Returns: |
| Dictionary with aggregated statistics |
| """ |
| logger.info(f"Processing {len(df)} comments using sequential processing (debug mode)...") |
|
|
| |
| comments = df.to_dict('records') |
|
|
| |
| batch_data = (1, comments, self.config, self.api_key, overwrite, data_source_config) |
| result = process_batch_worker(batch_data) |
|
|
| return { |
| 'total_processed': result['total_processed'], |
| 'total_stored': result['total_stored'], |
| 'failed_count': result['failed_count'], |
| 'translations': result['translations'], |
| 'non_english': result['non_english'], |
| 'requires_reply': result['requires_reply'], |
| 'failed_batches': 0 if result['success'] else 1 |
| } |
|
|
| def run(self, limit: int = None, overwrite: bool = False, sequential: bool = False, data_source_filter: str = None): |
| """ |
| Run the complete processing pipeline for all enabled data sources. |
| |
| Args: |
| limit: Optional limit on number of comments to process per data source |
| overwrite: Whether to overwrite existing Snowflake table |
| sequential: If True, use sequential processing instead of parallel (for debugging) |
| data_source_filter: Optional filter to process only a specific data source |
| """ |
| try: |
| logger.info("=" * 80) |
| logger.info("Starting Comment Processing Workflow") |
| if sequential: |
| logger.info("Mode: SEQUENTIAL (Debug Mode)") |
| else: |
| logger.info("Mode: PARALLEL") |
| logger.info("=" * 80) |
|
|
| |
| enabled_sources = self.get_enabled_data_sources() |
|
|
| if data_source_filter: |
| enabled_sources = [s for s in enabled_sources if s['key'] == data_source_filter] |
| if not enabled_sources: |
| logger.error(f"Data source '{data_source_filter}' not found or not enabled") |
| return |
|
|
| logger.info(f"Processing {len(enabled_sources)} data source(s)") |
|
|
| |
| for source_info in enabled_sources: |
| source_key = source_info['key'] |
| source_config = source_info['config'] |
| source_name = source_config['name'] |
|
|
| logger.info("=" * 80) |
| logger.info(f"Processing Data Source: {source_name}") |
| logger.info("=" * 80) |
|
|
| |
| df_comments = self.fetch_comments(data_source_key=source_key, limit=limit) |
|
|
| if df_comments.empty: |
| logger.warning(f"No comments to process from {source_name}") |
| continue |
|
|
| |
| start_time = datetime.now() |
|
|
| if sequential: |
| stats = self.process_comments_sequential(df_comments, source_config, overwrite=overwrite) |
| else: |
| stats = self.process_comments_parallel(df_comments, source_config, overwrite=overwrite) |
|
|
| end_time = datetime.now() |
| processing_time = (end_time - start_time).total_seconds() |
|
|
| |
| logger.info("=" * 80) |
| logger.info(f"Processing Summary for {source_name}:") |
| logger.info(f" Processing Mode: {'Sequential' if sequential else 'Parallel'}") |
| logger.info(f" Output Table: {source_config['output_config']['table_name']}") |
| logger.info(f" Total comments processed: {stats['total_processed']}") |
| logger.info(f" Successfully stored: {stats['total_stored']}") |
| logger.info(f" Failed sentiment analysis (not stored): {stats['failed_count']}") |
| if stats.get('failed_batches', 0) > 0: |
| logger.info(f" Failed batches: {stats['failed_batches']}") |
| logger.info(f" Non-English comments: {stats['non_english']}") |
| logger.info(f" Translations performed: {stats['translations']}") |
| logger.info(f" Comments requiring reply: {stats['requires_reply']}") |
| logger.info(f" Processing time: {processing_time:.2f} seconds") |
| logger.info(f" Average time per comment: {processing_time / stats['total_processed']:.2f} seconds") |
| logger.info("=" * 80) |
|
|
| except Exception as e: |
| logger.error(f"Error in workflow execution: {str(e)}", exc_info=True) |
| raise |
|
|
| finally: |
| |
| self.snowflake.close_connection() |
| logger.info("Snowflake connection closed") |
|
|
|
|
| def main(): |
| """ |
| Main entry point for the script. |
| """ |
| parser = argparse.ArgumentParser( |
| description="Process comments with language detection, translation, and sentiment analysis from multiple data sources" |
| ) |
| parser.add_argument( |
| '--limit', |
| type=int, |
| default=5000, |
| help='Limit number of comments to process per data source (default: 10000)' |
| ) |
| parser.add_argument( |
| '--overwrite', |
| action='store_true', |
| default=False, |
| help='Overwrite existing Snowflake table (default: False, appends new records)' |
| ) |
| parser.add_argument( |
| '--config', |
| type=str, |
| default=None, |
| help='Path to configuration file (default: config_files/sentiment_config.json relative to script)' |
| ) |
| parser.add_argument( |
| '--sequential', |
| action='store_true', |
| default=False, |
| help='Use sequential processing instead of parallel (for debugging)' |
| ) |
| parser.add_argument( |
| '--data-source', |
| type=str, |
| default=None, |
| help='Process only a specific data source (e.g., social_media, musora_comments). If not specified, all enabled sources are processed.' |
| ) |
|
|
| args = parser.parse_args() |
|
|
| |
| logs_dir = os.path.join(SCRIPT_DIR, 'logs') |
| os.makedirs(logs_dir, exist_ok=True) |
|
|
| |
| processor = CommentProcessor(config_path=args.config) |
| processor.run( |
| limit=args.limit, |
| overwrite=args.overwrite, |
| sequential=args.sequential, |
| data_source_filter=args.data_source |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |