anomalyOS / src /graph.py
CaffeinatedCoding's picture
Upload folder using huggingface_hub
fadccb6 verified
# src/graph.py
# Loads the NetworkX knowledge graph and exposes 2-hop traversal
# Graph built in notebook 04, stored as node-link JSON on HF Dataset
# Loaded once at FastAPI startup, kept in memory
import os
import json
import networkx as nx
DATA_DIR = os.environ.get("DATA_DIR", "data")
class KnowledgeGraph:
"""
Wraps the NetworkX DiGraph.
Provides 2-hop context retrieval for the RAG orchestrator.
"""
def __init__(self, data_dir=DATA_DIR):
self.data_dir = data_dir
self.graph = None
def load(self):
path = os.path.join(self.data_dir, "knowledge_graph.json")
if not os.path.exists(path):
print(f"Warning: Knowledge graph not found at {path}, using empty graph")
self.graph = nx.DiGraph()
return
try:
with open(path) as f:
data = json.load(f)
except Exception as e:
print(f"Warning: Failed to load knowledge graph JSON: {e}")
self.graph = nx.DiGraph()
return
# Try standard NetworkX node_link_graph format (expects 'links' or 'edges')
try:
self.graph = nx.node_link_graph(data, directed=True)
except (KeyError, TypeError):
# If that fails, try converting 'edges' key to 'links'
if "edges" in data:
data["links"] = data.pop("edges")
try:
self.graph = nx.node_link_graph(data, directed=True)
except Exception as e:
print(f"Warning: Failed to load graph with edges→links conversion: {e}")
self.graph = nx.DiGraph()
else:
# Last resort: create empty graph
print(f"Warning: Knowledge graph format not recognized, using empty graph")
self.graph = nx.DiGraph()
print(f"Knowledge graph loaded: "
f"{self.graph.number_of_nodes()} nodes, "
f"{self.graph.number_of_edges()} edges")
def get_context(self, category: str, defect_type: str) -> dict:
"""
2-hop traversal from a defect node.
Returns: root causes, remediations, co-occurring defects.
Path: defect → [caused_by] → root_cause
→ [remediated_by] → remediation
defect → [co_occurs_with] → related_defect
"""
if self.graph is None:
return {"root_causes": [], "remediations": [], "co_occurs": []}
defect_key = f"defect_{category}_{defect_type}"
# Try exact match first, then fallback to category-level
if defect_key not in self.graph:
# Try to find any defect node for this category
candidates = [
n for n in self.graph.nodes
if n.startswith(f"defect_{category}_")
]
if not candidates:
return {"root_causes": [], "remediations": [], "co_occurs": []}
defect_key = candidates[0]
root_causes = []
remediations = []
co_occurs = []
for nb1 in self.graph.successors(defect_key):
edge1 = self.graph[defect_key][nb1].get("edge_type", "")
node1_data = self.graph.nodes[nb1]
if edge1 == "caused_by":
rc = node1_data.get("name", nb1.replace("root_cause_", ""))
root_causes.append(rc)
# Second hop: root_cause → remediation
for nb2 in self.graph.successors(nb1):
edge2 = self.graph[nb1][nb2].get("edge_type", "")
if edge2 == "remediated_by":
node2_data = self.graph.nodes[nb2]
rem = node2_data.get("name",
nb2.replace("remediation_", ""))
remediations.append(rem)
elif edge1 == "co_occurs_with":
co_key = nb1.replace("defect_", "")
co_occurs.append(co_key)
return {
"defect_key": defect_key,
"root_causes": list(set(root_causes)),
"remediations": list(set(remediations)),
"co_occurs": co_occurs
}
def get_all_defect_nodes(self) -> list:
"""Returns all defect nodes — used by Knowledge Base Explorer."""
if self.graph is None:
return []
return [
{
"node_id": n,
**self.graph.nodes[n]
}
for n, d in self.graph.nodes(data=True)
if d.get("node_type") == "defect_instance"
]
def get_status(self) -> dict:
if self.graph is None:
return {"loaded": False}
return {
"loaded": True,
"nodes": self.graph.number_of_nodes(),
"edges": self.graph.number_of_edges()
}
# Global instance
knowledge_graph = KnowledgeGraph()