feat: implement kOREInference (Algorithm 4) with MDL scoring, add to ensemble, 79 tests
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful

This commit is contained in:
tobjend 2026-07-01 14:50:09 +02:00
parent dc559a4aee
commit edd6d9d4dd
5 changed files with 729 additions and 455 deletions

View file

@ -17,6 +17,7 @@ from .crx import CRX
from .ikoa import ikoa
from .rwrsq import rwr_sq
from .idregex import idregex
from .kore import kOREInference, validate_k_ore
from .koa import KOA, build_complete_koa
from .expr import concat, disj, star, optional, alphabet, strip_k
from .marking import mark_koa

View file

@ -3,6 +3,7 @@
import re
from .crx import CRX
from .idregex import idregex
from .kore import kOREInference
from .expr import alphabet
from .mdl import model_cost, mdl_score
@ -243,15 +244,47 @@ def mdl_score_simple(grammar, sequences):
return mdl_score(grammar, sequences)
def _run_idregex(sequences, kmax, N):
"""Run standalone iDRegEx, return (grammar, score) or (None, inf)."""
g = idregex(sequences, kmax=kmax, N=N)
if g and g != '':
return g, mdl_score_simple(g, sequences)
return None, float('inf')
def _run_kore(sequences, kmax, N):
"""Run kOREInference (Algorithm 4 with MDL), return (grammar, score) or (None, inf)."""
kore = kOREInference(k_max=kmax, N=N)
result = kore.infer(sequences)
if result:
_, expr, _ = result
return expr, mdl_score_simple(expr, sequences)
return None, float('inf')
_ALGO_NAMES = {
'crx': 'CRX',
'idregex': 'iDRegEx',
'koreinference': 'kOREInference',
}
_ALGORITHMS = {
'crx': lambda s, k, n: (CRX().infer(s), mdl_score_simple(CRX().infer(s), s)),
'idregex': _run_idregex,
'koreinference': _run_kore,
}
def infer_ensemble(sequences, kmax=2, N=3, prefer=None):
"""Run all applicable algorithms and return the best by MDL score.
Args:
sequences: List of sequences, each a list of strings.
kmax: Maximum k for iDRegEx k-ORE inference.
N: Number of EM iterations for iDRegEx.
prefer: Optional 'crx' or 'idregex' to skip ensemble and
return only that algorithm's result.
kmax: Maximum k for k-ORE inference (iDRegEx, kOREInference).
N: Number of random trials for k-ORE inference.
prefer: Optional 'crx', 'idregex', or 'koreinference' to skip
ensemble and return only that algorithm's result.
Returns:
dict with keys:
@ -259,84 +292,73 @@ def infer_ensemble(sequences, kmax=2, N=3, prefer=None):
all: [{algorithm, grammar, mdl_score}, ...]
why: str explaining the choice
"""
if prefer and prefer.lower() in _ALGORITHMS:
key = prefer.lower()
fn = _ALGORITHMS[key]
algo_name = _ALGO_NAMES.get(key, key)
g, score = fn(sequences, kmax, N)
if g and g != '':
return {
'best': {'algorithm': algo_name, 'grammar': g, 'mdl_score': round(score, 2)},
'all': [{'algorithm': algo_name, 'grammar': g, 'mdl_score': round(score, 2)}],
'why': f"Requested {algo_name} only.",
}
return {
'best': None,
'all': [],
'why': f"{algo_name} returned ∅ (no grammar found).",
}
results = []
if prefer and prefer.lower() == 'idregex':
idr_g = idregex(sequences, kmax=kmax, N=N)
idr_score = mdl_score_simple(idr_g, sequences) if idr_g and idr_g != '' else float('inf')
if idr_g and idr_g != '':
results.append(('iDRegEx', idr_g, idr_score))
if not results:
return {
'best': None,
'all': [],
'why': "iDRegEx returned ∅ (no common core found).",
}
why = "Requested iDRegEx only."
return {
'best': {
'algorithm': 'iDRegEx',
'grammar': results[0][1],
'mdl_score': round(results[0][2], 2),
},
'all': [{'algorithm': 'iDRegEx', 'grammar': results[0][1], 'mdl_score': round(results[0][2], 2)}],
'why': why,
}
# 1. CRX (always fast, always produces a result)
crx_g = CRX().infer(sequences)
crx_score = mdl_score_simple(crx_g, sequences)
results.append(('CRX', crx_g, crx_score))
crx_score = mdl_score_simple(crx_g, sequences) if crx_g and crx_g != '' else float('inf')
results.append(('CRX', crx_g if crx_g and crx_g != '' else '', crx_score))
if prefer and prefer.lower() == 'crx':
return {
'best': {
'algorithm': 'CRX',
'grammar': crx_g,
'mdl_score': round(crx_score, 2),
},
'all': [{'algorithm': 'CRX', 'grammar': crx_g, 'mdl_score': round(crx_score, 2)}],
'why': "Requested CRX only.",
}
idr_g = idregex(sequences, kmax=kmax, N=N)
if idr_g and idr_g != '':
idr_score = mdl_score_simple(idr_g, sequences)
# 2. iDRegEx (standalone, langsize-based)
idr_g, idr_score = _run_idregex(sequences, kmax, N)
if idr_g:
results.append(('iDRegEx', idr_g, idr_score))
results.sort(key=lambda x: x[2])
# 3. kOREInference (Algorithm 4 with MDL scoring)
kore_g, kore_score = _run_kore(sequences, kmax, N)
if kore_g:
results.append(('kOREInference', kore_g, kore_score))
results = [r for r in results if r[1] and r[1] != '']
if not results:
return {
'best': None,
'all': [],
'why': "No algorithm produced a non-empty grammar.",
}
results.sort(key=lambda x: x[2])
best = results[0]
all_results = [
{'algorithm': a, 'grammar': g, 'mdl_score': round(s, 2)}
for a, g, s in results
]
crx_match = sum(1 for s in sequences if _matches(crx_g, s))
idr_match = sum(1 for s in sequences if _matches(idr_g, s)) if len(results) > 1 else 0
active = {r[0] for r in results}
why_parts = []
if len(results) == 1:
why_parts.append(f"Only CRX produced a result (iDRegEx returned ∅).")
why_parts.append(f"Only {results[0][0]} produced a result.")
else:
why_parts.append(
f"{results[0][0]} (score {results[0][2]:.1f}) vs {results[1][0]} (score {results[1][2]:.1f})."
)
scores_str = ', '.join(f"{r[0]}={r[2]:.1f}" for r in results)
why_parts.append(f"Scores: {scores_str}.")
if crx_match == idr_match == len(sequences):
why_parts.append("Both grammars match all sequences.")
why_parts.append(
f"{results[0][0]} wins because it is more compact "
f"(lower model cost) while matching all data."
)
elif crx_match != idr_match:
why_parts.append(
f"CRX matches {crx_match}/{len(sequences)} sequences, "
f"iDRegEx matches {idr_match}/{len(sequences)}."
)
match_strs = []
for r_algo, r_grammar, _ in results:
if r_grammar and r_grammar != '':
m = sum(1 for s in sequences if _matches(r_grammar, s))
match_strs.append(f"{r_algo}={m}/{len(sequences)}")
if match_strs:
why_parts.append(f"Match rates: {', '.join(match_strs)}.")
why_parts.append(
f"{best[0]} selected (MDL score {best[2]:.1f})."
)
why_parts.append(f"{best[0]} selected (MDL score {best[2]:.1f}).")
return {
'best': {

View file

@ -1,432 +1,104 @@
"""
kore k-ORE Inference (iDRegEx) nach Bex et al. 2008/2010.
kOREInference Algorithm 4: iDRegEx (arXiv 1004.2372).
iDRegEx (Bex 2008):
1. Prefix-Tree Automaton (PTA) aus Beispielsequenzen
2. Shrink: Rewrite-Regeln generalisieren den Automaten
(simplify star_rewrite concat_rewrite alternation_rewrite)
3. Repair: Stelle Determinismus nach jedem Rewrite-Durchlauf wieder her
4. Convert: Überführe den Automaten in einen regulären Ausdruck
(State-Elimination nach Brzozowski & McCluskey)
5. k-ORE Prüfung: Der Ausdruck muss die k-Occurrence-Bedingung erfüllen
(jedes Symbol maximal k-mal nennenswert)
6. MDL: Wähle k mit minimalem MDL-Score
Implements the full iDRegEx pipeline:
1. For k = 1..kmax, for n = 1..N:
a. iKoa (Algorithm 1) build a deterministic k-OA from S
b. rwr² (Algorithm 3) translate k-OA to k-ORE expression
c. Validate determinism and k-occurrence
2. Score all valid candidates by MDL (model cost + data cost)
3. Return the best k-ORE
Unlike the PTAShrinkRepair approach from Bex 2008, this follows
the journal paper (arXiv 1004.2372) exactly.
"""
from .automaton import Automaton
from .pta import build_pta
from .shrink import shrink
from .repair import repair
from .ikoa import ikoa
from .rwrsq import rwr_sq
from .idregex import is_deterministic
from .mdl import mdl_score
def _state_elimination(G):
def validate_k_ore(expr, k, alphabet_set=None):
"""
State Elimination nach Brzozowski & McCluskey.
Check if a k-ORE satisfies the k-occurrence condition.
Entfernt nacheinander alle Nicht-Start/Accept-Zustände.
Für jeden eliminierten Zustand q:
- Für jedes Paar (p, r) mit pq (Label A) und qr (Label B):
- R_self_q = disjunktion aller Selbst-Schleifen auf q
- Neues Label = A · (R_self_q)* · B
- Füge Kante p r mit dem neuen Label hinzu (oder merge mit existierender)
The k-occurrence condition: for every subexpression (r|s),
each alphabet symbol appears at most k times across all
alternatives combined.
Nach Elimination: Nur Start- und Accept-Zustände bleiben.
Der Ausdruck ist: summe aller Pfade von Start zu Accept.
"""
G = G.copy()
eliminated = set()
# Wiederhole bis nur Start + Accepts übrig sind
changed = True
while changed:
changed = False
# Wähle einen Zustand zur Elimination (nicht Start, nicht Accept)
for q in list(G.nodes):
if q == G.start or q in G.accepts:
continue
if q in eliminated:
continue
reachable = _is_reachable_to_accept(G, q)
if not reachable:
G.nodes.discard(q)
G.accepts.discard(q)
G.edges = [e for e in G.edges if e['from'] != q and e['to'] != q]
eliminated.add(q)
changed = True
continue
incoming = G.incoming(q)
outgoing = G.outgoing(q)
# R_self_q = (a1 | a2 | ...)* für alle Selbst-Schleifen auf q
self_loops = [e for e in outgoing if e['to'] == q]
outgoing_no_self = [e for e in outgoing if e['to'] != q]
if not outgoing_no_self:
# Sackgasse, keine Outgoing-Kanten (außer self-loop)
# Entferne eingehende Kanten + q
for e in incoming:
G.remove_edge(e['from'], e['to'], e['label'])
G.nodes.discard(q)
G.accepts.discard(q)
eliminated.add(q)
changed = True
continue
if self_loops:
self_labels = list(set(e['label'] for e in self_loops))
if len(self_labels) == 1:
R_self_q = f"({self_labels[0]})*"
else:
R_self_q = f"({'|'.join(self_labels)})*"
else:
R_self_q = ""
# Für jedes Paar (p, r): p→q (incoming), q→r (outgoing, r != q)
for e_in in incoming:
p = e_in['from']
if p == q:
continue
A = e_in['label']
for e_out in outgoing_no_self:
r = e_out['to']
B = e_out['label']
if R_self_q:
new_label = f"({A}.{R_self_q}.{B})"
else:
new_label = f"({A}.{B})"
# Merge mit existierender Kante p→r wenn vorhanden
existing = [e for e in G.edges if e['from'] == p and e['to'] == r]
existing_labels = [e['label'] for e in existing]
if new_label not in existing_labels and f"({new_label})" not in existing_labels:
# Vereinige mit existierenden Labels via |
if existing:
old_label = existing[0]['label']
merged = f"({old_label}|{new_label})"
G.remove_edge(p, r, old_label)
G.add_edge(p, r, merged)
else:
G.add_edge(p, r, new_label)
# Lösche q und alle seine Kanten
for e in incoming:
G.remove_edge(e['from'], e['to'], e['label'])
for e in self_loops:
G.remove_edge(e['from'], e['to'], e['label'])
for e in outgoing_no_self:
G.remove_edge(e['from'], e['to'], e['label'])
G.nodes.discard(q)
G.accepts.discard(q)
eliminated.add(q)
changed = True
break
return G
def _is_reachable_to_accept(G, q):
"""Prüft ob von q aus ein Accept-Zustand erreichbar ist."""
visited = set()
stack = [q]
while stack:
n = stack.pop()
if n in visited:
continue
visited.add(n)
if n in G.accepts:
return True
for e in G.outgoing(n):
stack.append(e['to'])
return False
def _extract_expression(G):
"""
Extrahiert den regulären Ausdruck aus dem eliminierten Automaten.
Nach Elimination gibt es nur Startzustand und Accept-Zustände.
Der Ausdruck ist die Disjunktion aller Pfade von Start zu Accept.
"""
if G.start is None:
return ""
# Phase 1: State Elimination
G_elim = _state_elimination(G)
start = G_elim.start
if not G_elim.accepts:
return ""
paths = []
outgoing = G_elim.outgoing(start)
# Spezialfall: Start ist selbst Accept
if start in G_elim.accepts:
# Prüfe auf Selbst-Schleife
self_edges = [e for e in outgoing if e['to'] == start]
non_self = [e for e in outgoing if e['to'] != start]
if not non_self and not self_edges:
return "ε"
if self_edges:
self_labels = '|'.join(set(e['label'] for e in self_edges))
paths.append(f"({self_labels})*")
# Außer Start → Accept → andere Accepts
for e in non_self:
target = e['to']
if target in G_elim.accepts:
paths.append(e['label'])
# Pfade von Start zu Accept-Zuständen
for acc in G_elim.accepts:
if acc == start:
continue
# Kante start → acc
direct = [e for e in outgoing if e['to'] == acc]
for e in direct:
paths.append(e['label'])
self_loops_start = [e for e in G_elim.outgoing(start) if e['to'] == start]
# Weitere Kanten: start → x (wo x != accept)
intermediate = [e for e in outgoing if e['to'] not in G_elim.accepts and e['to'] != start]
for e in intermediate:
# Folge Pfad von intermediate zu accept
suffix = _follow_path(G_elim, e['to'], G_elim.accepts, set())
if suffix:
paths.append(f"({e['label']}.{suffix})")
# Entferne Duplikate
paths = list(set(paths))
if not paths:
return "ε"
if len(paths) == 1:
expr = paths[0]
else:
expr = f"({'|'.join(paths)})"
# Vereinfache: Entferne überflüssige Klammern
expr = _simplify_expression(expr)
return expr
def _follow_path(G, start, accepts, visited):
"""Findet den Pfad von start zu einem Accept."""
if start in accepts:
return "ε"
if start in visited:
return None
visited.add(start)
outgoing = G.outgoing(start)
for e in outgoing:
if e['to'] == start:
continue
suffix = _follow_path(G, e['to'], accepts, visited)
if suffix is not None:
if suffix == "ε":
return e['label']
else:
return f"({e['label']}.{suffix})"
return None
def _simplify_expression(expr):
"""
Vereinfacht einen regulären Ausdruck.
Entfernt überflüssige Klammern, doppelte Operatoren, etc.
"""
if not expr or expr in ('ε', ''):
return expr
# (ε. X ) → X
# (X . ε) → X
# ((X)) → X
# (a|a) → a
simplified = expr
while True:
prev = simplified
simplified = _simplify_once(simplified)
if simplified == prev:
break
return simplified
def _simplify_once(expr):
"""Ein Reduktionsschritt."""
# (ε.X) → X
# (X.ε) → X
# ((X)) → X
# (a|a) → a
result = expr
# ((X)) → X (doppelte Klammern)
import re
result = re.sub(r'$$\(([^()]+)\)$$', r'(\1)', result)
return result
def validate_k_ore(expr, k_index):
"""
Prüft ob ein Ausdruck die k-Occurrence-Bedingung erfüllt.
Ein k-ORE erlaubt jedes Symbol maximal einmal pro k-Indikator,
d.h. in jedem Konjunkt (Teilausdruck ohne |) darf jedes Symbol
höchstens k-mal vorkommen.
Vereinfacht: Zähle Vorkommen jedes eindeutigen Token-Namens
im Ausdruck. Wenn ein Token mehr als k-mal vorkommt, ist
die Bedingung verletzt.
Simplified implementation: count raw alphabet symbol
occurrences in the expression string. A symbol appearing
more than k times violates the condition.
Returns:
bool, str: (erfüllt, Grund)
(bool, str): (passes, explanation)
"""
# Extrahiere alle Token-Namen aus dem Ausdruck
tokens = set()
for c in '*+?()|.':
pass
if not expr or expr in ('', 'ε'):
return True, "OK"
token_names = set()
i = 0
while i < len(expr):
if expr[i].isalnum() or expr[i] in '/_-':
j = i
while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'):
j += 1
token_names.add(expr[i:j])
i = j
else:
i += 1
from .expr import alphabet
syms = alphabet_set or alphabet(expr)
# Zähle Vorkommen
token_counts = {}
i = 0
while i < len(expr):
if expr[i].isalnum() or expr[i] in '/_-':
j = i
while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'):
j += 1
token = expr[i:j]
token_counts[token] = token_counts.get(token, 0) + 1
i = j
else:
i += 1
counts = {}
for sym in syms:
import re
count = len(re.findall(rf'(?<![a-zA-Z_/]){re.escape(sym)}(?![a-zA-Z_/])', expr))
if count > 0:
counts[sym] = count
violations = [t for t, c in token_counts.items() if c > k_index]
violations = [f"{s}:{c}" for s, c in sorted(counts.items()) if c > k]
if violations:
return False, f"Token {violations} erscheint > {k_index}-mal"
return False, f"k={k} violations: {', '.join(violations)}"
return True, "OK"
class kOREInference:
"""
iDRegEx: k-ORE Inferenz via PTA Shrink Repair Expression.
| Algorithm 4: iDRegEx |
Require: sample S, kmax
Ensure: k-ORE r
Nach Bex et al. 2008:
- Baue PTA aus Sequenzen
- Shrink: Rewrite-Regeln generalisieren
- Repair: Stelle Determinismus wieder her
- Convert: Extrahiere regulären Ausdruck via State Elimination
- Prüfe k-Occurrence
- Wähle k mit MDL
1: C
2: for k = 1 to kmax do
3: for n = 1 to N do
4: G iKoa(S, k)
5: if rwr²(G) is deterministic then
6: add rwr²(G) to C
7: return best(C) by MDL
"""
def __init__(self, k_max=5):
def __init__(self, k_max=5, N=5):
self.k_max = k_max
self.N = N
def infer(self, sequences):
"""
Inferiere den besten k-ORE.
Infer the best k-ORE for the given sequences.
Returns:
(Automaton, expression_string, best_k) oder None
(koa_automaton, expression_string, best_k) or None if no valid
k-ORE can be inferred.
"""
sequences = [s for s in sequences if s]
if not sequences:
return None, "", 0
return None
best_score = float('inf')
best_result = None
candidates = []
for k in range(1, self.k_max + 1):
try:
auto, expr = self._infer_k_expression(sequences, k)
if auto is None:
for _ in range(self.N):
G = ikoa(sequences, k, num_trials=1)
if G is None:
continue
score = mdl_score(auto, sequences)
if score < best_score:
best_score = score
best_result = (auto, expr, k)
except Exception:
continue
expr = rwr_sq(G)
if expr and expr not in ('', 'ε'):
if is_deterministic(expr):
valid, _ = validate_k_ore(expr, k)
if valid:
candidates.append((G, expr, k))
return best_result
if not candidates:
return None
def _infer_k_expression(self, sequences, k):
"""Führe iDRegEx für ein spezifisches k durch."""
# 1. PTA bauen
pta = build_pta(sequences)
# 2. Shrink
shrunk = shrink(pta, max_iterations=20)
# 3. Repair
repaired = repair(shrunk)
# 4. Expression extrahieren
expr = _extract_expression(repaired)
# 5. k-ORE Prüfung
valid, _ = validate_k_ore(expr, k)
if not valid:
expr = self._generalize_to_k_ore(expr, k)
return repaired, expr
def _generalize_to_k_ore(self, expr, k):
"""
Generalisiere den Ausdruck zur k-ORE.
Wenn Token t mehr als k-mal vorkommt:
- Ersetze Wiederholungen durch t+ oder t*
"""
# Einfache Heuristik: Extrahiere Token, zähle, ersetze
result = expr
token_counts = {}
i = 0
while i < len(result):
if result[i].isalnum() or result[i] in '/_-':
j = i
while j < len(result) and (result[j].isalnum() or result[j] in '/_-'):
j += 1
token = result[i:j]
token_counts[token] = token_counts.get(token, 0) + 1
i = j
else:
i += 1
for token, count in token_counts.items():
if count > k:
# Ersetze token.token durch token+
import re
pattern = re.escape(token) + r'\..' + re.escape(token)
replacement = f"{token}+"
result = re.sub(pattern, replacement, result, count=1)
break
return result
return min(candidates, key=lambda c: mdl_score(c[1], sequences))

204
tests/test_ensemble.py Normal file
View file

@ -0,0 +1,204 @@
"""Tests for infer_ensemble — runs CRX, iDRegEx, and kOREInference, picks best by MDL."""
from bex.ensemble import infer_ensemble
from bex.idregex import is_deterministic
from bex.kore import kOREInference
# ── Basic ensemble runs ──
def test_ensemble_returns_dict():
seqs = [['a', 'b', 'c'], ['a', 'b', 'c', 'd']]
result = infer_ensemble(seqs, kmax=2, N=3)
assert isinstance(result, dict)
assert 'best' in result
assert 'all' in result
assert 'why' in result
def test_ensemble_best_not_none():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, kmax=2, N=3)
assert result['best'] is not None
assert result['best']['grammar'] is not None
assert result['best']['algorithm'] in ('CRX', 'iDRegEx', 'kOREInference')
assert result['best']['mdl_score'] is not None
def test_ensemble_runs_all_three():
seqs = [['a', 'b', 'c'], ['a', 'b', 'c', 'd']]
result = infer_ensemble(seqs, kmax=2, N=3)
algos = {a['algorithm'] for a in result['all']}
assert 'CRX' in algos
# iDRegEx and kOREInference may fail stochastically, so at least CRX
assert len(result['all']) >= 1
def test_ensemble_all_results_have_scores():
seqs = [['a', 'b'], ['a', 'b', 'b']]
result = infer_ensemble(seqs, kmax=2, N=3)
for entry in result['all']:
assert 'algorithm' in entry
assert 'grammar' in entry
assert 'mdl_score' in entry
assert isinstance(entry['mdl_score'], (int, float))
def test_ensemble_deterministic_results():
seqs = [['x', 'y'], ['x', 'z']]
result = infer_ensemble(seqs, kmax=2, N=3)
if result['best']:
assert is_deterministic(result['best']['grammar'])
# ── prefer parameter tests ──
def test_prefer_crx():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, prefer='crx')
assert result['best']['algorithm'] == 'CRX'
assert len(result['all']) == 1
def test_prefer_idregex():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, prefer='idregex', kmax=2, N=5)
assert result['best']['algorithm'] == 'iDRegEx'
assert len(result['all']) == 1
def test_prefer_koreinference():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, prefer='koreinference', kmax=2, N=5)
assert result['best']['algorithm'] == 'kOREInference'
assert len(result['all']) == 1
def test_prefer_case_insensitive():
seqs = [['a', 'b']]
r1 = infer_ensemble(seqs, prefer='CRX')
r2 = infer_ensemble(seqs, prefer='Crx')
assert r1['best']['algorithm'] == r2['best']['algorithm']
def test_prefer_unknown_falls_back():
seqs = [['a', 'b']]
result = infer_ensemble(seqs, prefer='unknown')
assert result['best'] is not None
assert len(result['all']) >= 1
# ── Edge cases ──
def test_ensemble_empty_input():
result = infer_ensemble([], kmax=2, N=3)
assert result['best'] is None or result['best']['grammar'] is not None
def test_ensemble_single_sequence():
result = infer_ensemble([['a', 'b', 'c']], kmax=2, N=3)
assert result['best'] is not None
assert result['best']['grammar'] is not None
def test_ensemble_many_identical():
seqs = [['a', 'b', 'c']] * 10
result = infer_ensemble(seqs, kmax=2, N=3)
assert result['best'] is not None
def test_ensemble_linear_data():
seqs = [
['file', 'template', 'command', 'set_fact', 'shell'],
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
]
result = infer_ensemble(seqs, kmax=2, N=3)
if result['best']:
g = result['best']['grammar']
assert 'file' in g and 'template' in g and 'shell' in g
def test_ensemble_branching_data():
seqs = [
['file', 'template', 'setup', 'shell'],
['file', 'template', 'deploy', 'shell'],
]
result = infer_ensemble(seqs, kmax=2, N=5)
if result['best']:
g = result['best']['grammar']
assert is_deterministic(g)
assert 'file' in g and 'template' in g and 'shell' in g
def test_ensemble_why_includes_scores():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, kmax=2, N=3)
assert 'CRX' in result['why']
assert 'selected' in result['why']
assert 'MDL' in result['why'] or 'score' in result['why'].lower()
def test_ensemble_ordering_best_first():
seqs = [['a', 'b', 'c'], ['a', 'b']]
result = infer_ensemble(seqs, kmax=2, N=3)
if result['best']:
assert result['all'][0]['algorithm'] == result['best']['algorithm']
assert result['all'][0]['mdl_score'] <= result['all'][-1]['mdl_score']
# ── Stochastic stability tests ──
def test_ensemble_stable_on_simple_data():
for _ in range(3):
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, kmax=2, N=3)
if result['best']:
assert 'a' in result['best']['grammar']
assert 'b' in result['best']['grammar']
def test_ensemble_crx_always_present():
seqs = [['a', 'b'], ['a', 'b', 'c']]
result = infer_ensemble(seqs, kmax=2, N=3)
crx_results = [a for a in result['all'] if a['algorithm'] == 'CRX']
assert len(crx_results) == 1
def run_all():
tests = [
test_ensemble_returns_dict,
test_ensemble_best_not_none,
test_ensemble_runs_all_three,
test_ensemble_all_results_have_scores,
test_ensemble_deterministic_results,
test_prefer_crx,
test_prefer_idregex,
test_prefer_koreinference,
test_prefer_case_insensitive,
test_prefer_unknown_falls_back,
test_ensemble_empty_input,
test_ensemble_single_sequence,
test_ensemble_many_identical,
test_ensemble_linear_data,
test_ensemble_branching_data,
test_ensemble_why_includes_scores,
test_ensemble_ordering_best_first,
test_ensemble_stable_on_simple_data,
test_ensemble_crx_always_present,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
import traceback
print(f" FAIL {t.__name__}: {e}")
traceback.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
if __name__ == '__main__':
run_all()

375
tests/test_kore.py Normal file
View file

@ -0,0 +1,375 @@
"""Tests for kOREInference (Algorithm 4: iDRegEx from arXiv 1004.2372)."""
from bex.kore import kOREInference, validate_k_ore
from bex.idregex import is_deterministic
from bex.mdl import mdl_score, model_cost, data_cost
# ── Core inference tests ──
def test_linear_sequence():
seqs = [
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
]
kore = kOREInference(k_max=3, N=3)
result = kore.infer(seqs)
assert result is not None, "Should infer a k-ORE"
auto, expr, best_k = result
assert expr is not None
assert all(t in expr for t in ['file', 'template', 'command', 'set_fact', 'shell', 'wait_for'])
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
def test_branching_paths():
seqs = [
['file', 'template', 'setup', 'set_fact', 'shell'],
['file', 'template', 'deploy', 'set_fact', 'shell'],
]
kore = kOREInference(k_max=3, N=3)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
assert 'file' in expr and 'template' in expr and 'shell' in expr
def test_optional_element():
seqs = [
['file', 'template', 'shell'],
['file', 'template', 'exec', 'shell'],
['file', 'template', 'exec', 'exec', 'shell'],
]
kore = kOREInference(k_max=4, N=15)
result = kore.infer(seqs)
if result is None:
return # stochastic failure
auto, expr, best_k = result
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
def test_looping_element():
seqs = [
['package', 'file', 'template', 'systemd'],
['package', 'file', 'template', 'template', 'systemd', 'systemd'],
['package', 'file', 'template', 'template', 'template', 'systemd'],
]
kore = kOREInference(k_max=3, N=5)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
def test_multiple_alternatives():
seqs = [
['install', 'configure', 'start'],
['install', 'configure', 'enable'],
['install', 'configure', 'restart'],
]
kore = kOREInference(k_max=3, N=5)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr), f"Expression must be deterministic: {expr}"
def test_rejects_non_deterministic():
seqs = [['a'], ['a']]
kore = kOREInference(k_max=2, N=2)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr), f"Non-deterministic: {expr}"
def test_empty_input():
kore = kOREInference(k_max=2, N=2)
result = kore.infer([])
assert result is None
result = kore.infer([[], []])
assert result is None
def test_single_element_sequences():
seqs = [['a'], ['b'], ['a'], ['b']]
kore = kOREInference(k_max=2, N=3)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr)
def test_infer_returns_best_k():
seqs = [
['a', 'b', 'c'],
['a', 'b', 'c', 'd'],
['a', 'b', 'd'],
]
kore = kOREInference(k_max=4, N=3)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert 1 <= best_k <= 4
assert is_deterministic(expr)
def test_tool_sequences():
seqs = [
['read', 'grep', 'read'],
['read', 'glob', 'grep', 'read'],
['read', 'bash', 'read'],
['glob', 'grep', 'read', 'edit', 'bash'],
['read', 'edit', 'bash', 'bash'],
['bash', 'read', 'bash'],
]
kore = kOREInference(k_max=3, N=5)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
# ── Edge case tests ──
def test_single_sequence():
kore = kOREInference(k_max=2, N=3)
result = kore.infer([['a', 'b', 'c']])
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr)
def test_many_identical_sequences():
seqs = [['a', 'b', 'c']] * 20
kore = kOREInference(k_max=2, N=3)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr)
assert 'a' in expr and 'b' in expr and 'c' in expr
def test_xml_like_structured():
seqs = [
['header', 'body', 'footer'],
['header', 'body', 'body', 'footer'],
['header', 'body', 'body', 'body', 'footer'],
['header', 'footer'],
]
kore = kOREInference(k_max=3, N=10)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
assert 'header' in expr and 'footer' in expr
def test_disjoint_symbols():
seqs = [
['alpha', 'beta'],
['gamma', 'delta'],
]
kore = kOREInference(k_max=2, N=3)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
def test_k1_vs_k2_selection():
seqs = [
['a', 'a', 'b'],
['a', 'b'],
['a', 'a', 'a', 'b'],
]
kore = kOREInference(k_max=3, N=5)
result = kore.infer(seqs)
assert result is not None
auto, expr, best_k = result
assert is_deterministic(expr)
def test_all_same_symbol():
seqs = [
['a', 'a'],
['a', 'a', 'a'],
['a'],
]
kore = kOREInference(k_max=2, N=5)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
def test_long_sequence():
seqs = [
['a', 'b', 'c', 'd', 'e', 'f', 'g'],
['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'],
]
kore = kOREInference(k_max=2, N=5)
result = kore.infer(seqs)
if result is not None:
auto, expr, best_k = result
assert is_deterministic(expr)
def test_infer_returns_koa():
kore = kOREInference(k_max=2, N=3)
result = kore.infer([['a', 'b'], ['a', 'b', 'c']])
assert result is not None
auto, expr, best_k = result
assert hasattr(auto, '_succ'), "Should return a KOA automaton"
assert hasattr(auto, 'src')
assert hasattr(auto, 'sink')
def test_different_kmax():
seqs = [['a', 'b', 'c', 'd', 'e'], ['a', 'b', 'c']]
kore1 = kOREInference(k_max=1, N=5)
kore2 = kOREInference(k_max=3, N=5)
r1 = kore1.infer(seqs)
r2 = kore2.infer(seqs)
assert r1 is not None or r2 is not None
# ── validate_k_ore tests ──
def test_validate_k_ore_basic():
valid, reason = validate_k_ore('a.b.c', 2)
assert valid, f"a.b.c should be valid for k=2: {reason}"
def test_validate_k_ore_exceeds_k():
valid, reason = validate_k_ore('a.a.a', 1)
assert not valid, "a.a.a should fail for k=1"
def test_validate_k_ore_epsilon():
valid, reason = validate_k_ore('ε', 1)
assert valid
def test_validate_k_ore_empty():
valid, reason = validate_k_ore('', 1)
assert valid
def test_validate_k_ore_disjunction():
valid, reason = validate_k_ore('(a|b|c)', 2)
assert valid, f"(a|b|c) should be valid for k=2: {reason}"
def test_validate_k_ore_loop():
valid, reason = validate_k_ore('a+', 1)
assert valid, "a+ should be valid for k=1"
def test_validate_k_ore_k0():
valid, reason = validate_k_ore('a', 0)
assert not valid, "a should fail for k=0"
# ── MDL scoring tests ──
def test_mdl_model_cost():
assert model_cost('a.b.c') == 3
assert model_cost('(a|b)+.c') >= 2
assert model_cost('ε') >= 0
def test_mdl_data_cost():
# General expression (a|b)+ has multiple words of length 1+: non-zero cost
dc = data_cost('(a|b)+', [['a', 'b'], ['b', 'a'], ['a']])
assert dc > 0, f"data_cost should be > 0 for general expression, got {dc}"
# Exact expression has cost 0 (log2(1) = 0)
dc_exact = data_cost('a.b.c', [['a', 'b', 'c']])
assert dc_exact == 0.0, f"data_cost for exact match should be 0, got {dc_exact}"
def test_mdl_score_lower_is_better():
score_specific = mdl_score('a.b.c', [['a', 'b', 'c']])
score_general = mdl_score('(a|b|c)+?', [['a', 'b', 'c']])
assert score_specific > 0 and score_general > 0
def test_mdl_empty_sequences():
score = mdl_score('a.b.c', [])
assert score == model_cost('a.b.c')
# ── Algorithm 4 paper-faithful tests ──
def test_infer_returns_deterministic():
for _ in range(5):
seqs = [['x', 'y'], ['x', 'z']]
kore = kOREInference(k_max=2, N=2)
result = kore.infer(seqs)
if result:
_, expr, _ = result
assert is_deterministic(expr), f"Non-deterministic: {expr}"
def test_infer_obeys_k_occurrence():
seqs = [['a', 'b'], ['a', 'b', 'c']]
for k in range(1, 4):
kore = kOREInference(k_max=k, N=5)
result = kore.infer(seqs)
if result:
_, expr, best_k = result
valid, _ = validate_k_ore(expr, best_k)
assert valid, f"k={best_k} expression {expr} violates k-occurrence"
def run_all():
tests = [
test_linear_sequence,
test_branching_paths,
test_optional_element,
test_looping_element,
test_multiple_alternatives,
test_rejects_non_deterministic,
test_empty_input,
test_single_element_sequences,
test_infer_returns_best_k,
test_tool_sequences,
test_single_sequence,
test_many_identical_sequences,
test_xml_like_structured,
test_disjoint_symbols,
test_k1_vs_k2_selection,
test_all_same_symbol,
test_long_sequence,
test_infer_returns_koa,
test_different_kmax,
test_validate_k_ore_basic,
test_validate_k_ore_exceeds_k,
test_validate_k_ore_epsilon,
test_validate_k_ore_empty,
test_validate_k_ore_disjunction,
test_validate_k_ore_loop,
test_validate_k_ore_k0,
test_mdl_model_cost,
test_mdl_data_cost,
test_mdl_score_lower_is_better,
test_mdl_empty_sequences,
test_infer_returns_deterministic,
test_infer_obeys_k_occurrence,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
import traceback
print(f" FAIL {t.__name__}: {e}")
traceback.print_exc()
failed += 1
print(f"\n{passed} passed, {failed} failed")
if __name__ == '__main__':
run_all()