| | """ |
| | Evidence Verifier |
| | |
| | Verifies that claims are supported by document evidence. |
| | Cross-references extracted information with source documents. |
| | """ |
| |
|
| | from typing import List, Optional, Dict, Any, Tuple |
| | from enum import Enum |
| | from pydantic import BaseModel, Field |
| | from loguru import logger |
| | import re |
| |
|
| |
|
| | class EvidenceStrength(str, Enum): |
| | """Evidence strength levels.""" |
| | STRONG = "strong" |
| | MODERATE = "moderate" |
| | WEAK = "weak" |
| | NONE = "none" |
| |
|
| |
|
| | class VerifierConfig(BaseModel): |
| | """Configuration for evidence verifier.""" |
| | |
| | fuzzy_match: bool = Field(default=True, description="Enable fuzzy matching") |
| | case_sensitive: bool = Field(default=False, description="Case-sensitive matching") |
| | min_match_ratio: float = Field( |
| | default=0.6, |
| | ge=0.0, |
| | le=1.0, |
| | description="Minimum match ratio for fuzzy matching" |
| | ) |
| |
|
| | |
| | strong_threshold: float = Field(default=0.9, ge=0.0, le=1.0) |
| | moderate_threshold: float = Field(default=0.7, ge=0.0, le=1.0) |
| | weak_threshold: float = Field(default=0.5, ge=0.0, le=1.0) |
| |
|
| | |
| | max_evidence_per_claim: int = Field(default=5, ge=1) |
| | context_window: int = Field(default=100, description="Characters around match") |
| |
|
| |
|
| | class EvidenceMatch(BaseModel): |
| | """A match between claim and evidence.""" |
| | evidence_text: str |
| | match_score: float |
| | strength: EvidenceStrength |
| |
|
| | |
| | chunk_id: Optional[str] = None |
| | page: Optional[int] = None |
| | position: Optional[int] = None |
| |
|
| | |
| | context_before: Optional[str] = None |
| | context_after: Optional[str] = None |
| |
|
| |
|
| | class VerificationResult(BaseModel): |
| | """Result of evidence verification.""" |
| | claim: str |
| | verified: bool |
| | strength: EvidenceStrength |
| | confidence: float |
| |
|
| | |
| | evidence_matches: List[EvidenceMatch] |
| | best_match: Optional[EvidenceMatch] = None |
| |
|
| | |
| | coverage_score: float |
| | contradiction_found: bool = False |
| | notes: Optional[str] = None |
| |
|
| |
|
| | class EvidenceVerifier: |
| | """ |
| | Verifies claims against document evidence. |
| | |
| | Features: |
| | - Text matching (exact and fuzzy) |
| | - Evidence strength scoring |
| | - Contradiction detection |
| | - Context extraction |
| | """ |
| |
|
| | def __init__(self, config: Optional[VerifierConfig] = None): |
| | """Initialize evidence verifier.""" |
| | self.config = config or VerifierConfig() |
| |
|
| | def verify_claim( |
| | self, |
| | claim: str, |
| | evidence_chunks: List[Dict[str, Any]], |
| | ) -> VerificationResult: |
| | """ |
| | Verify a claim against evidence. |
| | |
| | Args: |
| | claim: The claim to verify |
| | evidence_chunks: List of evidence chunks with text |
| | |
| | Returns: |
| | VerificationResult |
| | """ |
| | if not claim or not evidence_chunks: |
| | return VerificationResult( |
| | claim=claim, |
| | verified=False, |
| | strength=EvidenceStrength.NONE, |
| | confidence=0.0, |
| | evidence_matches=[], |
| | coverage_score=0.0, |
| | ) |
| |
|
| | |
| | matches = [] |
| | for chunk in evidence_chunks: |
| | chunk_text = chunk.get("text", "") |
| | if not chunk_text: |
| | continue |
| |
|
| | chunk_matches = self._find_matches(claim, chunk_text, chunk) |
| | matches.extend(chunk_matches) |
| |
|
| | |
| | matches.sort(key=lambda m: m.match_score, reverse=True) |
| | top_matches = matches[:self.config.max_evidence_per_claim] |
| |
|
| | |
| | if top_matches: |
| | best_match = top_matches[0] |
| | overall_strength = best_match.strength |
| | confidence = best_match.match_score |
| | coverage_score = self._calculate_coverage(claim, top_matches) |
| | else: |
| | best_match = None |
| | overall_strength = EvidenceStrength.NONE |
| | confidence = 0.0 |
| | coverage_score = 0.0 |
| |
|
| | |
| | verified = ( |
| | overall_strength in [EvidenceStrength.STRONG, EvidenceStrength.MODERATE] |
| | and confidence >= self.config.moderate_threshold |
| | ) |
| |
|
| | |
| | contradiction_found = self._check_contradictions(claim, evidence_chunks) |
| |
|
| | return VerificationResult( |
| | claim=claim, |
| | verified=verified and not contradiction_found, |
| | strength=overall_strength, |
| | confidence=confidence, |
| | evidence_matches=top_matches, |
| | best_match=best_match, |
| | coverage_score=coverage_score, |
| | contradiction_found=contradiction_found, |
| | ) |
| |
|
| | def verify_multiple( |
| | self, |
| | claims: List[str], |
| | evidence_chunks: List[Dict[str, Any]], |
| | ) -> List[VerificationResult]: |
| | """ |
| | Verify multiple claims against evidence. |
| | |
| | Args: |
| | claims: List of claims to verify |
| | evidence_chunks: Evidence chunks |
| | |
| | Returns: |
| | List of VerificationResult |
| | """ |
| | return [self.verify_claim(claim, evidence_chunks) for claim in claims] |
| |
|
| | def verify_extraction( |
| | self, |
| | extraction: Dict[str, Any], |
| | evidence_chunks: List[Dict[str, Any]], |
| | ) -> Dict[str, VerificationResult]: |
| | """ |
| | Verify extracted fields as claims. |
| | |
| | Args: |
| | extraction: Dictionary of field -> value |
| | evidence_chunks: Evidence chunks |
| | |
| | Returns: |
| | Dictionary of field -> VerificationResult |
| | """ |
| | results = {} |
| |
|
| | for field, value in extraction.items(): |
| | if value is None: |
| | continue |
| |
|
| | |
| | claim = f"{field}: {value}" |
| | results[field] = self.verify_claim(claim, evidence_chunks) |
| |
|
| | return results |
| |
|
| | def _find_matches( |
| | self, |
| | claim: str, |
| | text: str, |
| | chunk: Dict[str, Any], |
| | ) -> List[EvidenceMatch]: |
| | """Find matches for claim in text.""" |
| | matches = [] |
| |
|
| | |
| | claim_normalized = claim.lower() if not self.config.case_sensitive else claim |
| | text_normalized = text.lower() if not self.config.case_sensitive else text |
| |
|
| | |
| | terms = self._extract_terms(claim_normalized) |
| |
|
| | |
| | if claim_normalized in text_normalized: |
| | pos = text_normalized.find(claim_normalized) |
| | match = self._create_match( |
| | text, pos, len(claim), chunk, |
| | score=1.0, strength=EvidenceStrength.STRONG |
| | ) |
| | matches.append(match) |
| |
|
| | |
| | term_scores = [] |
| | for term in terms: |
| | if term in text_normalized: |
| | pos = text_normalized.find(term) |
| | term_scores.append((term, pos, 1.0)) |
| | elif self.config.fuzzy_match: |
| | |
| | fuzzy_score, fuzzy_pos = self._fuzzy_find(term, text_normalized) |
| | if fuzzy_score >= self.config.min_match_ratio: |
| | term_scores.append((term, fuzzy_pos, fuzzy_score)) |
| |
|
| | if term_scores: |
| | |
| | avg_score = sum(s[2] for s in term_scores) / len(terms) if terms else 0 |
| | coverage = len(term_scores) / len(terms) if terms else 0 |
| | combined_score = (avg_score * 0.7) + (coverage * 0.3) |
| |
|
| | |
| | if combined_score >= self.config.strong_threshold: |
| | strength = EvidenceStrength.STRONG |
| | elif combined_score >= self.config.moderate_threshold: |
| | strength = EvidenceStrength.MODERATE |
| | elif combined_score >= self.config.weak_threshold: |
| | strength = EvidenceStrength.WEAK |
| | else: |
| | strength = EvidenceStrength.NONE |
| |
|
| | |
| | if strength != EvidenceStrength.NONE: |
| | best_term = max(term_scores, key=lambda t: t[2]) |
| | match = self._create_match( |
| | text, best_term[1], len(best_term[0]), chunk, |
| | score=combined_score, strength=strength |
| | ) |
| | matches.append(match) |
| |
|
| | return matches |
| |
|
| | def _create_match( |
| | self, |
| | text: str, |
| | position: int, |
| | length: int, |
| | chunk: Dict[str, Any], |
| | score: float, |
| | strength: EvidenceStrength, |
| | ) -> EvidenceMatch: |
| | """Create an evidence match with context.""" |
| | |
| | window = self.config.context_window |
| | start = max(0, position - window) |
| | end = min(len(text), position + length + window) |
| |
|
| | context_before = text[start:position] if position > 0 else "" |
| | evidence_text = text[position:position + length] |
| | context_after = text[position + length:end] if position + length < len(text) else "" |
| |
|
| | return EvidenceMatch( |
| | evidence_text=evidence_text, |
| | match_score=score, |
| | strength=strength, |
| | chunk_id=chunk.get("chunk_id"), |
| | page=chunk.get("page"), |
| | position=position, |
| | context_before=context_before[-50:] if context_before else None, |
| | context_after=context_after[:50] if context_after else None, |
| | ) |
| |
|
| | def _extract_terms(self, text: str) -> List[str]: |
| | """Extract key terms from text.""" |
| | |
| | stop_words = { |
| | "the", "a", "an", "is", "are", "was", "were", "be", "been", |
| | "being", "have", "has", "had", "do", "does", "did", "will", |
| | "would", "could", "should", "may", "might", "must", "shall", |
| | "can", "need", "dare", "ought", "used", "to", "of", "in", |
| | "for", "on", "with", "at", "by", "from", "as", "into", "through", |
| | "during", "before", "after", "above", "below", "between", |
| | "and", "but", "if", "or", "because", "until", "while", |
| | } |
| |
|
| | |
| | words = re.findall(r'\b\w+\b', text.lower()) |
| |
|
| | |
| | terms = [w for w in words if w not in stop_words and len(w) > 2] |
| |
|
| | return terms |
| |
|
| | def _fuzzy_find(self, term: str, text: str) -> Tuple[float, int]: |
| | """Find term in text with fuzzy matching.""" |
| | |
| | best_score = 0.0 |
| | best_pos = 0 |
| |
|
| | term_len = len(term) |
| | for i in range(len(text) - term_len + 1): |
| | window = text[i:i + term_len] |
| | |
| | matches = sum(1 for a, b in zip(term, window) if a == b) |
| | score = matches / term_len |
| |
|
| | if score > best_score: |
| | best_score = score |
| | best_pos = i |
| |
|
| | return best_score, best_pos |
| |
|
| | def _calculate_coverage( |
| | self, |
| | claim: str, |
| | matches: List[EvidenceMatch], |
| | ) -> float: |
| | """Calculate how much of the claim is covered by evidence.""" |
| | claim_terms = set(self._extract_terms(claim.lower())) |
| | if not claim_terms: |
| | return 0.0 |
| |
|
| | covered_terms = set() |
| | for match in matches: |
| | match_terms = set(self._extract_terms(match.evidence_text.lower())) |
| | covered_terms.update(match_terms.intersection(claim_terms)) |
| |
|
| | return len(covered_terms) / len(claim_terms) |
| |
|
| | def _check_contradictions( |
| | self, |
| | claim: str, |
| | evidence_chunks: List[Dict[str, Any]], |
| | ) -> bool: |
| | """Check if evidence contains contradictions to the claim.""" |
| | |
| | negation_patterns = [ |
| | r'\bnot\b', r'\bno\b', r'\bnever\b', r'\bnone\b', |
| | r'\bwithout\b', r'\bfailed\b', r'\bdenied\b', |
| | ] |
| |
|
| | claim_lower = claim.lower() |
| | claim_terms = set(self._extract_terms(claim_lower)) |
| |
|
| | for chunk in evidence_chunks: |
| | text = chunk.get("text", "").lower() |
| |
|
| | |
| | for term in claim_terms: |
| | if term in text: |
| | |
| | for pattern in negation_patterns: |
| | matches = list(re.finditer(pattern, text)) |
| | for match in matches: |
| | |
| | term_pos = text.find(term) |
| | if abs(match.start() - term_pos) < 30: |
| | return True |
| |
|
| | return False |
| |
|
| |
|
| | |
| | _evidence_verifier: Optional[EvidenceVerifier] = None |
| |
|
| |
|
| | def get_evidence_verifier( |
| | config: Optional[VerifierConfig] = None, |
| | ) -> EvidenceVerifier: |
| | """Get or create singleton evidence verifier.""" |
| | global _evidence_verifier |
| | if _evidence_verifier is None: |
| | _evidence_verifier = EvidenceVerifier(config) |
| | return _evidence_verifier |
| |
|
| |
|
| | def reset_evidence_verifier(): |
| | """Reset the global verifier instance.""" |
| | global _evidence_verifier |
| | _evidence_verifier = None |
| |
|