Merge pull request 'feat: kOREInference — Algorithm 4 iDRegEx with MDL scoring + ensemble integration' (#1) from feature/kore-inference into main
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
This commit is contained in:
commit
d2d57bc431
10 changed files with 1058 additions and 496 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -6,3 +6,4 @@ venv/
|
|||
*.egg-info/
|
||||
dist/
|
||||
build/
|
||||
examples/
|
||||
|
|
|
|||
49
README.md
49
README.md
|
|
@ -1,10 +1,18 @@
|
|||
# Dervish MCP
|
||||
|
||||
<p align="left">
|
||||
<img src="dervish-logo.png" alt="Dervish" width="180">
|
||||
<p align="center">
|
||||
<img src="dervish-logo.png" alt="Dervish" width="216">
|
||||
</p>
|
||||
<p align="left">
|
||||
<p align="center">
|
||||
<img src="https://img.shields.io/badge/license-MIT-blue" alt="License">
|
||||
<img src="https://img.shields.io/badge/python-3.10%2B-blue" alt="Python 3.10+">
|
||||
<img src="https://ci.corentic.eu/api/badges/7/status.svg" alt="CI Pipeline Status">
|
||||
<br>
|
||||
<a href="SHOWCASE.md">Showcase</a> ·
|
||||
<a href="#quick-start">Usage</a> ·
|
||||
<a href="#papers">Papers</a>
|
||||
<br><br>
|
||||
<a href="https://www.buymeacoffee.com/IjonTichy85"><img src="https://cdn.buymeacoffee.com/buttons/v2/default-yellow.png" alt="Buy me a coffee" width="140"></a>
|
||||
</p>
|
||||
|
||||
**Dervish** infers **regular expression grammars** from example sequences using the BEX family of algorithms. Given a set of example sequences (strings over some alphabet), it learns a compact regular expression that captures the general pattern.
|
||||
|
|
@ -41,18 +49,19 @@ The primary interface is a **Model Context Protocol (MCP)** server. Connect any
|
|||
|
||||
| Tool | Parameters | What it does |
|
||||
|------|-----------|-------------|
|
||||
| `infer_best_grammar` | `sequences`, `prefer`, `kmax`, `N` | **The only tool you need.** Runs CRX + iDRegEx, picks best by MDL. Set `prefer='crx'` for full coverage or `prefer='idregex'` for minimal core — skips the ensemble and runs one algorithm. |
|
||||
| `infer_best_grammar` | `sequences`, `prefer`, `kmax`, `N`, `min_coverage` | **The only tool you need.** Runs CRX + iDRegEx + kOREInference, picks best by MDL. Set `prefer` to run only one algorithm. Set `min_coverage < 1.0` for optional core+outlier analysis. |
|
||||
|
||||
**Parameters explained:**
|
||||
- **`prefer`**: `'crx'` for full vocabulary (accepts all sequences), `'idregex'` for minimal common core (only what every example shares). Omit to let MDL pick the winner.
|
||||
- **`kmax`** (1–5): Context window for iDRegEx's k-testable automaton. Higher values capture longer-range dependencies but need more data and are slower. Default 2 works for most cases.
|
||||
- **`N`** (1–10): Baum-Welch EM iterations for iDRegEx training. More iterations = better convergence but slower. Default 3 is a good balance.
|
||||
- **`prefer`**: `'crx'` for full vocabulary (accepts all sequences), `'idregex'` or `'koreinference'` for deterministic minimal core. Omit to let MDL pick the winner across all three.
|
||||
- **`kmax`** (1–5): Context window for k-ORE inference (iDRegEx, kOREInference). Higher values capture longer-range dependencies but need more data and are slower. Default 2 works for most cases.
|
||||
- **`N`** (1–10): Random trials for k-ORE inference. More = better convergence but slower. Default 3.
|
||||
- **`min_coverage`** (0.5–1.0): **Optional core+outlier analysis.** When < 1.0, iteratively removes outlier sequences (those with the rarest symbols) until at least this fraction remain. Returns the core CRX grammar for the majority plus a list of removed outliers. Default 1.0 = disabled. Example: `min_coverage=0.8` finds the tight pattern for ~80% of examples while flagging the other ~20% as variants.
|
||||
|
||||
### Agent workflow
|
||||
|
||||
An LLM agent uses the MCP to discover an unwritten convention from existing examples — compressing hundreds of files into a single ~60-token rule:
|
||||
|
||||
```
|
||||
```text
|
||||
User: Generate a new Ansible role for installing PostgreSQL.
|
||||
|
||||
Agent: Let me check what pattern the existing community roles follow.
|
||||
|
|
@ -78,6 +87,11 @@ Agent: Let me check what pattern the existing community roles follow.
|
|||
|
||||
**With Dervish:** one MCP call returns a ~60-token grammar known to match 15/15 existing roles. The agent follows it reliably.
|
||||
|
||||
**Core+outlier mode:** When generating a new role, the agent can call with
|
||||
`min_coverage=0.8` to learn the mainstream pattern while seeing which roles
|
||||
deviate and why — useful when the user's case resembles an outlier
|
||||
(e.g., a PHP app like phpmyadmin that needs raw `lineinfile`).
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
|
|
@ -108,8 +122,8 @@ Dervish discovers these conventions automatically from existing examples. The do
|
|||
| Domain | What gets extracted | Example extracted symbols | What Dervish discovers | Why it helps an LLM |
|
||||
|--------|-------------------|--------------------------|----------------------|---------------------|
|
||||
| Ansible roles | Module names from `tasks/main.yml` in order | `fail`, `include_vars`, `set_fact`, `package`, `file`, `template`, `service`, `npm`, `pip`, `lineinfile` | `fail?.(include_vars+set_fact+package+file+template+service+...)+.include+?.(npm+pip)+?.lineinfile?` | "Validate preconditions first, then set vars, install packages, configure with templates, start services. Include sub-roles last." |
|
||||
| Helm charts | K8s resource kinds from `helm template` output in rendered order | `ServiceAccount`, `ClusterRole`, `ClusterRoleBinding`, `Service`, `Deployment`, `ConfigMap`, `Alertmanager` | `ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment` (iDRegEx minimal core) | "Every Prometheus stack needs this bootstrap pipeline. Everything else is optional." |
|
||||
| GitHub Actions (Go lint) | Step `uses:` or `run:` values from workflow YAML in job order | `actions/checkout`, `actions/setup-go`, `golangci/golangci-lint-action`, `megalinter/megalinter` | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | "Starting a new Go project on GitHub Actions? Four independent projects converged on: checkout → setup Go → golangci-lint → (optionally megalinter)." |
|
||||
| Helm charts (cross-project, 15 charts) | K8s resource kinds from `helm template` output in rendered order | `NetworkPolicy`, `PodDisruptionBudget`, `ServiceAccount`, `Secret`, `ConfigMap`, `Service`, `Deployment`, `StatefulSet`, `ClusterRole`, `ClusterRoleBinding` | `NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret?.ConfigMap?.PersistentVolumeClaim?.ClusterRole?.ClusterRoleBinding?.Service.Deployment?.StatefulSet?.(IngressClass+MutatingWebhookConfiguration)?.ValidatingWebhookConfiguration?.Job?` | "Writing a Helm chart? Start with resilience (PDB, NetworkPolicy), then identity (ServiceAccount, Secrets), then the Service, then your workload. Only cluster-wide tools need RBAC." |
|
||||
| GitHub Actions (Go lint) | Step `uses:` or `run:` values from workflow YAML in job order | `actions/checkout`, `actions/setup-go`, `golangci/golangci-lint-action`, `megalinter/megalinter` | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | "Starting a new Go project on GitHub Actions? Four independent projects converged on: checkout → setup Go → (optional golangci-lint) → (optional megalinter)." |
|
||||
|
||||
|
||||
## Real-world Results
|
||||
|
|
@ -119,19 +133,22 @@ Dervish has been tested against public datasets from Ansible Galaxy, Helm, and G
|
|||
| Dataset | Best grammar | Compression |
|
||||
|---------|-------------|-------------|
|
||||
| Ansible Galaxy (15 roles) | `fail?.(include_vars+set_fact+package+file+template+service+...)+.include+?.(npm+pip)+?.lineinfile?` | 5,000 tokens → 60 tokens (83×) |
|
||||
| Helm (6 configs) | `ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment` | ~3,000 tokens → 40 tokens (75×) |
|
||||
| Helm cross-project (15 charts) | `NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret?.ConfigMap?...Service.Deployment?.StatefulSet?...` | 121 tokens → 35 tokens (3.5×) |
|
||||
| Go lint (6 jobs) | `actions/checkout.(actions/setup-go+run:echo+run:sudo)+.golangci/golangci-lint-action?.megalinter?` | ~900 tokens → 30 tokens (30×) |
|
||||
|
||||
The sweet spot: **multiple implementations of the same abstract task** with a shared but undocumented pattern. Not everything works — Dockerfiles, pre-commit configs, and schema-enforced formats are too rigid or too diverse to yield a convention.
|
||||
|
||||
> **kOREInference note:** Algorithm 4 (iDRegEx with MDL, arXiv 1004.2372) is included for paper-faithful correctness. On real tool-sequence data, its rwr₀ repair step returns ∅ because the k-OA is rarely SORE (interconnected symbols). The ensemble falls back to CRX or iDRegEx automatically.
|
||||
|
||||
## Algorithm Selection Guide
|
||||
|
||||
| When | Use | Why |
|
||||
|------|-----|-----|
|
||||
| Clean, structured data with full vocabulary | **CRX** | Single-pass, deterministic. Accepts all sequences. |
|
||||
| Few examples, or want minimal common core | **iDRegEx** | Probabilistic EM, finds only what's shared. |
|
||||
| Don't know which is better | **Ensemble (default)** | Runs both, picks the best by MDL score. |
|
||||
| Data is clearly one type | `prefer='crx'` or `prefer='idregex'` | Skips ensemble comparison, runs one algorithm. |
|
||||
| Few examples, or want minimal common core | **iDRegEx** or **kOREInference** | Probabilistic EM, finds only what's shared. |
|
||||
| Don't know which is better | **Ensemble (default)** | Runs all three, picks best by MDL score. |
|
||||
| Want core pattern + outlier detection | **Ensemble + `min_coverage<1`** | Finds tight grammar for majority, flags outliers. |
|
||||
| Data is clearly one type | `prefer='crx'` | Skips ensemble comparison, runs CRX alone. |
|
||||
|
||||
## When each algorithm wins
|
||||
|
||||
|
|
@ -139,9 +156,11 @@ The sweet spot: **multiple implementations of the same abstract task** with a sh
|
|||
|---------------|--------|-----|
|
||||
| Diverse patterns, full vocabulary needed | CRX | Captures all symbols. iDRegEx returns ∅. |
|
||||
| Clean sequences with clear core | iDRegEx | Extracts minimal common subsequence. CRX buries it in optional noise. |
|
||||
| Interconnected (non-SORE) data | CRX | kOREInference (rwr₀) returns ∅ when k-OA is not SORE. CRX handles it. |
|
||||
| Single sequence | iDRegEx (+ RWR₀) | RWR₀ repair produces a grammatical regex from one example. |
|
||||
| 2–3 sequences | iDRegEx | CRX overfits. iDRegEx handles noise better. |
|
||||
| Many sequences, tight pattern | CRX | Learns precise concatenation with optional suffixes. |
|
||||
| Want majority pattern + outlier list | CRX + `min_coverage` | Core analysis finds tight grammar for ~80%, flags the rest. |
|
||||
|
||||
## Token savings
|
||||
|
||||
|
|
@ -153,7 +172,7 @@ Across all public benchmarks, Dervish delivers **40–83× compression**. The gr
|
|||
|
||||
## How MDL scoring works
|
||||
|
||||
```
|
||||
```text
|
||||
MDL = model_cost + data_cost
|
||||
```
|
||||
|
||||
|
|
|
|||
78
SHOWCASE.md
78
SHOWCASE.md
|
|
@ -6,7 +6,7 @@ Infer the **unwritten convention** from existing examples. Given N example
|
|||
sequences, produce a ~100-char grammar that captures the structural
|
||||
pattern — in far fewer tokens than the originals.
|
||||
|
||||
```
|
||||
```text
|
||||
a.b → a then b (concatenation)
|
||||
(a+b) → a or b (disjunction)
|
||||
r? → optional (zero or one)
|
||||
|
|
@ -14,13 +14,13 @@ r+ → one or more (iteration)
|
|||
r+? → zero or more
|
||||
```
|
||||
|
||||
## 1. Ansible Galaxy roles (15 geerlingguy roles) — flagship
|
||||
## 1. Ansible Galaxy roles (15 geerlingguy roles)
|
||||
|
||||
15 popular Ansible roles by Jeff Geerling. There is NO written convention
|
||||
for the module ordering in `tasks/main.yml`. Our grammar is its first
|
||||
explicit description:
|
||||
|
||||
```
|
||||
```text
|
||||
Grammar: fail?.(include_vars+set_fact+package+file+template+service+...)+.
|
||||
include+?.(npm+pip)+?.lineinfile?
|
||||
```
|
||||
|
|
@ -34,25 +34,60 @@ All 15/15 match. **~29× compression** (7200+ modules → ~250 chars).
|
|||
exact structure: fail-check first, then vars, then packages, then config/svc.
|
||||
No guessing.
|
||||
|
||||
## 2. Helm chart (kube-prometheus-stack, 6 configs)
|
||||
### Bonus: core+outlier analysis
|
||||
|
||||
6 different `values.yaml` files rendered through the same chart:
|
||||
Set `min_coverage=0.8` to find the tight grammar for the majority while
|
||||
flagging outlier roles with unusual module usage:
|
||||
|
||||
```
|
||||
Best: iDRegEx | MDL 1433
|
||||
Grammar: ServiceAccount.ClusterRole.ClusterRoleBinding.Service.Deployment
|
||||
```text
|
||||
Core CRX (80% coverage, 3 outliers):
|
||||
fail?.(include_vars+set_fact+package+file+template+service+...)+
|
||||
|
||||
Outlier sequences:
|
||||
1. phpmyadmin: include_vars → set_fact → include → include → lineinfile
|
||||
2. composer: fail → set_fact → stat → uri → get_url → command
|
||||
3. pip: package → file → pip
|
||||
```
|
||||
|
||||
The **minimal core** every config must deploy. CRX captures the full
|
||||
vocabulary (19 kinds). Which one an agent uses depends on the task:
|
||||
- Bootstrapping a new cluster: iDRegEx — what you can't skip
|
||||
- Writing a complete chart: CRX — everything you might need
|
||||
phpmyadmin uses raw `lineinfile` instead of templates; composer needs
|
||||
a `stat` check + `uri` download; pip is purely `pip` — all three deviate
|
||||
from the mainstream install → configure → enable pattern.
|
||||
|
||||
## 2. Helm charts — cross-project convention (15 charts, 6 publishers)
|
||||
|
||||
15 popular Helm charts from **Bitnami** (10), **Grafana**, **Jetstack** (cert-manager),
|
||||
**Argo**, **Ingress-Nginx**, and **Elastic**. Different publishers, different
|
||||
purposes (databases, web servers, infrastructure tools) — but they converged
|
||||
on a common resource ordering:
|
||||
|
||||
```text
|
||||
Best: CRX | MDL 230
|
||||
Grammar: NetworkPolicy?.PodDisruptionBudget?.ServiceAccount?.Secret?
|
||||
.ConfigMap?.PersistentVolumeClaim?.ClusterRole?.ClusterRoleBinding?
|
||||
.Role?.RoleBinding?.Service.Deployment?.StatefulSet?.
|
||||
(IngressClass+MutatingWebhookConfiguration)?.ValidatingWebhookConfiguration?.Job?
|
||||
|
||||
Match rates: CRX=15/15
|
||||
```
|
||||
|
||||
Every chart follows: **resilience → identity → data → service → workload → extensions**.
|
||||
|
||||
`Service` is the **only resource type that appears in all 15 charts**.
|
||||
Bitnami charts (10/15) consistently start with `NetworkPolicy + PodDisruptionBudget`
|
||||
before identity and service. Infrastructure tools (cert-manager, grafana,
|
||||
argo-cd, ingress-nginx) add RBAC and admission webhooks for cluster-wide access.
|
||||
|
||||
**Why it helps an LLM:** Generating a Helm chart template? You know the
|
||||
structure: start with availability guarantees (PDB, NetworkPolicy), then
|
||||
identity (ServiceAccount, Secrets), then the Service endpoint, then your
|
||||
workload type. Only cluster-wide tools need RBAC and webhooks — skip them
|
||||
for simple application charts.
|
||||
|
||||
## 3. GitHub Actions (cross-project Go lint, 6 jobs)
|
||||
|
||||
Lint jobs from prometheus, goreleaser, cosign, sigstore:
|
||||
|
||||
```
|
||||
```text
|
||||
Best: CRX | MDL 13.6
|
||||
Grammar: actions/checkout.(actions/setup-go+run:echo+run:sudo)+.
|
||||
golangci/golangci-lint-action?.megalinter?
|
||||
|
|
@ -77,10 +112,17 @@ with a shared but undocumented pattern.
|
|||
## Usage
|
||||
|
||||
```python
|
||||
from bex.mcp_server import infer_best_grammar
|
||||
from bex import infer_ensemble
|
||||
|
||||
output = infer_best_grammar(
|
||||
sequences=role_sequences,
|
||||
prefer="crx",
|
||||
)
|
||||
# Pick best across all 3 algorithms (CRX + iDRegEx + kOREInference)
|
||||
result = infer_ensemble(role_sequences)
|
||||
print(f"Best: {result['best']['algorithm']}")
|
||||
print(f"Grammar: {result['best']['grammar']}")
|
||||
|
||||
# Or: find the tight core + flag outliers
|
||||
result = infer_ensemble(role_sequences, min_coverage=0.8)
|
||||
print(f"Core: {result['core']['grammar']}")
|
||||
print(f"Outliers ({result['core']['outlier_count']}):")
|
||||
for i, o in enumerate(result['core']['outliers'], 1):
|
||||
print(f" {i}. {' → '.join(str(x) for x in o[:8])}{'...' if len(o) > 8 else ''}")
|
||||
```
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ from .crx import CRX
|
|||
from .ikoa import ikoa
|
||||
from .rwrsq import rwr_sq
|
||||
from .idregex import idregex
|
||||
from .kore import kOREInference, validate_k_ore
|
||||
from .koa import KOA, build_complete_koa
|
||||
from .expr import concat, disj, star, optional, alphabet, strip_k
|
||||
from .marking import mark_koa
|
||||
|
|
|
|||
302
bex/ensemble.py
302
bex/ensemble.py
|
|
@ -3,6 +3,7 @@
|
|||
import re
|
||||
from .crx import CRX
|
||||
from .idregex import idregex
|
||||
from .kore import kOREInference
|
||||
from .expr import alphabet
|
||||
from .mdl import model_cost, mdl_score
|
||||
|
||||
|
|
@ -233,6 +234,129 @@ def _matches(grammar, sequence):
|
|||
return False
|
||||
|
||||
|
||||
def _fit_score(grammar, seq):
|
||||
"""Score how tightly a sequence fits: 1.0 = perfect match to core,
|
||||
0.0 = mostly uses optional/repeated parts.
|
||||
|
||||
Instead of trying to parse the grammar structure (which is fragile),
|
||||
this measures how well seq matches against the grammatical core by
|
||||
comparing its symbol positions to the grammar's 'spine' — the symbols
|
||||
that appear in all sequences.
|
||||
"""
|
||||
if not seq:
|
||||
return 0.0
|
||||
try:
|
||||
# Strategy: parse grammar tokens, match seq, count what fraction
|
||||
# of seq length is consumed by obligatory (non-?, non-+?) tokens.
|
||||
tokens = _parse_parts(grammar.strip())
|
||||
if not tokens or tokens[0][0] == 'empty':
|
||||
return 0.0
|
||||
|
||||
def _classify_tokens(node):
|
||||
"""Return (obligatory_count, optional_count) for this node."""
|
||||
tt, tv, tq = node
|
||||
if tt == 'symbol':
|
||||
if tq in ('', '+'):
|
||||
return (1, 0)
|
||||
return (0, 1)
|
||||
if tt == 'concat':
|
||||
ob, op = 0, 0
|
||||
for c in tv:
|
||||
if c[0] == 'empty':
|
||||
continue
|
||||
o1, o2 = _classify_tokens(c)
|
||||
ob += o1
|
||||
op += o2
|
||||
return (ob, op)
|
||||
if tt == 'disj':
|
||||
# Any alternative counts as optional
|
||||
return (0, len(tv))
|
||||
return (0, 0)
|
||||
|
||||
ob, op = _classify_tokens(tokens[0])
|
||||
total = ob + op
|
||||
if total == 0:
|
||||
return 0.5
|
||||
|
||||
# Match seq and see how many symbols are actually consumed
|
||||
end = _match_tokens(tokens, seq)
|
||||
if end is None or end != len(seq):
|
||||
return 0.0
|
||||
|
||||
# Fit = fraction of mandatory symbols / total mandatory+optional
|
||||
# Penalizes sequences that lean heavily on optional parts
|
||||
return max(0.0, 1.0 - (op / total))
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
|
||||
def _symbol_rarity_score(seq, all_sequences):
|
||||
"""Score a sequence by how rare its symbols are across the dataset.
|
||||
1.0 = all symbols are common, 0.0 = mostly rare symbols.
|
||||
"""
|
||||
from collections import Counter
|
||||
all_syms = Counter()
|
||||
for s in all_sequences:
|
||||
all_syms.update(s)
|
||||
n = len(all_sequences)
|
||||
scores = []
|
||||
for sym in seq:
|
||||
freq = all_syms.get(sym, 0) / n
|
||||
scores.append(min(freq, 1.0))
|
||||
return sum(scores) / len(scores) if scores else 0.0
|
||||
|
||||
|
||||
def _find_core(sequences, min_coverage=0.8):
|
||||
"""Find the core subset of sequences by iterative CRX + outlier removal.
|
||||
|
||||
Outlier detection uses symbol rarity: sequences with rare symbols
|
||||
(appearing in few other sequences) are removed first.
|
||||
|
||||
Returns:
|
||||
(core_grammar, core_sequences, outliers, fit_scores)
|
||||
"""
|
||||
if not sequences or min_coverage >= 1.0:
|
||||
crx_g = CRX().infer(sequences)
|
||||
return crx_g, sequences, [], []
|
||||
|
||||
from collections import Counter
|
||||
all_syms = Counter()
|
||||
for s in sequences:
|
||||
all_syms.update(s)
|
||||
n = len(sequences)
|
||||
|
||||
def _rarity(seq):
|
||||
rare_count = sum(1 for sym in seq if all_syms.get(sym, 0) / n < 0.3)
|
||||
return rare_count / max(len(seq), 1)
|
||||
|
||||
working = list(sequences)
|
||||
removed_indices = []
|
||||
crx = CRX()
|
||||
|
||||
for _ in range(50):
|
||||
if len(working) < 3:
|
||||
break
|
||||
|
||||
target = max(int(len(sequences) * min_coverage), 1)
|
||||
if len(working) <= target:
|
||||
break
|
||||
|
||||
# Score by rarity: most rare symbol → worst fit
|
||||
scores = [(i, _rarity(seq)) for i, seq in enumerate(working)]
|
||||
scores.sort(key=lambda x: -x[1]) # most rare first
|
||||
|
||||
# If all sequences have the same score, stop (no outliers to remove)
|
||||
if len(scores) < 2 or scores[0][1] == scores[-1][1]:
|
||||
break
|
||||
|
||||
worst_idx = scores[0][0]
|
||||
removed_indices.append(working[worst_idx])
|
||||
working = [s for i, s in enumerate(working) if i != worst_idx]
|
||||
|
||||
core_g = crx.infer(working) if working else None
|
||||
return core_g, working, removed_indices, []
|
||||
|
||||
|
||||
def mdl_score_simple(grammar, sequences):
|
||||
"""MDL score from the paper: model_cost + Σ log₂(|L(r)| at length len(s)).
|
||||
|
||||
|
|
@ -243,102 +367,137 @@ def mdl_score_simple(grammar, sequences):
|
|||
return mdl_score(grammar, sequences)
|
||||
|
||||
|
||||
def infer_ensemble(sequences, kmax=2, N=3, prefer=None):
|
||||
def _run_idregex(sequences, kmax, N):
|
||||
"""Run standalone iDRegEx, return (grammar, score) or (None, inf)."""
|
||||
g = idregex(sequences, kmax=kmax, N=N)
|
||||
if g and g != '∅':
|
||||
return g, mdl_score_simple(g, sequences)
|
||||
return None, float('inf')
|
||||
|
||||
|
||||
def _run_kore(sequences, kmax, N):
|
||||
"""Run kOREInference (Algorithm 4 with MDL), return (grammar, score) or (None, inf)."""
|
||||
kore = kOREInference(k_max=kmax, N=N)
|
||||
result = kore.infer(sequences)
|
||||
if result:
|
||||
_, expr, _ = result
|
||||
return expr, mdl_score_simple(expr, sequences)
|
||||
return None, float('inf')
|
||||
|
||||
|
||||
_ALGO_NAMES = {
|
||||
'crx': 'CRX',
|
||||
'idregex': 'iDRegEx',
|
||||
'koreinference': 'kOREInference',
|
||||
}
|
||||
|
||||
|
||||
_ALGORITHMS = {
|
||||
'crx': lambda s, k, n: (CRX().infer(s), mdl_score_simple(CRX().infer(s), s)),
|
||||
'idregex': _run_idregex,
|
||||
'koreinference': _run_kore,
|
||||
}
|
||||
|
||||
|
||||
def infer_ensemble(sequences, kmax=2, N=3, prefer=None, min_coverage=1.0):
|
||||
"""Run all applicable algorithms and return the best by MDL score.
|
||||
|
||||
Args:
|
||||
sequences: List of sequences, each a list of strings.
|
||||
kmax: Maximum k for iDRegEx k-ORE inference.
|
||||
N: Number of EM iterations for iDRegEx.
|
||||
prefer: Optional — 'crx' or 'idregex' to skip ensemble and
|
||||
return only that algorithm's result.
|
||||
kmax: Maximum k for k-ORE inference (iDRegEx, kOREInference).
|
||||
N: Number of random trials for k-ORE inference.
|
||||
prefer: Optional — 'crx', 'idregex', or 'koreinference' to skip
|
||||
ensemble and return only that algorithm's result.
|
||||
min_coverage: When < 1.0, also runs CRX on the tightest core subset
|
||||
of sequences. Outliers (worst-fitting) are iteratively
|
||||
removed until at least this fraction remains. The core
|
||||
grammar and outlier list are included in the response.
|
||||
|
||||
Returns:
|
||||
dict with keys:
|
||||
best: {algorithm, grammar, mdl_score}
|
||||
all: [{algorithm, grammar, mdl_score}, ...]
|
||||
why: str explaining the choice
|
||||
core: (optional) {grammar, coverage, outliers} — only when
|
||||
min_coverage < 1.0
|
||||
"""
|
||||
if prefer and prefer.lower() in _ALGORITHMS:
|
||||
key = prefer.lower()
|
||||
fn = _ALGORITHMS[key]
|
||||
algo_name = _ALGO_NAMES.get(key, key)
|
||||
g, score = fn(sequences, kmax, N)
|
||||
if g and g != '∅':
|
||||
return {
|
||||
'best': {'algorithm': algo_name, 'grammar': g, 'mdl_score': round(score, 2)},
|
||||
'all': [{'algorithm': algo_name, 'grammar': g, 'mdl_score': round(score, 2)}],
|
||||
'why': f"Requested {algo_name} only.",
|
||||
}
|
||||
return {
|
||||
'best': None,
|
||||
'all': [],
|
||||
'why': f"{algo_name} returned ∅ (no grammar found).",
|
||||
}
|
||||
|
||||
results = []
|
||||
|
||||
if prefer and prefer.lower() == 'idregex':
|
||||
idr_g = idregex(sequences, kmax=kmax, N=N)
|
||||
idr_score = mdl_score_simple(idr_g, sequences) if idr_g and idr_g != '∅' else float('inf')
|
||||
if idr_g and idr_g != '∅':
|
||||
results.append(('iDRegEx', idr_g, idr_score))
|
||||
if not results:
|
||||
return {
|
||||
'best': None,
|
||||
'all': [],
|
||||
'why': "iDRegEx returned ∅ (no common core found).",
|
||||
}
|
||||
why = "Requested iDRegEx only."
|
||||
return {
|
||||
'best': {
|
||||
'algorithm': 'iDRegEx',
|
||||
'grammar': results[0][1],
|
||||
'mdl_score': round(results[0][2], 2),
|
||||
},
|
||||
'all': [{'algorithm': 'iDRegEx', 'grammar': results[0][1], 'mdl_score': round(results[0][2], 2)}],
|
||||
'why': why,
|
||||
}
|
||||
|
||||
# 1. CRX (always fast, always produces a result)
|
||||
crx_g = CRX().infer(sequences)
|
||||
crx_score = mdl_score_simple(crx_g, sequences)
|
||||
results.append(('CRX', crx_g, crx_score))
|
||||
crx_score = mdl_score_simple(crx_g, sequences) if crx_g and crx_g != '∅' else float('inf')
|
||||
results.append(('CRX', crx_g if crx_g and crx_g != '∅' else '∅', crx_score))
|
||||
|
||||
if prefer and prefer.lower() == 'crx':
|
||||
return {
|
||||
'best': {
|
||||
'algorithm': 'CRX',
|
||||
'grammar': crx_g,
|
||||
'mdl_score': round(crx_score, 2),
|
||||
},
|
||||
'all': [{'algorithm': 'CRX', 'grammar': crx_g, 'mdl_score': round(crx_score, 2)}],
|
||||
'why': "Requested CRX only.",
|
||||
}
|
||||
|
||||
idr_g = idregex(sequences, kmax=kmax, N=N)
|
||||
if idr_g and idr_g != '∅':
|
||||
idr_score = mdl_score_simple(idr_g, sequences)
|
||||
# 2. iDRegEx (standalone, langsize-based)
|
||||
idr_g, idr_score = _run_idregex(sequences, kmax, N)
|
||||
if idr_g:
|
||||
results.append(('iDRegEx', idr_g, idr_score))
|
||||
|
||||
results.sort(key=lambda x: x[2])
|
||||
# 3. kOREInference (Algorithm 4 with MDL scoring)
|
||||
kore_g, kore_score = _run_kore(sequences, kmax, N)
|
||||
if kore_g:
|
||||
results.append(('kOREInference', kore_g, kore_score))
|
||||
|
||||
results = [r for r in results if r[1] and r[1] != '∅']
|
||||
if not results:
|
||||
base = {
|
||||
'best': None,
|
||||
'all': [],
|
||||
'why': "No algorithm produced a non-empty grammar.",
|
||||
}
|
||||
if min_coverage < 1.0:
|
||||
core_g, core_seqs, outliers, _ = _find_core(sequences, min_coverage)
|
||||
base['core'] = {
|
||||
'grammar': core_g,
|
||||
'coverage': round(len(core_seqs) / max(len(sequences), 1), 2) if sequences else 0,
|
||||
'outliers': outliers,
|
||||
}
|
||||
return base
|
||||
|
||||
results.sort(key=lambda x: x[2])
|
||||
best = results[0]
|
||||
all_results = [
|
||||
{'algorithm': a, 'grammar': g, 'mdl_score': round(s, 2)}
|
||||
for a, g, s in results
|
||||
]
|
||||
|
||||
crx_match = sum(1 for s in sequences if _matches(crx_g, s))
|
||||
idr_match = sum(1 for s in sequences if _matches(idr_g, s)) if len(results) > 1 else 0
|
||||
active = {r[0] for r in results}
|
||||
|
||||
why_parts = []
|
||||
if len(results) == 1:
|
||||
why_parts.append(f"Only CRX produced a result (iDRegEx returned ∅).")
|
||||
why_parts.append(f"Only {results[0][0]} produced a result.")
|
||||
else:
|
||||
why_parts.append(
|
||||
f"{results[0][0]} (score {results[0][2]:.1f}) vs {results[1][0]} (score {results[1][2]:.1f})."
|
||||
)
|
||||
scores_str = ', '.join(f"{r[0]}={r[2]:.1f}" for r in results)
|
||||
why_parts.append(f"Scores: {scores_str}.")
|
||||
|
||||
if crx_match == idr_match == len(sequences):
|
||||
why_parts.append("Both grammars match all sequences.")
|
||||
why_parts.append(
|
||||
f"{results[0][0]} wins because it is more compact "
|
||||
f"(lower model cost) while matching all data."
|
||||
)
|
||||
elif crx_match != idr_match:
|
||||
why_parts.append(
|
||||
f"CRX matches {crx_match}/{len(sequences)} sequences, "
|
||||
f"iDRegEx matches {idr_match}/{len(sequences)}."
|
||||
)
|
||||
match_strs = []
|
||||
for r_algo, r_grammar, _ in results:
|
||||
if r_grammar and r_grammar != '∅':
|
||||
m = sum(1 for s in sequences if _matches(r_grammar, s))
|
||||
match_strs.append(f"{r_algo}={m}/{len(sequences)}")
|
||||
if match_strs:
|
||||
why_parts.append(f"Match rates: {', '.join(match_strs)}.")
|
||||
|
||||
why_parts.append(
|
||||
f"{best[0]} selected (MDL score {best[2]:.1f})."
|
||||
)
|
||||
why_parts.append(f"{best[0]} selected (MDL score {best[2]:.1f}).")
|
||||
|
||||
return {
|
||||
result = {
|
||||
'best': {
|
||||
'algorithm': best[0],
|
||||
'grammar': best[1],
|
||||
|
|
@ -347,3 +506,16 @@ def infer_ensemble(sequences, kmax=2, N=3, prefer=None):
|
|||
'all': all_results,
|
||||
'why': ' '.join(why_parts),
|
||||
}
|
||||
|
||||
# Core analysis when min_coverage < 1.0
|
||||
if min_coverage < 1.0:
|
||||
core_g, core_seqs, outliers, _ = _find_core(sequences, min_coverage)
|
||||
result['core'] = {
|
||||
'grammar': core_g,
|
||||
'coverage': round(len(core_seqs) / max(len(sequences), 1), 2) if sequences else 0,
|
||||
'outlier_count': len(outliers),
|
||||
'outliers': outliers,
|
||||
}
|
||||
result['why'] += f' Core CRX ({min_coverage:.0%} coverage, {len(outliers)} outliers): {core_g}'
|
||||
|
||||
return result
|
||||
|
|
|
|||
456
bex/kore.py
456
bex/kore.py
|
|
@ -1,432 +1,104 @@
|
|||
"""
|
||||
kore — k-ORE Inference (iDRegEx) nach Bex et al. 2008/2010.
|
||||
kOREInference — Algorithm 4: iDRegEx (arXiv 1004.2372).
|
||||
|
||||
iDRegEx (Bex 2008):
|
||||
1. Prefix-Tree Automaton (PTA) aus Beispielsequenzen
|
||||
2. Shrink: Rewrite-Regeln generalisieren den Automaten
|
||||
(simplify → star_rewrite → concat_rewrite → alternation_rewrite)
|
||||
3. Repair: Stelle Determinismus nach jedem Rewrite-Durchlauf wieder her
|
||||
4. Convert: Überführe den Automaten in einen regulären Ausdruck
|
||||
(State-Elimination nach Brzozowski & McCluskey)
|
||||
5. k-ORE Prüfung: Der Ausdruck muss die k-Occurrence-Bedingung erfüllen
|
||||
(jedes Symbol maximal k-mal nennenswert)
|
||||
6. MDL: Wähle k mit minimalem MDL-Score
|
||||
Implements the full iDRegEx pipeline:
|
||||
1. For k = 1..kmax, for n = 1..N:
|
||||
a. iKoa (Algorithm 1) — build a deterministic k-OA from S
|
||||
b. rwr² (Algorithm 3) — translate k-OA to k-ORE expression
|
||||
c. Validate determinism and k-occurrence
|
||||
2. Score all valid candidates by MDL (model cost + data cost)
|
||||
3. Return the best k-ORE
|
||||
|
||||
Unlike the PTA→Shrink→Repair approach from Bex 2008, this follows
|
||||
the journal paper (arXiv 1004.2372) exactly.
|
||||
"""
|
||||
|
||||
from .automaton import Automaton
|
||||
from .pta import build_pta
|
||||
from .shrink import shrink
|
||||
from .repair import repair
|
||||
from .ikoa import ikoa
|
||||
from .rwrsq import rwr_sq
|
||||
from .idregex import is_deterministic
|
||||
from .mdl import mdl_score
|
||||
|
||||
|
||||
def _state_elimination(G):
|
||||
def validate_k_ore(expr, k, alphabet_set=None):
|
||||
"""
|
||||
State Elimination nach Brzozowski & McCluskey.
|
||||
Check if a k-ORE satisfies the k-occurrence condition.
|
||||
|
||||
Entfernt nacheinander alle Nicht-Start/Accept-Zustände.
|
||||
Für jeden eliminierten Zustand q:
|
||||
- Für jedes Paar (p, r) mit p→q (Label A) und q→r (Label B):
|
||||
- R_self_q = disjunktion aller Selbst-Schleifen auf q
|
||||
- Neues Label = A · (R_self_q)* · B
|
||||
- Füge Kante p → r mit dem neuen Label hinzu (oder merge mit existierender)
|
||||
The k-occurrence condition: for every subexpression (r|s),
|
||||
each alphabet symbol appears at most k times across all
|
||||
alternatives combined.
|
||||
|
||||
Nach Elimination: Nur Start- und Accept-Zustände bleiben.
|
||||
Der Ausdruck ist: summe aller Pfade von Start zu Accept.
|
||||
"""
|
||||
G = G.copy()
|
||||
eliminated = set()
|
||||
|
||||
# Wiederhole bis nur Start + Accepts übrig sind
|
||||
changed = True
|
||||
while changed:
|
||||
changed = False
|
||||
# Wähle einen Zustand zur Elimination (nicht Start, nicht Accept)
|
||||
for q in list(G.nodes):
|
||||
if q == G.start or q in G.accepts:
|
||||
continue
|
||||
if q in eliminated:
|
||||
continue
|
||||
|
||||
reachable = _is_reachable_to_accept(G, q)
|
||||
if not reachable:
|
||||
G.nodes.discard(q)
|
||||
G.accepts.discard(q)
|
||||
G.edges = [e for e in G.edges if e['from'] != q and e['to'] != q]
|
||||
eliminated.add(q)
|
||||
changed = True
|
||||
continue
|
||||
|
||||
incoming = G.incoming(q)
|
||||
outgoing = G.outgoing(q)
|
||||
|
||||
# R_self_q = (a1 | a2 | ...)* für alle Selbst-Schleifen auf q
|
||||
self_loops = [e for e in outgoing if e['to'] == q]
|
||||
outgoing_no_self = [e for e in outgoing if e['to'] != q]
|
||||
|
||||
if not outgoing_no_self:
|
||||
# Sackgasse, keine Outgoing-Kanten (außer self-loop)
|
||||
# Entferne eingehende Kanten + q
|
||||
for e in incoming:
|
||||
G.remove_edge(e['from'], e['to'], e['label'])
|
||||
G.nodes.discard(q)
|
||||
G.accepts.discard(q)
|
||||
eliminated.add(q)
|
||||
changed = True
|
||||
continue
|
||||
|
||||
if self_loops:
|
||||
self_labels = list(set(e['label'] for e in self_loops))
|
||||
if len(self_labels) == 1:
|
||||
R_self_q = f"({self_labels[0]})*"
|
||||
else:
|
||||
R_self_q = f"({'|'.join(self_labels)})*"
|
||||
else:
|
||||
R_self_q = ""
|
||||
|
||||
# Für jedes Paar (p, r): p→q (incoming), q→r (outgoing, r != q)
|
||||
for e_in in incoming:
|
||||
p = e_in['from']
|
||||
if p == q:
|
||||
continue
|
||||
A = e_in['label']
|
||||
|
||||
for e_out in outgoing_no_self:
|
||||
r = e_out['to']
|
||||
B = e_out['label']
|
||||
|
||||
if R_self_q:
|
||||
new_label = f"({A}.{R_self_q}.{B})"
|
||||
else:
|
||||
new_label = f"({A}.{B})"
|
||||
|
||||
# Merge mit existierender Kante p→r wenn vorhanden
|
||||
existing = [e for e in G.edges if e['from'] == p and e['to'] == r]
|
||||
existing_labels = [e['label'] for e in existing]
|
||||
|
||||
if new_label not in existing_labels and f"({new_label})" not in existing_labels:
|
||||
# Vereinige mit existierenden Labels via |
|
||||
if existing:
|
||||
old_label = existing[0]['label']
|
||||
merged = f"({old_label}|{new_label})"
|
||||
G.remove_edge(p, r, old_label)
|
||||
G.add_edge(p, r, merged)
|
||||
else:
|
||||
G.add_edge(p, r, new_label)
|
||||
|
||||
# Lösche q und alle seine Kanten
|
||||
for e in incoming:
|
||||
G.remove_edge(e['from'], e['to'], e['label'])
|
||||
for e in self_loops:
|
||||
G.remove_edge(e['from'], e['to'], e['label'])
|
||||
for e in outgoing_no_self:
|
||||
G.remove_edge(e['from'], e['to'], e['label'])
|
||||
|
||||
G.nodes.discard(q)
|
||||
G.accepts.discard(q)
|
||||
eliminated.add(q)
|
||||
changed = True
|
||||
break
|
||||
|
||||
return G
|
||||
|
||||
|
||||
def _is_reachable_to_accept(G, q):
|
||||
"""Prüft ob von q aus ein Accept-Zustand erreichbar ist."""
|
||||
visited = set()
|
||||
stack = [q]
|
||||
while stack:
|
||||
n = stack.pop()
|
||||
if n in visited:
|
||||
continue
|
||||
visited.add(n)
|
||||
if n in G.accepts:
|
||||
return True
|
||||
for e in G.outgoing(n):
|
||||
stack.append(e['to'])
|
||||
return False
|
||||
|
||||
|
||||
def _extract_expression(G):
|
||||
"""
|
||||
Extrahiert den regulären Ausdruck aus dem eliminierten Automaten.
|
||||
Nach Elimination gibt es nur Startzustand und Accept-Zustände.
|
||||
Der Ausdruck ist die Disjunktion aller Pfade von Start zu Accept.
|
||||
"""
|
||||
if G.start is None:
|
||||
return "∅"
|
||||
|
||||
# Phase 1: State Elimination
|
||||
G_elim = _state_elimination(G)
|
||||
start = G_elim.start
|
||||
|
||||
if not G_elim.accepts:
|
||||
return "∅"
|
||||
|
||||
paths = []
|
||||
outgoing = G_elim.outgoing(start)
|
||||
|
||||
# Spezialfall: Start ist selbst Accept
|
||||
if start in G_elim.accepts:
|
||||
# Prüfe auf Selbst-Schleife
|
||||
self_edges = [e for e in outgoing if e['to'] == start]
|
||||
non_self = [e for e in outgoing if e['to'] != start]
|
||||
|
||||
if not non_self and not self_edges:
|
||||
return "ε"
|
||||
|
||||
if self_edges:
|
||||
self_labels = '|'.join(set(e['label'] for e in self_edges))
|
||||
paths.append(f"({self_labels})*")
|
||||
|
||||
# Außer Start → Accept → andere Accepts
|
||||
for e in non_self:
|
||||
target = e['to']
|
||||
if target in G_elim.accepts:
|
||||
paths.append(e['label'])
|
||||
|
||||
# Pfade von Start zu Accept-Zuständen
|
||||
for acc in G_elim.accepts:
|
||||
if acc == start:
|
||||
continue
|
||||
# Kante start → acc
|
||||
direct = [e for e in outgoing if e['to'] == acc]
|
||||
for e in direct:
|
||||
paths.append(e['label'])
|
||||
|
||||
self_loops_start = [e for e in G_elim.outgoing(start) if e['to'] == start]
|
||||
|
||||
# Weitere Kanten: start → x (wo x != accept)
|
||||
intermediate = [e for e in outgoing if e['to'] not in G_elim.accepts and e['to'] != start]
|
||||
for e in intermediate:
|
||||
# Folge Pfad von intermediate zu accept
|
||||
suffix = _follow_path(G_elim, e['to'], G_elim.accepts, set())
|
||||
if suffix:
|
||||
paths.append(f"({e['label']}.{suffix})")
|
||||
|
||||
# Entferne Duplikate
|
||||
paths = list(set(paths))
|
||||
|
||||
if not paths:
|
||||
return "ε"
|
||||
|
||||
if len(paths) == 1:
|
||||
expr = paths[0]
|
||||
else:
|
||||
expr = f"({'|'.join(paths)})"
|
||||
|
||||
# Vereinfache: Entferne überflüssige Klammern
|
||||
expr = _simplify_expression(expr)
|
||||
|
||||
return expr
|
||||
|
||||
|
||||
def _follow_path(G, start, accepts, visited):
|
||||
"""Findet den Pfad von start zu einem Accept."""
|
||||
if start in accepts:
|
||||
return "ε"
|
||||
if start in visited:
|
||||
return None
|
||||
visited.add(start)
|
||||
|
||||
outgoing = G.outgoing(start)
|
||||
for e in outgoing:
|
||||
if e['to'] == start:
|
||||
continue
|
||||
suffix = _follow_path(G, e['to'], accepts, visited)
|
||||
if suffix is not None:
|
||||
if suffix == "ε":
|
||||
return e['label']
|
||||
else:
|
||||
return f"({e['label']}.{suffix})"
|
||||
return None
|
||||
|
||||
|
||||
def _simplify_expression(expr):
|
||||
"""
|
||||
Vereinfacht einen regulären Ausdruck.
|
||||
Entfernt überflüssige Klammern, doppelte Operatoren, etc.
|
||||
"""
|
||||
if not expr or expr in ('ε', '∅'):
|
||||
return expr
|
||||
|
||||
# (ε. X ) → X
|
||||
# (X . ε) → X
|
||||
# ((X)) → X
|
||||
# (a|a) → a
|
||||
|
||||
simplified = expr
|
||||
|
||||
while True:
|
||||
prev = simplified
|
||||
simplified = _simplify_once(simplified)
|
||||
if simplified == prev:
|
||||
break
|
||||
|
||||
return simplified
|
||||
|
||||
|
||||
def _simplify_once(expr):
|
||||
"""Ein Reduktionsschritt."""
|
||||
# (ε.X) → X
|
||||
# (X.ε) → X
|
||||
# ((X)) → X
|
||||
# (a|a) → a
|
||||
|
||||
result = expr
|
||||
|
||||
# ((X)) → X (doppelte Klammern)
|
||||
import re
|
||||
result = re.sub(r'$$\(([^()]+)\)$$', r'(\1)', result)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def validate_k_ore(expr, k_index):
|
||||
"""
|
||||
Prüft ob ein Ausdruck die k-Occurrence-Bedingung erfüllt.
|
||||
Ein k-ORE erlaubt jedes Symbol maximal einmal pro k-Indikator,
|
||||
d.h. in jedem Konjunkt (Teilausdruck ohne |) darf jedes Symbol
|
||||
höchstens k-mal vorkommen.
|
||||
|
||||
Vereinfacht: Zähle Vorkommen jedes eindeutigen Token-Namens
|
||||
im Ausdruck. Wenn ein Token mehr als k-mal vorkommt, ist
|
||||
die Bedingung verletzt.
|
||||
Simplified implementation: count raw alphabet symbol
|
||||
occurrences in the expression string. A symbol appearing
|
||||
more than k times violates the condition.
|
||||
|
||||
Returns:
|
||||
bool, str: (erfüllt, Grund)
|
||||
(bool, str): (passes, explanation)
|
||||
"""
|
||||
# Extrahiere alle Token-Namen aus dem Ausdruck
|
||||
tokens = set()
|
||||
for c in '*+?()|.':
|
||||
pass
|
||||
if not expr or expr in ('∅', 'ε'):
|
||||
return True, "OK"
|
||||
|
||||
token_names = set()
|
||||
i = 0
|
||||
while i < len(expr):
|
||||
if expr[i].isalnum() or expr[i] in '/_-':
|
||||
j = i
|
||||
while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'):
|
||||
j += 1
|
||||
token_names.add(expr[i:j])
|
||||
i = j
|
||||
else:
|
||||
i += 1
|
||||
from .expr import alphabet
|
||||
syms = alphabet_set or alphabet(expr)
|
||||
|
||||
# Zähle Vorkommen
|
||||
token_counts = {}
|
||||
i = 0
|
||||
while i < len(expr):
|
||||
if expr[i].isalnum() or expr[i] in '/_-':
|
||||
j = i
|
||||
while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'):
|
||||
j += 1
|
||||
token = expr[i:j]
|
||||
token_counts[token] = token_counts.get(token, 0) + 1
|
||||
i = j
|
||||
else:
|
||||
i += 1
|
||||
counts = {}
|
||||
for sym in syms:
|
||||
import re
|
||||
count = len(re.findall(rf'(?<![a-zA-Z_/]){re.escape(sym)}(?![a-zA-Z_/])', expr))
|
||||
if count > 0:
|
||||
counts[sym] = count
|
||||
|
||||
violations = [t for t, c in token_counts.items() if c > k_index]
|
||||
violations = [f"{s}:{c}" for s, c in sorted(counts.items()) if c > k]
|
||||
if violations:
|
||||
return False, f"Token {violations} erscheint > {k_index}-mal"
|
||||
return False, f"k={k} violations: {', '.join(violations)}"
|
||||
return True, "OK"
|
||||
|
||||
|
||||
class kOREInference:
|
||||
"""
|
||||
iDRegEx: k-ORE Inferenz via PTA → Shrink → Repair → Expression.
|
||||
|———— Algorithm 4: iDRegEx ————|
|
||||
Require: sample S, kmax
|
||||
Ensure: k-ORE r
|
||||
|
||||
Nach Bex et al. 2008:
|
||||
- Baue PTA aus Sequenzen
|
||||
- Shrink: Rewrite-Regeln generalisieren
|
||||
- Repair: Stelle Determinismus wieder her
|
||||
- Convert: Extrahiere regulären Ausdruck via State Elimination
|
||||
- Prüfe k-Occurrence
|
||||
- Wähle k mit MDL
|
||||
1: C ← ∅
|
||||
2: for k = 1 to kmax do
|
||||
3: for n = 1 to N do
|
||||
4: G ← iKoa(S, k)
|
||||
5: if rwr²(G) is deterministic then
|
||||
6: add rwr²(G) to C
|
||||
7: return best(C) by MDL
|
||||
"""
|
||||
|
||||
def __init__(self, k_max=5):
|
||||
def __init__(self, k_max=5, N=5):
|
||||
self.k_max = k_max
|
||||
self.N = N
|
||||
|
||||
def infer(self, sequences):
|
||||
"""
|
||||
Inferiere den besten k-ORE.
|
||||
Infer the best k-ORE for the given sequences.
|
||||
|
||||
Returns:
|
||||
(Automaton, expression_string, best_k) oder None
|
||||
(koa_automaton, expression_string, best_k) or None if no valid
|
||||
k-ORE can be inferred.
|
||||
"""
|
||||
sequences = [s for s in sequences if s]
|
||||
if not sequences:
|
||||
return None, "∅", 0
|
||||
return None
|
||||
|
||||
best_score = float('inf')
|
||||
best_result = None
|
||||
candidates = []
|
||||
|
||||
for k in range(1, self.k_max + 1):
|
||||
try:
|
||||
auto, expr = self._infer_k_expression(sequences, k)
|
||||
if auto is None:
|
||||
for _ in range(self.N):
|
||||
G = ikoa(sequences, k, num_trials=1)
|
||||
if G is None:
|
||||
continue
|
||||
score = mdl_score(auto, sequences)
|
||||
if score < best_score:
|
||||
best_score = score
|
||||
best_result = (auto, expr, k)
|
||||
except Exception:
|
||||
continue
|
||||
expr = rwr_sq(G)
|
||||
if expr and expr not in ('∅', 'ε'):
|
||||
if is_deterministic(expr):
|
||||
valid, _ = validate_k_ore(expr, k)
|
||||
if valid:
|
||||
candidates.append((G, expr, k))
|
||||
|
||||
return best_result
|
||||
if not candidates:
|
||||
return None
|
||||
|
||||
def _infer_k_expression(self, sequences, k):
|
||||
"""Führe iDRegEx für ein spezifisches k durch."""
|
||||
# 1. PTA bauen
|
||||
pta = build_pta(sequences)
|
||||
|
||||
# 2. Shrink
|
||||
shrunk = shrink(pta, max_iterations=20)
|
||||
|
||||
# 3. Repair
|
||||
repaired = repair(shrunk)
|
||||
|
||||
# 4. Expression extrahieren
|
||||
expr = _extract_expression(repaired)
|
||||
|
||||
# 5. k-ORE Prüfung
|
||||
valid, _ = validate_k_ore(expr, k)
|
||||
if not valid:
|
||||
expr = self._generalize_to_k_ore(expr, k)
|
||||
|
||||
return repaired, expr
|
||||
|
||||
def _generalize_to_k_ore(self, expr, k):
|
||||
"""
|
||||
Generalisiere den Ausdruck zur k-ORE.
|
||||
|
||||
Wenn Token t mehr als k-mal vorkommt:
|
||||
- Ersetze Wiederholungen durch t+ oder t*
|
||||
"""
|
||||
# Einfache Heuristik: Extrahiere Token, zähle, ersetze
|
||||
result = expr
|
||||
token_counts = {}
|
||||
i = 0
|
||||
while i < len(result):
|
||||
if result[i].isalnum() or result[i] in '/_-':
|
||||
j = i
|
||||
while j < len(result) and (result[j].isalnum() or result[j] in '/_-'):
|
||||
j += 1
|
||||
token = result[i:j]
|
||||
token_counts[token] = token_counts.get(token, 0) + 1
|
||||
i = j
|
||||
else:
|
||||
i += 1
|
||||
|
||||
for token, count in token_counts.items():
|
||||
if count > k:
|
||||
# Ersetze token.token durch token+
|
||||
import re
|
||||
pattern = re.escape(token) + r'\..' + re.escape(token)
|
||||
replacement = f"{token}+"
|
||||
result = re.sub(pattern, replacement, result, count=1)
|
||||
break
|
||||
|
||||
return result
|
||||
return min(candidates, key=lambda c: mdl_score(c[1], sequences))
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ def infer_best_grammar(
|
|||
prefer: str = "",
|
||||
kmax: int = 2,
|
||||
N: int = 3,
|
||||
min_coverage: float = 1.0,
|
||||
) -> str:
|
||||
"""Infer a compact grammar from example sequences. Use this when you
|
||||
have examples of sequential data and want to learn the pattern.
|
||||
|
|
@ -29,19 +30,26 @@ def infer_best_grammar(
|
|||
sequences: List of sequences, each a list of strings (symbols in
|
||||
the order they appear). Example: [["file","copy","command"],
|
||||
["file","template","command"]].
|
||||
prefer: Optional — 'crx' for full coverage (accepts all examples),
|
||||
'idregex' for minimal core (only what every example shares).
|
||||
Default: runs both and picks best by MDL score.
|
||||
kmax: Maximum k for iDRegEx k-ORE inference.
|
||||
N: Number of EM iterations for iDRegEx.
|
||||
prefer: Optional — 'crx' for full vocabulary (accepts all examples),
|
||||
'idregex' for deterministic minimal core. Omit to auto-pick by MDL.
|
||||
kmax: Context depth for k-ORE inference. Default 2.
|
||||
N: Random trials for k-ORE inference (higher = better, slower).
|
||||
min_coverage: (Expert) When < 1.0, also runs a **core+outlier analysis**:
|
||||
iteratively removes outlier sequences (those with rarest symbols)
|
||||
until at least this fraction remain. Returns the core grammar
|
||||
for the majority, plus a list of which sequences were removed and why.
|
||||
Default 1.0 = no core analysis. Set to 0.8 to find the tight
|
||||
pattern shared by ~80% of examples while flagging the other ~20%
|
||||
as variations.
|
||||
|
||||
Returns:
|
||||
A formatted string with the best grammar, scores, and explanation.
|
||||
When min_coverage < 1.0, includes the core grammar and outlier info.
|
||||
Grammar notation: a.b = a then b, (a+b) = a or b, r? = optional,
|
||||
r+ = one or more, r+? = zero or more.
|
||||
"""
|
||||
pref = prefer if prefer else None
|
||||
result = infer_ensemble(sequences, kmax=kmax, N=N, prefer=pref)
|
||||
result = infer_ensemble(sequences, kmax=kmax, N=N, prefer=pref, min_coverage=min_coverage)
|
||||
if result['best'] is None:
|
||||
return f"No grammar found. {result['why']}"
|
||||
lines = [f"Best: {result['best']['algorithm']} (MDL {result['best']['mdl_score']})",
|
||||
|
|
@ -53,6 +61,13 @@ def infer_best_grammar(
|
|||
lines.append(f" {r['algorithm']:10s} MDL={r['mdl_score']:>8.2f} match={m}/{len(sequences)}")
|
||||
lines.append("")
|
||||
lines.append(f"Why: {result['why']}")
|
||||
if 'core' in result and result['core']:
|
||||
c = result['core']
|
||||
lines.append(f"\nCore CRX ({c['coverage']:.0%} coverage, {c['outlier_count']} outliers): {c['grammar']}")
|
||||
if c['outliers']:
|
||||
lines.append(f" Outlier sequences:")
|
||||
for i, o in enumerate(c['outliers'], 1):
|
||||
lines.append(f" {i}. {' → '.join(str(x) for x in o[:8])}{'...' if len(o) > 8 else ''}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
|
|
|
|||
Binary file not shown.
|
Before Width: | Height: | Size: 137 KiB After Width: | Height: | Size: 124 KiB |
265
tests/test_ensemble.py
Normal file
265
tests/test_ensemble.py
Normal file
|
|
@ -0,0 +1,265 @@
|
|||
"""Tests for infer_ensemble — runs CRX, iDRegEx, and kOREInference, picks best by MDL."""
|
||||
|
||||
from bex.ensemble import infer_ensemble
|
||||
from bex.idregex import is_deterministic
|
||||
from bex.kore import kOREInference
|
||||
|
||||
|
||||
# ── Basic ensemble runs ──
|
||||
|
||||
def test_ensemble_returns_dict():
|
||||
seqs = [['a', 'b', 'c'], ['a', 'b', 'c', 'd']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
assert isinstance(result, dict)
|
||||
assert 'best' in result
|
||||
assert 'all' in result
|
||||
assert 'why' in result
|
||||
|
||||
|
||||
def test_ensemble_best_not_none():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
assert result['best'] is not None
|
||||
assert result['best']['grammar'] is not None
|
||||
assert result['best']['algorithm'] in ('CRX', 'iDRegEx', 'kOREInference')
|
||||
assert result['best']['mdl_score'] is not None
|
||||
|
||||
|
||||
def test_ensemble_runs_all_three():
|
||||
seqs = [['a', 'b', 'c'], ['a', 'b', 'c', 'd']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
algos = {a['algorithm'] for a in result['all']}
|
||||
assert 'CRX' in algos
|
||||
# iDRegEx and kOREInference may fail stochastically, so at least CRX
|
||||
assert len(result['all']) >= 1
|
||||
|
||||
|
||||
def test_ensemble_all_results_have_scores():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'b']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
for entry in result['all']:
|
||||
assert 'algorithm' in entry
|
||||
assert 'grammar' in entry
|
||||
assert 'mdl_score' in entry
|
||||
assert isinstance(entry['mdl_score'], (int, float))
|
||||
|
||||
|
||||
def test_ensemble_deterministic_results():
|
||||
seqs = [['x', 'y'], ['x', 'z']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
if result['best']:
|
||||
assert is_deterministic(result['best']['grammar'])
|
||||
|
||||
|
||||
# ── prefer parameter tests ──
|
||||
|
||||
def test_prefer_crx():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, prefer='crx')
|
||||
assert result['best']['algorithm'] == 'CRX'
|
||||
assert len(result['all']) == 1
|
||||
|
||||
|
||||
def test_prefer_idregex():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, prefer='idregex', kmax=2, N=5)
|
||||
assert result['best']['algorithm'] == 'iDRegEx'
|
||||
assert len(result['all']) == 1
|
||||
|
||||
|
||||
def test_prefer_koreinference():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, prefer='koreinference', kmax=2, N=5)
|
||||
assert result['best']['algorithm'] == 'kOREInference'
|
||||
assert len(result['all']) == 1
|
||||
|
||||
|
||||
def test_prefer_case_insensitive():
|
||||
seqs = [['a', 'b']]
|
||||
r1 = infer_ensemble(seqs, prefer='CRX')
|
||||
r2 = infer_ensemble(seqs, prefer='Crx')
|
||||
assert r1['best']['algorithm'] == r2['best']['algorithm']
|
||||
|
||||
|
||||
def test_prefer_unknown_falls_back():
|
||||
seqs = [['a', 'b']]
|
||||
result = infer_ensemble(seqs, prefer='unknown')
|
||||
assert result['best'] is not None
|
||||
assert len(result['all']) >= 1
|
||||
|
||||
|
||||
# ── Edge cases ──
|
||||
|
||||
def test_ensemble_empty_input():
|
||||
result = infer_ensemble([], kmax=2, N=3)
|
||||
assert result['best'] is None or result['best']['grammar'] is not None
|
||||
|
||||
|
||||
def test_ensemble_single_sequence():
|
||||
result = infer_ensemble([['a', 'b', 'c']], kmax=2, N=3)
|
||||
assert result['best'] is not None
|
||||
assert result['best']['grammar'] is not None
|
||||
|
||||
|
||||
def test_ensemble_many_identical():
|
||||
seqs = [['a', 'b', 'c']] * 10
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
assert result['best'] is not None
|
||||
|
||||
|
||||
def test_ensemble_linear_data():
|
||||
seqs = [
|
||||
['file', 'template', 'command', 'set_fact', 'shell'],
|
||||
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
|
||||
]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
if result['best']:
|
||||
g = result['best']['grammar']
|
||||
assert 'file' in g and 'template' in g and 'shell' in g
|
||||
|
||||
|
||||
def test_ensemble_branching_data():
|
||||
seqs = [
|
||||
['file', 'template', 'setup', 'shell'],
|
||||
['file', 'template', 'deploy', 'shell'],
|
||||
]
|
||||
result = infer_ensemble(seqs, kmax=2, N=5)
|
||||
if result['best']:
|
||||
g = result['best']['grammar']
|
||||
assert is_deterministic(g)
|
||||
assert 'file' in g and 'template' in g and 'shell' in g
|
||||
|
||||
|
||||
def test_ensemble_why_includes_scores():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
assert 'CRX' in result['why']
|
||||
assert 'selected' in result['why']
|
||||
assert 'MDL' in result['why'] or 'score' in result['why'].lower()
|
||||
|
||||
|
||||
def test_ensemble_ordering_best_first():
|
||||
seqs = [['a', 'b', 'c'], ['a', 'b']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
if result['best']:
|
||||
assert result['all'][0]['algorithm'] == result['best']['algorithm']
|
||||
assert result['all'][0]['mdl_score'] <= result['all'][-1]['mdl_score']
|
||||
|
||||
|
||||
# ── Stochastic stability tests ──
|
||||
|
||||
def test_ensemble_stable_on_simple_data():
|
||||
for _ in range(3):
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
if result['best']:
|
||||
assert 'a' in result['best']['grammar']
|
||||
assert 'b' in result['best']['grammar']
|
||||
|
||||
|
||||
def test_ensemble_crx_always_present():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, kmax=2, N=3)
|
||||
crx_results = [a for a in result['all'] if a['algorithm'] == 'CRX']
|
||||
assert len(crx_results) == 1
|
||||
|
||||
|
||||
# ── min_coverage / core analysis tests ──
|
||||
|
||||
def test_core_not_included_when_coverage_1():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, min_coverage=1.0)
|
||||
assert 'core' not in result
|
||||
|
||||
|
||||
def test_core_included_when_coverage_lt_1():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
result = infer_ensemble(seqs, min_coverage=0.8)
|
||||
assert 'core' in result
|
||||
assert 'grammar' in result['core']
|
||||
assert 'coverage' in result['core']
|
||||
assert 'outliers' in result['core']
|
||||
assert 'outlier_count' in result['core']
|
||||
|
||||
|
||||
def test_core_outlier_detection():
|
||||
seqs = [
|
||||
['fail', 'package', 'file', 'service'],
|
||||
['fail', 'package', 'file', 'service'],
|
||||
['fail', 'package', 'file', 'service', 'npm'],
|
||||
['fail', 'package', 'file', 'service', 'npm', 'pip'],
|
||||
]
|
||||
result = infer_ensemble(seqs, min_coverage=0.7)
|
||||
assert 'core' in result
|
||||
c = result['core']
|
||||
assert c['outlier_count'] >= 1
|
||||
assert 'npm' in c['grammar'] or 'service' in c['grammar']
|
||||
|
||||
|
||||
def test_core_all_identical():
|
||||
seqs = [['a', 'b', 'c']] * 10
|
||||
result = infer_ensemble(seqs, min_coverage=0.8)
|
||||
assert 'core' in result
|
||||
assert result['core']['outlier_count'] == 0
|
||||
assert 'a' in result['core']['grammar']
|
||||
|
||||
|
||||
def test_core_coverage_ratio():
|
||||
seqs = [
|
||||
['a', 'b', 'c'],
|
||||
['a', 'b', 'c'],
|
||||
['a', 'b', 'c', 'd'],
|
||||
['a', 'b', 'c', 'd', 'e'],
|
||||
]
|
||||
result = infer_ensemble(seqs, min_coverage=0.7)
|
||||
if 'core' in result:
|
||||
c = result['core']
|
||||
assert c['outlier_count'] >= 1
|
||||
assert len(c['outliers']) >= 1
|
||||
assert c['coverage'] >= 0.5
|
||||
|
||||
|
||||
def test_core_empty_sequences():
|
||||
result = infer_ensemble([], min_coverage=0.8)
|
||||
assert 'core' in result
|
||||
assert result['core']['grammar'] is not None
|
||||
|
||||
|
||||
def run_all():
|
||||
tests = [
|
||||
test_ensemble_returns_dict,
|
||||
test_ensemble_best_not_none,
|
||||
test_ensemble_runs_all_three,
|
||||
test_ensemble_all_results_have_scores,
|
||||
test_ensemble_deterministic_results,
|
||||
test_prefer_crx,
|
||||
test_prefer_idregex,
|
||||
test_prefer_koreinference,
|
||||
test_prefer_case_insensitive,
|
||||
test_prefer_unknown_falls_back,
|
||||
test_ensemble_empty_input,
|
||||
test_ensemble_single_sequence,
|
||||
test_ensemble_many_identical,
|
||||
test_ensemble_linear_data,
|
||||
test_ensemble_branching_data,
|
||||
test_ensemble_why_includes_scores,
|
||||
test_ensemble_ordering_best_first,
|
||||
test_ensemble_stable_on_simple_data,
|
||||
test_ensemble_crx_always_present,
|
||||
]
|
||||
passed = 0
|
||||
failed = 0
|
||||
for t in tests:
|
||||
try:
|
||||
t()
|
||||
passed += 1
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f" FAIL {t.__name__}: {e}")
|
||||
traceback.print_exc()
|
||||
failed += 1
|
||||
print(f"\n{passed} passed, {failed} failed")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run_all()
|
||||
375
tests/test_kore.py
Normal file
375
tests/test_kore.py
Normal file
|
|
@ -0,0 +1,375 @@
|
|||
"""Tests for kOREInference (Algorithm 4: iDRegEx from arXiv 1004.2372)."""
|
||||
|
||||
from bex.kore import kOREInference, validate_k_ore
|
||||
from bex.idregex import is_deterministic
|
||||
from bex.mdl import mdl_score, model_cost, data_cost
|
||||
|
||||
|
||||
# ── Core inference tests ──
|
||||
|
||||
def test_linear_sequence():
|
||||
seqs = [
|
||||
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
|
||||
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
|
||||
]
|
||||
kore = kOREInference(k_max=3, N=3)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None, "Should infer a k-ORE"
|
||||
auto, expr, best_k = result
|
||||
assert expr is not None
|
||||
assert all(t in expr for t in ['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'])
|
||||
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
|
||||
|
||||
|
||||
def test_branching_paths():
|
||||
seqs = [
|
||||
['file', 'template', 'setup', 'set_fact', 'shell'],
|
||||
['file', 'template', 'deploy', 'set_fact', 'shell'],
|
||||
]
|
||||
kore = kOREInference(k_max=3, N=3)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
|
||||
assert 'file' in expr and 'template' in expr and 'shell' in expr
|
||||
|
||||
|
||||
def test_optional_element():
|
||||
seqs = [
|
||||
['file', 'template', 'shell'],
|
||||
['file', 'template', 'exec', 'shell'],
|
||||
['file', 'template', 'exec', 'exec', 'shell'],
|
||||
]
|
||||
kore = kOREInference(k_max=4, N=15)
|
||||
result = kore.infer(seqs)
|
||||
if result is None:
|
||||
return # stochastic failure
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
|
||||
|
||||
|
||||
def test_looping_element():
|
||||
seqs = [
|
||||
['package', 'file', 'template', 'systemd'],
|
||||
['package', 'file', 'template', 'template', 'systemd', 'systemd'],
|
||||
['package', 'file', 'template', 'template', 'template', 'systemd'],
|
||||
]
|
||||
kore = kOREInference(k_max=3, N=5)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
|
||||
|
||||
|
||||
def test_multiple_alternatives():
|
||||
seqs = [
|
||||
['install', 'configure', 'start'],
|
||||
['install', 'configure', 'enable'],
|
||||
['install', 'configure', 'restart'],
|
||||
]
|
||||
kore = kOREInference(k_max=3, N=5)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
|
||||
|
||||
|
||||
def test_rejects_non_deterministic():
|
||||
seqs = [['a'], ['a']]
|
||||
kore = kOREInference(k_max=2, N=2)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr), f"Non-deterministic: {expr}"
|
||||
|
||||
|
||||
def test_empty_input():
|
||||
kore = kOREInference(k_max=2, N=2)
|
||||
result = kore.infer([])
|
||||
assert result is None
|
||||
result = kore.infer([[], []])
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_single_element_sequences():
|
||||
seqs = [['a'], ['b'], ['a'], ['b']]
|
||||
kore = kOREInference(k_max=2, N=3)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
|
||||
|
||||
def test_infer_returns_best_k():
|
||||
seqs = [
|
||||
['a', 'b', 'c'],
|
||||
['a', 'b', 'c', 'd'],
|
||||
['a', 'b', 'd'],
|
||||
]
|
||||
kore = kOREInference(k_max=4, N=3)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert 1 <= best_k <= 4
|
||||
assert is_deterministic(expr)
|
||||
|
||||
|
||||
def test_tool_sequences():
|
||||
seqs = [
|
||||
['read', 'grep', 'read'],
|
||||
['read', 'glob', 'grep', 'read'],
|
||||
['read', 'bash', 'read'],
|
||||
['glob', 'grep', 'read', 'edit', 'bash'],
|
||||
['read', 'edit', 'bash', 'bash'],
|
||||
['bash', 'read', 'bash'],
|
||||
]
|
||||
kore = kOREInference(k_max=3, N=5)
|
||||
result = kore.infer(seqs)
|
||||
if result is not None:
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
|
||||
|
||||
# ── Edge case tests ──
|
||||
|
||||
def test_single_sequence():
|
||||
kore = kOREInference(k_max=2, N=3)
|
||||
result = kore.infer([['a', 'b', 'c']])
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
|
||||
|
||||
def test_many_identical_sequences():
|
||||
seqs = [['a', 'b', 'c']] * 20
|
||||
kore = kOREInference(k_max=2, N=3)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
assert 'a' in expr and 'b' in expr and 'c' in expr
|
||||
|
||||
|
||||
def test_xml_like_structured():
|
||||
seqs = [
|
||||
['header', 'body', 'footer'],
|
||||
['header', 'body', 'body', 'footer'],
|
||||
['header', 'body', 'body', 'body', 'footer'],
|
||||
['header', 'footer'],
|
||||
]
|
||||
kore = kOREInference(k_max=3, N=10)
|
||||
result = kore.infer(seqs)
|
||||
if result is not None:
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
assert 'header' in expr and 'footer' in expr
|
||||
|
||||
|
||||
def test_disjoint_symbols():
|
||||
seqs = [
|
||||
['alpha', 'beta'],
|
||||
['gamma', 'delta'],
|
||||
]
|
||||
kore = kOREInference(k_max=2, N=3)
|
||||
result = kore.infer(seqs)
|
||||
if result is not None:
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
|
||||
|
||||
def test_k1_vs_k2_selection():
|
||||
seqs = [
|
||||
['a', 'a', 'b'],
|
||||
['a', 'b'],
|
||||
['a', 'a', 'a', 'b'],
|
||||
]
|
||||
kore = kOREInference(k_max=3, N=5)
|
||||
result = kore.infer(seqs)
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
|
||||
|
||||
def test_all_same_symbol():
|
||||
seqs = [
|
||||
['a', 'a'],
|
||||
['a', 'a', 'a'],
|
||||
['a'],
|
||||
]
|
||||
kore = kOREInference(k_max=2, N=5)
|
||||
result = kore.infer(seqs)
|
||||
if result is not None:
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
|
||||
|
||||
def test_long_sequence():
|
||||
seqs = [
|
||||
['a', 'b', 'c', 'd', 'e', 'f', 'g'],
|
||||
['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'],
|
||||
]
|
||||
kore = kOREInference(k_max=2, N=5)
|
||||
result = kore.infer(seqs)
|
||||
if result is not None:
|
||||
auto, expr, best_k = result
|
||||
assert is_deterministic(expr)
|
||||
|
||||
|
||||
def test_infer_returns_koa():
|
||||
kore = kOREInference(k_max=2, N=3)
|
||||
result = kore.infer([['a', 'b'], ['a', 'b', 'c']])
|
||||
assert result is not None
|
||||
auto, expr, best_k = result
|
||||
assert hasattr(auto, '_succ'), "Should return a KOA automaton"
|
||||
assert hasattr(auto, 'src')
|
||||
assert hasattr(auto, 'sink')
|
||||
|
||||
|
||||
def test_different_kmax():
|
||||
seqs = [['a', 'b', 'c', 'd', 'e'], ['a', 'b', 'c']]
|
||||
kore1 = kOREInference(k_max=1, N=5)
|
||||
kore2 = kOREInference(k_max=3, N=5)
|
||||
r1 = kore1.infer(seqs)
|
||||
r2 = kore2.infer(seqs)
|
||||
assert r1 is not None or r2 is not None
|
||||
|
||||
|
||||
# ── validate_k_ore tests ──
|
||||
|
||||
def test_validate_k_ore_basic():
|
||||
valid, reason = validate_k_ore('a.b.c', 2)
|
||||
assert valid, f"a.b.c should be valid for k=2: {reason}"
|
||||
|
||||
|
||||
def test_validate_k_ore_exceeds_k():
|
||||
valid, reason = validate_k_ore('a.a.a', 1)
|
||||
assert not valid, "a.a.a should fail for k=1"
|
||||
|
||||
|
||||
def test_validate_k_ore_epsilon():
|
||||
valid, reason = validate_k_ore('ε', 1)
|
||||
assert valid
|
||||
|
||||
|
||||
def test_validate_k_ore_empty():
|
||||
valid, reason = validate_k_ore('', 1)
|
||||
assert valid
|
||||
|
||||
|
||||
def test_validate_k_ore_disjunction():
|
||||
valid, reason = validate_k_ore('(a|b|c)', 2)
|
||||
assert valid, f"(a|b|c) should be valid for k=2: {reason}"
|
||||
|
||||
|
||||
def test_validate_k_ore_loop():
|
||||
valid, reason = validate_k_ore('a+', 1)
|
||||
assert valid, "a+ should be valid for k=1"
|
||||
|
||||
|
||||
def test_validate_k_ore_k0():
|
||||
valid, reason = validate_k_ore('a', 0)
|
||||
assert not valid, "a should fail for k=0"
|
||||
|
||||
|
||||
# ── MDL scoring tests ──
|
||||
|
||||
def test_mdl_model_cost():
|
||||
assert model_cost('a.b.c') == 3
|
||||
assert model_cost('(a|b)+.c') >= 2
|
||||
assert model_cost('ε') >= 0
|
||||
|
||||
|
||||
def test_mdl_data_cost():
|
||||
# General expression (a|b)+ has multiple words of length 1+: non-zero cost
|
||||
dc = data_cost('(a|b)+', [['a', 'b'], ['b', 'a'], ['a']])
|
||||
assert dc > 0, f"data_cost should be > 0 for general expression, got {dc}"
|
||||
# Exact expression has cost 0 (log2(1) = 0)
|
||||
dc_exact = data_cost('a.b.c', [['a', 'b', 'c']])
|
||||
assert dc_exact == 0.0, f"data_cost for exact match should be 0, got {dc_exact}"
|
||||
|
||||
|
||||
def test_mdl_score_lower_is_better():
|
||||
score_specific = mdl_score('a.b.c', [['a', 'b', 'c']])
|
||||
score_general = mdl_score('(a|b|c)+?', [['a', 'b', 'c']])
|
||||
assert score_specific > 0 and score_general > 0
|
||||
|
||||
|
||||
def test_mdl_empty_sequences():
|
||||
score = mdl_score('a.b.c', [])
|
||||
assert score == model_cost('a.b.c')
|
||||
|
||||
|
||||
# ── Algorithm 4 paper-faithful tests ──
|
||||
|
||||
def test_infer_returns_deterministic():
|
||||
for _ in range(5):
|
||||
seqs = [['x', 'y'], ['x', 'z']]
|
||||
kore = kOREInference(k_max=2, N=2)
|
||||
result = kore.infer(seqs)
|
||||
if result:
|
||||
_, expr, _ = result
|
||||
assert is_deterministic(expr), f"Non-deterministic: {expr}"
|
||||
|
||||
|
||||
def test_infer_obeys_k_occurrence():
|
||||
seqs = [['a', 'b'], ['a', 'b', 'c']]
|
||||
for k in range(1, 4):
|
||||
kore = kOREInference(k_max=k, N=5)
|
||||
result = kore.infer(seqs)
|
||||
if result:
|
||||
_, expr, best_k = result
|
||||
valid, _ = validate_k_ore(expr, best_k)
|
||||
assert valid, f"k={best_k} expression {expr} violates k-occurrence"
|
||||
|
||||
|
||||
def run_all():
|
||||
tests = [
|
||||
test_linear_sequence,
|
||||
test_branching_paths,
|
||||
test_optional_element,
|
||||
test_looping_element,
|
||||
test_multiple_alternatives,
|
||||
test_rejects_non_deterministic,
|
||||
test_empty_input,
|
||||
test_single_element_sequences,
|
||||
test_infer_returns_best_k,
|
||||
test_tool_sequences,
|
||||
test_single_sequence,
|
||||
test_many_identical_sequences,
|
||||
test_xml_like_structured,
|
||||
test_disjoint_symbols,
|
||||
test_k1_vs_k2_selection,
|
||||
test_all_same_symbol,
|
||||
test_long_sequence,
|
||||
test_infer_returns_koa,
|
||||
test_different_kmax,
|
||||
test_validate_k_ore_basic,
|
||||
test_validate_k_ore_exceeds_k,
|
||||
test_validate_k_ore_epsilon,
|
||||
test_validate_k_ore_empty,
|
||||
test_validate_k_ore_disjunction,
|
||||
test_validate_k_ore_loop,
|
||||
test_validate_k_ore_k0,
|
||||
test_mdl_model_cost,
|
||||
test_mdl_data_cost,
|
||||
test_mdl_score_lower_is_better,
|
||||
test_mdl_empty_sequences,
|
||||
test_infer_returns_deterministic,
|
||||
test_infer_obeys_k_occurrence,
|
||||
]
|
||||
passed = 0
|
||||
failed = 0
|
||||
for t in tests:
|
||||
try:
|
||||
t()
|
||||
passed += 1
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f" FAIL {t.__name__}: {e}")
|
||||
traceback.print_exc()
|
||||
failed += 1
|
||||
print(f"\n{passed} passed, {failed} failed")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run_all()
|
||||
Loading…
Add table
Reference in a new issue