grammar-inference-engine/bex/kore.py
tobjend 7c00c6713d Initial commit: BEX-based grammar inference engine
- CRX: direct CHARE inference (Algorithm 7, TODS 2010)
- iDRegEx: k-ORE inference (Algorithm 4, arXiv 2010)
- RWR₀: SORE repair (Algorithm 6, TODS 2010)
- rwr²: k-ORE extraction (Algorithm 3, arXiv 2010)
- SOA, k-OA, iKoa, 2T-INF, Baum-Welch
- Ansible role grammar adapter
- Generic YAML key-path converter
- 28 tests, all passing
2026-07-01 08:01:16 +02:00

432 lines
13 KiB
Python

"""
kore — k-ORE Inference (iDRegEx) nach Bex et al. 2008/2010.
iDRegEx (Bex 2008):
1. Prefix-Tree Automaton (PTA) aus Beispielsequenzen
2. Shrink: Rewrite-Regeln generalisieren den Automaten
(simplify → star_rewrite → concat_rewrite → alternation_rewrite)
3. Repair: Stelle Determinismus nach jedem Rewrite-Durchlauf wieder her
4. Convert: Überführe den Automaten in einen regulären Ausdruck
(State-Elimination nach Brzozowski & McCluskey)
5. k-ORE Prüfung: Der Ausdruck muss die k-Occurrence-Bedingung erfüllen
(jedes Symbol maximal k-mal nennenswert)
6. MDL: Wähle k mit minimalem MDL-Score
"""
from .automaton import Automaton
from .pta import build_pta
from .shrink import shrink
from .repair import repair
from .mdl import mdl_score
def _state_elimination(G):
"""
State Elimination nach Brzozowski & McCluskey.
Entfernt nacheinander alle Nicht-Start/Accept-Zustände.
Für jeden eliminierten Zustand q:
- Für jedes Paar (p, r) mit p→q (Label A) und q→r (Label B):
- R_self_q = disjunktion aller Selbst-Schleifen auf q
- Neues Label = A · (R_self_q)* · B
- Füge Kante p → r mit dem neuen Label hinzu (oder merge mit existierender)
Nach Elimination: Nur Start- und Accept-Zustände bleiben.
Der Ausdruck ist: summe aller Pfade von Start zu Accept.
"""
G = G.copy()
eliminated = set()
# Wiederhole bis nur Start + Accepts übrig sind
changed = True
while changed:
changed = False
# Wähle einen Zustand zur Elimination (nicht Start, nicht Accept)
for q in list(G.nodes):
if q == G.start or q in G.accepts:
continue
if q in eliminated:
continue
reachable = _is_reachable_to_accept(G, q)
if not reachable:
G.nodes.discard(q)
G.accepts.discard(q)
G.edges = [e for e in G.edges if e['from'] != q and e['to'] != q]
eliminated.add(q)
changed = True
continue
incoming = G.incoming(q)
outgoing = G.outgoing(q)
# R_self_q = (a1 | a2 | ...)* für alle Selbst-Schleifen auf q
self_loops = [e for e in outgoing if e['to'] == q]
outgoing_no_self = [e for e in outgoing if e['to'] != q]
if not outgoing_no_self:
# Sackgasse, keine Outgoing-Kanten (außer self-loop)
# Entferne eingehende Kanten + q
for e in incoming:
G.remove_edge(e['from'], e['to'], e['label'])
G.nodes.discard(q)
G.accepts.discard(q)
eliminated.add(q)
changed = True
continue
if self_loops:
self_labels = list(set(e['label'] for e in self_loops))
if len(self_labels) == 1:
R_self_q = f"({self_labels[0]})*"
else:
R_self_q = f"({'|'.join(self_labels)})*"
else:
R_self_q = ""
# Für jedes Paar (p, r): p→q (incoming), q→r (outgoing, r != q)
for e_in in incoming:
p = e_in['from']
if p == q:
continue
A = e_in['label']
for e_out in outgoing_no_self:
r = e_out['to']
B = e_out['label']
if R_self_q:
new_label = f"({A}.{R_self_q}.{B})"
else:
new_label = f"({A}.{B})"
# Merge mit existierender Kante p→r wenn vorhanden
existing = [e for e in G.edges if e['from'] == p and e['to'] == r]
existing_labels = [e['label'] for e in existing]
if new_label not in existing_labels and f"({new_label})" not in existing_labels:
# Vereinige mit existierenden Labels via |
if existing:
old_label = existing[0]['label']
merged = f"({old_label}|{new_label})"
G.remove_edge(p, r, old_label)
G.add_edge(p, r, merged)
else:
G.add_edge(p, r, new_label)
# Lösche q und alle seine Kanten
for e in incoming:
G.remove_edge(e['from'], e['to'], e['label'])
for e in self_loops:
G.remove_edge(e['from'], e['to'], e['label'])
for e in outgoing_no_self:
G.remove_edge(e['from'], e['to'], e['label'])
G.nodes.discard(q)
G.accepts.discard(q)
eliminated.add(q)
changed = True
break
return G
def _is_reachable_to_accept(G, q):
"""Prüft ob von q aus ein Accept-Zustand erreichbar ist."""
visited = set()
stack = [q]
while stack:
n = stack.pop()
if n in visited:
continue
visited.add(n)
if n in G.accepts:
return True
for e in G.outgoing(n):
stack.append(e['to'])
return False
def _extract_expression(G):
"""
Extrahiert den regulären Ausdruck aus dem eliminierten Automaten.
Nach Elimination gibt es nur Startzustand und Accept-Zustände.
Der Ausdruck ist die Disjunktion aller Pfade von Start zu Accept.
"""
if G.start is None:
return ""
# Phase 1: State Elimination
G_elim = _state_elimination(G)
start = G_elim.start
if not G_elim.accepts:
return ""
paths = []
outgoing = G_elim.outgoing(start)
# Spezialfall: Start ist selbst Accept
if start in G_elim.accepts:
# Prüfe auf Selbst-Schleife
self_edges = [e for e in outgoing if e['to'] == start]
non_self = [e for e in outgoing if e['to'] != start]
if not non_self and not self_edges:
return "ε"
if self_edges:
self_labels = '|'.join(set(e['label'] for e in self_edges))
paths.append(f"({self_labels})*")
# Außer Start → Accept → andere Accepts
for e in non_self:
target = e['to']
if target in G_elim.accepts:
paths.append(e['label'])
# Pfade von Start zu Accept-Zuständen
for acc in G_elim.accepts:
if acc == start:
continue
# Kante start → acc
direct = [e for e in outgoing if e['to'] == acc]
for e in direct:
paths.append(e['label'])
self_loops_start = [e for e in G_elim.outgoing(start) if e['to'] == start]
# Weitere Kanten: start → x (wo x != accept)
intermediate = [e for e in outgoing if e['to'] not in G_elim.accepts and e['to'] != start]
for e in intermediate:
# Folge Pfad von intermediate zu accept
suffix = _follow_path(G_elim, e['to'], G_elim.accepts, set())
if suffix:
paths.append(f"({e['label']}.{suffix})")
# Entferne Duplikate
paths = list(set(paths))
if not paths:
return "ε"
if len(paths) == 1:
expr = paths[0]
else:
expr = f"({'|'.join(paths)})"
# Vereinfache: Entferne überflüssige Klammern
expr = _simplify_expression(expr)
return expr
def _follow_path(G, start, accepts, visited):
"""Findet den Pfad von start zu einem Accept."""
if start in accepts:
return "ε"
if start in visited:
return None
visited.add(start)
outgoing = G.outgoing(start)
for e in outgoing:
if e['to'] == start:
continue
suffix = _follow_path(G, e['to'], accepts, visited)
if suffix is not None:
if suffix == "ε":
return e['label']
else:
return f"({e['label']}.{suffix})"
return None
def _simplify_expression(expr):
"""
Vereinfacht einen regulären Ausdruck.
Entfernt überflüssige Klammern, doppelte Operatoren, etc.
"""
if not expr or expr in ('ε', ''):
return expr
# (ε. X ) → X
# (X . ε) → X
# ((X)) → X
# (a|a) → a
simplified = expr
while True:
prev = simplified
simplified = _simplify_once(simplified)
if simplified == prev:
break
return simplified
def _simplify_once(expr):
"""Ein Reduktionsschritt."""
# (ε.X) → X
# (X.ε) → X
# ((X)) → X
# (a|a) → a
result = expr
# ((X)) → X (doppelte Klammern)
import re
result = re.sub(r'$$\(([^()]+)\)$$', r'(\1)', result)
return result
def validate_k_ore(expr, k_index):
"""
Prüft ob ein Ausdruck die k-Occurrence-Bedingung erfüllt.
Ein k-ORE erlaubt jedes Symbol maximal einmal pro k-Indikator,
d.h. in jedem Konjunkt (Teilausdruck ohne |) darf jedes Symbol
höchstens k-mal vorkommen.
Vereinfacht: Zähle Vorkommen jedes eindeutigen Token-Namens
im Ausdruck. Wenn ein Token mehr als k-mal vorkommt, ist
die Bedingung verletzt.
Returns:
bool, str: (erfüllt, Grund)
"""
# Extrahiere alle Token-Namen aus dem Ausdruck
tokens = set()
for c in '*+?()|.':
pass
token_names = set()
i = 0
while i < len(expr):
if expr[i].isalnum() or expr[i] in '/_-':
j = i
while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'):
j += 1
token_names.add(expr[i:j])
i = j
else:
i += 1
# Zähle Vorkommen
token_counts = {}
i = 0
while i < len(expr):
if expr[i].isalnum() or expr[i] in '/_-':
j = i
while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'):
j += 1
token = expr[i:j]
token_counts[token] = token_counts.get(token, 0) + 1
i = j
else:
i += 1
violations = [t for t, c in token_counts.items() if c > k_index]
if violations:
return False, f"Token {violations} erscheint > {k_index}-mal"
return True, "OK"
class kOREInference:
"""
iDRegEx: k-ORE Inferenz via PTA → Shrink → Repair → Expression.
Nach Bex et al. 2008:
- Baue PTA aus Sequenzen
- Shrink: Rewrite-Regeln generalisieren
- Repair: Stelle Determinismus wieder her
- Convert: Extrahiere regulären Ausdruck via State Elimination
- Prüfe k-Occurrence
- Wähle k mit MDL
"""
def __init__(self, k_max=5):
self.k_max = k_max
def infer(self, sequences):
"""
Inferiere den besten k-ORE.
Returns:
(Automaton, expression_string, best_k) oder None
"""
sequences = [s for s in sequences if s]
if not sequences:
return None, "", 0
best_score = float('inf')
best_result = None
for k in range(1, self.k_max + 1):
try:
auto, expr = self._infer_k_expression(sequences, k)
if auto is None:
continue
score = mdl_score(auto, sequences)
if score < best_score:
best_score = score
best_result = (auto, expr, k)
except Exception:
continue
return best_result
def _infer_k_expression(self, sequences, k):
"""Führe iDRegEx für ein spezifisches k durch."""
# 1. PTA bauen
pta = build_pta(sequences)
# 2. Shrink
shrunk = shrink(pta, max_iterations=20)
# 3. Repair
repaired = repair(shrunk)
# 4. Expression extrahieren
expr = _extract_expression(repaired)
# 5. k-ORE Prüfung
valid, _ = validate_k_ore(expr, k)
if not valid:
expr = self._generalize_to_k_ore(expr, k)
return repaired, expr
def _generalize_to_k_ore(self, expr, k):
"""
Generalisiere den Ausdruck zur k-ORE.
Wenn Token t mehr als k-mal vorkommt:
- Ersetze Wiederholungen durch t+ oder t*
"""
# Einfache Heuristik: Extrahiere Token, zähle, ersetze
result = expr
token_counts = {}
i = 0
while i < len(result):
if result[i].isalnum() or result[i] in '/_-':
j = i
while j < len(result) and (result[j].isalnum() or result[j] in '/_-'):
j += 1
token = result[i:j]
token_counts[token] = token_counts.get(token, 0) + 1
i = j
else:
i += 1
for token, count in token_counts.items():
if count > k:
# Ersetze token.token durch token+
import re
pattern = re.escape(token) + r'\..' + re.escape(token)
replacement = f"{token}+"
result = re.sub(pattern, replacement, result, count=1)
break
return result