diff --git a/.gitignore b/.gitignore index c2f4095..d362a70 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ venv/ *.egg-info/ dist/ build/ +examples/ diff --git a/README.md b/README.md index 247a240..fdc2878 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,18 @@ # Dervish MCP -
-
+
+
+
**Dervish** infers **regular expression grammars** from example sequences using the BEX family of algorithms. Given a set of example sequences (strings over some alphabet), it learns a compact regular expression that captures the general pattern. @@ -41,18 +49,19 @@ The primary interface is a **Model Context Protocol (MCP)** server. Connect any | Tool | Parameters | What it does | |------|-----------|-------------| -| `infer_best_grammar` | `sequences`, `prefer`, `kmax`, `N` | **The only tool you need.** Runs CRX + iDRegEx, picks best by MDL. Set `prefer='crx'` for full coverage or `prefer='idregex'` for minimal core — skips the ensemble and runs one algorithm. | +| `infer_best_grammar` | `sequences`, `prefer`, `kmax`, `N`, `min_coverage` | **The only tool you need.** Runs CRX + iDRegEx + kOREInference, picks best by MDL. Set `prefer` to run only one algorithm. Set `min_coverage < 1.0` for optional core+outlier analysis. | **Parameters explained:** -- **`prefer`**: `'crx'` for full vocabulary (accepts all sequences), `'idregex'` for minimal common core (only what every example shares). Omit to let MDL pick the winner. -- **`kmax`** (1–5): Context window for iDRegEx's k-testable automaton. Higher values capture longer-range dependencies but need more data and are slower. Default 2 works for most cases. -- **`N`** (1–10): Baum-Welch EM iterations for iDRegEx training. More iterations = better convergence but slower. Default 3 is a good balance. +- **`prefer`**: `'crx'` for full vocabulary (accepts all sequences), `'idregex'` or `'koreinference'` for deterministic minimal core. Omit to let MDL pick the winner across all three. +- **`kmax`** (1–5): Context window for k-ORE inference (iDRegEx, kOREInference). Higher values capture longer-range dependencies but need more data and are slower. Default 2 works for most cases. +- **`N`** (1–10): Random trials for k-ORE inference. More = better convergence but slower. Default 3. +- **`min_coverage`** (0.5–1.0): **Optional core+outlier analysis.** When < 1.0, iteratively removes outlier sequences (those with the rarest symbols) until at least this fraction remain. Returns the core CRX grammar for the majority plus a list of removed outliers. Default 1.0 = disabled. Example: `min_coverage=0.8` finds the tight pattern for ~80% of examples while flagging the other ~20% as variants. ### Agent workflow An LLM agent uses the MCP to discover an unwritten convention from existing examples — compressing hundreds of files into a single ~60-token rule: -``` +```text User: Generate a new Ansible role for installing PostgreSQL. Agent: Let me check what pattern the existing community roles follow. @@ -78,6 +87,11 @@ Agent: Let me check what pattern the existing community roles follow. **With Dervish:** one MCP call returns a ~60-token grammar known to match 15/15 existing roles. The agent follows it reliably. +**Core+outlier mode:** When generating a new role, the agent can call with +`min_coverage=0.8` to learn the mainstream pattern while seeing which roles +deviate and why — useful when the user's case resembles an outlier +(e.g., a PHP app like phpmyadmin that needs raw `lineinfile`). + ## Quick Start ```bash @@ -108,8 +122,8 @@ Dervish discovers these conventions automatically from existing examples. The do | Domain | What gets extracted | Example extracted symbols | What Dervish discovers | Why it helps an LLM | |--------|-------------------|--------------------------|----------------------|---------------------| | Ansible roles | Module names from `tasks/main.yml` in order | `fail`, `include_vars`, `set_fact`, `package`, `file`, `template`, `service`, `npm`, `pip`, `lineinfile` | `fail?.(include_vars+set_fact+package+file+template+service+...)+.include+?.(npm+pip)+?.lineinfile?` | "Validate preconditions first, then set vars, install packages, configure with templates, start services. Include sub-roles last." | -| Helm charts | K8s resource kinds from `helm template` output in rendered order | `ServiceAccount`, `ClusterRole`, `ClusterRoleBinding`, `Service`, `Deployment`, `ConfigMap`, `Alertmanager` | `ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment` (iDRegEx minimal core) | "Every Prometheus stack needs this bootstrap pipeline. Everything else is optional." | -| GitHub Actions (Go lint) | Step `uses:` or `run:` values from workflow YAML in job order | `actions/checkout`, `actions/setup-go`, `golangci/golangci-lint-action`, `megalinter/megalinter` | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | "Starting a new Go project on GitHub Actions? Four independent projects converged on: checkout → setup Go → golangci-lint → (optionally megalinter)." | +| Helm charts (cross-project, 15 charts) | K8s resource kinds from `helm template` output in rendered order | `NetworkPolicy`, `PodDisruptionBudget`, `ServiceAccount`, `Secret`, `ConfigMap`, `Service`, `Deployment`, `StatefulSet`, `ClusterRole`, `ClusterRoleBinding` | `NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret?.ConfigMap?.PersistentVolumeClaim?.ClusterRole?.ClusterRoleBinding?.Service.Deployment?.StatefulSet?.(IngressClass+MutatingWebhookConfiguration)?.ValidatingWebhookConfiguration?.Job?` | "Writing a Helm chart? Start with resilience (PDB, NetworkPolicy), then identity (ServiceAccount, Secrets), then the Service, then your workload. Only cluster-wide tools need RBAC." | +| GitHub Actions (Go lint) | Step `uses:` or `run:` values from workflow YAML in job order | `actions/checkout`, `actions/setup-go`, `golangci/golangci-lint-action`, `megalinter/megalinter` | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | "Starting a new Go project on GitHub Actions? Four independent projects converged on: checkout → setup Go → (optional golangci-lint) → (optional megalinter)." | ## Real-world Results @@ -119,19 +133,22 @@ Dervish has been tested against public datasets from Ansible Galaxy, Helm, and G | Dataset | Best grammar | Compression | |---------|-------------|-------------| | Ansible Galaxy (15 roles) | `fail?.(include_vars+set_fact+package+file+template+service+...)+.include+?.(npm+pip)+?.lineinfile?` | 5,000 tokens → 60 tokens (83×) | -| Helm (6 configs) | `ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment` | ~3,000 tokens → 40 tokens (75×) | +| Helm cross-project (15 charts) | `NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret?.ConfigMap?...Service.Deployment?.StatefulSet?...` | 121 tokens → 35 tokens (3.5×) | | Go lint (6 jobs) | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | ~900 tokens → 30 tokens (30×) | The sweet spot: **multiple implementations of the same abstract task** with a shared but undocumented pattern. Not everything works — Dockerfiles, pre-commit configs, and schema-enforced formats are too rigid or too diverse to yield a convention. +> **kOREInference note:** Algorithm 4 (iDRegEx with MDL, arXiv 1004.2372) is included for paper-faithful correctness. On real tool-sequence data, its rwr₀ repair step returns ∅ because the k-OA is rarely SORE (interconnected symbols). The ensemble falls back to CRX or iDRegEx automatically. + ## Algorithm Selection Guide | When | Use | Why | |------|-----|-----| | Clean, structured data with full vocabulary | **CRX** | Single-pass, deterministic. Accepts all sequences. | -| Few examples, or want minimal common core | **iDRegEx** | Probabilistic EM, finds only what's shared. | -| Don't know which is better | **Ensemble (default)** | Runs both, picks the best by MDL score. | -| Data is clearly one type | `prefer='crx'` or `prefer='idregex'` | Skips ensemble comparison, runs one algorithm. | +| Few examples, or want minimal common core | **iDRegEx** or **kOREInference** | Probabilistic EM, finds only what's shared. | +| Don't know which is better | **Ensemble (default)** | Runs all three, picks best by MDL score. | +| Want core pattern + outlier detection | **Ensemble + `min_coverage<1`** | Finds tight grammar for majority, flags outliers. | +| Data is clearly one type | `prefer='crx'` | Skips ensemble comparison, runs CRX alone. | ## When each algorithm wins @@ -139,9 +156,11 @@ The sweet spot: **multiple implementations of the same abstract task** with a sh |---------------|--------|-----| | Diverse patterns, full vocabulary needed | CRX | Captures all symbols. iDRegEx returns ∅. | | Clean sequences with clear core | iDRegEx | Extracts minimal common subsequence. CRX buries it in optional noise. | +| Interconnected (non-SORE) data | CRX | kOREInference (rwr₀) returns ∅ when k-OA is not SORE. CRX handles it. | | Single sequence | iDRegEx (+ RWR₀) | RWR₀ repair produces a grammatical regex from one example. | | 2–3 sequences | iDRegEx | CRX overfits. iDRegEx handles noise better. | | Many sequences, tight pattern | CRX | Learns precise concatenation with optional suffixes. | +| Want majority pattern + outlier list | CRX + `min_coverage` | Core analysis finds tight grammar for ~80%, flags the rest. | ## Token savings @@ -153,7 +172,7 @@ Across all public benchmarks, Dervish delivers **40–83× compression**. The gr ## How MDL scoring works -``` +```text MDL = model_cost + data_cost ``` diff --git a/SHOWCASE.md b/SHOWCASE.md index fc2ff39..abf84b1 100644 --- a/SHOWCASE.md +++ b/SHOWCASE.md @@ -6,7 +6,7 @@ Infer the **unwritten convention** from existing examples. Given N example sequences, produce a ~100-char grammar that captures the structural pattern — in far fewer tokens than the originals. -``` +```text a.b → a then b (concatenation) (a+b) → a or b (disjunction) r? → optional (zero or one) @@ -14,13 +14,13 @@ r+ → one or more (iteration) r+? → zero or more ``` -## 1. Ansible Galaxy roles (15 geerlingguy roles) — flagship +## 1. Ansible Galaxy roles (15 geerlingguy roles) 15 popular Ansible roles by Jeff Geerling. There is NO written convention for the module ordering in `tasks/main.yml`. Our grammar is its first explicit description: -``` +```text Grammar: fail?.(include_vars+set_fact+package+file+template+service+...)+. include+?.(npm+pip)+?.lineinfile? ``` @@ -34,25 +34,60 @@ All 15/15 match. **~29× compression** (7200+ modules → ~250 chars). exact structure: fail-check first, then vars, then packages, then config/svc. No guessing. -## 2. Helm chart (kube-prometheus-stack, 6 configs) +### Bonus: core+outlier analysis -6 different `values.yaml` files rendered through the same chart: +Set `min_coverage=0.8` to find the tight grammar for the majority while +flagging outlier roles with unusual module usage: -``` -Best: iDRegEx | MDL 1433 -Grammar: ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment +```text +Core CRX (80% coverage, 3 outliers): + fail?.(include_vars+set_fact+package+file+template+service+...)+ + +Outlier sequences: + 1. phpmyadmin: include_vars → set_fact → include → include → lineinfile + 2. composer: fail → set_fact → stat → uri → get_url → command + 3. pip: package → file → pip ``` -The **minimal core** every config must deploy. CRX captures the full -vocabulary (19 kinds). Which one an agent uses depends on the task: -- Bootstrapping a new cluster: iDRegEx — what you can't skip -- Writing a complete chart: CRX — everything you might need +phpmyadmin uses raw `lineinfile` instead of templates; composer needs +a `stat` check + `uri` download; pip is purely `pip` — all three deviate +from the mainstream install → configure → enable pattern. + +## 2. Helm charts — cross-project convention (15 charts, 6 publishers) + +15 popular Helm charts from **Bitnami** (10), **Grafana**, **Jetstack** (cert-manager), +**Argo**, **Ingress-Nginx**, and **Elastic**. Different publishers, different +purposes (databases, web servers, infrastructure tools) — but they converged +on a common resource ordering: + +```text +Best: CRX | MDL 230 +Grammar: NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret? + .ConfigMap?.PersistentVolumeClaim?.ClusterRole?.ClusterRoleBinding? + .Role?.RoleBinding?.Service.Deployment?.StatefulSet?. + (IngressClass+MutatingWebhookConfiguration)?.ValidatingWebhookConfiguration?.Job? + +Match rates: CRX=15/15 +``` + +Every chart follows: **resilience → identity → data → service → workload → extensions**. + +`Service` is the **only resource type that appears in all 15 charts**. +Bitnami charts (10/15) consistently start with `NetworkPolicy + PodDisruptionBudget` +before identity and service. Infrastructure tools (cert-manager, grafana, +argo-cd, ingress-nginx) add RBAC and admission webhooks for cluster-wide access. + +**Why it helps an LLM:** Generating a Helm chart template? You know the +structure: start with availability guarantees (PDB, NetworkPolicy), then +identity (ServiceAccount, Secrets), then the Service endpoint, then your +workload type. Only cluster-wide tools need RBAC and webhooks — skip them +for simple application charts. ## 3. GitHub Actions (cross-project Go lint, 6 jobs) Lint jobs from prometheus, goreleaser, cosign, sigstore: -``` +```text Best: CRX | MDL 13.6 Grammar: actions/checkout.(actions/setup-go+run:echo+run:sudo)+. golangci/golangci-lint-action?.megalinter? @@ -77,10 +112,17 @@ with a shared but undocumented pattern. ## Usage ```python -from bex.mcp_server import infer_best_grammar +from bex import infer_ensemble -output = infer_best_grammar( - sequences=role_sequences, - prefer="crx", -) +# Pick best across all 3 algorithms (CRX + iDRegEx + kOREInference) +result = infer_ensemble(role_sequences) +print(f"Best: {result['best']['algorithm']}") +print(f"Grammar: {result['best']['grammar']}") + +# Or: find the tight core + flag outliers +result = infer_ensemble(role_sequences, min_coverage=0.8) +print(f"Core: {result['core']['grammar']}") +print(f"Outliers ({result['core']['outlier_count']}):") +for i, o in enumerate(result['core']['outliers'], 1): + print(f" {i}. {' → '.join(str(x) for x in o[:8])}{'...' if len(o) > 8 else ''}") ``` diff --git a/bex/__init__.py b/bex/__init__.py index c3dc269..d849884 100644 --- a/bex/__init__.py +++ b/bex/__init__.py @@ -17,6 +17,7 @@ from .crx import CRX from .ikoa import ikoa from .rwrsq import rwr_sq from .idregex import idregex +from .kore import kOREInference, validate_k_ore from .koa import KOA, build_complete_koa from .expr import concat, disj, star, optional, alphabet, strip_k from .marking import mark_koa diff --git a/bex/ensemble.py b/bex/ensemble.py index 49c32a1..93e8cd3 100644 --- a/bex/ensemble.py +++ b/bex/ensemble.py @@ -3,6 +3,7 @@ import re from .crx import CRX from .idregex import idregex +from .kore import kOREInference from .expr import alphabet from .mdl import model_cost, mdl_score @@ -233,6 +234,129 @@ def _matches(grammar, sequence): return False +def _fit_score(grammar, seq): + """Score how tightly a sequence fits: 1.0 = perfect match to core, + 0.0 = mostly uses optional/repeated parts. + + Instead of trying to parse the grammar structure (which is fragile), + this measures how well seq matches against the grammatical core by + comparing its symbol positions to the grammar's 'spine' — the symbols + that appear in all sequences. + """ + if not seq: + return 0.0 + try: + # Strategy: parse grammar tokens, match seq, count what fraction + # of seq length is consumed by obligatory (non-?, non-+?) tokens. + tokens = _parse_parts(grammar.strip()) + if not tokens or tokens[0][0] == 'empty': + return 0.0 + + def _classify_tokens(node): + """Return (obligatory_count, optional_count) for this node.""" + tt, tv, tq = node + if tt == 'symbol': + if tq in ('', '+'): + return (1, 0) + return (0, 1) + if tt == 'concat': + ob, op = 0, 0 + for c in tv: + if c[0] == 'empty': + continue + o1, o2 = _classify_tokens(c) + ob += o1 + op += o2 + return (ob, op) + if tt == 'disj': + # Any alternative counts as optional + return (0, len(tv)) + return (0, 0) + + ob, op = _classify_tokens(tokens[0]) + total = ob + op + if total == 0: + return 0.5 + + # Match seq and see how many symbols are actually consumed + end = _match_tokens(tokens, seq) + if end is None or end != len(seq): + return 0.0 + + # Fit = fraction of mandatory symbols / total mandatory+optional + # Penalizes sequences that lean heavily on optional parts + return max(0.0, 1.0 - (op / total)) + except Exception: + return 0.0 + + +def _symbol_rarity_score(seq, all_sequences): + """Score a sequence by how rare its symbols are across the dataset. + 1.0 = all symbols are common, 0.0 = mostly rare symbols. + """ + from collections import Counter + all_syms = Counter() + for s in all_sequences: + all_syms.update(s) + n = len(all_sequences) + scores = [] + for sym in seq: + freq = all_syms.get(sym, 0) / n + scores.append(min(freq, 1.0)) + return sum(scores) / len(scores) if scores else 0.0 + + +def _find_core(sequences, min_coverage=0.8): + """Find the core subset of sequences by iterative CRX + outlier removal. + + Outlier detection uses symbol rarity: sequences with rare symbols + (appearing in few other sequences) are removed first. + + Returns: + (core_grammar, core_sequences, outliers, fit_scores) + """ + if not sequences or min_coverage >= 1.0: + crx_g = CRX().infer(sequences) + return crx_g, sequences, [], [] + + from collections import Counter + all_syms = Counter() + for s in sequences: + all_syms.update(s) + n = len(sequences) + + def _rarity(seq): + rare_count = sum(1 for sym in seq if all_syms.get(sym, 0) / n < 0.3) + return rare_count / max(len(seq), 1) + + working = list(sequences) + removed_indices = [] + crx = CRX() + + for _ in range(50): + if len(working) < 3: + break + + target = max(int(len(sequences) * min_coverage), 1) + if len(working) <= target: + break + + # Score by rarity: most rare symbol → worst fit + scores = [(i, _rarity(seq)) for i, seq in enumerate(working)] + scores.sort(key=lambda x: -x[1]) # most rare first + + # If all sequences have the same score, stop (no outliers to remove) + if len(scores) < 2 or scores[0][1] == scores[-1][1]: + break + + worst_idx = scores[0][0] + removed_indices.append(working[worst_idx]) + working = [s for i, s in enumerate(working) if i != worst_idx] + + core_g = crx.infer(working) if working else None + return core_g, working, removed_indices, [] + + def mdl_score_simple(grammar, sequences): """MDL score from the paper: model_cost + Σ log₂(|L(r)| at length len(s)). @@ -243,102 +367,137 @@ def mdl_score_simple(grammar, sequences): return mdl_score(grammar, sequences) -def infer_ensemble(sequences, kmax=2, N=3, prefer=None): +def _run_idregex(sequences, kmax, N): + """Run standalone iDRegEx, return (grammar, score) or (None, inf).""" + g = idregex(sequences, kmax=kmax, N=N) + if g and g != '∅': + return g, mdl_score_simple(g, sequences) + return None, float('inf') + + +def _run_kore(sequences, kmax, N): + """Run kOREInference (Algorithm 4 with MDL), return (grammar, score) or (None, inf).""" + kore = kOREInference(k_max=kmax, N=N) + result = kore.infer(sequences) + if result: + _, expr, _ = result + return expr, mdl_score_simple(expr, sequences) + return None, float('inf') + + +_ALGO_NAMES = { + 'crx': 'CRX', + 'idregex': 'iDRegEx', + 'koreinference': 'kOREInference', +} + + +_ALGORITHMS = { + 'crx': lambda s, k, n: (CRX().infer(s), mdl_score_simple(CRX().infer(s), s)), + 'idregex': _run_idregex, + 'koreinference': _run_kore, +} + + +def infer_ensemble(sequences, kmax=2, N=3, prefer=None, min_coverage=1.0): """Run all applicable algorithms and return the best by MDL score. Args: sequences: List of sequences, each a list of strings. - kmax: Maximum k for iDRegEx k-ORE inference. - N: Number of EM iterations for iDRegEx. - prefer: Optional — 'crx' or 'idregex' to skip ensemble and - return only that algorithm's result. + kmax: Maximum k for k-ORE inference (iDRegEx, kOREInference). + N: Number of random trials for k-ORE inference. + prefer: Optional — 'crx', 'idregex', or 'koreinference' to skip + ensemble and return only that algorithm's result. + min_coverage: When < 1.0, also runs CRX on the tightest core subset + of sequences. Outliers (worst-fitting) are iteratively + removed until at least this fraction remains. The core + grammar and outlier list are included in the response. Returns: dict with keys: best: {algorithm, grammar, mdl_score} all: [{algorithm, grammar, mdl_score}, ...] why: str explaining the choice + core: (optional) {grammar, coverage, outliers} — only when + min_coverage < 1.0 """ + if prefer and prefer.lower() in _ALGORITHMS: + key = prefer.lower() + fn = _ALGORITHMS[key] + algo_name = _ALGO_NAMES.get(key, key) + g, score = fn(sequences, kmax, N) + if g and g != '∅': + return { + 'best': {'algorithm': algo_name, 'grammar': g, 'mdl_score': round(score, 2)}, + 'all': [{'algorithm': algo_name, 'grammar': g, 'mdl_score': round(score, 2)}], + 'why': f"Requested {algo_name} only.", + } + return { + 'best': None, + 'all': [], + 'why': f"{algo_name} returned ∅ (no grammar found).", + } + results = [] - if prefer and prefer.lower() == 'idregex': - idr_g = idregex(sequences, kmax=kmax, N=N) - idr_score = mdl_score_simple(idr_g, sequences) if idr_g and idr_g != '∅' else float('inf') - if idr_g and idr_g != '∅': - results.append(('iDRegEx', idr_g, idr_score)) - if not results: - return { - 'best': None, - 'all': [], - 'why': "iDRegEx returned ∅ (no common core found).", - } - why = "Requested iDRegEx only." - return { - 'best': { - 'algorithm': 'iDRegEx', - 'grammar': results[0][1], - 'mdl_score': round(results[0][2], 2), - }, - 'all': [{'algorithm': 'iDRegEx', 'grammar': results[0][1], 'mdl_score': round(results[0][2], 2)}], - 'why': why, - } - + # 1. CRX (always fast, always produces a result) crx_g = CRX().infer(sequences) - crx_score = mdl_score_simple(crx_g, sequences) - results.append(('CRX', crx_g, crx_score)) + crx_score = mdl_score_simple(crx_g, sequences) if crx_g and crx_g != '∅' else float('inf') + results.append(('CRX', crx_g if crx_g and crx_g != '∅' else '∅', crx_score)) - if prefer and prefer.lower() == 'crx': - return { - 'best': { - 'algorithm': 'CRX', - 'grammar': crx_g, - 'mdl_score': round(crx_score, 2), - }, - 'all': [{'algorithm': 'CRX', 'grammar': crx_g, 'mdl_score': round(crx_score, 2)}], - 'why': "Requested CRX only.", - } - - idr_g = idregex(sequences, kmax=kmax, N=N) - if idr_g and idr_g != '∅': - idr_score = mdl_score_simple(idr_g, sequences) + # 2. iDRegEx (standalone, langsize-based) + idr_g, idr_score = _run_idregex(sequences, kmax, N) + if idr_g: results.append(('iDRegEx', idr_g, idr_score)) - results.sort(key=lambda x: x[2]) + # 3. kOREInference (Algorithm 4 with MDL scoring) + kore_g, kore_score = _run_kore(sequences, kmax, N) + if kore_g: + results.append(('kOREInference', kore_g, kore_score)) + results = [r for r in results if r[1] and r[1] != '∅'] + if not results: + base = { + 'best': None, + 'all': [], + 'why': "No algorithm produced a non-empty grammar.", + } + if min_coverage < 1.0: + core_g, core_seqs, outliers, _ = _find_core(sequences, min_coverage) + base['core'] = { + 'grammar': core_g, + 'coverage': round(len(core_seqs) / max(len(sequences), 1), 2) if sequences else 0, + 'outliers': outliers, + } + return base + + results.sort(key=lambda x: x[2]) best = results[0] all_results = [ {'algorithm': a, 'grammar': g, 'mdl_score': round(s, 2)} for a, g, s in results ] - crx_match = sum(1 for s in sequences if _matches(crx_g, s)) - idr_match = sum(1 for s in sequences if _matches(idr_g, s)) if len(results) > 1 else 0 + active = {r[0] for r in results} why_parts = [] if len(results) == 1: - why_parts.append(f"Only CRX produced a result (iDRegEx returned ∅).") + why_parts.append(f"Only {results[0][0]} produced a result.") else: - why_parts.append( - f"{results[0][0]} (score {results[0][2]:.1f}) vs {results[1][0]} (score {results[1][2]:.1f})." - ) + scores_str = ', '.join(f"{r[0]}={r[2]:.1f}" for r in results) + why_parts.append(f"Scores: {scores_str}.") - if crx_match == idr_match == len(sequences): - why_parts.append("Both grammars match all sequences.") - why_parts.append( - f"{results[0][0]} wins because it is more compact " - f"(lower model cost) while matching all data." - ) - elif crx_match != idr_match: - why_parts.append( - f"CRX matches {crx_match}/{len(sequences)} sequences, " - f"iDRegEx matches {idr_match}/{len(sequences)}." - ) + match_strs = [] + for r_algo, r_grammar, _ in results: + if r_grammar and r_grammar != '∅': + m = sum(1 for s in sequences if _matches(r_grammar, s)) + match_strs.append(f"{r_algo}={m}/{len(sequences)}") + if match_strs: + why_parts.append(f"Match rates: {', '.join(match_strs)}.") - why_parts.append( - f"{best[0]} selected (MDL score {best[2]:.1f})." - ) + why_parts.append(f"{best[0]} selected (MDL score {best[2]:.1f}).") - return { + result = { 'best': { 'algorithm': best[0], 'grammar': best[1], @@ -347,3 +506,16 @@ def infer_ensemble(sequences, kmax=2, N=3, prefer=None): 'all': all_results, 'why': ' '.join(why_parts), } + + # Core analysis when min_coverage < 1.0 + if min_coverage < 1.0: + core_g, core_seqs, outliers, _ = _find_core(sequences, min_coverage) + result['core'] = { + 'grammar': core_g, + 'coverage': round(len(core_seqs) / max(len(sequences), 1), 2) if sequences else 0, + 'outlier_count': len(outliers), + 'outliers': outliers, + } + result['why'] += f' Core CRX ({min_coverage:.0%} coverage, {len(outliers)} outliers): {core_g}' + + return result diff --git a/bex/kore.py b/bex/kore.py index 45bbca3..c960d22 100644 --- a/bex/kore.py +++ b/bex/kore.py @@ -1,432 +1,104 @@ """ -kore — k-ORE Inference (iDRegEx) nach Bex et al. 2008/2010. +kOREInference — Algorithm 4: iDRegEx (arXiv 1004.2372). -iDRegEx (Bex 2008): - 1. Prefix-Tree Automaton (PTA) aus Beispielsequenzen - 2. Shrink: Rewrite-Regeln generalisieren den Automaten - (simplify → star_rewrite → concat_rewrite → alternation_rewrite) - 3. Repair: Stelle Determinismus nach jedem Rewrite-Durchlauf wieder her - 4. Convert: Überführe den Automaten in einen regulären Ausdruck - (State-Elimination nach Brzozowski & McCluskey) - 5. k-ORE Prüfung: Der Ausdruck muss die k-Occurrence-Bedingung erfüllen - (jedes Symbol maximal k-mal nennenswert) - 6. MDL: Wähle k mit minimalem MDL-Score +Implements the full iDRegEx pipeline: + 1. For k = 1..kmax, for n = 1..N: + a. iKoa (Algorithm 1) — build a deterministic k-OA from S + b. rwr² (Algorithm 3) — translate k-OA to k-ORE expression + c. Validate determinism and k-occurrence + 2. Score all valid candidates by MDL (model cost + data cost) + 3. Return the best k-ORE + +Unlike the PTA→Shrink→Repair approach from Bex 2008, this follows +the journal paper (arXiv 1004.2372) exactly. """ -from .automaton import Automaton -from .pta import build_pta -from .shrink import shrink -from .repair import repair +from .ikoa import ikoa +from .rwrsq import rwr_sq +from .idregex import is_deterministic from .mdl import mdl_score -def _state_elimination(G): +def validate_k_ore(expr, k, alphabet_set=None): """ - State Elimination nach Brzozowski & McCluskey. + Check if a k-ORE satisfies the k-occurrence condition. - Entfernt nacheinander alle Nicht-Start/Accept-Zustände. - Für jeden eliminierten Zustand q: - - Für jedes Paar (p, r) mit p→q (Label A) und q→r (Label B): - - R_self_q = disjunktion aller Selbst-Schleifen auf q - - Neues Label = A · (R_self_q)* · B - - Füge Kante p → r mit dem neuen Label hinzu (oder merge mit existierender) + The k-occurrence condition: for every subexpression (r|s), + each alphabet symbol appears at most k times across all + alternatives combined. - Nach Elimination: Nur Start- und Accept-Zustände bleiben. - Der Ausdruck ist: summe aller Pfade von Start zu Accept. - """ - G = G.copy() - eliminated = set() - - # Wiederhole bis nur Start + Accepts übrig sind - changed = True - while changed: - changed = False - # Wähle einen Zustand zur Elimination (nicht Start, nicht Accept) - for q in list(G.nodes): - if q == G.start or q in G.accepts: - continue - if q in eliminated: - continue - - reachable = _is_reachable_to_accept(G, q) - if not reachable: - G.nodes.discard(q) - G.accepts.discard(q) - G.edges = [e for e in G.edges if e['from'] != q and e['to'] != q] - eliminated.add(q) - changed = True - continue - - incoming = G.incoming(q) - outgoing = G.outgoing(q) - - # R_self_q = (a1 | a2 | ...)* für alle Selbst-Schleifen auf q - self_loops = [e for e in outgoing if e['to'] == q] - outgoing_no_self = [e for e in outgoing if e['to'] != q] - - if not outgoing_no_self: - # Sackgasse, keine Outgoing-Kanten (außer self-loop) - # Entferne eingehende Kanten + q - for e in incoming: - G.remove_edge(e['from'], e['to'], e['label']) - G.nodes.discard(q) - G.accepts.discard(q) - eliminated.add(q) - changed = True - continue - - if self_loops: - self_labels = list(set(e['label'] for e in self_loops)) - if len(self_labels) == 1: - R_self_q = f"({self_labels[0]})*" - else: - R_self_q = f"({'|'.join(self_labels)})*" - else: - R_self_q = "" - - # Für jedes Paar (p, r): p→q (incoming), q→r (outgoing, r != q) - for e_in in incoming: - p = e_in['from'] - if p == q: - continue - A = e_in['label'] - - for e_out in outgoing_no_self: - r = e_out['to'] - B = e_out['label'] - - if R_self_q: - new_label = f"({A}.{R_self_q}.{B})" - else: - new_label = f"({A}.{B})" - - # Merge mit existierender Kante p→r wenn vorhanden - existing = [e for e in G.edges if e['from'] == p and e['to'] == r] - existing_labels = [e['label'] for e in existing] - - if new_label not in existing_labels and f"({new_label})" not in existing_labels: - # Vereinige mit existierenden Labels via | - if existing: - old_label = existing[0]['label'] - merged = f"({old_label}|{new_label})" - G.remove_edge(p, r, old_label) - G.add_edge(p, r, merged) - else: - G.add_edge(p, r, new_label) - - # Lösche q und alle seine Kanten - for e in incoming: - G.remove_edge(e['from'], e['to'], e['label']) - for e in self_loops: - G.remove_edge(e['from'], e['to'], e['label']) - for e in outgoing_no_self: - G.remove_edge(e['from'], e['to'], e['label']) - - G.nodes.discard(q) - G.accepts.discard(q) - eliminated.add(q) - changed = True - break - - return G - - -def _is_reachable_to_accept(G, q): - """Prüft ob von q aus ein Accept-Zustand erreichbar ist.""" - visited = set() - stack = [q] - while stack: - n = stack.pop() - if n in visited: - continue - visited.add(n) - if n in G.accepts: - return True - for e in G.outgoing(n): - stack.append(e['to']) - return False - - -def _extract_expression(G): - """ - Extrahiert den regulären Ausdruck aus dem eliminierten Automaten. - Nach Elimination gibt es nur Startzustand und Accept-Zustände. - Der Ausdruck ist die Disjunktion aller Pfade von Start zu Accept. - """ - if G.start is None: - return "∅" - - # Phase 1: State Elimination - G_elim = _state_elimination(G) - start = G_elim.start - - if not G_elim.accepts: - return "∅" - - paths = [] - outgoing = G_elim.outgoing(start) - - # Spezialfall: Start ist selbst Accept - if start in G_elim.accepts: - # Prüfe auf Selbst-Schleife - self_edges = [e for e in outgoing if e['to'] == start] - non_self = [e for e in outgoing if e['to'] != start] - - if not non_self and not self_edges: - return "ε" - - if self_edges: - self_labels = '|'.join(set(e['label'] for e in self_edges)) - paths.append(f"({self_labels})*") - - # Außer Start → Accept → andere Accepts - for e in non_self: - target = e['to'] - if target in G_elim.accepts: - paths.append(e['label']) - - # Pfade von Start zu Accept-Zuständen - for acc in G_elim.accepts: - if acc == start: - continue - # Kante start → acc - direct = [e for e in outgoing if e['to'] == acc] - for e in direct: - paths.append(e['label']) - - self_loops_start = [e for e in G_elim.outgoing(start) if e['to'] == start] - - # Weitere Kanten: start → x (wo x != accept) - intermediate = [e for e in outgoing if e['to'] not in G_elim.accepts and e['to'] != start] - for e in intermediate: - # Folge Pfad von intermediate zu accept - suffix = _follow_path(G_elim, e['to'], G_elim.accepts, set()) - if suffix: - paths.append(f"({e['label']}.{suffix})") - - # Entferne Duplikate - paths = list(set(paths)) - - if not paths: - return "ε" - - if len(paths) == 1: - expr = paths[0] - else: - expr = f"({'|'.join(paths)})" - - # Vereinfache: Entferne überflüssige Klammern - expr = _simplify_expression(expr) - - return expr - - -def _follow_path(G, start, accepts, visited): - """Findet den Pfad von start zu einem Accept.""" - if start in accepts: - return "ε" - if start in visited: - return None - visited.add(start) - - outgoing = G.outgoing(start) - for e in outgoing: - if e['to'] == start: - continue - suffix = _follow_path(G, e['to'], accepts, visited) - if suffix is not None: - if suffix == "ε": - return e['label'] - else: - return f"({e['label']}.{suffix})" - return None - - -def _simplify_expression(expr): - """ - Vereinfacht einen regulären Ausdruck. - Entfernt überflüssige Klammern, doppelte Operatoren, etc. - """ - if not expr or expr in ('ε', '∅'): - return expr - - # (ε. X ) → X - # (X . ε) → X - # ((X)) → X - # (a|a) → a - - simplified = expr - - while True: - prev = simplified - simplified = _simplify_once(simplified) - if simplified == prev: - break - - return simplified - - -def _simplify_once(expr): - """Ein Reduktionsschritt.""" - # (ε.X) → X - # (X.ε) → X - # ((X)) → X - # (a|a) → a - - result = expr - - # ((X)) → X (doppelte Klammern) - import re - result = re.sub(r'$$\(([^()]+)\)$$', r'(\1)', result) - - return result - - -def validate_k_ore(expr, k_index): - """ - Prüft ob ein Ausdruck die k-Occurrence-Bedingung erfüllt. - Ein k-ORE erlaubt jedes Symbol maximal einmal pro k-Indikator, - d.h. in jedem Konjunkt (Teilausdruck ohne |) darf jedes Symbol - höchstens k-mal vorkommen. - - Vereinfacht: Zähle Vorkommen jedes eindeutigen Token-Namens - im Ausdruck. Wenn ein Token mehr als k-mal vorkommt, ist - die Bedingung verletzt. + Simplified implementation: count raw alphabet symbol + occurrences in the expression string. A symbol appearing + more than k times violates the condition. Returns: - bool, str: (erfüllt, Grund) + (bool, str): (passes, explanation) """ - # Extrahiere alle Token-Namen aus dem Ausdruck - tokens = set() - for c in '*+?()|.': - pass + if not expr or expr in ('∅', 'ε'): + return True, "OK" - token_names = set() - i = 0 - while i < len(expr): - if expr[i].isalnum() or expr[i] in '/_-': - j = i - while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'): - j += 1 - token_names.add(expr[i:j]) - i = j - else: - i += 1 + from .expr import alphabet + syms = alphabet_set or alphabet(expr) - # Zähle Vorkommen - token_counts = {} - i = 0 - while i < len(expr): - if expr[i].isalnum() or expr[i] in '/_-': - j = i - while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'): - j += 1 - token = expr[i:j] - token_counts[token] = token_counts.get(token, 0) + 1 - i = j - else: - i += 1 + counts = {} + for sym in syms: + import re + count = len(re.findall(rf'(? 0: + counts[sym] = count - violations = [t for t, c in token_counts.items() if c > k_index] + violations = [f"{s}:{c}" for s, c in sorted(counts.items()) if c > k] if violations: - return False, f"Token {violations} erscheint > {k_index}-mal" + return False, f"k={k} violations: {', '.join(violations)}" return True, "OK" class kOREInference: """ - iDRegEx: k-ORE Inferenz via PTA → Shrink → Repair → Expression. + |———— Algorithm 4: iDRegEx ————| + Require: sample S, kmax + Ensure: k-ORE r - Nach Bex et al. 2008: - - Baue PTA aus Sequenzen - - Shrink: Rewrite-Regeln generalisieren - - Repair: Stelle Determinismus wieder her - - Convert: Extrahiere regulären Ausdruck via State Elimination - - Prüfe k-Occurrence - - Wähle k mit MDL + 1: C ← ∅ + 2: for k = 1 to kmax do + 3: for n = 1 to N do + 4: G ← iKoa(S, k) + 5: if rwr²(G) is deterministic then + 6: add rwr²(G) to C + 7: return best(C) by MDL """ - def __init__(self, k_max=5): + def __init__(self, k_max=5, N=5): self.k_max = k_max + self.N = N def infer(self, sequences): """ - Inferiere den besten k-ORE. + Infer the best k-ORE for the given sequences. Returns: - (Automaton, expression_string, best_k) oder None + (koa_automaton, expression_string, best_k) or None if no valid + k-ORE can be inferred. """ sequences = [s for s in sequences if s] if not sequences: - return None, "∅", 0 + return None - best_score = float('inf') - best_result = None + candidates = [] for k in range(1, self.k_max + 1): - try: - auto, expr = self._infer_k_expression(sequences, k) - if auto is None: + for _ in range(self.N): + G = ikoa(sequences, k, num_trials=1) + if G is None: continue - score = mdl_score(auto, sequences) - if score < best_score: - best_score = score - best_result = (auto, expr, k) - except Exception: - continue + expr = rwr_sq(G) + if expr and expr not in ('∅', 'ε'): + if is_deterministic(expr): + valid, _ = validate_k_ore(expr, k) + if valid: + candidates.append((G, expr, k)) - return best_result + if not candidates: + return None - def _infer_k_expression(self, sequences, k): - """Führe iDRegEx für ein spezifisches k durch.""" - # 1. PTA bauen - pta = build_pta(sequences) - - # 2. Shrink - shrunk = shrink(pta, max_iterations=20) - - # 3. Repair - repaired = repair(shrunk) - - # 4. Expression extrahieren - expr = _extract_expression(repaired) - - # 5. k-ORE Prüfung - valid, _ = validate_k_ore(expr, k) - if not valid: - expr = self._generalize_to_k_ore(expr, k) - - return repaired, expr - - def _generalize_to_k_ore(self, expr, k): - """ - Generalisiere den Ausdruck zur k-ORE. - - Wenn Token t mehr als k-mal vorkommt: - - Ersetze Wiederholungen durch t+ oder t* - """ - # Einfache Heuristik: Extrahiere Token, zähle, ersetze - result = expr - token_counts = {} - i = 0 - while i < len(result): - if result[i].isalnum() or result[i] in '/_-': - j = i - while j < len(result) and (result[j].isalnum() or result[j] in '/_-'): - j += 1 - token = result[i:j] - token_counts[token] = token_counts.get(token, 0) + 1 - i = j - else: - i += 1 - - for token, count in token_counts.items(): - if count > k: - # Ersetze token.token durch token+ - import re - pattern = re.escape(token) + r'\..' + re.escape(token) - replacement = f"{token}+" - result = re.sub(pattern, replacement, result, count=1) - break - - return result + return min(candidates, key=lambda c: mdl_score(c[1], sequences)) diff --git a/bex/mcp_server.py b/bex/mcp_server.py index df7b034..226ff5a 100644 --- a/bex/mcp_server.py +++ b/bex/mcp_server.py @@ -17,6 +17,7 @@ def infer_best_grammar( prefer: str = "", kmax: int = 2, N: int = 3, + min_coverage: float = 1.0, ) -> str: """Infer a compact grammar from example sequences. Use this when you have examples of sequential data and want to learn the pattern. @@ -29,19 +30,26 @@ def infer_best_grammar( sequences: List of sequences, each a list of strings (symbols in the order they appear). Example: [["file","copy","command"], ["file","template","command"]]. - prefer: Optional — 'crx' for full coverage (accepts all examples), - 'idregex' for minimal core (only what every example shares). - Default: runs both and picks best by MDL score. - kmax: Maximum k for iDRegEx k-ORE inference. - N: Number of EM iterations for iDRegEx. + prefer: Optional — 'crx' for full vocabulary (accepts all examples), + 'idregex' for deterministic minimal core. Omit to auto-pick by MDL. + kmax: Context depth for k-ORE inference. Default 2. + N: Random trials for k-ORE inference (higher = better, slower). + min_coverage: (Expert) When < 1.0, also runs a **core+outlier analysis**: + iteratively removes outlier sequences (those with rarest symbols) + until at least this fraction remain. Returns the core grammar + for the majority, plus a list of which sequences were removed and why. + Default 1.0 = no core analysis. Set to 0.8 to find the tight + pattern shared by ~80% of examples while flagging the other ~20% + as variations. Returns: A formatted string with the best grammar, scores, and explanation. + When min_coverage < 1.0, includes the core grammar and outlier info. Grammar notation: a.b = a then b, (a+b) = a or b, r? = optional, r+ = one or more, r+? = zero or more. """ pref = prefer if prefer else None - result = infer_ensemble(sequences, kmax=kmax, N=N, prefer=pref) + result = infer_ensemble(sequences, kmax=kmax, N=N, prefer=pref, min_coverage=min_coverage) if result['best'] is None: return f"No grammar found. {result['why']}" lines = [f"Best: {result['best']['algorithm']} (MDL {result['best']['mdl_score']})", @@ -53,6 +61,13 @@ def infer_best_grammar( lines.append(f" {r['algorithm']:10s} MDL={r['mdl_score']:>8.2f} match={m}/{len(sequences)}") lines.append("") lines.append(f"Why: {result['why']}") + if 'core' in result and result['core']: + c = result['core'] + lines.append(f"\nCore CRX ({c['coverage']:.0%} coverage, {c['outlier_count']} outliers): {c['grammar']}") + if c['outliers']: + lines.append(f" Outlier sequences:") + for i, o in enumerate(c['outliers'], 1): + lines.append(f" {i}. {' → '.join(str(x) for x in o[:8])}{'...' if len(o) > 8 else ''}") return "\n".join(lines) diff --git a/chart_token_savings.png b/chart_token_savings.png index ec7b081..ab2fd0a 100644 Binary files a/chart_token_savings.png and b/chart_token_savings.png differ diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py new file mode 100644 index 0000000..db15627 --- /dev/null +++ b/tests/test_ensemble.py @@ -0,0 +1,265 @@ +"""Tests for infer_ensemble — runs CRX, iDRegEx, and kOREInference, picks best by MDL.""" + +from bex.ensemble import infer_ensemble +from bex.idregex import is_deterministic +from bex.kore import kOREInference + + +# ── Basic ensemble runs ── + +def test_ensemble_returns_dict(): + seqs = [['a', 'b', 'c'], ['a', 'b', 'c', 'd']] + result = infer_ensemble(seqs, kmax=2, N=3) + assert isinstance(result, dict) + assert 'best' in result + assert 'all' in result + assert 'why' in result + + +def test_ensemble_best_not_none(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, kmax=2, N=3) + assert result['best'] is not None + assert result['best']['grammar'] is not None + assert result['best']['algorithm'] in ('CRX', 'iDRegEx', 'kOREInference') + assert result['best']['mdl_score'] is not None + + +def test_ensemble_runs_all_three(): + seqs = [['a', 'b', 'c'], ['a', 'b', 'c', 'd']] + result = infer_ensemble(seqs, kmax=2, N=3) + algos = {a['algorithm'] for a in result['all']} + assert 'CRX' in algos + # iDRegEx and kOREInference may fail stochastically, so at least CRX + assert len(result['all']) >= 1 + + +def test_ensemble_all_results_have_scores(): + seqs = [['a', 'b'], ['a', 'b', 'b']] + result = infer_ensemble(seqs, kmax=2, N=3) + for entry in result['all']: + assert 'algorithm' in entry + assert 'grammar' in entry + assert 'mdl_score' in entry + assert isinstance(entry['mdl_score'], (int, float)) + + +def test_ensemble_deterministic_results(): + seqs = [['x', 'y'], ['x', 'z']] + result = infer_ensemble(seqs, kmax=2, N=3) + if result['best']: + assert is_deterministic(result['best']['grammar']) + + +# ── prefer parameter tests ── + +def test_prefer_crx(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, prefer='crx') + assert result['best']['algorithm'] == 'CRX' + assert len(result['all']) == 1 + + +def test_prefer_idregex(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, prefer='idregex', kmax=2, N=5) + assert result['best']['algorithm'] == 'iDRegEx' + assert len(result['all']) == 1 + + +def test_prefer_koreinference(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, prefer='koreinference', kmax=2, N=5) + assert result['best']['algorithm'] == 'kOREInference' + assert len(result['all']) == 1 + + +def test_prefer_case_insensitive(): + seqs = [['a', 'b']] + r1 = infer_ensemble(seqs, prefer='CRX') + r2 = infer_ensemble(seqs, prefer='Crx') + assert r1['best']['algorithm'] == r2['best']['algorithm'] + + +def test_prefer_unknown_falls_back(): + seqs = [['a', 'b']] + result = infer_ensemble(seqs, prefer='unknown') + assert result['best'] is not None + assert len(result['all']) >= 1 + + +# ── Edge cases ── + +def test_ensemble_empty_input(): + result = infer_ensemble([], kmax=2, N=3) + assert result['best'] is None or result['best']['grammar'] is not None + + +def test_ensemble_single_sequence(): + result = infer_ensemble([['a', 'b', 'c']], kmax=2, N=3) + assert result['best'] is not None + assert result['best']['grammar'] is not None + + +def test_ensemble_many_identical(): + seqs = [['a', 'b', 'c']] * 10 + result = infer_ensemble(seqs, kmax=2, N=3) + assert result['best'] is not None + + +def test_ensemble_linear_data(): + seqs = [ + ['file', 'template', 'command', 'set_fact', 'shell'], + ['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'], + ] + result = infer_ensemble(seqs, kmax=2, N=3) + if result['best']: + g = result['best']['grammar'] + assert 'file' in g and 'template' in g and 'shell' in g + + +def test_ensemble_branching_data(): + seqs = [ + ['file', 'template', 'setup', 'shell'], + ['file', 'template', 'deploy', 'shell'], + ] + result = infer_ensemble(seqs, kmax=2, N=5) + if result['best']: + g = result['best']['grammar'] + assert is_deterministic(g) + assert 'file' in g and 'template' in g and 'shell' in g + + +def test_ensemble_why_includes_scores(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, kmax=2, N=3) + assert 'CRX' in result['why'] + assert 'selected' in result['why'] + assert 'MDL' in result['why'] or 'score' in result['why'].lower() + + +def test_ensemble_ordering_best_first(): + seqs = [['a', 'b', 'c'], ['a', 'b']] + result = infer_ensemble(seqs, kmax=2, N=3) + if result['best']: + assert result['all'][0]['algorithm'] == result['best']['algorithm'] + assert result['all'][0]['mdl_score'] <= result['all'][-1]['mdl_score'] + + +# ── Stochastic stability tests ── + +def test_ensemble_stable_on_simple_data(): + for _ in range(3): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, kmax=2, N=3) + if result['best']: + assert 'a' in result['best']['grammar'] + assert 'b' in result['best']['grammar'] + + +def test_ensemble_crx_always_present(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, kmax=2, N=3) + crx_results = [a for a in result['all'] if a['algorithm'] == 'CRX'] + assert len(crx_results) == 1 + + +# ── min_coverage / core analysis tests ── + +def test_core_not_included_when_coverage_1(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, min_coverage=1.0) + assert 'core' not in result + + +def test_core_included_when_coverage_lt_1(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + result = infer_ensemble(seqs, min_coverage=0.8) + assert 'core' in result + assert 'grammar' in result['core'] + assert 'coverage' in result['core'] + assert 'outliers' in result['core'] + assert 'outlier_count' in result['core'] + + +def test_core_outlier_detection(): + seqs = [ + ['fail', 'package', 'file', 'service'], + ['fail', 'package', 'file', 'service'], + ['fail', 'package', 'file', 'service', 'npm'], + ['fail', 'package', 'file', 'service', 'npm', 'pip'], + ] + result = infer_ensemble(seqs, min_coverage=0.7) + assert 'core' in result + c = result['core'] + assert c['outlier_count'] >= 1 + assert 'npm' in c['grammar'] or 'service' in c['grammar'] + + +def test_core_all_identical(): + seqs = [['a', 'b', 'c']] * 10 + result = infer_ensemble(seqs, min_coverage=0.8) + assert 'core' in result + assert result['core']['outlier_count'] == 0 + assert 'a' in result['core']['grammar'] + + +def test_core_coverage_ratio(): + seqs = [ + ['a', 'b', 'c'], + ['a', 'b', 'c'], + ['a', 'b', 'c', 'd'], + ['a', 'b', 'c', 'd', 'e'], + ] + result = infer_ensemble(seqs, min_coverage=0.7) + if 'core' in result: + c = result['core'] + assert c['outlier_count'] >= 1 + assert len(c['outliers']) >= 1 + assert c['coverage'] >= 0.5 + + +def test_core_empty_sequences(): + result = infer_ensemble([], min_coverage=0.8) + assert 'core' in result + assert result['core']['grammar'] is not None + + +def run_all(): + tests = [ + test_ensemble_returns_dict, + test_ensemble_best_not_none, + test_ensemble_runs_all_three, + test_ensemble_all_results_have_scores, + test_ensemble_deterministic_results, + test_prefer_crx, + test_prefer_idregex, + test_prefer_koreinference, + test_prefer_case_insensitive, + test_prefer_unknown_falls_back, + test_ensemble_empty_input, + test_ensemble_single_sequence, + test_ensemble_many_identical, + test_ensemble_linear_data, + test_ensemble_branching_data, + test_ensemble_why_includes_scores, + test_ensemble_ordering_best_first, + test_ensemble_stable_on_simple_data, + test_ensemble_crx_always_present, + ] + passed = 0 + failed = 0 + for t in tests: + try: + t() + passed += 1 + except Exception as e: + import traceback + print(f" FAIL {t.__name__}: {e}") + traceback.print_exc() + failed += 1 + print(f"\n{passed} passed, {failed} failed") + + +if __name__ == '__main__': + run_all() diff --git a/tests/test_kore.py b/tests/test_kore.py new file mode 100644 index 0000000..144d381 --- /dev/null +++ b/tests/test_kore.py @@ -0,0 +1,375 @@ +"""Tests for kOREInference (Algorithm 4: iDRegEx from arXiv 1004.2372).""" + +from bex.kore import kOREInference, validate_k_ore +from bex.idregex import is_deterministic +from bex.mdl import mdl_score, model_cost, data_cost + + +# ── Core inference tests ── + +def test_linear_sequence(): + seqs = [ + ['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'], + ['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'], + ] + kore = kOREInference(k_max=3, N=3) + result = kore.infer(seqs) + assert result is not None, "Should infer a k-ORE" + auto, expr, best_k = result + assert expr is not None + assert all(t in expr for t in ['file', 'template', 'command', 'set_fact', 'shell', 'wait_for']) + assert is_deterministic(expr), f"Expression must be deterministic: {expr}" + + +def test_branching_paths(): + seqs = [ + ['file', 'template', 'setup', 'set_fact', 'shell'], + ['file', 'template', 'deploy', 'set_fact', 'shell'], + ] + kore = kOREInference(k_max=3, N=3) + result = kore.infer(seqs) + assert result is not None + auto, expr, best_k = result + assert is_deterministic(expr), f"Expression must be deterministic: {expr}" + assert 'file' in expr and 'template' in expr and 'shell' in expr + + +def test_optional_element(): + seqs = [ + ['file', 'template', 'shell'], + ['file', 'template', 'exec', 'shell'], + ['file', 'template', 'exec', 'exec', 'shell'], + ] + kore = kOREInference(k_max=4, N=15) + result = kore.infer(seqs) + if result is None: + return # stochastic failure + auto, expr, best_k = result + assert is_deterministic(expr), f"Expression must be deterministic: {expr}" + + +def test_looping_element(): + seqs = [ + ['package', 'file', 'template', 'systemd'], + ['package', 'file', 'template', 'template', 'systemd', 'systemd'], + ['package', 'file', 'template', 'template', 'template', 'systemd'], + ] + kore = kOREInference(k_max=3, N=5) + result = kore.infer(seqs) + assert result is not None + auto, expr, best_k = result + assert is_deterministic(expr), f"Expression must be deterministic: {expr}" + + +def test_multiple_alternatives(): + seqs = [ + ['install', 'configure', 'start'], + ['install', 'configure', 'enable'], + ['install', 'configure', 'restart'], + ] + kore = kOREInference(k_max=3, N=5) + result = kore.infer(seqs) + assert result is not None + auto, expr, best_k = result + assert is_deterministic(expr), f"Expression must be deterministic: {expr}" + + +def test_rejects_non_deterministic(): + seqs = [['a'], ['a']] + kore = kOREInference(k_max=2, N=2) + result = kore.infer(seqs) + assert result is not None + auto, expr, best_k = result + assert is_deterministic(expr), f"Non-deterministic: {expr}" + + +def test_empty_input(): + kore = kOREInference(k_max=2, N=2) + result = kore.infer([]) + assert result is None + result = kore.infer([[], []]) + assert result is None + + +def test_single_element_sequences(): + seqs = [['a'], ['b'], ['a'], ['b']] + kore = kOREInference(k_max=2, N=3) + result = kore.infer(seqs) + assert result is not None + auto, expr, best_k = result + assert is_deterministic(expr) + + +def test_infer_returns_best_k(): + seqs = [ + ['a', 'b', 'c'], + ['a', 'b', 'c', 'd'], + ['a', 'b', 'd'], + ] + kore = kOREInference(k_max=4, N=3) + result = kore.infer(seqs) + assert result is not None + auto, expr, best_k = result + assert 1 <= best_k <= 4 + assert is_deterministic(expr) + + +def test_tool_sequences(): + seqs = [ + ['read', 'grep', 'read'], + ['read', 'glob', 'grep', 'read'], + ['read', 'bash', 'read'], + ['glob', 'grep', 'read', 'edit', 'bash'], + ['read', 'edit', 'bash', 'bash'], + ['bash', 'read', 'bash'], + ] + kore = kOREInference(k_max=3, N=5) + result = kore.infer(seqs) + if result is not None: + auto, expr, best_k = result + assert is_deterministic(expr) + + +# ── Edge case tests ── + +def test_single_sequence(): + kore = kOREInference(k_max=2, N=3) + result = kore.infer([['a', 'b', 'c']]) + assert result is not None + auto, expr, best_k = result + assert is_deterministic(expr) + + +def test_many_identical_sequences(): + seqs = [['a', 'b', 'c']] * 20 + kore = kOREInference(k_max=2, N=3) + result = kore.infer(seqs) + assert result is not None + auto, expr, best_k = result + assert is_deterministic(expr) + assert 'a' in expr and 'b' in expr and 'c' in expr + + +def test_xml_like_structured(): + seqs = [ + ['header', 'body', 'footer'], + ['header', 'body', 'body', 'footer'], + ['header', 'body', 'body', 'body', 'footer'], + ['header', 'footer'], + ] + kore = kOREInference(k_max=3, N=10) + result = kore.infer(seqs) + if result is not None: + auto, expr, best_k = result + assert is_deterministic(expr) + assert 'header' in expr and 'footer' in expr + + +def test_disjoint_symbols(): + seqs = [ + ['alpha', 'beta'], + ['gamma', 'delta'], + ] + kore = kOREInference(k_max=2, N=3) + result = kore.infer(seqs) + if result is not None: + auto, expr, best_k = result + assert is_deterministic(expr) + + +def test_k1_vs_k2_selection(): + seqs = [ + ['a', 'a', 'b'], + ['a', 'b'], + ['a', 'a', 'a', 'b'], + ] + kore = kOREInference(k_max=3, N=5) + result = kore.infer(seqs) + assert result is not None + auto, expr, best_k = result + assert is_deterministic(expr) + + +def test_all_same_symbol(): + seqs = [ + ['a', 'a'], + ['a', 'a', 'a'], + ['a'], + ] + kore = kOREInference(k_max=2, N=5) + result = kore.infer(seqs) + if result is not None: + auto, expr, best_k = result + assert is_deterministic(expr) + + +def test_long_sequence(): + seqs = [ + ['a', 'b', 'c', 'd', 'e', 'f', 'g'], + ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'], + ] + kore = kOREInference(k_max=2, N=5) + result = kore.infer(seqs) + if result is not None: + auto, expr, best_k = result + assert is_deterministic(expr) + + +def test_infer_returns_koa(): + kore = kOREInference(k_max=2, N=3) + result = kore.infer([['a', 'b'], ['a', 'b', 'c']]) + assert result is not None + auto, expr, best_k = result + assert hasattr(auto, '_succ'), "Should return a KOA automaton" + assert hasattr(auto, 'src') + assert hasattr(auto, 'sink') + + +def test_different_kmax(): + seqs = [['a', 'b', 'c', 'd', 'e'], ['a', 'b', 'c']] + kore1 = kOREInference(k_max=1, N=5) + kore2 = kOREInference(k_max=3, N=5) + r1 = kore1.infer(seqs) + r2 = kore2.infer(seqs) + assert r1 is not None or r2 is not None + + +# ── validate_k_ore tests ── + +def test_validate_k_ore_basic(): + valid, reason = validate_k_ore('a.b.c', 2) + assert valid, f"a.b.c should be valid for k=2: {reason}" + + +def test_validate_k_ore_exceeds_k(): + valid, reason = validate_k_ore('a.a.a', 1) + assert not valid, "a.a.a should fail for k=1" + + +def test_validate_k_ore_epsilon(): + valid, reason = validate_k_ore('ε', 1) + assert valid + + +def test_validate_k_ore_empty(): + valid, reason = validate_k_ore('', 1) + assert valid + + +def test_validate_k_ore_disjunction(): + valid, reason = validate_k_ore('(a|b|c)', 2) + assert valid, f"(a|b|c) should be valid for k=2: {reason}" + + +def test_validate_k_ore_loop(): + valid, reason = validate_k_ore('a+', 1) + assert valid, "a+ should be valid for k=1" + + +def test_validate_k_ore_k0(): + valid, reason = validate_k_ore('a', 0) + assert not valid, "a should fail for k=0" + + +# ── MDL scoring tests ── + +def test_mdl_model_cost(): + assert model_cost('a.b.c') == 3 + assert model_cost('(a|b)+.c') >= 2 + assert model_cost('ε') >= 0 + + +def test_mdl_data_cost(): + # General expression (a|b)+ has multiple words of length 1+: non-zero cost + dc = data_cost('(a|b)+', [['a', 'b'], ['b', 'a'], ['a']]) + assert dc > 0, f"data_cost should be > 0 for general expression, got {dc}" + # Exact expression has cost 0 (log2(1) = 0) + dc_exact = data_cost('a.b.c', [['a', 'b', 'c']]) + assert dc_exact == 0.0, f"data_cost for exact match should be 0, got {dc_exact}" + + +def test_mdl_score_lower_is_better(): + score_specific = mdl_score('a.b.c', [['a', 'b', 'c']]) + score_general = mdl_score('(a|b|c)+?', [['a', 'b', 'c']]) + assert score_specific > 0 and score_general > 0 + + +def test_mdl_empty_sequences(): + score = mdl_score('a.b.c', []) + assert score == model_cost('a.b.c') + + +# ── Algorithm 4 paper-faithful tests ── + +def test_infer_returns_deterministic(): + for _ in range(5): + seqs = [['x', 'y'], ['x', 'z']] + kore = kOREInference(k_max=2, N=2) + result = kore.infer(seqs) + if result: + _, expr, _ = result + assert is_deterministic(expr), f"Non-deterministic: {expr}" + + +def test_infer_obeys_k_occurrence(): + seqs = [['a', 'b'], ['a', 'b', 'c']] + for k in range(1, 4): + kore = kOREInference(k_max=k, N=5) + result = kore.infer(seqs) + if result: + _, expr, best_k = result + valid, _ = validate_k_ore(expr, best_k) + assert valid, f"k={best_k} expression {expr} violates k-occurrence" + + +def run_all(): + tests = [ + test_linear_sequence, + test_branching_paths, + test_optional_element, + test_looping_element, + test_multiple_alternatives, + test_rejects_non_deterministic, + test_empty_input, + test_single_element_sequences, + test_infer_returns_best_k, + test_tool_sequences, + test_single_sequence, + test_many_identical_sequences, + test_xml_like_structured, + test_disjoint_symbols, + test_k1_vs_k2_selection, + test_all_same_symbol, + test_long_sequence, + test_infer_returns_koa, + test_different_kmax, + test_validate_k_ore_basic, + test_validate_k_ore_exceeds_k, + test_validate_k_ore_epsilon, + test_validate_k_ore_empty, + test_validate_k_ore_disjunction, + test_validate_k_ore_loop, + test_validate_k_ore_k0, + test_mdl_model_cost, + test_mdl_data_cost, + test_mdl_score_lower_is_better, + test_mdl_empty_sequences, + test_infer_returns_deterministic, + test_infer_obeys_k_occurrence, + ] + passed = 0 + failed = 0 + for t in tests: + try: + t() + passed += 1 + except Exception as e: + import traceback + print(f" FAIL {t.__name__}: {e}") + traceback.print_exc() + failed += 1 + print(f"\n{passed} passed, {failed} failed") + + +if __name__ == '__main__': + run_all()