| """ |
| SPARKNET API Integration Tests - Phase 1B |
| |
| Comprehensive test suite for REST API endpoints: |
| - Document API (/api/documents) |
| - RAG API (/api/rag) |
| - Auth API (/api/auth) |
| - Health/Status endpoints |
| |
| Uses FastAPI TestClient for synchronous testing without running the server. |
| """ |
|
|
| import pytest |
| import json |
| import io |
| import os |
| import sys |
| from pathlib import Path |
| from typing import Dict, Any, Optional |
| from unittest.mock import patch, MagicMock, AsyncMock |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent)) |
|
|
| from fastapi.testclient import TestClient |
|
|
|
|
| |
| |
| |
|
|
| @pytest.fixture(scope="module") |
| def mock_components(): |
| """Mock SPARKNET components for testing.""" |
| |
| mock_embeddings = MagicMock() |
| mock_embeddings.embed_documents = MagicMock(return_value=[[0.1] * 1024]) |
| mock_embeddings.embed_query = MagicMock(return_value=[0.1] * 1024) |
|
|
| mock_store = MagicMock() |
| mock_store._collection = MagicMock() |
| mock_store._collection.count = MagicMock(return_value=100) |
| mock_store.search = MagicMock(return_value=[]) |
| mock_store.add_documents = MagicMock(return_value=["doc_1"]) |
|
|
| mock_llm_client = MagicMock() |
| mock_llm_client.generate = MagicMock(return_value="Mock response") |
| mock_llm_client.get_llm = MagicMock(return_value=MagicMock()) |
|
|
| mock_workflow = MagicMock() |
| mock_workflow.run = AsyncMock(return_value={ |
| "response": "Test response", |
| "sources": [], |
| "confidence": 0.9 |
| }) |
|
|
| return { |
| "embeddings": mock_embeddings, |
| "store": mock_store, |
| "llm_client": mock_llm_client, |
| "workflow": mock_workflow, |
| } |
|
|
|
|
| @pytest.fixture(scope="module") |
| def client(mock_components): |
| """Create TestClient with mocked dependencies.""" |
| |
| with patch.dict("api.main.app_state", { |
| "start_time": 1000000, |
| "embeddings": mock_components["embeddings"], |
| "store": mock_components["store"], |
| "llm_client": mock_components["llm_client"], |
| "workflow": mock_components["workflow"], |
| "rag_ready": True, |
| "workflows": {}, |
| "patents": {}, |
| "planner": MagicMock(), |
| "critic": MagicMock(), |
| "memory": MagicMock(), |
| "vision_ocr": None, |
| }): |
| from api.main import app |
| with TestClient(app) as test_client: |
| yield test_client |
|
|
|
|
| @pytest.fixture |
| def auth_headers(client) -> Dict[str, str]: |
| """Get authentication headers with valid token.""" |
| |
| response = client.post( |
| "/api/auth/token", |
| data={"username": "admin", "password": "admin123"} |
| ) |
|
|
| if response.status_code == 200: |
| token = response.json()["access_token"] |
| return {"Authorization": f"Bearer {token}"} |
|
|
| |
| return {} |
|
|
|
|
| @pytest.fixture |
| def sample_pdf_file(): |
| """Create a sample PDF file for upload tests.""" |
| |
| pdf_content = b"""%PDF-1.4 |
| 1 0 obj << /Type /Catalog /Pages 2 0 R >> endobj |
| 2 0 obj << /Type /Pages /Kids [3 0 R] /Count 1 >> endobj |
| 3 0 obj << /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] /Contents 4 0 R >> endobj |
| 4 0 obj << /Length 44 >> stream |
| BT /F1 12 Tf 100 700 Td (Test Document) Tj ET |
| endstream endobj |
| xref |
| 0 5 |
| 0000000000 65535 f |
| 0000000009 00000 n |
| 0000000058 00000 n |
| 0000000115 00000 n |
| 0000000214 00000 n |
| trailer << /Size 5 /Root 1 0 R >> |
| startxref |
| 306 |
| %%EOF""" |
| return io.BytesIO(pdf_content) |
|
|
|
|
| @pytest.fixture |
| def sample_text_file(): |
| """Create a sample text file for upload tests.""" |
| content = b"""SPARKNET Test Document |
| |
| This is a sample document for testing the document processing pipeline. |
| |
| ## Section 1: Introduction |
| The SPARKNET framework provides AI-powered document intelligence. |
| |
| ## Section 2: Features |
| - Multi-agent RAG pipeline |
| - Table extraction |
| - Evidence grounding |
| |
| ## Section 3: Conclusion |
| This document tests the upload and processing functionality. |
| """ |
| return io.BytesIO(content) |
|
|
|
|
| |
| |
| |
|
|
| class TestHealthEndpoints: |
| """Test health and status endpoints.""" |
|
|
| def test_root_endpoint(self, client): |
| """Test root endpoint returns service info.""" |
| response = client.get("/") |
| assert response.status_code == 200 |
|
|
| data = response.json() |
| assert data["status"] == "operational" |
| assert data["service"] == "SPARKNET API" |
| assert "version" in data |
|
|
| def test_health_endpoint(self, client): |
| """Test health endpoint returns component status.""" |
| response = client.get("/api/health") |
| assert response.status_code == 200 |
|
|
| data = response.json() |
| assert "status" in data |
| assert "components" in data |
| assert "statistics" in data |
| assert "uptime_seconds" in data |
|
|
| |
| components = data["components"] |
| expected_keys = ["rag", "embeddings", "vector_store", "llm_client"] |
| for key in expected_keys: |
| assert key in components |
|
|
| def test_status_endpoint(self, client): |
| """Test status endpoint returns comprehensive info.""" |
| response = client.get("/api/status") |
| assert response.status_code == 200 |
|
|
| data = response.json() |
| assert data["status"] == "operational" |
| assert "statistics" in data |
| assert "models" in data |
|
|
|
|
| |
| |
| |
|
|
| class TestAuthEndpoints: |
| """Test authentication endpoints.""" |
|
|
| def test_get_token_valid_credentials(self, client): |
| """Test token generation with valid credentials.""" |
| response = client.post( |
| "/api/auth/token", |
| data={"username": "admin", "password": "admin123"} |
| ) |
|
|
| |
| if response.status_code == 200: |
| data = response.json() |
| assert "access_token" in data |
| assert data["token_type"] == "bearer" |
|
|
| def test_get_token_invalid_credentials(self, client): |
| """Test token generation fails with invalid credentials.""" |
| response = client.post( |
| "/api/auth/token", |
| data={"username": "invalid", "password": "wrong"} |
| ) |
| assert response.status_code in [401, 500] |
|
|
| def test_get_current_user(self, client, auth_headers): |
| """Test getting current user info.""" |
| if not auth_headers: |
| pytest.skip("Auth not available") |
|
|
| response = client.get("/api/auth/me", headers=auth_headers) |
| assert response.status_code == 200 |
|
|
| data = response.json() |
| assert "username" in data |
|
|
| def test_protected_endpoint_without_token(self, client): |
| """Test that protected endpoints require authentication.""" |
| response = client.get("/api/auth/me") |
| assert response.status_code == 401 |
|
|
|
|
| |
| |
| |
|
|
| class TestDocumentEndpoints: |
| """Test document management endpoints.""" |
|
|
| def test_list_documents_empty(self, client): |
| """Test listing documents when none exist.""" |
| response = client.get("/api/documents") |
| assert response.status_code == 200 |
|
|
| data = response.json() |
| assert isinstance(data, list) |
|
|
| def test_upload_text_document(self, client, sample_text_file): |
| """Test uploading a text document.""" |
| response = client.post( |
| "/api/documents/upload", |
| files={"file": ("test.txt", sample_text_file, "text/plain")} |
| ) |
|
|
| assert response.status_code == 200 |
| data = response.json() |
|
|
| assert "document_id" in data |
| assert data["filename"] == "test.txt" |
| assert data["status"] in ["uploaded", "processing", "processed"] |
|
|
| def test_upload_pdf_document(self, client, sample_pdf_file): |
| """Test uploading a PDF document.""" |
| response = client.post( |
| "/api/documents/upload", |
| files={"file": ("test.pdf", sample_pdf_file, "application/pdf")} |
| ) |
|
|
| assert response.status_code == 200 |
| data = response.json() |
|
|
| assert "document_id" in data |
| assert data["filename"] == "test.pdf" |
|
|
| def test_upload_unsupported_format(self, client): |
| """Test uploading unsupported file format is rejected.""" |
| fake_file = io.BytesIO(b"fake executable content") |
|
|
| response = client.post( |
| "/api/documents/upload", |
| files={"file": ("test.exe", fake_file, "application/octet-stream")} |
| ) |
|
|
| |
| assert response.status_code in [400, 415] |
|
|
| def test_get_document_not_found(self, client): |
| """Test getting non-existent document returns 404.""" |
| response = client.get("/api/documents/nonexistent_id") |
| assert response.status_code == 404 |
|
|
| def test_document_workflow(self, client, sample_text_file): |
| """Test complete document workflow: upload -> process -> index.""" |
| |
| upload_response = client.post( |
| "/api/documents/upload", |
| files={"file": ("workflow_test.txt", sample_text_file, "text/plain")} |
| ) |
| assert upload_response.status_code == 200 |
| doc_id = upload_response.json()["document_id"] |
|
|
| |
| detail_response = client.get(f"/api/documents/{doc_id}/detail") |
| assert detail_response.status_code == 200 |
|
|
| |
| chunks_response = client.get(f"/api/documents/{doc_id}/chunks") |
| assert chunks_response.status_code == 200 |
|
|
| |
| index_response = client.post(f"/api/documents/{doc_id}/index") |
| |
| assert index_response.status_code in [200, 400, 422] |
|
|
| |
| delete_response = client.delete(f"/api/documents/{doc_id}") |
| assert delete_response.status_code == 200 |
|
|
|
|
| |
| |
| |
|
|
| class TestRAGEndpoints: |
| """Test RAG query and search endpoints.""" |
|
|
| def test_rag_query_basic(self, client): |
| """Test basic RAG query endpoint.""" |
| response = client.post( |
| "/api/rag/query", |
| json={ |
| "query": "What is SPARKNET?", |
| "max_sources": 5 |
| } |
| ) |
|
|
| |
| assert response.status_code in [200, 500, 503] |
|
|
| if response.status_code == 200: |
| data = response.json() |
| assert "response" in data or "error" in data |
|
|
| def test_rag_query_with_filters(self, client): |
| """Test RAG query with document filters.""" |
| response = client.post( |
| "/api/rag/query", |
| json={ |
| "query": "Test query", |
| "document_ids": ["doc_1", "doc_2"], |
| "max_sources": 3, |
| "min_confidence": 0.5 |
| } |
| ) |
|
|
| assert response.status_code in [200, 500, 503] |
|
|
| def test_rag_search_semantic(self, client): |
| """Test semantic search without synthesis.""" |
| response = client.post( |
| "/api/rag/search", |
| json={ |
| "query": "document processing", |
| "top_k": 10 |
| } |
| ) |
|
|
| assert response.status_code in [200, 500, 503] |
|
|
| if response.status_code == 200: |
| data = response.json() |
| assert "results" in data or "error" in data |
|
|
| def test_rag_store_status(self, client): |
| """Test getting vector store status.""" |
| response = client.get("/api/rag/store/status") |
|
|
| assert response.status_code in [200, 500] |
|
|
| if response.status_code == 200: |
| data = response.json() |
| assert "status" in data |
|
|
| def test_rag_cache_stats(self, client): |
| """Test getting cache statistics.""" |
| response = client.get("/api/rag/cache/stats") |
|
|
| assert response.status_code in [200, 404, 500] |
|
|
| def test_rag_query_empty_query(self, client): |
| """Test that empty query is rejected.""" |
| response = client.post( |
| "/api/rag/query", |
| json={"query": ""} |
| ) |
|
|
| |
| assert response.status_code == 422 |
|
|
|
|
| |
| |
| |
|
|
| class TestDocumentProcessing: |
| """Test document processing functionality.""" |
|
|
| def test_process_document_endpoint(self, client, sample_text_file): |
| """Test triggering document processing.""" |
| |
| upload_response = client.post( |
| "/api/documents/upload", |
| files={"file": ("process_test.txt", sample_text_file, "text/plain")} |
| ) |
|
|
| if upload_response.status_code != 200: |
| pytest.skip("Upload failed") |
|
|
| doc_id = upload_response.json()["document_id"] |
|
|
| |
| process_response = client.post(f"/api/documents/{doc_id}/process") |
| assert process_response.status_code in [200, 202, 400] |
|
|
| def test_batch_index_documents(self, client): |
| """Test batch indexing multiple documents.""" |
| response = client.post( |
| "/api/documents/batch-index", |
| json={"document_ids": ["doc_1", "doc_2", "doc_3"]} |
| ) |
|
|
| |
| assert response.status_code in [200, 400, 404] |
|
|
|
|
| |
| |
| |
|
|
| class TestErrorHandling: |
| """Test API error handling.""" |
|
|
| def test_invalid_json_body(self, client): |
| """Test handling of invalid JSON in request body.""" |
| response = client.post( |
| "/api/rag/query", |
| content="not valid json", |
| headers={"Content-Type": "application/json"} |
| ) |
|
|
| assert response.status_code == 422 |
|
|
| def test_missing_required_fields(self, client): |
| """Test handling of missing required fields.""" |
| response = client.post( |
| "/api/rag/query", |
| json={} |
| ) |
|
|
| assert response.status_code == 422 |
|
|
| def test_invalid_document_id_format(self, client): |
| """Test handling of various document ID formats.""" |
| |
| response = client.get("/api/documents/../../etc/passwd") |
| assert response.status_code in [400, 404] |
|
|
| |
| long_id = "a" * 1000 |
| response = client.get(f"/api/documents/{long_id}") |
| assert response.status_code in [400, 404] |
|
|
|
|
| |
| |
| |
|
|
| class TestConcurrency: |
| """Test concurrent request handling.""" |
|
|
| def test_multiple_health_checks(self, client): |
| """Test multiple concurrent health checks.""" |
| import concurrent.futures |
|
|
| def make_request(): |
| return client.get("/api/health") |
|
|
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
| futures = [executor.submit(make_request) for _ in range(10)] |
| results = [f.result() for f in futures] |
|
|
| |
| assert all(r.status_code == 200 for r in results) |
|
|
| def test_multiple_document_uploads(self, client): |
| """Test handling multiple simultaneous uploads.""" |
| import concurrent.futures |
|
|
| def upload_file(i): |
| content = f"Test content {i}".encode() |
| file = io.BytesIO(content) |
| return client.post( |
| "/api/documents/upload", |
| files={"file": (f"test_{i}.txt", file, "text/plain")} |
| ) |
|
|
| with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: |
| futures = [executor.submit(upload_file, i) for i in range(5)] |
| results = [f.result() for f in futures] |
|
|
| |
| assert all(r.status_code in [200, 500] for r in results) |
|
|
|
|
| |
| |
| |
|
|
| class TestIntegrationWorkflows: |
| """Test end-to-end integration workflows.""" |
|
|
| def test_document_to_rag_query_workflow(self, client, sample_text_file): |
| """Test complete workflow from document upload to RAG query.""" |
| |
| upload_response = client.post( |
| "/api/documents/upload", |
| files={"file": ("integration_test.txt", sample_text_file, "text/plain")} |
| ) |
|
|
| if upload_response.status_code != 200: |
| pytest.skip("Upload failed, skipping workflow test") |
|
|
| doc_id = upload_response.json()["document_id"] |
|
|
| |
| get_response = client.get(f"/api/documents/{doc_id}") |
| assert get_response.status_code == 200 |
|
|
| |
| index_response = client.post(f"/api/documents/{doc_id}/index") |
| |
| if index_response.status_code != 200: |
| pytest.skip("Indexing not available") |
|
|
| |
| query_response = client.post( |
| "/api/rag/query", |
| json={ |
| "query": "What does this document contain?", |
| "document_ids": [doc_id] |
| } |
| ) |
|
|
| assert query_response.status_code in [200, 500, 503] |
|
|
| |
| client.delete(f"/api/documents/{doc_id}") |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.slow |
| class TestPerformance: |
| """Performance tests (marked as slow).""" |
|
|
| def test_large_document_upload(self, client): |
| """Test uploading a larger document.""" |
| |
| large_content = b"Test content line\n" * 60000 |
| large_file = io.BytesIO(large_content) |
|
|
| response = client.post( |
| "/api/documents/upload", |
| files={"file": ("large_test.txt", large_file, "text/plain")} |
| ) |
|
|
| |
| assert response.status_code in [200, 413] |
|
|
| def test_rapid_query_requests(self, client): |
| """Test handling rapid consecutive queries.""" |
| import time |
|
|
| start = time.time() |
| responses = [] |
|
|
| for i in range(20): |
| response = client.post( |
| "/api/rag/query", |
| json={"query": f"Test query {i}"} |
| ) |
| responses.append(response) |
|
|
| elapsed = time.time() - start |
|
|
| |
| assert elapsed < 30 |
|
|
| |
| success_count = sum(1 for r in responses if r.status_code in [200, 500, 503]) |
| assert success_count >= len(responses) * 0.8 |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| pytest.main([__file__, "-v", "--tb=short"]) |
|
|