Compare commits

...

18 commits

Author SHA1 Message Date
d2d57bc431 Merge pull request 'feat: kOREInference — Algorithm 4 iDRegEx with MDL scoring + ensemble integration' (#1) from feature/kore-inference into main
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2026-07-01 14:08:18 +00:00
tobjend
90d8c69aa7 fix: broken y-axis bar chart for readability
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
2026-07-01 16:05:49 +02:00
tobjend
2562519718 fix: grouped bar chart with Without Dervish vs With Dervish
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 16:04:55 +02:00
tobjend
ce4088e705 fix: chart bars use Dervish purple #853E91
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 16:03:20 +02:00
tobjend
d96e99d84f docs: xkcd-style chart
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 16:02:14 +02:00
tobjend
b34e39d4b9 feat: replace single-chart Helm with cross-project convention (15 charts, 6 publishers)
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
2026-07-01 16:00:04 +02:00
tobjend
fbdb39bf7c docs: syntax highlighting tags in SHOWCASE.md
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:48:36 +02:00
tobjend
bc81bbdce1 docs: extra blank line above coffee button
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:42:14 +02:00
tobjend
bb193d000f docs: bump logo 180→216px
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:41:55 +02:00
tobjend
4969819dbb docs: center logo, move coffee button below nav links
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:41:35 +02:00
tobjend
a70024397f chore: ignore examples/
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
2026-07-01 15:40:02 +02:00
tobjend
929a50c95d docs: add Buy Me a Coffee button
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:37:09 +02:00
tobjend
0be1a7fd79 docs: badges row, nav links, language tags on code blocks
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:24:00 +02:00
tobjend
5e0674bf77 docs: fix Go lint description (both optional), format outliers in SHOWCASE usage
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:21:55 +02:00
tobjend
0886e5f3bc docs: update README and SHOWCASE for kOREInference + core/outlier analysis
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:18:32 +02:00
tobjend
036a84cc76 docs: add min_coverage to MCP tool + README, include core in output
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:16:24 +02:00
tobjend
9045769d57 feat: core+outlier analysis via min_coverage parameter, 6 new tests
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 15:09:10 +02:00
tobjend
edd6d9d4dd feat: implement kOREInference (Algorithm 4) with MDL scoring, add to ensemble, 79 tests
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2026-07-01 14:50:09 +02:00
10 changed files with 1058 additions and 496 deletions

1
.gitignore vendored
View file

@ -6,3 +6,4 @@ venv/
*.egg-info/
dist/
build/
examples/

View file

@ -1,10 +1,18 @@
# Dervish MCP
<p align="left">
<img src="dervish-logo.png" alt="Dervish" width="180">
<p align="center">
<img src="dervish-logo.png" alt="Dervish" width="216">
</p>
<p align="left">
<p align="center">
<img src="https://img.shields.io/badge/license-MIT-blue" alt="License">
<img src="https://img.shields.io/badge/python-3.10%2B-blue" alt="Python 3.10+">
<img src="https://ci.corentic.eu/api/badges/7/status.svg" alt="CI Pipeline Status">
<br>
<a href="SHOWCASE.md">Showcase</a> ·
<a href="#quick-start">Usage</a> ·
<a href="#papers">Papers</a>
<br><br>
<a href="https://www.buymeacoffee.com/IjonTichy85"><img src="https://cdn.buymeacoffee.com/buttons/v2/default-yellow.png" alt="Buy me a coffee" width="140"></a>
</p>
**Dervish** infers **regular expression grammars** from example sequences using the BEX family of algorithms. Given a set of example sequences (strings over some alphabet), it learns a compact regular expression that captures the general pattern.
@ -41,18 +49,19 @@ The primary interface is a **Model Context Protocol (MCP)** server. Connect any
| Tool | Parameters | What it does |
|------|-----------|-------------|
| `infer_best_grammar` | `sequences`, `prefer`, `kmax`, `N` | **The only tool you need.** Runs CRX + iDRegEx, picks best by MDL. Set `prefer='crx'` for full coverage or `prefer='idregex'` for minimal core — skips the ensemble and runs one algorithm. |
| `infer_best_grammar` | `sequences`, `prefer`, `kmax`, `N`, `min_coverage` | **The only tool you need.** Runs CRX + iDRegEx + kOREInference, picks best by MDL. Set `prefer` to run only one algorithm. Set `min_coverage < 1.0` for optional core+outlier analysis. |
**Parameters explained:**
- **`prefer`**: `'crx'` for full vocabulary (accepts all sequences), `'idregex'` for minimal common core (only what every example shares). Omit to let MDL pick the winner.
- **`kmax`** (15): Context window for iDRegEx's k-testable automaton. Higher values capture longer-range dependencies but need more data and are slower. Default 2 works for most cases.
- **`N`** (110): Baum-Welch EM iterations for iDRegEx training. More iterations = better convergence but slower. Default 3 is a good balance.
- **`prefer`**: `'crx'` for full vocabulary (accepts all sequences), `'idregex'` or `'koreinference'` for deterministic minimal core. Omit to let MDL pick the winner across all three.
- **`kmax`** (15): Context window for k-ORE inference (iDRegEx, kOREInference). Higher values capture longer-range dependencies but need more data and are slower. Default 2 works for most cases.
- **`N`** (110): Random trials for k-ORE inference. More = better convergence but slower. Default 3.
- **`min_coverage`** (0.51.0): **Optional core+outlier analysis.** When < 1.0, iteratively removes outlier sequences (those with the rarest symbols) until at least this fraction remain. Returns the core CRX grammar for the majority plus a list of removed outliers. Default 1.0 = disabled. Example: `min_coverage=0.8` finds the tight pattern for ~80% of examples while flagging the other ~20% as variants.
### Agent workflow
An LLM agent uses the MCP to discover an unwritten convention from existing examples — compressing hundreds of files into a single ~60-token rule:
```
```text
User: Generate a new Ansible role for installing PostgreSQL.
Agent: Let me check what pattern the existing community roles follow.
@ -78,6 +87,11 @@ Agent: Let me check what pattern the existing community roles follow.
**With Dervish:** one MCP call returns a ~60-token grammar known to match 15/15 existing roles. The agent follows it reliably.
**Core+outlier mode:** When generating a new role, the agent can call with
`min_coverage=0.8` to learn the mainstream pattern while seeing which roles
deviate and why — useful when the user's case resembles an outlier
(e.g., a PHP app like phpmyadmin that needs raw `lineinfile`).
## Quick Start
```bash
@ -108,8 +122,8 @@ Dervish discovers these conventions automatically from existing examples. The do
| Domain | What gets extracted | Example extracted symbols | What Dervish discovers | Why it helps an LLM |
|--------|-------------------|--------------------------|----------------------|---------------------|
| Ansible roles | Module names from `tasks/main.yml` in order | `fail`, `include_vars`, `set_fact`, `package`, `file`, `template`, `service`, `npm`, `pip`, `lineinfile` | `fail?.(include_vars+set_fact+package+file+template+service+...)+.include+?.(npm+pip)+?.lineinfile?` | "Validate preconditions first, then set vars, install packages, configure with templates, start services. Include sub-roles last." |
| Helm charts | K8s resource kinds from `helm template` output in rendered order | `ServiceAccount`, `ClusterRole`, `ClusterRoleBinding`, `Service`, `Deployment`, `ConfigMap`, `Alertmanager` | `ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment` (iDRegEx minimal core) | "Every Prometheus stack needs this bootstrap pipeline. Everything else is optional." |
| GitHub Actions (Go lint) | Step `uses:` or `run:` values from workflow YAML in job order | `actions/checkout`, `actions/setup-go`, `golangci/golangci-lint-action`, `megalinter/megalinter` | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | "Starting a new Go project on GitHub Actions? Four independent projects converged on: checkout → setup Go → golangci-lint → (optionally megalinter)." |
| Helm charts (cross-project, 15 charts) | K8s resource kinds from `helm template` output in rendered order | `NetworkPolicy`, `PodDisruptionBudget`, `ServiceAccount`, `Secret`, `ConfigMap`, `Service`, `Deployment`, `StatefulSet`, `ClusterRole`, `ClusterRoleBinding` | `NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret?.ConfigMap?.PersistentVolumeClaim?.ClusterRole?.ClusterRoleBinding?.Service.Deployment?.StatefulSet?.(IngressClass+MutatingWebhookConfiguration)?.ValidatingWebhookConfiguration?.Job?` | "Writing a Helm chart? Start with resilience (PDB, NetworkPolicy), then identity (ServiceAccount, Secrets), then the Service, then your workload. Only cluster-wide tools need RBAC." |
| GitHub Actions (Go lint) | Step `uses:` or `run:` values from workflow YAML in job order | `actions/checkout`, `actions/setup-go`, `golangci/golangci-lint-action`, `megalinter/megalinter` | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | "Starting a new Go project on GitHub Actions? Four independent projects converged on: checkout → setup Go → (optional golangci-lint) → (optional megalinter)." |
## Real-world Results
@ -119,19 +133,22 @@ Dervish has been tested against public datasets from Ansible Galaxy, Helm, and G
| Dataset | Best grammar | Compression |
|---------|-------------|-------------|
| Ansible Galaxy (15 roles) | `fail?.(include_vars+set_fact+package+file+template+service+...)+.include+?.(npm+pip)+?.lineinfile?` | 5,000 tokens → 60 tokens (83×) |
| Helm (6 configs) | `ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment` | ~3,000 tokens → 40 tokens (75×) |
| Helm cross-project (15 charts) | `NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret?.ConfigMap?...Service.Deployment?.StatefulSet?...` | 121 tokens → 35 tokens (3.5×) |
| Go lint (6 jobs) | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | ~900 tokens → 30 tokens (30×) |
The sweet spot: **multiple implementations of the same abstract task** with a shared but undocumented pattern. Not everything works — Dockerfiles, pre-commit configs, and schema-enforced formats are too rigid or too diverse to yield a convention.
> **kOREInference note:** Algorithm 4 (iDRegEx with MDL, arXiv 1004.2372) is included for paper-faithful correctness. On real tool-sequence data, its rwr₀ repair step returns ∅ because the k-OA is rarely SORE (interconnected symbols). The ensemble falls back to CRX or iDRegEx automatically.
## Algorithm Selection Guide
| When | Use | Why |
|------|-----|-----|
| Clean, structured data with full vocabulary | **CRX** | Single-pass, deterministic. Accepts all sequences. |
| Few examples, or want minimal common core | **iDRegEx** | Probabilistic EM, finds only what's shared. |
| Don't know which is better | **Ensemble (default)** | Runs both, picks the best by MDL score. |
| Data is clearly one type | `prefer='crx'` or `prefer='idregex'` | Skips ensemble comparison, runs one algorithm. |
| Few examples, or want minimal common core | **iDRegEx** or **kOREInference** | Probabilistic EM, finds only what's shared. |
| Don't know which is better | **Ensemble (default)** | Runs all three, picks best by MDL score. |
| Want core pattern + outlier detection | **Ensemble + `min_coverage<1`** | Finds tight grammar for majority, flags outliers. |
| Data is clearly one type | `prefer='crx'` | Skips ensemble comparison, runs CRX alone. |
## When each algorithm wins
@ -139,9 +156,11 @@ The sweet spot: **multiple implementations of the same abstract task** with a sh
|---------------|--------|-----|
| Diverse patterns, full vocabulary needed | CRX | Captures all symbols. iDRegEx returns ∅. |
| Clean sequences with clear core | iDRegEx | Extracts minimal common subsequence. CRX buries it in optional noise. |
| Interconnected (non-SORE) data | CRX | kOREInference (rwr₀) returns ∅ when k-OA is not SORE. CRX handles it. |
| Single sequence | iDRegEx (+ RWR₀) | RWR₀ repair produces a grammatical regex from one example. |
| 23 sequences | iDRegEx | CRX overfits. iDRegEx handles noise better. |
| Many sequences, tight pattern | CRX | Learns precise concatenation with optional suffixes. |
| Want majority pattern + outlier list | CRX + `min_coverage` | Core analysis finds tight grammar for ~80%, flags the rest. |
## Token savings
@ -153,7 +172,7 @@ Across all public benchmarks, Dervish delivers **4083× compression**. The gr
## How MDL scoring works
```
```text
MDL = model_cost + data_cost
```

View file

@ -6,7 +6,7 @@ Infer the **unwritten convention** from existing examples. Given N example
sequences, produce a ~100-char grammar that captures the structural
pattern — in far fewer tokens than the originals.
```
```text
a.b → a then b (concatenation)
(a+b) → a or b (disjunction)
r? → optional (zero or one)
@ -14,13 +14,13 @@ r+ → one or more (iteration)
r+? → zero or more
```
## 1. Ansible Galaxy roles (15 geerlingguy roles) — flagship
## 1. Ansible Galaxy roles (15 geerlingguy roles)
15 popular Ansible roles by Jeff Geerling. There is NO written convention
for the module ordering in `tasks/main.yml`. Our grammar is its first
explicit description:
```
```text
Grammar: fail?.(include_vars+set_fact+package+file+template+service+...)+.
include+?.(npm+pip)+?.lineinfile?
```
@ -34,25 +34,60 @@ All 15/15 match. **~29× compression** (7200+ modules → ~250 chars).
exact structure: fail-check first, then vars, then packages, then config/svc.
No guessing.
## 2. Helm chart (kube-prometheus-stack, 6 configs)
### Bonus: core+outlier analysis
6 different `values.yaml` files rendered through the same chart:
Set `min_coverage=0.8` to find the tight grammar for the majority while
flagging outlier roles with unusual module usage:
```
Best: iDRegEx | MDL 1433
Grammar: ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment
```text
Core CRX (80% coverage, 3 outliers):
fail?.(include_vars+set_fact+package+file+template+service+...)+
Outlier sequences:
1. phpmyadmin: include_vars → set_fact → include → include → lineinfile
2. composer: fail → set_fact → stat → uri → get_url → command
3. pip: package → file → pip
```
The **minimal core** every config must deploy. CRX captures the full
vocabulary (19 kinds). Which one an agent uses depends on the task:
- Bootstrapping a new cluster: iDRegEx — what you can't skip
- Writing a complete chart: CRX — everything you might need
phpmyadmin uses raw `lineinfile` instead of templates; composer needs
a `stat` check + `uri` download; pip is purely `pip` — all three deviate
from the mainstream install → configure → enable pattern.
## 2. Helm charts — cross-project convention (15 charts, 6 publishers)
15 popular Helm charts from **Bitnami** (10), **Grafana**, **Jetstack** (cert-manager),
**Argo**, **Ingress-Nginx**, and **Elastic**. Different publishers, different
purposes (databases, web servers, infrastructure tools) — but they converged
on a common resource ordering:
```text
Best: CRX | MDL 230
Grammar: NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret?
.ConfigMap?.PersistentVolumeClaim?.ClusterRole?.ClusterRoleBinding?
.Role?.RoleBinding?.Service.Deployment?.StatefulSet?.
(IngressClass+MutatingWebhookConfiguration)?.ValidatingWebhookConfiguration?.Job?
Match rates: CRX=15/15
```
Every chart follows: **resilience → identity → data → service → workload → extensions**.
`Service` is the **only resource type that appears in all 15 charts**.
Bitnami charts (10/15) consistently start with `NetworkPolicy + PodDisruptionBudget`
before identity and service. Infrastructure tools (cert-manager, grafana,
argo-cd, ingress-nginx) add RBAC and admission webhooks for cluster-wide access.
**Why it helps an LLM:** Generating a Helm chart template? You know the
structure: start with availability guarantees (PDB, NetworkPolicy), then
identity (ServiceAccount, Secrets), then the Service endpoint, then your
workload type. Only cluster-wide tools need RBAC and webhooks — skip them
for simple application charts.
## 3. GitHub Actions (cross-project Go lint, 6 jobs)
Lint jobs from prometheus, goreleaser, cosign, sigstore:
```
```text
Best: CRX | MDL 13.6
Grammar: actions/checkout.(actions/setup-go+run:echo+run:sudo)+.
golangci/golangci-lint-action?.megalinter?
@ -77,10 +112,17 @@ with a shared but undocumented pattern.
## Usage
```python
from bex.mcp_server import infer_best_grammar
from bex import infer_ensemble
output = infer_best_grammar(
sequences=role_sequences,
prefer="crx",
)
# Pick best across all 3 algorithms (CRX + iDRegEx + kOREInference)
result = infer_ensemble(role_sequences)
print(f"Best: {result['best']['algorithm']}")
print(f"Grammar: {result['best']['grammar']}")
# Or: find the tight core + flag outliers
result = infer_ensemble(role_sequences, min_coverage=0.8)
print(f"Core: {result['core']['grammar']}")
print(f"Outliers ({result['core']['outlier_count']}):")
for i, o in enumerate(result['core']['outliers'], 1):
print(f" {i}. {' → '.join(str(x) for x in o[:8])}{'...' if len(o) > 8 else ''}")
```

View file

@ -17,6 +17,7 @@ from .crx import CRX
from .ikoa import ikoa
from .rwrsq import rwr_sq
from .idregex import idregex
from .kore import kOREInference, validate_k_ore
from .koa import KOA, build_complete_koa
from .expr import concat, disj, star, optional, alphabet, strip_k
from .marking import mark_koa

View file

@ -3,6 +3,7 @@
import re
from .crx import CRX
from .idregex import idregex
from .kore import kOREInference
from .expr import alphabet
from .mdl import model_cost, mdl_score
@ -233,6 +234,129 @@ def _matches(grammar, sequence):
return False
def _fit_score(grammar, seq):
"""Score how tightly a sequence fits: 1.0 = perfect match to core,
0.0 = mostly uses optional/repeated parts.
Instead of trying to parse the grammar structure (which is fragile),
this measures how well seq matches against the grammatical core by
comparing its symbol positions to the grammar's 'spine' — the symbols
that appear in all sequences.
"""
if not seq:
return 0.0
try:
# Strategy: parse grammar tokens, match seq, count what fraction
# of seq length is consumed by obligatory (non-?, non-+?) tokens.
tokens = _parse_parts(grammar.strip())
if not tokens or tokens[0][0] == 'empty':
return 0.0
def _classify_tokens(node):
"""Return (obligatory_count, optional_count) for this node."""
tt, tv, tq = node
if tt == 'symbol':
if tq in ('', '+'):
return (1, 0)
return (0, 1)
if tt == 'concat':
ob, op = 0, 0
for c in tv:
if c[0] == 'empty':
continue
o1, o2 = _classify_tokens(c)
ob += o1
op += o2
return (ob, op)
if tt == 'disj':
# Any alternative counts as optional
return (0, len(tv))
return (0, 0)
ob, op = _classify_tokens(tokens[0])
total = ob + op
if total == 0:
return 0.5
# Match seq and see how many symbols are actually consumed
end = _match_tokens(tokens, seq)
if end is None or end != len(seq):
return 0.0
# Fit = fraction of mandatory symbols / total mandatory+optional
# Penalizes sequences that lean heavily on optional parts
return max(0.0, 1.0 - (op / total))
except Exception:
return 0.0
def _symbol_rarity_score(seq, all_sequences):
"""Score a sequence by how rare its symbols are across the dataset.
1.0 = all symbols are common, 0.0 = mostly rare symbols.
"""
from collections import Counter
all_syms = Counter()
for s in all_sequences:
all_syms.update(s)
n = len(all_sequences)
scores = []
for sym in seq:
freq = all_syms.get(sym, 0) / n
scores.append(min(freq, 1.0))
return sum(scores) / len(scores) if scores else 0.0
def _find_core(sequences, min_coverage=0.8):
"""Find the core subset of sequences by iterative CRX + outlier removal.
Outlier detection uses symbol rarity: sequences with rare symbols
(appearing in few other sequences) are removed first.
Returns:
(core_grammar, core_sequences, outliers, fit_scores)
"""
if not sequences or min_coverage >= 1.0:
crx_g = CRX().infer(sequences)
return crx_g, sequences, [], []
from collections import Counter
all_syms = Counter()
for s in sequences:
all_syms.update(s)
n = len(sequences)
def _rarity(seq):
rare_count = sum(1 for sym in seq if all_syms.get(sym, 0) / n < 0.3)
return rare_count / max(len(seq), 1)
working = list(sequences)
removed_indices = []
crx = CRX()
for _ in range(50):
if len(working) < 3:
break
target = max(int(len(sequences) * min_coverage), 1)
if len(working) <= target:
break
# Score by rarity: most rare symbol → worst fit
scores = [(i, _rarity(seq)) for i, seq in enumerate(working)]
scores.sort(key=lambda x: -x[1]) # most rare first
# If all sequences have the same score, stop (no outliers to remove)
if len(scores) < 2 or scores[0][1] == scores[-1][1]:
break
worst_idx = scores[0][0]
removed_indices.append(working[worst_idx])
working = [s for i, s in enumerate(working) if i != worst_idx]
core_g = crx.infer(working) if working else None
return core_g, working, removed_indices, []
def mdl_score_simple(grammar, sequences):
"""MDL score from the paper: model_cost + Σ log₂(|L(r)| at length len(s)).
@ -243,102 +367,137 @@ def mdl_score_simple(grammar, sequences):
return mdl_score(grammar, sequences)
def infer_ensemble(sequences, kmax=2, N=3, prefer=None):
def _run_idregex(sequences, kmax, N):
"""Run standalone iDRegEx, return (grammar, score) or (None, inf)."""
g = idregex(sequences, kmax=kmax, N=N)
if g and g != '':
return g, mdl_score_simple(g, sequences)
return None, float('inf')
def _run_kore(sequences, kmax, N):
"""Run kOREInference (Algorithm 4 with MDL), return (grammar, score) or (None, inf)."""
kore = kOREInference(k_max=kmax, N=N)
result = kore.infer(sequences)
if result:
_, expr, _ = result
return expr, mdl_score_simple(expr, sequences)
return None, float('inf')
_ALGO_NAMES = {
'crx': 'CRX',
'idregex': 'iDRegEx',
'koreinference': 'kOREInference',
}
_ALGORITHMS = {
'crx': lambda s, k, n: (CRX().infer(s), mdl_score_simple(CRX().infer(s), s)),
'idregex': _run_idregex,
'koreinference': _run_kore,
}
def infer_ensemble(sequences, kmax=2, N=3, prefer=None, min_coverage=1.0):
"""Run all applicable algorithms and return the best by MDL score.
Args:
sequences: List of sequences, each a list of strings.
kmax: Maximum k for iDRegEx k-ORE inference.
N: Number of EM iterations for iDRegEx.
prefer: Optional 'crx' or 'idregex' to skip ensemble and
return only that algorithm's result.
kmax: Maximum k for k-ORE inference (iDRegEx, kOREInference).
N: Number of random trials for k-ORE inference.
prefer: Optional 'crx', 'idregex', or 'koreinference' to skip
ensemble and return only that algorithm's result.
min_coverage: When < 1.0, also runs CRX on the tightest core subset
of sequences. Outliers (worst-fitting) are iteratively
removed until at least this fraction remains. The core
grammar and outlier list are included in the response.
Returns:
dict with keys:
best: {algorithm, grammar, mdl_score}
all: [{algorithm, grammar, mdl_score}, ...]
why: str explaining the choice
core: (optional) {grammar, coverage, outliers} only when
min_coverage < 1.0
"""
if prefer and prefer.lower() in _ALGORITHMS:
key = prefer.lower()
fn = _ALGORITHMS[key]
algo_name = _ALGO_NAMES.get(key, key)
g, score = fn(sequences, kmax, N)
if g and g != '':
return {
'best': {'algorithm': algo_name, 'grammar': g, 'mdl_score': round(score, 2)},
'all': [{'algorithm': algo_name, 'grammar': g, 'mdl_score': round(score, 2)}],
'why': f"Requested {algo_name} only.",
}
return {
'best': None,
'all': [],
'why': f"{algo_name} returned ∅ (no grammar found).",
}
results = []
if prefer and prefer.lower() == 'idregex':
idr_g = idregex(sequences, kmax=kmax, N=N)
idr_score = mdl_score_simple(idr_g, sequences) if idr_g and idr_g != '' else float('inf')
if idr_g and idr_g != '':
results.append(('iDRegEx', idr_g, idr_score))
if not results:
return {
'best': None,
'all': [],
'why': "iDRegEx returned ∅ (no common core found).",
}
why = "Requested iDRegEx only."
return {
'best': {
'algorithm': 'iDRegEx',
'grammar': results[0][1],
'mdl_score': round(results[0][2], 2),
},
'all': [{'algorithm': 'iDRegEx', 'grammar': results[0][1], 'mdl_score': round(results[0][2], 2)}],
'why': why,
}
# 1. CRX (always fast, always produces a result)
crx_g = CRX().infer(sequences)
crx_score = mdl_score_simple(crx_g, sequences)
results.append(('CRX', crx_g, crx_score))
crx_score = mdl_score_simple(crx_g, sequences) if crx_g and crx_g != '' else float('inf')
results.append(('CRX', crx_g if crx_g and crx_g != '' else '', crx_score))
if prefer and prefer.lower() == 'crx':
return {
'best': {
'algorithm': 'CRX',
'grammar': crx_g,
'mdl_score': round(crx_score, 2),
},
'all': [{'algorithm': 'CRX', 'grammar': crx_g, 'mdl_score': round(crx_score, 2)}],
'why': "Requested CRX only.",
}
idr_g = idregex(sequences, kmax=kmax, N=N)
if idr_g and idr_g != '':
idr_score = mdl_score_simple(idr_g, sequences)
# 2. iDRegEx (standalone, langsize-based)
idr_g, idr_score = _run_idregex(sequences, kmax, N)
if idr_g:
results.append(('iDRegEx', idr_g, idr_score))
results.sort(key=lambda x: x[2])
# 3. kOREInference (Algorithm 4 with MDL scoring)
kore_g, kore_score = _run_kore(sequences, kmax, N)
if kore_g:
results.append(('kOREInference', kore_g, kore_score))
results = [r for r in results if r[1] and r[1] != '']
if not results:
base = {
'best': None,
'all': [],
'why': "No algorithm produced a non-empty grammar.",
}
if min_coverage < 1.0:
core_g, core_seqs, outliers, _ = _find_core(sequences, min_coverage)
base['core'] = {
'grammar': core_g,
'coverage': round(len(core_seqs) / max(len(sequences), 1), 2) if sequences else 0,
'outliers': outliers,
}
return base
results.sort(key=lambda x: x[2])
best = results[0]
all_results = [
{'algorithm': a, 'grammar': g, 'mdl_score': round(s, 2)}
for a, g, s in results
]
crx_match = sum(1 for s in sequences if _matches(crx_g, s))
idr_match = sum(1 for s in sequences if _matches(idr_g, s)) if len(results) > 1 else 0
active = {r[0] for r in results}
why_parts = []
if len(results) == 1:
why_parts.append(f"Only CRX produced a result (iDRegEx returned ∅).")
why_parts.append(f"Only {results[0][0]} produced a result.")
else:
why_parts.append(
f"{results[0][0]} (score {results[0][2]:.1f}) vs {results[1][0]} (score {results[1][2]:.1f})."
)
scores_str = ', '.join(f"{r[0]}={r[2]:.1f}" for r in results)
why_parts.append(f"Scores: {scores_str}.")
if crx_match == idr_match == len(sequences):
why_parts.append("Both grammars match all sequences.")
why_parts.append(
f"{results[0][0]} wins because it is more compact "
f"(lower model cost) while matching all data."
)
elif crx_match != idr_match:
why_parts.append(
f"CRX matches {crx_match}/{len(sequences)} sequences, "
f"iDRegEx matches {idr_match}/{len(sequences)}."
)
match_strs = []
for r_algo, r_grammar, _ in results:
if r_grammar and r_grammar != '':
m = sum(1 for s in sequences if _matches(r_grammar, s))
match_strs.append(f"{r_algo}={m}/{len(sequences)}")
if match_strs:
why_parts.append(f"Match rates: {', '.join(match_strs)}.")
why_parts.append(
f"{best[0]} selected (MDL score {best[2]:.1f})."
)
why_parts.append(f"{best[0]} selected (MDL score {best[2]:.1f}).")
return {
result = {
'best': {
'algorithm': best[0],
'grammar': best[1],
@ -347,3 +506,16 @@ def infer_ensemble(sequences, kmax=2, N=3, prefer=None):
'all': all_results,
'why': ' '.join(why_parts),
}
# Core analysis when min_coverage < 1.0
if min_coverage < 1.0:
core_g, core_seqs, outliers, _ = _find_core(sequences, min_coverage)
result['core'] = {
'grammar': core_g,
'coverage': round(len(core_seqs) / max(len(sequences), 1), 2) if sequences else 0,
'outlier_count': len(outliers),
'outliers': outliers,
}
result['why'] += f' Core CRX ({min_coverage:.0%} coverage, {len(outliers)} outliers): {core_g}'
return result

View file

@ -1,432 +1,104 @@
"""
kore k-ORE Inference (iDRegEx) nach Bex et al. 2008/2010.
kOREInference Algorithm 4: iDRegEx (arXiv 1004.2372).
iDRegEx (Bex 2008):
1. Prefix-Tree Automaton (PTA) aus Beispielsequenzen
2. Shrink: Rewrite-Regeln generalisieren den Automaten
(simplify star_rewrite concat_rewrite alternation_rewrite)
3. Repair: Stelle Determinismus nach jedem Rewrite-Durchlauf wieder her
4. Convert: Überführe den Automaten in einen regulären Ausdruck
(State-Elimination nach Brzozowski & McCluskey)
5. k-ORE Prüfung: Der Ausdruck muss die k-Occurrence-Bedingung erfüllen
(jedes Symbol maximal k-mal nennenswert)
6. MDL: Wähle k mit minimalem MDL-Score
Implements the full iDRegEx pipeline:
1. For k = 1..kmax, for n = 1..N:
a. iKoa (Algorithm 1) build a deterministic k-OA from S
b. rwr² (Algorithm 3) translate k-OA to k-ORE expression
c. Validate determinism and k-occurrence
2. Score all valid candidates by MDL (model cost + data cost)
3. Return the best k-ORE
Unlike the PTAShrinkRepair approach from Bex 2008, this follows
the journal paper (arXiv 1004.2372) exactly.
"""
from .automaton import Automaton
from .pta import build_pta
from .shrink import shrink
from .repair import repair
from .ikoa import ikoa
from .rwrsq import rwr_sq
from .idregex import is_deterministic
from .mdl import mdl_score
def _state_elimination(G):
def validate_k_ore(expr, k, alphabet_set=None):
"""
State Elimination nach Brzozowski & McCluskey.
Check if a k-ORE satisfies the k-occurrence condition.
Entfernt nacheinander alle Nicht-Start/Accept-Zustände.
Für jeden eliminierten Zustand q:
- Für jedes Paar (p, r) mit pq (Label A) und qr (Label B):
- R_self_q = disjunktion aller Selbst-Schleifen auf q
- Neues Label = A · (R_self_q)* · B
- Füge Kante p r mit dem neuen Label hinzu (oder merge mit existierender)
The k-occurrence condition: for every subexpression (r|s),
each alphabet symbol appears at most k times across all
alternatives combined.
Nach Elimination: Nur Start- und Accept-Zustände bleiben.
Der Ausdruck ist: summe aller Pfade von Start zu Accept.
"""
G = G.copy()
eliminated = set()
# Wiederhole bis nur Start + Accepts übrig sind
changed = True
while changed:
changed = False
# Wähle einen Zustand zur Elimination (nicht Start, nicht Accept)
for q in list(G.nodes):
if q == G.start or q in G.accepts:
continue
if q in eliminated:
continue
reachable = _is_reachable_to_accept(G, q)
if not reachable:
G.nodes.discard(q)
G.accepts.discard(q)
G.edges = [e for e in G.edges if e['from'] != q and e['to'] != q]
eliminated.add(q)
changed = True
continue
incoming = G.incoming(q)
outgoing = G.outgoing(q)
# R_self_q = (a1 | a2 | ...)* für alle Selbst-Schleifen auf q
self_loops = [e for e in outgoing if e['to'] == q]
outgoing_no_self = [e for e in outgoing if e['to'] != q]
if not outgoing_no_self:
# Sackgasse, keine Outgoing-Kanten (außer self-loop)
# Entferne eingehende Kanten + q
for e in incoming:
G.remove_edge(e['from'], e['to'], e['label'])
G.nodes.discard(q)
G.accepts.discard(q)
eliminated.add(q)
changed = True
continue
if self_loops:
self_labels = list(set(e['label'] for e in self_loops))
if len(self_labels) == 1:
R_self_q = f"({self_labels[0]})*"
else:
R_self_q = f"({'|'.join(self_labels)})*"
else:
R_self_q = ""
# Für jedes Paar (p, r): p→q (incoming), q→r (outgoing, r != q)
for e_in in incoming:
p = e_in['from']
if p == q:
continue
A = e_in['label']
for e_out in outgoing_no_self:
r = e_out['to']
B = e_out['label']
if R_self_q:
new_label = f"({A}.{R_self_q}.{B})"
else:
new_label = f"({A}.{B})"
# Merge mit existierender Kante p→r wenn vorhanden
existing = [e for e in G.edges if e['from'] == p and e['to'] == r]
existing_labels = [e['label'] for e in existing]
if new_label not in existing_labels and f"({new_label})" not in existing_labels:
# Vereinige mit existierenden Labels via |
if existing:
old_label = existing[0]['label']
merged = f"({old_label}|{new_label})"
G.remove_edge(p, r, old_label)
G.add_edge(p, r, merged)
else:
G.add_edge(p, r, new_label)
# Lösche q und alle seine Kanten
for e in incoming:
G.remove_edge(e['from'], e['to'], e['label'])
for e in self_loops:
G.remove_edge(e['from'], e['to'], e['label'])
for e in outgoing_no_self:
G.remove_edge(e['from'], e['to'], e['label'])
G.nodes.discard(q)
G.accepts.discard(q)
eliminated.add(q)
changed = True
break
return G
def _is_reachable_to_accept(G, q):
"""Prüft ob von q aus ein Accept-Zustand erreichbar ist."""
visited = set()
stack = [q]
while stack:
n = stack.pop()
if n in visited:
continue
visited.add(n)
if n in G.accepts:
return True
for e in G.outgoing(n):
stack.append(e['to'])
return False
def _extract_expression(G):
"""
Extrahiert den regulären Ausdruck aus dem eliminierten Automaten.
Nach Elimination gibt es nur Startzustand und Accept-Zustände.
Der Ausdruck ist die Disjunktion aller Pfade von Start zu Accept.
"""
if G.start is None:
return ""
# Phase 1: State Elimination
G_elim = _state_elimination(G)
start = G_elim.start
if not G_elim.accepts:
return ""
paths = []
outgoing = G_elim.outgoing(start)
# Spezialfall: Start ist selbst Accept
if start in G_elim.accepts:
# Prüfe auf Selbst-Schleife
self_edges = [e for e in outgoing if e['to'] == start]
non_self = [e for e in outgoing if e['to'] != start]
if not non_self and not self_edges:
return "ε"
if self_edges:
self_labels = '|'.join(set(e['label'] for e in self_edges))
paths.append(f"({self_labels})*")
# Außer Start → Accept → andere Accepts
for e in non_self:
target = e['to']
if target in G_elim.accepts:
paths.append(e['label'])
# Pfade von Start zu Accept-Zuständen
for acc in G_elim.accepts:
if acc == start:
continue
# Kante start → acc
direct = [e for e in outgoing if e['to'] == acc]
for e in direct:
paths.append(e['label'])
self_loops_start = [e for e in G_elim.outgoing(start) if e['to'] == start]
# Weitere Kanten: start → x (wo x != accept)
intermediate = [e for e in outgoing if e['to'] not in G_elim.accepts and e['to'] != start]
for e in intermediate:
# Folge Pfad von intermediate zu accept
suffix = _follow_path(G_elim, e['to'], G_elim.accepts, set())
if suffix:
paths.append(f"({e['label']}.{suffix})")
# Entferne Duplikate
paths = list(set(paths))
if not paths:
return "ε"
if len(paths) == 1:
expr = paths[0]
else:
expr = f"({'|'.join(paths)})"
# Vereinfache: Entferne überflüssige Klammern
expr = _simplify_expression(expr)
return expr
def _follow_path(G, start, accepts, visited):
"""Findet den Pfad von start zu einem Accept."""
if start in accepts:
return "ε"
if start in visited:
return None
visited.add(start)
outgoing = G.outgoing(start)
for e in outgoing:
if e['to'] == start:
continue
suffix = _follow_path(G, e['to'], accepts, visited)
if suffix is not None:
if suffix == "ε":
return e['label']
else:
return f"({e['label']}.{suffix})"
return None
def _simplify_expression(expr):
"""
Vereinfacht einen regulären Ausdruck.
Entfernt überflüssige Klammern, doppelte Operatoren, etc.
"""
if not expr or expr in ('ε', ''):
return expr
# (ε. X ) → X
# (X . ε) → X
# ((X)) → X
# (a|a) → a
simplified = expr
while True:
prev = simplified
simplified = _simplify_once(simplified)
if simplified == prev:
break
return simplified
def _simplify_once(expr):
"""Ein Reduktionsschritt."""
# (ε.X) → X
# (X.ε) → X
# ((X)) → X
# (a|a) → a
result = expr
# ((X)) → X (doppelte Klammern)
import re
result = re.sub(r'$$\(([^()]+)\)$$', r'(\1)', result)
return result
def validate_k_ore(expr, k_index):
"""
Prüft ob ein Ausdruck die k-Occurrence-Bedingung erfüllt.
Ein k-ORE erlaubt jedes Symbol maximal einmal pro k-Indikator,
d.h. in jedem Konjunkt (Teilausdruck ohne |) darf jedes Symbol
höchstens k-mal vorkommen.
Vereinfacht: Zähle Vorkommen jedes eindeutigen Token-Namens
im Ausdruck. Wenn ein Token mehr als k-mal vorkommt, ist
die Bedingung verletzt.
Simplified implementation: count raw alphabet symbol
occurrences in the expression string. A symbol appearing
more than k times violates the condition.
Returns:
bool, str: (erfüllt, Grund)
(bool, str): (passes, explanation)
"""
# Extrahiere alle Token-Namen aus dem Ausdruck
tokens = set()
for c in '*+?()|.':
pass
if not expr or expr in ('', 'ε'):
return True, "OK"
token_names = set()
i = 0
while i < len(expr):
if expr[i].isalnum() or expr[i] in '/_-':
j = i
while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'):
j += 1
token_names.add(expr[i:j])
i = j
else:
i += 1
from .expr import alphabet
syms = alphabet_set or alphabet(expr)
# Zähle Vorkommen
token_counts = {}
i = 0
while i < len(expr):
if expr[i].isalnum() or expr[i] in '/_-':
j = i
while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'):
j += 1
token = expr[i:j]
token_counts[token] = token_counts.get(token, 0) + 1
i = j
else:
i += 1
counts = {}
for sym in syms:
import re
count = len(re.findall(rf'(?<![a-zA-Z_/]){re.escape(sym)}(?![a-zA-Z_/])', expr))
if count > 0:
counts[sym] = count
violations = [t for t, c in token_counts.items() if c > k_index]
violations = [f"{s}:{c}" for s, c in sorted(counts.items()) if c > k]
if violations:
return False, f"Token {violations} erscheint > {k_index}-mal"
return False, f"k={k} violations: {', '.join(violations)}"
return True, "OK"
class kOREInference:
"""
iDRegEx: k-ORE Inferenz via PTA Shrink Repair Expression.
| Algorithm 4: iDRegEx |
Require: sample S, kmax
Ensure: k-ORE r
Nach Bex et al. 2008:
- Baue PTA aus Sequenzen
- Shrink: Rewrite-Regeln generalisieren
- Repair: Stelle Determinismus wieder her
- Convert: Extrahiere regulären Ausdruck via State Elimination
- Prüfe k-Occurrence
- Wähle k mit MDL
1: C
2: for k = 1 to kmax do
3: for n = 1 to N do
4: G iKoa(S, k)
5: if rwr²(G) is deterministic then
6: add rwr²(G) to C
7: return best(C) by MDL
"""
def __init__(self, k_max=5):
def __init__(self, k_max=5, N=5):
self.k_max = k_max
self.N = N
def infer(self, sequences):
"""
Inferiere den besten k-ORE.
Infer the best k-ORE for the given sequences.
Returns:
(Automaton, expression_string, best_k) oder None
(koa_automaton, expression_string, best_k) or None if no valid
k-ORE can be inferred.
"""
sequences = [s for s in sequences if s]
if not sequences:
return None, "", 0
return None
best_score = float('inf')
best_result = None
candidates = []
for k in range(1, self.k_max + 1):
try:
auto, expr = self._infer_k_expression(sequences, k)
if auto is None:
for _ in range(self.N):
G = ikoa(sequences, k, num_trials=1)
if G is None:
continue
score = mdl_score(auto, sequences)
if score < best_score:
best_score = score
best_result = (auto, expr, k)
except Exception:
continue
expr = rwr_sq(G)
if expr and expr not in ('', 'ε'):
if is_deterministic(expr):
valid, _ = validate_k_ore(expr, k)
if valid:
candidates.append((G, expr, k))
return best_result
if not candidates:
return None
def _infer_k_expression(self, sequences, k):
"""Führe iDRegEx für ein spezifisches k durch."""
# 1. PTA bauen
pta = build_pta(sequences)
# 2. Shrink
shrunk = shrink(pta, max_iterations=20)
# 3. Repair
repaired = repair(shrunk)
# 4. Expression extrahieren
expr = _extract_expression(repaired)
# 5. k-ORE Prüfung
valid, _ = validate_k_ore(expr, k)
if not valid:
expr = self._generalize_to_k_ore(expr, k)
return repaired, expr
def _generalize_to_k_ore(self, expr, k):
"""
Generalisiere den Ausdruck zur k-ORE.
Wenn Token t mehr als k-mal vorkommt:
- Ersetze Wiederholungen durch t+ oder t*
"""
# Einfache Heuristik: Extrahiere Token, zähle, ersetze
result = expr
token_counts = {}
i = 0
while i < len(result):
if result[i].isalnum() or result[i] in '/_-':
j = i
while j < len(result) and (result[j].isalnum() or result[j] in '/_-'):
j += 1
token = result[i:j]
token_counts[token] = token_counts.get(token, 0) + 1
i = j
else:
i += 1
for token, count in token_counts.items():
if count > k:
# Ersetze token.token durch token+
import re
pattern = re.escape(token) + r'\..' + re.escape(token)
replacement = f"{token}+"
result = re.sub(pattern, replacement, result, count=1)
break
return result
return min(candidates, key=lambda c: mdl_score(c[1], sequences))

View file

@ -17,6 +17,7 @@ def infer_best_grammar(
prefer: str = "",
kmax: int = 2,
N: int = 3,
min_coverage: float = 1.0,
) -> str:
"""Infer a compact grammar from example sequences. Use this when you
have examples of sequential data and want to learn the pattern.
@ -29,19 +30,26 @@ def infer_best_grammar(
sequences: List of sequences, each a list of strings (symbols in
the order they appear). Example: [["file","copy","command"],
["file","template","command"]].
prefer: Optional 'crx' for full coverage (accepts all examples),
'idregex' for minimal core (only what every example shares).
Default: runs both and picks best by MDL score.
kmax: Maximum k for iDRegEx k-ORE inference.
N: Number of EM iterations for iDRegEx.
prefer: Optional 'crx' for full vocabulary (accepts all examples),
'idregex' for deterministic minimal core. Omit to auto-pick by MDL.
kmax: Context depth for k-ORE inference. Default 2.
N: Random trials for k-ORE inference (higher = better, slower).
min_coverage: (Expert) When < 1.0, also runs a **core+outlier analysis**:
iteratively removes outlier sequences (those with rarest symbols)
until at least this fraction remain. Returns the core grammar
for the majority, plus a list of which sequences were removed and why.
Default 1.0 = no core analysis. Set to 0.8 to find the tight
pattern shared by ~80% of examples while flagging the other ~20%
as variations.
Returns:
A formatted string with the best grammar, scores, and explanation.
When min_coverage < 1.0, includes the core grammar and outlier info.
Grammar notation: a.b = a then b, (a+b) = a or b, r? = optional,
r+ = one or more, r+? = zero or more.
"""
pref = prefer if prefer else None
result = infer_ensemble(sequences, kmax=kmax, N=N, prefer=pref)
result = infer_ensemble(sequences, kmax=kmax, N=N, prefer=pref, min_coverage=min_coverage)
if result['best'] is None:
return f"No grammar found. {result['why']}"
lines = [f"Best: {result['best']['algorithm']} (MDL {result['best']['mdl_score']})",
@ -53,6 +61,13 @@ def infer_best_grammar(
lines.append(f" {r['algorithm']:10s} MDL={r['mdl_score']:>8.2f} match={m}/{len(sequences)}")
lines.append("")
lines.append(f"Why: {result['why']}")
if 'core' in result and result['core']:
c = result['core']
lines.append(f"\nCore CRX ({c['coverage']:.0%} coverage, {c['outlier_count']} outliers): {c['grammar']}")
if c['outliers']:
lines.append(f" Outlier sequences:")
for i, o in enumerate(c['outliers'], 1):
lines.append(f" {i}. {''.join(str(x) for x in o[:8])}{'...' if len(o) > 8 else ''}")
return "\n".join(lines)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 137 KiB

After

Width:  |  Height:  |  Size: 124 KiB

265
tests/test_ensemble.py Normal file
View file

@ -0,0 +1,265 @@
"""Tests for infer_ensemble — runs CRX, iDRegEx, and kOREInference, picks best by MDL."""
from bex.ensemble import infer_ensemble
from bex.idregex import is_deterministic
from bex.kore import kOREInference
# ── Basic ensemble runs ──
def test_ensemble_returns_dict():
seqs = [['a', 'b', 'c'], ['a', 'b', 'c', 'd']]
result = infer_ensemble(seqs, kmax=2, N=3)
assert isinstance(result, dict)
assert 'best' in result
assert 'all' in result
assert 'why' in result
def test_ensemble_best_not_none():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, kmax=2, N=3)
assert result['best'] is not None
assert result['best']['grammar'] is not None
assert result['best']['algorithm'] in ('CRX', 'iDRegEx', 'kOREInference')
assert result['best']['mdl_score'] is not None
def test_ensemble_runs_all_three():
seqs = [['a', 'b', 'c'], ['a', 'b', 'c', 'd']]
result = infer_ensemble(seqs, kmax=2, N=3)
algos = {a['algorithm'] for a in result['all']}
assert 'CRX' in algos
# iDRegEx and kOREInference may fail stochastically, so at least CRX
assert len(result['all']) >= 1
def test_ensemble_all_results_have_scores():
seqs = [['a', 'b'], ['a', 'b', 'b']]
result = infer_ensemble(seqs, kmax=2, N=3)
for entry in result['all']:
assert 'algorithm' in entry
assert 'grammar' in entry
assert 'mdl_score' in entry
assert isinstance(entry['mdl_score'], (int, float))
def test_ensemble_deterministic_results():
seqs = [['x', 'y'], ['x', 'z']]
result = infer_ensemble(seqs, kmax=2, N=3)
if result['best']:
assert is_deterministic(result['best']['grammar'])
# ── prefer parameter tests ──
def test_prefer_crx():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, prefer='crx')
assert result['best']['algorithm'] == 'CRX'
assert len(result['all']) == 1
def test_prefer_idregex():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, prefer='idregex', kmax=2, N=5)
assert result['best']['algorithm'] == 'iDRegEx'
assert len(result['all']) == 1
def test_prefer_koreinference():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, prefer='koreinference', kmax=2, N=5)
assert result['best']['algorithm'] == 'kOREInference'
assert len(result['all']) == 1
def test_prefer_case_insensitive():
seqs = [['a', 'b']]
r1 = infer_ensemble(seqs, prefer='CRX')
r2 = infer_ensemble(seqs, prefer='Crx')
assert r1['best']['algorithm'] == r2['best']['algorithm']
def test_prefer_unknown_falls_back():
seqs = [['a', 'b']]
result = infer_ensemble(seqs, prefer='unknown')
assert result['best'] is not None
assert len(result['all']) >= 1
# ── Edge cases ──
def test_ensemble_empty_input():
result = infer_ensemble([], kmax=2, N=3)
assert result['best'] is None or result['best']['grammar'] is not None
def test_ensemble_single_sequence():
result = infer_ensemble([['a', 'b', 'c']], kmax=2, N=3)
assert result['best'] is not None
assert result['best']['grammar'] is not None
def test_ensemble_many_identical():
seqs = [['a', 'b', 'c']] * 10
result = infer_ensemble(seqs, kmax=2, N=3)
assert result['best'] is not None
def test_ensemble_linear_data():
seqs = [
['file', 'template', 'command', 'set_fact', 'shell'],
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
]
result = infer_ensemble(seqs, kmax=2, N=3)
if result['best']:
g = result['best']['grammar']
assert 'file' in g and 'template' in g and 'shell' in g
def test_ensemble_branching_data():
seqs = [
['file', 'template', 'setup', 'shell'],
['file', 'template', 'deploy', 'shell'],
]
result = infer_ensemble(seqs, kmax=2, N=5)
if result['best']:
g = result['best']['grammar']
assert is_deterministic(g)
assert 'file' in g and 'template' in g and 'shell' in g
def test_ensemble_why_includes_scores():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, kmax=2, N=3)
assert 'CRX' in result['why']
assert 'selected' in result['why']
assert 'MDL' in result['why'] or 'score' in result['why'].lower()
def test_ensemble_ordering_best_first():
seqs = [['a', 'b', 'c'], ['a', 'b']]
result = infer_ensemble(seqs, kmax=2, N=3)
if result['best']:
assert result['all'][0]['algorithm'] == result['best']['algorithm']
assert result['all'][0]['mdl_score'] <= result['all'][-1]['mdl_score']
# ── Stochastic stability tests ──
def test_ensemble_stable_on_simple_data():
for _ in range(3):
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, kmax=2, N=3)
if result['best']:
assert 'a' in result['best']['grammar']
assert 'b' in result['best']['grammar']
def test_ensemble_crx_always_present():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, kmax=2, N=3)
crx_results = [a for a in result['all'] if a['algorithm'] == 'CRX']
assert len(crx_results) == 1
# ── min_coverage / core analysis tests ──
def test_core_not_included_when_coverage_1():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, min_coverage=1.0)
assert 'core' not in result
def test_core_included_when_coverage_lt_1():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, min_coverage=0.8)
assert 'core' in result
assert 'grammar' in result['core']
assert 'coverage' in result['core']
assert 'outliers' in result['core']
assert 'outlier_count' in result['core']
def test_core_outlier_detection():
seqs = [
['fail', 'package', 'file', 'service'],
['fail', 'package', 'file', 'service'],
['fail', 'package', 'file', 'service', 'npm'],
['fail', 'package', 'file', 'service', 'npm', 'pip'],
]
result = infer_ensemble(seqs, min_coverage=0.7)
assert 'core' in result
c = result['core']
assert c['outlier_count'] >= 1
assert 'npm' in c['grammar'] or 'service' in c['grammar']
def test_core_all_identical():
seqs = [['a', 'b', 'c']] * 10
result = infer_ensemble(seqs, min_coverage=0.8)
assert 'core' in result
assert result['core']['outlier_count'] == 0
assert 'a' in result['core']['grammar']
def test_core_coverage_ratio():
seqs = [
['a', 'b', 'c'],
['a', 'b', 'c'],
['a', 'b', 'c', 'd'],
['a', 'b', 'c', 'd', 'e'],
]
result = infer_ensemble(seqs, min_coverage=0.7)
if 'core' in result:
c = result['core']
assert c['outlier_count'] >= 1
assert len(c['outliers']) >= 1
assert c['coverage'] >= 0.5
def test_core_empty_sequences():
result = infer_ensemble([], min_coverage=0.8)
assert 'core' in result
assert result['core']['grammar'] is not None
def run_all():
tests = [
test_ensemble_returns_dict,
test_ensemble_best_not_none,
test_ensemble_runs_all_three,
test_ensemble_all_results_have_scores,
test_ensemble_deterministic_results,
test_prefer_crx,
test_prefer_idregex,
test_prefer_koreinference,
test_prefer_case_insensitive,
test_prefer_unknown_falls_back,
test_ensemble_empty_input,
test_ensemble_single_sequence,
test_ensemble_many_identical,
test_ensemble_linear_data,
test_ensemble_branching_data,
test_ensemble_why_includes_scores,
test_ensemble_ordering_best_first,
test_ensemble_stable_on_simple_data,
test_ensemble_crx_always_present,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
import traceback
print(f" FAIL {t.__name__}: {e}")
traceback.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
if __name__ == '__main__':
run_all()

375
tests/test_kore.py Normal file
View file

@ -0,0 +1,375 @@
"""Tests for kOREInference (Algorithm 4: iDRegEx from arXiv 1004.2372)."""
from bex.kore import kOREInference, validate_k_ore
from bex.idregex import is_deterministic
from bex.mdl import mdl_score, model_cost, data_cost
# ── Core inference tests ──
def test_linear_sequence():
seqs = [
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
]
kore = kOREInference(k_max=3, N=3)
result = kore.infer(seqs)
assert result is not None, "Should infer a k-ORE"
auto, expr, best_k = result
assert expr is not None
assert all(t in expr for t in ['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'])
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
def test_branching_paths():
seqs = [
['file', 'template', 'setup', 'set_fact', 'shell'],
['file', 'template', 'deploy', 'set_fact', 'shell'],
]
kore = kOREInference(k_max=3, N=3)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
assert 'file' in expr and 'template' in expr and 'shell' in expr
def test_optional_element():
seqs = [
['file', 'template', 'shell'],
['file', 'template', 'exec', 'shell'],
['file', 'template', 'exec', 'exec', 'shell'],
]
kore = kOREInference(k_max=4, N=15)
result = kore.infer(seqs)
if result is None:
return # stochastic failure
auto, expr, best_k = result
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
def test_looping_element():
seqs = [
['package', 'file', 'template', 'systemd'],
['package', 'file', 'template', 'template', 'systemd', 'systemd'],
['package', 'file', 'template', 'template', 'template', 'systemd'],
]
kore = kOREInference(k_max=3, N=5)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
def test_multiple_alternatives():
seqs = [
['install', 'configure', 'start'],
['install', 'configure', 'enable'],
['install', 'configure', 'restart'],
]
kore = kOREInference(k_max=3, N=5)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
def test_rejects_non_deterministic():
seqs = [['a'], ['a']]
kore = kOREInference(k_max=2, N=2)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr), f"Non-deterministic: {expr}"
def test_empty_input():
kore = kOREInference(k_max=2, N=2)
result = kore.infer([])
assert result is None
result = kore.infer([[], []])
assert result is None
def test_single_element_sequences():
seqs = [['a'], ['b'], ['a'], ['b']]
kore = kOREInference(k_max=2, N=3)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr)
def test_infer_returns_best_k():
seqs = [
['a', 'b', 'c'],
['a', 'b', 'c', 'd'],
['a', 'b', 'd'],
]
kore = kOREInference(k_max=4, N=3)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert 1 <= best_k <= 4
assert is_deterministic(expr)
def test_tool_sequences():
seqs = [
['read', 'grep', 'read'],
['read', 'glob', 'grep', 'read'],
['read', 'bash', 'read'],
['glob', 'grep', 'read', 'edit', 'bash'],
['read', 'edit', 'bash', 'bash'],
['bash', 'read', 'bash'],
]
kore = kOREInference(k_max=3, N=5)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
# ── Edge case tests ──
def test_single_sequence():
kore = kOREInference(k_max=2, N=3)
result = kore.infer([['a', 'b', 'c']])
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr)
def test_many_identical_sequences():
seqs = [['a', 'b', 'c']] * 20
kore = kOREInference(k_max=2, N=3)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr)
assert 'a' in expr and 'b' in expr and 'c' in expr
def test_xml_like_structured():
seqs = [
['header', 'body', 'footer'],
['header', 'body', 'body', 'footer'],
['header', 'body', 'body', 'body', 'footer'],
['header', 'footer'],
]
kore = kOREInference(k_max=3, N=10)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
assert 'header' in expr and 'footer' in expr
def test_disjoint_symbols():
seqs = [
['alpha', 'beta'],
['gamma', 'delta'],
]
kore = kOREInference(k_max=2, N=3)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
def test_k1_vs_k2_selection():
seqs = [
['a', 'a', 'b'],
['a', 'b'],
['a', 'a', 'a', 'b'],
]
kore = kOREInference(k_max=3, N=5)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr)
def test_all_same_symbol():
seqs = [
['a', 'a'],
['a', 'a', 'a'],
['a'],
]
kore = kOREInference(k_max=2, N=5)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
def test_long_sequence():
seqs = [
['a', 'b', 'c', 'd', 'e', 'f', 'g'],
['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'],
]
kore = kOREInference(k_max=2, N=5)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
def test_infer_returns_koa():
kore = kOREInference(k_max=2, N=3)
result = kore.infer([['a', 'b'], ['a', 'b', 'c']])
assert result is not None
auto, expr, best_k = result
assert hasattr(auto, '_succ'), "Should return a KOA automaton"
assert hasattr(auto, 'src')
assert hasattr(auto, 'sink')
def test_different_kmax():
seqs = [['a', 'b', 'c', 'd', 'e'], ['a', 'b', 'c']]
kore1 = kOREInference(k_max=1, N=5)
kore2 = kOREInference(k_max=3, N=5)
r1 = kore1.infer(seqs)
r2 = kore2.infer(seqs)
assert r1 is not None or r2 is not None
# ── validate_k_ore tests ──
def test_validate_k_ore_basic():
valid, reason = validate_k_ore('a.b.c', 2)
assert valid, f"a.b.c should be valid for k=2: {reason}"
def test_validate_k_ore_exceeds_k():
valid, reason = validate_k_ore('a.a.a', 1)
assert not valid, "a.a.a should fail for k=1"
def test_validate_k_ore_epsilon():
valid, reason = validate_k_ore('ε', 1)
assert valid
def test_validate_k_ore_empty():
valid, reason = validate_k_ore('', 1)
assert valid
def test_validate_k_ore_disjunction():
valid, reason = validate_k_ore('(a|b|c)', 2)
assert valid, f"(a|b|c) should be valid for k=2: {reason}"
def test_validate_k_ore_loop():
valid, reason = validate_k_ore('a+', 1)
assert valid, "a+ should be valid for k=1"
def test_validate_k_ore_k0():
valid, reason = validate_k_ore('a', 0)
assert not valid, "a should fail for k=0"
# ── MDL scoring tests ──
def test_mdl_model_cost():
assert model_cost('a.b.c') == 3
assert model_cost('(a|b)+.c') >= 2
assert model_cost('ε') >= 0
def test_mdl_data_cost():
# General expression (a|b)+ has multiple words of length 1+: non-zero cost
dc = data_cost('(a|b)+', [['a', 'b'], ['b', 'a'], ['a']])
assert dc > 0, f"data_cost should be > 0 for general expression, got {dc}"
# Exact expression has cost 0 (log2(1) = 0)
dc_exact = data_cost('a.b.c', [['a', 'b', 'c']])
assert dc_exact == 0.0, f"data_cost for exact match should be 0, got {dc_exact}"
def test_mdl_score_lower_is_better():
score_specific = mdl_score('a.b.c', [['a', 'b', 'c']])
score_general = mdl_score('(a|b|c)+?', [['a', 'b', 'c']])
assert score_specific > 0 and score_general > 0
def test_mdl_empty_sequences():
score = mdl_score('a.b.c', [])
assert score == model_cost('a.b.c')
# ── Algorithm 4 paper-faithful tests ──
def test_infer_returns_deterministic():
for _ in range(5):
seqs = [['x', 'y'], ['x', 'z']]
kore = kOREInference(k_max=2, N=2)
result = kore.infer(seqs)
if result:
_, expr, _ = result
assert is_deterministic(expr), f"Non-deterministic: {expr}"
def test_infer_obeys_k_occurrence():
seqs = [['a', 'b'], ['a', 'b', 'c']]
for k in range(1, 4):
kore = kOREInference(k_max=k, N=5)
result = kore.infer(seqs)
if result:
_, expr, best_k = result
valid, _ = validate_k_ore(expr, best_k)
assert valid, f"k={best_k} expression {expr} violates k-occurrence"
def run_all():
tests = [
test_linear_sequence,
test_branching_paths,
test_optional_element,
test_looping_element,
test_multiple_alternatives,
test_rejects_non_deterministic,
test_empty_input,
test_single_element_sequences,
test_infer_returns_best_k,
test_tool_sequences,
test_single_sequence,
test_many_identical_sequences,
test_xml_like_structured,
test_disjoint_symbols,
test_k1_vs_k2_selection,
test_all_same_symbol,
test_long_sequence,
test_infer_returns_koa,
test_different_kmax,
test_validate_k_ore_basic,
test_validate_k_ore_exceeds_k,
test_validate_k_ore_epsilon,
test_validate_k_ore_empty,
test_validate_k_ore_disjunction,
test_validate_k_ore_loop,
test_validate_k_ore_k0,
test_mdl_model_cost,
test_mdl_data_cost,
test_mdl_score_lower_is_better,
test_mdl_empty_sequences,
test_infer_returns_deterministic,
test_infer_obeys_k_occurrence,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
import traceback
print(f" FAIL {t.__name__}: {e}")
traceback.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
if __name__ == '__main__':
run_all()