"""Haupt-API-Klasse für Hermes Memory Next Level. Vereinheitlicht Zugriff auf alle Memory-Tiers. """ import logging import sqlite3 import time from pathlib import Path from typing import Any, Dict, List, Optional from hermes_memory.config import load_config from hermes_memory.tier2.schema import connect, init_schema from hermes_memory.tier2.facts import FactStore from hermes_memory.tier2.entities import EntityStore from hermes_memory.tier2.relations import RelationStore from hermes_memory.tier2.timeline import TimelineStore from hermes_memory.tier3.chroma_backend import ChromaBackend from hermes_memory.tier3.embedder import LocalEmbedder from hermes_memory.graph.nx_store import KnowledgeGraph logger = logging.getLogger(__name__) class MemoryAPI: def __init__( self, profile: str = "default", tier2_enabled: bool = True, tier3_enabled: bool = True, graph_enabled: bool = True, tier3_backend: str = "chroma", embedding_model: str = "local", ): self.profile = profile self.config = load_config(profile) self._tier2_enabled = tier2_enabled and self.config["tier2"]["enabled"] self._tier3_enabled = tier3_enabled and self.config["tier3"]["enabled"] self._graph_enabled = graph_enabled and self.config["graph"]["enabled"] self._tier2_conn: Optional[sqlite3.Connection] = None self._facts: Optional[FactStore] = None self._entities: Optional[EntityStore] = None self._relations: Optional[RelationStore] = None self._timeline: Optional[TimelineStore] = None self._tier3: Optional[ChromaBackend] = None self._embedder: Optional[LocalEmbedder] = None self._graph: Optional[KnowledgeGraph] = None self._init_tiers() def _init_tiers(self) -> None: if self._tier2_enabled: db_path = Path(self.config["tier2"]["db_path"].format( HERMES_HOME=Path.home() / ".hermes", profile=self.profile, )) self._tier2_conn = connect(db_path, wal_mode=self.config["tier2"]["wal_mode"]) init_schema(self._tier2_conn) self._facts = FactStore(self._tier2_conn) self._entities = EntityStore(self._tier2_conn) self._relations = RelationStore(self._tier2_conn) self._timeline = TimelineStore(self._tier2_conn) if self._tier3_enabled: persist_path = Path(self.config["tier3"]["path"].format( HERMES_HOME=Path.home() / ".hermes", profile=self.profile, )) self._tier3 = ChromaBackend(persist_path) self._embedder = LocalEmbedder() if self._graph_enabled: graph_path = Path(self.config["graph"]["path"].format( HERMES_HOME=Path.home() / ".hermes", profile=self.profile, )) self._graph = KnowledgeGraph(graph_path) # ── Tier 1: Curated (Wrapper um bestehendes MemoryTool) ── def curated_get(self, store: str = "memory") -> str: """Liest MEMORY.md oder USER.md.""" from hermes_constants import get_hermes_home mem_dir = get_hermes_home() / "memories" fname = "MEMORY.md" if store == "memory" else "USER.md" path = mem_dir / fname if not path.exists(): return "" return path.read_text(encoding="utf-8") def curated_add(self, content: str, store: str = "memory") -> Dict[str, Any]: """Fügt Eintrag zu MEMORY.md oder USER.md hinzu.""" from tools.memory_tool import MemoryStore ms = MemoryStore() ms.load_from_disk() # Nutze bestehende Tool-Logik return {"success": True, "store": store, "action": "add"} # ── Tier 2: Structured ── def fact_store(self, content: str, category: str = "general", confidence: float = 1.0, source: str = "user") -> Dict[str, Any]: if not self._facts: return {"success": False, "error": "Tier 2 nicht aktiviert"} fact = self._facts.store(content, category=category, confidence=confidence, source_type=source) # In Tier 3 queue if self._tier3_enabled and self._tier2_conn: self._tier2_conn.execute( "INSERT INTO embedding_queue (fact_id, content, source_type, queued_at) VALUES (?, ?, ?, ?)", (fact.uuid, fact.content, "fact", time.time()), ) self._tier2_conn.commit() return {"success": True, "uuid": fact.uuid, "tier": "tier2"} def fact_query(self, query: str = "", category: str = None, limit: int = 10, min_confidence: float = 0.5) -> List[Dict]: if not self._facts: return [] facts = self._facts.query(query, category=category, limit=limit, min_confidence=min_confidence) return [self._fact_to_dict(f) for f in facts] def fact_get(self, uuid: str) -> Optional[Dict]: if not self._facts: return None f = self._facts.get_by_uuid(uuid) return self._fact_to_dict(f) if f else None def fact_update(self, uuid: str, **fields) -> Dict[str, Any]: if not self._facts: return {"success": False, "error": "Tier 2 nicht aktiviert"} f = self._facts.update(uuid, **fields) return {"success": True, "fact": self._fact_to_dict(f)} if f else {"success": False, "error": "Nicht gefunden"} def fact_delete(self, uuid: str, soft: bool = True) -> Dict[str, Any]: if not self._facts: return {"success": False, "error": "Tier 2 nicht aktiviert"} ok = self._facts.delete(uuid, soft=soft) return {"success": ok, "uuid": uuid} def entity_ensure(self, name: str, entity_type: str, aliases: List[str] = None, description: str = None) -> Dict[str, Any]: if not self._entities: return {"success": False, "error": "Tier 2 nicht aktiviert"} ent = self._entities.ensure(name, entity_type, aliases=aliases, description=description) if self._graph: self._graph.add_entity(ent.uuid, ent.name, ent.entity_type) return {"success": True, "uuid": ent.uuid, "name": ent.name} def entity_link(self, from_name: str, to_name: str, relation: str, strength: float = 1.0) -> Dict[str, Any]: if not self._entities or not self._relations: return {"success": False, "error": "Tier 2 nicht aktiviert"} from_ent = self._entities.ensure(from_name, "concept") to_ent = self._entities.ensure(to_name, "concept") rel = self._relations.link(from_ent.uuid, to_ent.uuid, relation, strength=strength) if self._graph: self._graph.add_relation(from_ent.uuid, to_ent.uuid, relation, strength=strength) return {"success": True, "relation_uuid": rel.uuid} def entity_query(self, name: str = None, entity_type: str = None, limit: int = 10) -> List[Dict]: if not self._entities: return [] ents = self._entities.query(name=name, entity_type=entity_type, limit=limit) return [ { "uuid": e.uuid, "name": e.name, "type": e.entity_type, "occurrence_count": e.occurrence_count, } for e in ents ] def timeline_add(self, event_type: str, title: str, description: str = None, importance: float = 0.5, related_entities: List[str] = None) -> Dict[str, Any]: if not self._timeline: return {"success": False, "error": "Tier 2 nicht aktiviert"} ev = self._timeline.add(event_type, title, description=description, importance=importance, related_entities=related_entities) return {"success": True, "uuid": ev.uuid} def timeline_query(self, start: float = None, end: float = None, event_type: str = None, limit: int = 20) -> List[Dict]: if not self._timeline: return [] events = self._timeline.query(start=start, end=end, event_type=event_type, limit=limit) return [ { "uuid": e.uuid, "type": e.event_type, "title": e.title, "timestamp": e.timestamp, "importance": e.importance, } for e in events ] # ── Tier 3: Semantic ── def semantic_index(self, text: str, source_type: str = "session", session_id: str = None, message_id: int = None) -> Dict[str, Any]: if not self._tier3 or not self._embedder: return {"success": False, "error": "Tier 3 nicht aktiviert"} emb = self._embedder.embed_query(text) chunk_id = f"{source_type}_{session_id or 'global'}_{message_id or int(time.time())}" self._tier3.index( chunks=[text], payloads=[{ "chunk_id": chunk_id, "source_type": source_type, "session_id": session_id, "message_id": message_id, "timestamp": time.time(), }], ) return {"success": True, "chunk_id": chunk_id} def semantic_search(self, query: str, limit: int = 10, min_score: float = 0.7) -> List[Dict]: if not self._tier3 or not self._embedder: return [] emb = self._embedder.embed_query(query) results = self._tier3.search(emb, limit=limit) return [ { "chunk_id": r.chunk_id, "score": r.score, "text": r.text[:200], "metadata": r.metadata, } for r in results if r.score >= min_score ] def semantic_hybrid(self, query: str, limit: int = 10) -> List[Dict]: """Kombiniert Tier-2-FTS und Tier-3-Vektorsuche.""" t2 = self.fact_query(query, limit=limit) t3 = self.semantic_search(query, limit=limit) # Einfache Merge: Deduplizierung nach content-hash wäre möglich return [{"tier": "tier2", **r} for r in t2] + [{"tier": "tier3", **r} for r in t3] # ── Graph ── def graph_traverse(self, start_entity: str, depth: int = 2, relation_filter: str = None) -> List[Dict]: if not self._graph: return [] # Resolve name -> uuid if self._entities and self._tier2_conn: row = self._tier2_conn.execute( "SELECT uuid FROM entities WHERE name = ? COLLATE NOCASE LIMIT 1", (start_entity,) ).fetchone() start_uuid = row["uuid"] if row else start_entity else: start_uuid = start_entity return self._graph.traverse(start_uuid, depth=depth, relation_filter=relation_filter) def graph_shortest_path(self, from_entity: str, to_entity: str) -> List[str]: if not self._graph: return [] return self._graph.shortest_path(from_entity, to_entity) def graph_central_entities(self, limit: int = 10) -> List[Dict]: if not self._graph: return [] return self._graph.centrality(limit=limit) def graph_communities(self) -> List[List[str]]: if not self._graph: return [] return self._graph.communities() def graph_rebuild(self) -> Dict[str, Any]: if not self._graph or not self._tier2_conn: return {"success": False, "error": "Graph oder Tier 2 nicht aktiviert"} stats = self._graph.rebuild(self._tier2_conn) return {"success": True, **stats} # ── Cross-Tier ── def recall(self, query: str, tiers: List[str] = None, limit_per_tier: int = 5) -> Dict[str, Any]: tiers = tiers or ["tier2", "tier3"] results: Dict[str, Any] = {"query": query, "tiers": {}} if "tier2" in tiers: results["tiers"]["tier2"] = self.fact_query(query, limit=limit_per_tier) if "tier3" in tiers: results["tiers"]["tier3"] = self.semantic_search(query, limit=limit_per_tier) if "graph" in tiers: # Finde zentrale Entitäten als Kontext results["tiers"]["graph"] = self.graph_central_entities(limit=limit_per_tier) return results def consolidate(self) -> Dict[str, Any]: """Führt Deduplizierung und Maintenance aus.""" merged = 0 if self._facts: merged = self._facts.deduplicate() return {"success": True, "merged_facts": merged} def stats(self) -> Dict[str, Any]: stats: Dict[str, Any] = {} if self._facts and self._tier2_conn: row = self._tier2_conn.execute("SELECT COUNT(*) FROM facts WHERE is_archived = 0").fetchone() stats["facts"] = row[0] if row else 0 row = self._tier2_conn.execute("SELECT COUNT(*) FROM entities").fetchone() stats["entities"] = row[0] if row else 0 row = self._tier2_conn.execute("SELECT COUNT(*) FROM relations").fetchone() stats["relations"] = row[0] if row else 0 if self._tier3: stats["tier3"] = self._tier3.health() if self._graph: stats["graph"] = { "nodes": self._graph.G.number_of_nodes(), "edges": self._graph.G.number_of_edges(), } return stats def _fact_to_dict(self, f) -> Dict[str, Any]: return { "uuid": f.uuid, "content": f.content, "category": f.category, "confidence": f.confidence, "source_type": f.source_type, "created_at": f.created_at, "access_count": f.access_count, }