| """ |
| Test script for Langfuse v3 observability integration. |
| |
| This script tests the observability module and ensures that: |
| 1. Observability can be initialized properly |
| 2. Root spans are created correctly |
| 3. Agent spans are nested properly |
| 4. Tool spans work as expected |
| """ |
|
|
| import asyncio |
| import os |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv("env.local") |
|
|
| def test_observability_initialization(): |
| """Test that observability can be initialized""" |
| print("π§ͺ Testing observability initialization...") |
| |
| from observability import initialize_observability, get_callback_handler |
| |
| |
| success = initialize_observability() |
| if success: |
| print("β
Observability initialized successfully") |
| else: |
| print("β Observability initialization failed") |
| return False |
| |
| |
| handler = get_callback_handler() |
| if handler: |
| print("β
Callback handler obtained successfully") |
| else: |
| print("β Failed to get callback handler") |
| return False |
| |
| return True |
|
|
| def test_span_creation(): |
| """Test that spans can be created properly""" |
| print("\nπ§ͺ Testing span creation...") |
| |
| from observability import start_root_span, agent_span, tool_span |
| |
| try: |
| |
| with start_root_span( |
| name="test-request", |
| user_id="test_user", |
| session_id="test_session", |
| metadata={"test": True} |
| ) as root_span: |
| print("β
Root span created successfully") |
| |
| |
| with agent_span("test_agent", metadata={"agent_type": "test"}) as agent_span_ctx: |
| print("β
Agent span created successfully") |
| |
| |
| with tool_span("test_tool", metadata={"tool_type": "test"}) as tool_span_ctx: |
| print("β
Tool span created successfully") |
| |
| print("β
All spans created and closed successfully") |
| return True |
| |
| except Exception as e: |
| print(f"β Span creation failed: {e}") |
| return False |
|
|
| async def test_agent_system_integration(): |
| """Test the full agent system with observability""" |
| print("\nπ§ͺ Testing agent system integration...") |
| |
| try: |
| from langgraph_agent_system import run_agent_system |
| |
| |
| result = await run_agent_system( |
| query="What is 2 + 2?", |
| user_id="test_user_integration", |
| session_id="test_session_integration", |
| max_iterations=1 |
| ) |
| |
| if result and isinstance(result, str): |
| print(f"β
Agent system ran successfully") |
| print(f"π Result: {result[:100]}...") |
| return True |
| else: |
| print("β Agent system returned invalid result") |
| return False |
| |
| except Exception as e: |
| print(f"β Agent system integration test failed: {e}") |
| return False |
|
|
| def test_flush_and_cleanup(): |
| """Test flushing and cleanup functions""" |
| print("\nπ§ͺ Testing flush and cleanup...") |
| |
| try: |
| from observability import flush_traces, shutdown_observability |
| |
| |
| flush_traces(background=False) |
| print("β
Traces flushed successfully") |
| |
| |
| shutdown_observability() |
| print("β
Observability shutdown successfully") |
| |
| return True |
| |
| except Exception as e: |
| print(f"β Flush and cleanup test failed: {e}") |
| return False |
|
|
| async def main(): |
| """Run all tests""" |
| print("π Starting Langfuse v3 observability tests...\n") |
| |
| tests = [ |
| ("Observability Initialization", test_observability_initialization), |
| ("Span Creation", test_span_creation), |
| ("Agent System Integration", test_agent_system_integration), |
| ("Flush and Cleanup", test_flush_and_cleanup) |
| ] |
| |
| results = [] |
| |
| for test_name, test_func in tests: |
| print(f"\n{'='*50}") |
| print(f"Running: {test_name}") |
| print(f"{'='*50}") |
| |
| try: |
| if asyncio.iscoroutinefunction(test_func): |
| result = await test_func() |
| else: |
| result = test_func() |
| results.append((test_name, result)) |
| except Exception as e: |
| print(f"β Test {test_name} failed with exception: {e}") |
| results.append((test_name, False)) |
| |
| |
| print(f"\n{'='*50}") |
| print("TEST RESULTS SUMMARY") |
| print(f"{'='*50}") |
| |
| passed = 0 |
| total = len(results) |
| |
| for test_name, result in results: |
| status = "β
PASSED" if result else "β FAILED" |
| print(f"{test_name}: {status}") |
| if result: |
| passed += 1 |
| |
| print(f"\nOverall: {passed}/{total} tests passed") |
| |
| if passed == total: |
| print("π All tests passed! Langfuse v3 observability is working correctly.") |
| else: |
| print("β οΈ Some tests failed. Check the output above for details.") |
| |
| |
| print(f"\n{'='*50}") |
| print("ENVIRONMENT CHECK") |
| print(f"{'='*50}") |
| |
| required_env_vars = [ |
| "LANGFUSE_PUBLIC_KEY", |
| "LANGFUSE_SECRET_KEY", |
| "LANGFUSE_HOST" |
| ] |
| |
| for var in required_env_vars: |
| value = os.getenv(var) |
| if value: |
| print(f"β
{var}: {'*' * min(len(value), 10)}...") |
| else: |
| print(f"β {var}: Not set") |
| |
| print(f"\nπ If tests passed, check your Langfuse dashboard at: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}") |
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |