grammar-inference-engine/bex/ikoa.py
tobjend 7c00c6713d Initial commit: BEX-based grammar inference engine
- CRX: direct CHARE inference (Algorithm 7, TODS 2010)
- iDRegEx: k-ORE inference (Algorithm 4, arXiv 2010)
- RWR₀: SORE repair (Algorithm 6, TODS 2010)
- rwr²: k-ORE extraction (Algorithm 3, arXiv 2010)
- SOA, k-OA, iKoa, 2T-INF, Baum-Welch
- Ansible role grammar adapter
- Generic YAML key-path converter
- 28 tests, all passing
2026-07-01 08:01:16 +02:00

139 lines
4.1 KiB
Python

"""iKoa — Algorithm 1 (arXiv 1004.2372) with Disambiguate (Algorithm 2)."""
from collections import deque, defaultdict
import random
from .koa import KOA, build_complete_koa
from .baum_welch import init_probabilities, baum_welch, baum_welch_fixed
def disambiguate(G, prob, sequences):
"""
|---- Algorithm 2: Disambiguate ----|
Require: POMM P=(G,alpha) and sample S
Ensure: deterministic k-OA
"""
sigma = set()
for seq in sequences:
for sym in seq:
sigma.add(sym)
bw_iter = 2 if len(sigma) <= 7 else 3
Q = deque([G.src])
for s in G._succ.get(G.src, set()):
if prob.get(G.src, {}).get(s, 0) > 0:
Q.append(s)
D = set()
from .expr import strip_k
while Q:
s = Q.popleft()
while True:
lab_groups = defaultdict(list)
for t in list(G._succ.get(s, set())):
l = G.label(t)
if l:
lab_groups[strip_k(l)].append(t)
multi = [(lab, ts) for lab, ts in lab_groups.items() if len(ts) > 1]
if not multi:
break
for lab, targets in multi:
t_max = max(targets, key=lambda t: prob.get(s, {}).get(t, 0))
total_p = sum(prob.get(s, {}).get(t, 0) for t in targets)
if total_p > 0 and t_max in prob.get(s, {}):
prob[s][t_max] = total_p
for t in targets:
if t != t_max:
G.rm_edge(s, t)
if t in prob.get(s, {}):
prob[s][t] = 0.0
prob = baum_welch_fixed(G, prob, sequences, bw_iter)
for seq in sequences:
if not G.accept(seq):
return None
D.add(s)
for t in list(G._succ.get(s, set())):
if t not in D and t != G.sink:
Q.append(t)
return G
def prune(G, sequences):
"""Prune (iKoa line 4). Remove edges without witnesses in S.
Also removes states s ∈ Succ(src) without a witness.
"""
from .expr import strip_k as _sk
witnessed = set()
for seq in sequences:
if not seq:
witnessed.add((G.src, G.sink))
continue
cur = {G.src}
for sym in seq:
nxt = set()
for s in cur:
for t in G._succ.get(s, set()):
lab = G.label(t)
if lab and _sk(lab) == sym:
nxt.add(t)
witnessed.add((s, t))
cur = nxt
for s in cur:
if G.has_edge(s, G.sink):
witnessed.add((s, G.sink))
for s in list(G._succ.keys()):
for t in list(G._succ.get(s, set())):
if (s, t) not in witnessed:
G.rm_edge(s, t)
r_from_src = set()
q = [G.src]
while q:
s = q.pop()
if s in r_from_src:
continue
r_from_src.add(s)
q.extend(G._succ.get(s, set()))
r_to_sink = set()
q = [G.sink]
while q:
s = q.pop()
if s in r_to_sink:
continue
r_to_sink.add(s)
q.extend(G._pred.get(s, set()))
for n in list(G._succ.keys()):
if n in (G.src, G.sink):
continue
if n not in r_from_src or n not in r_to_sink:
G.rm_state(n)
return G
def ikoa(sequences, k, num_trials=1):
"""
|———— Algorithm 1: iKoa ————|
Require: sample S, value k
Ensure: deterministic k-OA G with S ⊆ L(G)
1: P ← init(k, S)
2: P ← BaumWelsh(P, S)
3: G ← Disambiguate(P, S)
4: G ← Prune(G, S)
5: return G
"""
for _ in range(num_trials):
G, _ = build_complete_koa(sequences, k)
prob = init_probabilities(G, sequences)
prob = baum_welch(G, prob, sequences, iterations=10)
G2 = G.copy()
prob2 = {s: dict(d) for s, d in prob.items()}
result = disambiguate(G2, prob2, sequences)
if result is not None:
result = prune(result, sequences)
if result.sink_reachable():
return result
return None