feat(agent-zero): add corpus_search + intranet_search to cluster configmaps

- Add corpus_search.py to bluejay-tools-c: semantic vector search over
  fleet SQLite-vec DBs (fleet-workstation-full, fleet-pi-edge, fleet-bmo-bot).
  Returns offline-friendly results for Bible/Greek/Hebrew/Strongs corpora.
  Cluster pod degrades gracefully (no DB mounted yet — BLUEJAY-WS only for now).

- Add intranet_search.py to bluejay-tools-c: live RAG search over the
  intranet vector store via GET /api/search?q=...&topK=N. Uses in-cluster
  service URL (http://intranet-web.intranet.svc:5300) to bypass Traefik TLS
  and the private-range egress denylist.

- Fix intranet_search.py param name: was 'limit', now 'topK' matching the
  SearchController's [FromQuery] parameter name.

- NetworkPolicy: add egress rule for intranet namespace port 5300 (without
  this the pod's TCP connection to the search endpoint was dropped).

- agent-zero.yaml: set FLOWERCORE_INTRANET_URL env var to in-cluster service
  URL so intranet_search uses internal routing, not the public Traefik VIP.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Andrew Stoltz
2026-04-29 08:34:31 -05:00
parent f1431f7324
commit b71f9e4ec9
2 changed files with 454 additions and 0 deletions

View File

@@ -412,6 +412,12 @@ spec:
secretKeyRef:
name: print-web-api-keys
key: password
# Intranet search — use in-cluster HTTP (no step-ca TLS needed)
# corpus_search.py reads FLOWERCORE_FLEET_VECTOR_DIR but that mount is not
# on the cluster yet (BLUEJAY-WS only). The tool gracefully returns a
# "no DB found" message with rebuild instructions rather than crashing.
- name: FLOWERCORE_INTRANET_URL
value: "http://intranet-web.intranet.svc:5300"
# Kubernetes
- name: KUBERNETES_SERVICE_HOST
value: "kubernetes.default.svc"
@@ -624,6 +630,15 @@ spec:
protocol: TCP
- port: 8080
protocol: TCP
# Intranet search API — use in-cluster svc so traffic stays inside
# the cluster and is not blocked by the private-range egress denylist.
- to:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: intranet
ports:
- port: 5300
protocol: TCP
# Allow internet (for kubectl image pull, etc)
- to:
- ipBlock:

View File

@@ -13158,6 +13158,445 @@ metadata:
---
apiVersion: v1
data:
corpus_search.py: |
# FlowerCore Fleet Corpus Vector Search Tool
#
# Queries the AiStation-built SqliteVecVectorStore DB at /a0/usr/vectors/fleet.db
# (bind-mounted read-only from /var/lib/flowercore/vector-stores/ on the host).
# Embeds the query through Ollama's nomic-embed-text model, computes cosine
# similarity against every stored chunk in pure Python (no numpy — not present
# in the container), and returns the top-K nearest neighbors with source metadata.
#
# This is the offline-friendly counterpart to `intranet_search` (which hits the
# Intranet's live REST API). Use it for Bible/Greek/Hebrew/Strong's lookups and
# anywhere the workstation has a newer DB than the Intranet one. The store is
# refreshed by `aistation-indexer build <edition>` — see the FlowerCore.Knowledge
# ADR at docs/ai-agents/flowercore-knowledge-service-plan.md.
import json
import math
import os
import sqlite3
import urllib.request
from pathlib import Path
from python.helpers.tool import Tool, Response
DEFAULT_VECTORS_DIR = os.environ.get(
"FLOWERCORE_FLEET_VECTOR_DIR",
"/a0/usr/vectors",
)
# When the caller doesn't pick an explicit DB, prefer the biggest fleet tier
# present on disk. Workstation → pi-edge → bmo-bot.
PREFERRED_DB_ORDER = [
os.environ.get("FLOWERCORE_FLEET_VECTOR_DB", ""),
"fleet-workstation-full.db",
"fleet-pi-edge.db",
"fleet-bmo-bot.db",
]
OLLAMA_BASE_URL = os.environ.get(
"FLOWERCORE_AGENTZERO_OLLAMA_URL",
"http://host.containers.internal:11434",
)
EMBEDDING_MODEL = os.environ.get(
"FLOWERCORE_FLEET_EMBEDDING_MODEL",
"nomic-embed-text",
)
class CorpusSearch(Tool):
async def execute(self, **kwargs) -> Response:
"""
Semantic search over the FlowerCore fleet corpus (Bible texts, lexicons,
dictionaries, morphology) pre-indexed by aistation-indexer.
Args (via self.args):
query (str): Search query text. Required unless action=stats.
limit (int): Max results. Default 8.
index (str): Optional index name filter ("bible-texts", "lexicons",
"dictionaries", "morphology"). Default: all indexes.
repo (str): Optional repo filter (e.g. "world-english-bible").
db (str): Override DB path OR file name inside FLOWERCORE_FLEET_VECTOR_DIR
(defaults to /a0/usr/vectors). If omitted, the largest
fleet tier present on disk is picked automatically.
action (str): Optional. "stats" returns an inventory of all fleet DBs
visible to the tool (names, sizes, index counts, chunk
counts, last-built timestamps). No embedding call.
Returns:
Response with ranked chunks (score, source, text preview) OR
(when action=stats) a markdown inventory of available fleet DBs.
"""
query = (self.args.get("query") or "").strip()
limit = int(self.args.get("limit") or 8)
index_filter = (self.args.get("index") or "").strip()
repo_filter = (self.args.get("repo") or "").strip()
db_override = (self.args.get("db") or "").strip()
action = (self.args.get("action") or "").strip().lower()
if action == "stats":
return Response(message=_render_stats(), break_loop=False)
if not query:
return Response(
message=(
"Error: 'query' is required unless action=stats.\n"
"Example: query=\"what does Genesis 1:1 say\" limit=5\n"
"Inventory: action=stats"
),
break_loop=False,
)
db = _resolve_db(db_override)
if db is None:
return Response(
message=(
f"Error: no fleet vector DB found under {DEFAULT_VECTORS_DIR}.\n"
"Host side: run `aistation-indexer build fleet-workstation-full`\n"
"(or `fleet-pi-edge`/`fleet-bmo-bot`) to produce\n"
"`/var/lib/flowercore/vector-stores/<slug>.db`, then confirm the\n"
"Podman unit mounts that directory into `/a0/usr/vectors:ro`."
),
break_loop=False,
)
try:
query_vec = _embed(query)
except Exception as e:
return Response(
message=f"Error: failed to embed query via Ollama at {OLLAMA_BASE_URL}: {e}",
break_loop=False,
)
try:
hits = _search(db, query_vec, index_filter, repo_filter, limit)
except Exception as e:
return Response(
message=f"Error: corpus search failed: {e}",
break_loop=False,
)
if not hits:
return Response(
message=(
f"No matches for '{query}' in {db.name}.\n"
f"Indexes available: " + _list_indexes_summary(db)
),
break_loop=False,
)
lines = [f"**Corpus search: `{query}`** (top {len(hits)} of {limit} requested, DB={db.name})", ""]
for rank, h in enumerate(hits, 1):
passage = h.get("passage") or ""
lang = h.get("language") or ""
meta_bits = [x for x in (h["index"], h["repo"], passage, lang) if x]
meta = " · ".join(meta_bits)
preview = h["text"]
if len(preview) > 320:
preview = preview[:320].rstrip() + "…"
lines.append(f"{rank}. **{h['score']:.3f}** {meta}")
lines.append(f" `{h['source']}`")
lines.append(f" {preview}")
lines.append("")
return Response(message="\n".join(lines).rstrip() + "\n", break_loop=False)
def _resolve_db(override: str) -> "Path | None":
"""Pick a fleet DB by explicit path, explicit filename, or preferred order."""
vectors_dir = Path(DEFAULT_VECTORS_DIR)
if override:
# Absolute or relative path that points at a real file wins outright.
p = Path(override)
if p.is_absolute() and p.exists():
return p
# Otherwise treat it as a filename within the vectors dir.
candidate = vectors_dir / override
if candidate.exists():
return candidate
return None
for name in PREFERRED_DB_ORDER:
if not name:
continue
p = Path(name) if Path(name).is_absolute() else vectors_dir / name
if p.exists():
return p
# Fallback: any *.db in the dir, largest first.
if vectors_dir.is_dir():
candidates = sorted(vectors_dir.glob("*.db"), key=lambda p: p.stat().st_size, reverse=True)
if candidates:
return candidates[0]
return None
def _embed(text: str) -> list:
"""Embed a query via Ollama's /api/embeddings. Single-vector response."""
body = json.dumps({"model": EMBEDDING_MODEL, "prompt": text}).encode("utf-8")
req = urllib.request.Request(
f"{OLLAMA_BASE_URL.rstrip('/')}/api/embeddings",
data=body,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read().decode("utf-8"))
vec = data.get("embedding")
if not isinstance(vec, list) or not vec:
raise RuntimeError(f"Ollama returned no embedding: {data}")
return [float(x) for x in vec]
def _cosine(a: list, b: list) -> float:
"""Cosine similarity in pure Python — no numpy in the A0 container."""
# zip() stops at the shorter — AiStation DB guarantees same dim per index.
dot = 0.0
na = 0.0
nb = 0.0
for x, y in zip(a, b):
dot += x * y
na += x * x
nb += y * y
if na == 0.0 or nb == 0.0:
return 0.0
return dot / (math.sqrt(na) * math.sqrt(nb))
def _search(db_path: Path, query_vec: list, index_filter: str, repo_filter: str, limit: int) -> list:
"""Load entries, compute cosine, return top-K.
SqliteVecVectorStore schema:
VectorIndexes(IndexName, Dimensions, UpdatedAtUtc)
VectorEntries(IndexName, ChunkId, TextContent, SourceRepo, SourceFile,
Book, Chapter, VerseRange, Language, ContentType, License,
EstimatedTokens, EmbeddingJson)
Embeddings are stored as JSON arrays in EmbeddingJson; similarity is computed
in Python. For ~100k chunks × 768 dims this takes a couple seconds on a
workstation — acceptable for interactive A0 use.
"""
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
try:
sql = [
"SELECT IndexName, ChunkId, TextContent, SourceRepo, SourceFile, ",
" Book, Chapter, VerseRange, Language, EmbeddingJson ",
"FROM VectorEntries",
]
where = []
params = []
if index_filter:
where.append("IndexName = ?")
params.append(index_filter)
if repo_filter:
where.append("SourceRepo LIKE ?")
params.append(f"%{repo_filter}%")
if where:
sql.append(" WHERE " + " AND ".join(where))
sql.append(";")
cursor = conn.execute("".join(sql), params)
# Min-heap by (score, ...) would be faster but for interactive use we
# just sort at the end — simpler and readable.
scored = []
for row in cursor:
idx, chunk_id, text, repo, source_file, book, chapter, verses, lang, emb_json = row
try:
vec = json.loads(emb_json)
except (json.JSONDecodeError, TypeError):
continue
score = _cosine(query_vec, vec)
passage = None
if book and chapter:
passage = f"{book} {chapter}"
if verses:
passage += f":{verses}"
scored.append((score, {
"index": idx,
"chunk_id": chunk_id,
"text": text,
"repo": repo or "",
"source": source_file or "",
"passage": passage or "",
"language": lang or "",
}))
scored.sort(key=lambda t: t[0], reverse=True)
return [{"score": s, **meta} for s, meta in scored[:limit]]
finally:
conn.close()
def _render_stats() -> str:
"""Markdown inventory of every *.db in FLOWERCORE_FLEET_VECTOR_DIR."""
vectors_dir = Path(DEFAULT_VECTORS_DIR)
if not vectors_dir.is_dir():
return f"No fleet vector dir mounted at {vectors_dir}. Ask the host operator to build an index with scripts/agent-zero/build-fleet-index.sh."
dbs = sorted(vectors_dir.glob("*.db"))
if not dbs:
return f"No fleet DBs present under {vectors_dir}. Run `scripts/agent-zero/build-fleet-index.sh fleet-workstation-full` on the host."
lines = [f"**Fleet vector DB inventory** ({vectors_dir})", ""]
for db in dbs:
size_mb = db.stat().st_size / (1024 * 1024)
lines.append(f"### `{db.name}` ({size_mb:.1f} MB)")
try:
conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True)
try:
idx_rows = conn.execute(
"SELECT IndexName, Dimensions, UpdatedAtUtc FROM VectorIndexes ORDER BY IndexName;"
).fetchall()
if not idx_rows:
lines.append("- (no indexes registered)")
else:
counts = dict(conn.execute(
"SELECT IndexName, COUNT(*) FROM VectorEntries GROUP BY IndexName;"
).fetchall())
for name, dim, updated in idx_rows:
count = counts.get(name, 0)
lines.append(f"- **{name}** — {count:,} chunks × {dim}d (built {updated})")
finally:
conn.close()
except Exception as e:
lines.append(f"- (inspect failed: {e})")
lines.append("")
lines.append(f"**Tool defaults:** embedding model `{EMBEDDING_MODEL}`, Ollama at `{OLLAMA_BASE_URL}`. Pick a DB with `db=<filename>`; filter by `index=<name>`/`repo=<substring>`.")
return "\n".join(lines).rstrip() + "\n"
def _list_indexes_summary(db_path: Path) -> str:
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
try:
rows = conn.execute(
"SELECT IndexName, Dimensions, "
" (SELECT COUNT(*) FROM VectorEntries WHERE VectorEntries.IndexName = VectorIndexes.IndexName) "
"FROM VectorIndexes ORDER BY IndexName;"
).fetchall()
if not rows:
return "(no indexes)"
return ", ".join(f"{r[0]}({r[2]}×{r[1]}d)" for r in rows)
finally:
conn.close()
except Exception as e:
return f"(couldn't list: {e})"
intranet_search.py: |
# Intranet Vector Search Tool
# Queries the Blue Jay Lab Intranet's Shared.Indexing RAG corpus over its
# live REST API (https://intranet.iamworkin.lan/search). Returns ranked chunks
# with source file paths and scores.
import json
import os
import ssl
import urllib.parse
import urllib.request
from python.helpers.tool import Tool, Response
INTRANET_BASE_URL = os.environ.get(
"FLOWERCORE_INTRANET_URL",
"https://intranet.iamworkin.lan",
)
STEPCA_ROOT_CRT = "/a0/usr/ca/stepca-root.crt"
def _ssl_ctx() -> ssl.SSLContext:
ctx = ssl.create_default_context()
if os.path.exists(STEPCA_ROOT_CRT):
ctx.load_verify_locations(cafile=STEPCA_ROOT_CRT)
return ctx
class IntranetSearch(Tool):
async def execute(self, **kwargs) -> Response:
"""
Search the Blue Jay Lab intranet corpus (docs, project notes, dashboards).
Args (via self.args):
query (str): Search query. Required.
limit (int): Max chunks to return. Default 8.
corpus (str): Optional corpus filter (e.g. "notes", "docs").
Returns:
Response with ranked chunk text, source path, and score.
"""
query = self.args.get("query", "").strip()
limit = int(self.args.get("limit", 8))
corpus = self.args.get("corpus", "").strip()
if not query:
return Response(
message="Error: 'query' is required.",
break_loop=False,
)
params = {"q": query, "topK": str(limit)}
if corpus:
params["indexName"] = corpus
url = f"{INTRANET_BASE_URL}/api/search?{urllib.parse.urlencode(params)}"
try:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=20, context=_ssl_ctx()) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except Exception as exc:
return Response(
message=f"Intranet search failed: {exc}\nURL: {url}",
break_loop=False,
)
try:
data = json.loads(raw)
except json.JSONDecodeError:
return Response(
message=f"Intranet returned non-JSON response:\n{raw[:500]}",
break_loop=False,
)
hits = data if isinstance(data, list) else (
data.get("results") or data.get("hits") or data.get("chunks") or []
)
if not hits:
return Response(
message=f"No intranet results for query: {query!r}",
break_loop=False,
)
lines = [f"# Intranet search: {query} ({len(hits)} hits)\n"]
for i, hit in enumerate(hits[:limit], 1):
src = (
hit.get("sourceFile")
or hit.get("source")
or hit.get("path")
or hit.get("file")
or "?"
)
repo = hit.get("sourceRepo") or ""
idx = hit.get("indexName") or ""
score = hit.get("score") or hit.get("similarity") or ""
text = (
hit.get("snippet")
or hit.get("text")
or hit.get("content")
or hit.get("chunk")
or ""
).strip()
if len(text) > 600:
text = text[:600] + "..."
header = f"## [{i}] {repo}/{src}" if repo else f"## [{i}] {src}"
if idx:
header += f" ({idx})"
if score:
header += f" score={score:.3f}" if isinstance(score, float) else f" score={score}"
lines.append(header)
lines.append(text)
lines.append("")
return Response(message="\n".join(lines), break_loop=False)
agent.json: |
{
"title": "Blue Jay",