From b71f9e4ec945ea4bdee80b039ee0e029a5c6d0cd Mon Sep 17 00:00:00 2001 From: Andrew Stoltz Date: Wed, 29 Apr 2026 08:34:31 -0500 Subject: [PATCH] feat(agent-zero): add corpus_search + intranet_search to cluster configmaps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add corpus_search.py to bluejay-tools-c: semantic vector search over fleet SQLite-vec DBs (fleet-workstation-full, fleet-pi-edge, fleet-bmo-bot). Returns offline-friendly results for Bible/Greek/Hebrew/Strongs corpora. Cluster pod degrades gracefully (no DB mounted yet — BLUEJAY-WS only for now). - Add intranet_search.py to bluejay-tools-c: live RAG search over the intranet vector store via GET /api/search?q=...&topK=N. Uses in-cluster service URL (http://intranet-web.intranet.svc:5300) to bypass Traefik TLS and the private-range egress denylist. - Fix intranet_search.py param name: was 'limit', now 'topK' matching the SearchController's [FromQuery] parameter name. - NetworkPolicy: add egress rule for intranet namespace port 5300 (without this the pod's TCP connection to the search endpoint was dropped). - agent-zero.yaml: set FLOWERCORE_INTRANET_URL env var to in-cluster service URL so intranet_search uses internal routing, not the public Traefik VIP. Co-Authored-By: Claude Sonnet 4.6 --- apps/agent-zero/agent-zero.yaml | 15 + apps/agent-zero/configmaps-bluejay.yaml | 439 ++++++++++++++++++++++++ 2 files changed, 454 insertions(+) diff --git a/apps/agent-zero/agent-zero.yaml b/apps/agent-zero/agent-zero.yaml index 4302290..04ca1f9 100644 --- a/apps/agent-zero/agent-zero.yaml +++ b/apps/agent-zero/agent-zero.yaml @@ -412,6 +412,12 @@ spec: secretKeyRef: name: print-web-api-keys key: password + # Intranet search — use in-cluster HTTP (no step-ca TLS needed) + # corpus_search.py reads FLOWERCORE_FLEET_VECTOR_DIR but that mount is not + # on the cluster yet (BLUEJAY-WS only). The tool gracefully returns a + # "no DB found" message with rebuild instructions rather than crashing. + - name: FLOWERCORE_INTRANET_URL + value: "http://intranet-web.intranet.svc:5300" # Kubernetes - name: KUBERNETES_SERVICE_HOST value: "kubernetes.default.svc" @@ -624,6 +630,15 @@ spec: protocol: TCP - port: 8080 protocol: TCP + # Intranet search API — use in-cluster svc so traffic stays inside + # the cluster and is not blocked by the private-range egress denylist. + - to: + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: intranet + ports: + - port: 5300 + protocol: TCP # Allow internet (for kubectl image pull, etc) - to: - ipBlock: diff --git a/apps/agent-zero/configmaps-bluejay.yaml b/apps/agent-zero/configmaps-bluejay.yaml index 92ae7b5..513cd1c 100644 --- a/apps/agent-zero/configmaps-bluejay.yaml +++ b/apps/agent-zero/configmaps-bluejay.yaml @@ -13158,6 +13158,445 @@ metadata: --- apiVersion: v1 data: + corpus_search.py: | + # FlowerCore Fleet Corpus Vector Search Tool + # + # Queries the AiStation-built SqliteVecVectorStore DB at /a0/usr/vectors/fleet.db + # (bind-mounted read-only from /var/lib/flowercore/vector-stores/ on the host). + # Embeds the query through Ollama's nomic-embed-text model, computes cosine + # similarity against every stored chunk in pure Python (no numpy — not present + # in the container), and returns the top-K nearest neighbors with source metadata. + # + # This is the offline-friendly counterpart to `intranet_search` (which hits the + # Intranet's live REST API). Use it for Bible/Greek/Hebrew/Strong's lookups and + # anywhere the workstation has a newer DB than the Intranet one. The store is + # refreshed by `aistation-indexer build ` — see the FlowerCore.Knowledge + # ADR at docs/ai-agents/flowercore-knowledge-service-plan.md. + + import json + import math + import os + import sqlite3 + import urllib.request + from pathlib import Path + + from python.helpers.tool import Tool, Response + + + DEFAULT_VECTORS_DIR = os.environ.get( + "FLOWERCORE_FLEET_VECTOR_DIR", + "/a0/usr/vectors", + ) + # When the caller doesn't pick an explicit DB, prefer the biggest fleet tier + # present on disk. Workstation → pi-edge → bmo-bot. + PREFERRED_DB_ORDER = [ + os.environ.get("FLOWERCORE_FLEET_VECTOR_DB", ""), + "fleet-workstation-full.db", + "fleet-pi-edge.db", + "fleet-bmo-bot.db", + ] + OLLAMA_BASE_URL = os.environ.get( + "FLOWERCORE_AGENTZERO_OLLAMA_URL", + "http://host.containers.internal:11434", + ) + EMBEDDING_MODEL = os.environ.get( + "FLOWERCORE_FLEET_EMBEDDING_MODEL", + "nomic-embed-text", + ) + + + class CorpusSearch(Tool): + async def execute(self, **kwargs) -> Response: + """ + Semantic search over the FlowerCore fleet corpus (Bible texts, lexicons, + dictionaries, morphology) pre-indexed by aistation-indexer. + + Args (via self.args): + query (str): Search query text. Required unless action=stats. + limit (int): Max results. Default 8. + index (str): Optional index name filter ("bible-texts", "lexicons", + "dictionaries", "morphology"). Default: all indexes. + repo (str): Optional repo filter (e.g. "world-english-bible"). + db (str): Override DB path OR file name inside FLOWERCORE_FLEET_VECTOR_DIR + (defaults to /a0/usr/vectors). If omitted, the largest + fleet tier present on disk is picked automatically. + action (str): Optional. "stats" returns an inventory of all fleet DBs + visible to the tool (names, sizes, index counts, chunk + counts, last-built timestamps). No embedding call. + + Returns: + Response with ranked chunks (score, source, text preview) OR + (when action=stats) a markdown inventory of available fleet DBs. + """ + query = (self.args.get("query") or "").strip() + limit = int(self.args.get("limit") or 8) + index_filter = (self.args.get("index") or "").strip() + repo_filter = (self.args.get("repo") or "").strip() + db_override = (self.args.get("db") or "").strip() + action = (self.args.get("action") or "").strip().lower() + + if action == "stats": + return Response(message=_render_stats(), break_loop=False) + + if not query: + return Response( + message=( + "Error: 'query' is required unless action=stats.\n" + "Example: query=\"what does Genesis 1:1 say\" limit=5\n" + "Inventory: action=stats" + ), + break_loop=False, + ) + + db = _resolve_db(db_override) + if db is None: + return Response( + message=( + f"Error: no fleet vector DB found under {DEFAULT_VECTORS_DIR}.\n" + "Host side: run `aistation-indexer build fleet-workstation-full`\n" + "(or `fleet-pi-edge`/`fleet-bmo-bot`) to produce\n" + "`/var/lib/flowercore/vector-stores/.db`, then confirm the\n" + "Podman unit mounts that directory into `/a0/usr/vectors:ro`." + ), + break_loop=False, + ) + + try: + query_vec = _embed(query) + except Exception as e: + return Response( + message=f"Error: failed to embed query via Ollama at {OLLAMA_BASE_URL}: {e}", + break_loop=False, + ) + + try: + hits = _search(db, query_vec, index_filter, repo_filter, limit) + except Exception as e: + return Response( + message=f"Error: corpus search failed: {e}", + break_loop=False, + ) + + if not hits: + return Response( + message=( + f"No matches for '{query}' in {db.name}.\n" + f"Indexes available: " + _list_indexes_summary(db) + ), + break_loop=False, + ) + + lines = [f"**Corpus search: `{query}`** (top {len(hits)} of {limit} requested, DB={db.name})", ""] + for rank, h in enumerate(hits, 1): + passage = h.get("passage") or "" + lang = h.get("language") or "" + meta_bits = [x for x in (h["index"], h["repo"], passage, lang) if x] + meta = " · ".join(meta_bits) + preview = h["text"] + if len(preview) > 320: + preview = preview[:320].rstrip() + "…" + lines.append(f"{rank}. **{h['score']:.3f}** {meta}") + lines.append(f" `{h['source']}`") + lines.append(f" {preview}") + lines.append("") + + return Response(message="\n".join(lines).rstrip() + "\n", break_loop=False) + + + def _resolve_db(override: str) -> "Path | None": + """Pick a fleet DB by explicit path, explicit filename, or preferred order.""" + vectors_dir = Path(DEFAULT_VECTORS_DIR) + if override: + # Absolute or relative path that points at a real file wins outright. + p = Path(override) + if p.is_absolute() and p.exists(): + return p + # Otherwise treat it as a filename within the vectors dir. + candidate = vectors_dir / override + if candidate.exists(): + return candidate + return None + + for name in PREFERRED_DB_ORDER: + if not name: + continue + p = Path(name) if Path(name).is_absolute() else vectors_dir / name + if p.exists(): + return p + + # Fallback: any *.db in the dir, largest first. + if vectors_dir.is_dir(): + candidates = sorted(vectors_dir.glob("*.db"), key=lambda p: p.stat().st_size, reverse=True) + if candidates: + return candidates[0] + return None + + + def _embed(text: str) -> list: + """Embed a query via Ollama's /api/embeddings. Single-vector response.""" + body = json.dumps({"model": EMBEDDING_MODEL, "prompt": text}).encode("utf-8") + req = urllib.request.Request( + f"{OLLAMA_BASE_URL.rstrip('/')}/api/embeddings", + data=body, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=60) as resp: + data = json.loads(resp.read().decode("utf-8")) + vec = data.get("embedding") + if not isinstance(vec, list) or not vec: + raise RuntimeError(f"Ollama returned no embedding: {data}") + return [float(x) for x in vec] + + + def _cosine(a: list, b: list) -> float: + """Cosine similarity in pure Python — no numpy in the A0 container.""" + # zip() stops at the shorter — AiStation DB guarantees same dim per index. + dot = 0.0 + na = 0.0 + nb = 0.0 + for x, y in zip(a, b): + dot += x * y + na += x * x + nb += y * y + if na == 0.0 or nb == 0.0: + return 0.0 + return dot / (math.sqrt(na) * math.sqrt(nb)) + + + def _search(db_path: Path, query_vec: list, index_filter: str, repo_filter: str, limit: int) -> list: + """Load entries, compute cosine, return top-K. + + SqliteVecVectorStore schema: + VectorIndexes(IndexName, Dimensions, UpdatedAtUtc) + VectorEntries(IndexName, ChunkId, TextContent, SourceRepo, SourceFile, + Book, Chapter, VerseRange, Language, ContentType, License, + EstimatedTokens, EmbeddingJson) + + Embeddings are stored as JSON arrays in EmbeddingJson; similarity is computed + in Python. For ~100k chunks × 768 dims this takes a couple seconds on a + workstation — acceptable for interactive A0 use. + """ + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + try: + sql = [ + "SELECT IndexName, ChunkId, TextContent, SourceRepo, SourceFile, ", + " Book, Chapter, VerseRange, Language, EmbeddingJson ", + "FROM VectorEntries", + ] + where = [] + params = [] + if index_filter: + where.append("IndexName = ?") + params.append(index_filter) + if repo_filter: + where.append("SourceRepo LIKE ?") + params.append(f"%{repo_filter}%") + if where: + sql.append(" WHERE " + " AND ".join(where)) + sql.append(";") + + cursor = conn.execute("".join(sql), params) + + # Min-heap by (score, ...) would be faster but for interactive use we + # just sort at the end — simpler and readable. + scored = [] + for row in cursor: + idx, chunk_id, text, repo, source_file, book, chapter, verses, lang, emb_json = row + try: + vec = json.loads(emb_json) + except (json.JSONDecodeError, TypeError): + continue + score = _cosine(query_vec, vec) + passage = None + if book and chapter: + passage = f"{book} {chapter}" + if verses: + passage += f":{verses}" + scored.append((score, { + "index": idx, + "chunk_id": chunk_id, + "text": text, + "repo": repo or "", + "source": source_file or "", + "passage": passage or "", + "language": lang or "", + })) + scored.sort(key=lambda t: t[0], reverse=True) + return [{"score": s, **meta} for s, meta in scored[:limit]] + finally: + conn.close() + + + def _render_stats() -> str: + """Markdown inventory of every *.db in FLOWERCORE_FLEET_VECTOR_DIR.""" + vectors_dir = Path(DEFAULT_VECTORS_DIR) + if not vectors_dir.is_dir(): + return f"No fleet vector dir mounted at {vectors_dir}. Ask the host operator to build an index with scripts/agent-zero/build-fleet-index.sh." + + dbs = sorted(vectors_dir.glob("*.db")) + if not dbs: + return f"No fleet DBs present under {vectors_dir}. Run `scripts/agent-zero/build-fleet-index.sh fleet-workstation-full` on the host." + + lines = [f"**Fleet vector DB inventory** ({vectors_dir})", ""] + for db in dbs: + size_mb = db.stat().st_size / (1024 * 1024) + lines.append(f"### `{db.name}` ({size_mb:.1f} MB)") + try: + conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True) + try: + idx_rows = conn.execute( + "SELECT IndexName, Dimensions, UpdatedAtUtc FROM VectorIndexes ORDER BY IndexName;" + ).fetchall() + if not idx_rows: + lines.append("- (no indexes registered)") + else: + counts = dict(conn.execute( + "SELECT IndexName, COUNT(*) FROM VectorEntries GROUP BY IndexName;" + ).fetchall()) + for name, dim, updated in idx_rows: + count = counts.get(name, 0) + lines.append(f"- **{name}** — {count:,} chunks × {dim}d (built {updated})") + finally: + conn.close() + except Exception as e: + lines.append(f"- (inspect failed: {e})") + lines.append("") + + lines.append(f"**Tool defaults:** embedding model `{EMBEDDING_MODEL}`, Ollama at `{OLLAMA_BASE_URL}`. Pick a DB with `db=`; filter by `index=`/`repo=`.") + return "\n".join(lines).rstrip() + "\n" + + + def _list_indexes_summary(db_path: Path) -> str: + try: + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + try: + rows = conn.execute( + "SELECT IndexName, Dimensions, " + " (SELECT COUNT(*) FROM VectorEntries WHERE VectorEntries.IndexName = VectorIndexes.IndexName) " + "FROM VectorIndexes ORDER BY IndexName;" + ).fetchall() + if not rows: + return "(no indexes)" + return ", ".join(f"{r[0]}({r[2]}×{r[1]}d)" for r in rows) + finally: + conn.close() + except Exception as e: + return f"(couldn't list: {e})" + intranet_search.py: | + # Intranet Vector Search Tool + # Queries the Blue Jay Lab Intranet's Shared.Indexing RAG corpus over its + # live REST API (https://intranet.iamworkin.lan/search). Returns ranked chunks + # with source file paths and scores. + + import json + import os + import ssl + import urllib.parse + import urllib.request + + from python.helpers.tool import Tool, Response + + + INTRANET_BASE_URL = os.environ.get( + "FLOWERCORE_INTRANET_URL", + "https://intranet.iamworkin.lan", + ) + STEPCA_ROOT_CRT = "/a0/usr/ca/stepca-root.crt" + + + def _ssl_ctx() -> ssl.SSLContext: + ctx = ssl.create_default_context() + if os.path.exists(STEPCA_ROOT_CRT): + ctx.load_verify_locations(cafile=STEPCA_ROOT_CRT) + return ctx + + + class IntranetSearch(Tool): + async def execute(self, **kwargs) -> Response: + """ + Search the Blue Jay Lab intranet corpus (docs, project notes, dashboards). + + Args (via self.args): + query (str): Search query. Required. + limit (int): Max chunks to return. Default 8. + corpus (str): Optional corpus filter (e.g. "notes", "docs"). + + Returns: + Response with ranked chunk text, source path, and score. + """ + query = self.args.get("query", "").strip() + limit = int(self.args.get("limit", 8)) + corpus = self.args.get("corpus", "").strip() + + if not query: + return Response( + message="Error: 'query' is required.", + break_loop=False, + ) + + params = {"q": query, "topK": str(limit)} + if corpus: + params["indexName"] = corpus + url = f"{INTRANET_BASE_URL}/api/search?{urllib.parse.urlencode(params)}" + + try: + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=20, context=_ssl_ctx()) as resp: + raw = resp.read().decode("utf-8", errors="replace") + except Exception as exc: + return Response( + message=f"Intranet search failed: {exc}\nURL: {url}", + break_loop=False, + ) + + try: + data = json.loads(raw) + except json.JSONDecodeError: + return Response( + message=f"Intranet returned non-JSON response:\n{raw[:500]}", + break_loop=False, + ) + + hits = data if isinstance(data, list) else ( + data.get("results") or data.get("hits") or data.get("chunks") or [] + ) + if not hits: + return Response( + message=f"No intranet results for query: {query!r}", + break_loop=False, + ) + + lines = [f"# Intranet search: {query} ({len(hits)} hits)\n"] + for i, hit in enumerate(hits[:limit], 1): + src = ( + hit.get("sourceFile") + or hit.get("source") + or hit.get("path") + or hit.get("file") + or "?" + ) + repo = hit.get("sourceRepo") or "" + idx = hit.get("indexName") or "" + score = hit.get("score") or hit.get("similarity") or "" + text = ( + hit.get("snippet") + or hit.get("text") + or hit.get("content") + or hit.get("chunk") + or "" + ).strip() + if len(text) > 600: + text = text[:600] + "..." + header = f"## [{i}] {repo}/{src}" if repo else f"## [{i}] {src}" + if idx: + header += f" ({idx})" + if score: + header += f" score={score:.3f}" if isinstance(score, float) else f" score={score}" + lines.append(header) + lines.append(text) + lines.append("") + + return Response(message="\n".join(lines), break_loop=False) agent.json: | { "title": "Blue Jay",