"""KnowledgeGraph — NetworkX-basierter Graph-Store.""" import json import logging from pathlib import Path from typing import Dict, List, Optional import networkx as nx logger = logging.getLogger(__name__) class KnowledgeGraph: def __init__(self, path: Path): self.path = path self.path.mkdir(parents=True, exist_ok=True) self.graphml_path = self.path / "knowledge_graph.graphml" self.G = nx.DiGraph() self._load() def _load(self) -> None: if self.graphml_path.exists(): try: self.G = nx.read_graphml(self.graphml_path) # GraphML speichert alles als String — Typen wiederherstellen for node in self.G.nodes: self.G.nodes[node]["weight"] = float(self.G.nodes[node].get("weight", 1.0)) self.G.nodes[node]["occurrence_count"] = int(self.G.nodes[node].get("occurrence_count", 1)) for u, v in self.G.edges: self.G.edges[u, v]["strength"] = float(self.G.edges[u, v].get("strength", 1.0)) except Exception as e: logger.warning("GraphML-Laden fehlgeschlagen: %s", e) self.G = nx.DiGraph() def save(self) -> None: nx.write_graphml(self.G, self.graphml_path) def add_entity( self, uuid: str, name: str, entity_type: str, **attrs, ) -> Dict: if uuid in self.G.nodes: self.G.nodes[uuid]["occurrence_count"] = self.G.nodes[uuid].get("occurrence_count", 1) + 1 self.G.nodes[uuid]["last_seen"] = attrs.get("last_seen", 0) self.save() return {"uuid": uuid, "action": "updated"} self.G.add_node( uuid, node_type="entity", name=name, entity_type=entity_type, weight=1.0, occurrence_count=1, **attrs, ) self.save() return {"uuid": uuid, "action": "created"} def add_relation( self, from_uuid: str, to_uuid: str, relation_type: str, strength: float = 1.0, **attrs, ) -> Dict: if not self.G.has_node(from_uuid) or not self.G.has_node(to_uuid): return {"error": "Node nicht gefunden", "success": False} if self.G.has_edge(from_uuid, to_uuid): existing = self.G.edges[from_uuid, to_uuid] existing["strength"] = max(existing.get("strength", 0), strength) existing["updated_at"] = attrs.get("updated_at", 0) self.save() return {"from": from_uuid, "to": to_uuid, "action": "updated"} self.G.add_edge( from_uuid, to_uuid, relation_type=relation_type, strength=strength, **attrs, ) self.save() return {"from": from_uuid, "to": to_uuid, "action": "created"} def traverse( self, start_uuid: str, depth: int = 2, relation_filter: Optional[str] = None, ) -> List[Dict]: if start_uuid not in self.G.nodes: return [] results: List[Dict] = [] visited = {start_uuid} queue = [(start_uuid, 0)] while queue: current, level = queue.pop(0) if level >= depth: continue for neighbor in self.G.successors(current): edge_data = self.G.edges[current, neighbor] if relation_filter and edge_data.get("relation_type") != relation_filter: continue if neighbor not in visited: visited.add(neighbor) results.append({ "from": current, "to": neighbor, "relation": edge_data.get("relation_type"), "strength": edge_data.get("strength"), "depth": level + 1, }) queue.append((neighbor, level + 1)) return results def shortest_path(self, from_uuid: str, to_uuid: str) -> List[str]: try: return nx.shortest_path(self.G, source=from_uuid, target=to_uuid) except nx.NetworkXNoPath: return [] def centrality(self, algorithm: str = "betweenness", limit: int = 10) -> List[Dict]: if algorithm == "betweenness": scores = nx.betweenness_centrality(self.G) elif algorithm == "pagerank": scores = nx.pagerank(self.G) elif algorithm == "degree": scores = dict(self.G.degree()) else: scores = nx.degree_centrality(self.G) sorted_nodes = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit] return [ { "uuid": n, "name": self.G.nodes[n].get("name", n), "score": s, "type": self.G.nodes[n].get("entity_type", "unknown"), } for n, s in sorted_nodes ] def communities(self, algorithm: str = "louvain") -> List[List[str]]: if algorithm == "louvain": try: import community as community_louvain partition = community_louvain.best_partition(self.G.to_undirected()) groups: Dict[int, List[str]] = {} for node, comm_id in partition.items(): groups.setdefault(comm_id, []).append(node) return list(groups.values()) except ImportError: logger.warning("python-louvain nicht installiert, fallback auf connected_components") return [list(c) for c in nx.connected_components(self.G.to_undirected())] return [list(c) for c in nx.connected_components(self.G.to_undirected())] def rebuild(self, tier2_conn) -> Dict: """Baut Graph aus Tier 2 neu auf.""" self.G.clear() # Entitäten laden rows = tier2_conn.execute("SELECT uuid, name, entity_type, occurrence_count FROM entities").fetchall() for r in rows: self.G.add_node( r["uuid"], node_type="entity", name=r["name"], entity_type=r["entity_type"], occurrence_count=r["occurrence_count"], ) # Relationen laden rels = tier2_conn.execute("SELECT from_entity_id, to_entity_id, relation_type, strength FROM relations").fetchall() for rel in rels: if rel["from_entity_id"] in self.G.nodes and rel["to_entity_id"] in self.G.nodes: self.G.add_edge( rel["from_entity_id"], rel["to_entity_id"], relation_type=rel["relation_type"], strength=rel["strength"], ) self.save() return {"nodes": self.G.number_of_nodes(), "edges": self.G.number_of_edges()}