"""iKoa — Algorithm 1 (arXiv 1004.2372) with Disambiguate (Algorithm 2).""" from collections import deque, defaultdict import random from .koa import KOA, build_complete_koa from .baum_welch import init_probabilities, baum_welch, baum_welch_fixed def disambiguate(G, prob, sequences): """ |---- Algorithm 2: Disambiguate ----| Require: POMM P=(G,alpha) and sample S Ensure: deterministic k-OA """ sigma = set() for seq in sequences: for sym in seq: sigma.add(sym) bw_iter = 2 if len(sigma) <= 7 else 3 Q = deque([G.src]) for s in G._succ.get(G.src, set()): if prob.get(G.src, {}).get(s, 0) > 0: Q.append(s) D = set() from .expr import strip_k while Q: s = Q.popleft() while True: lab_groups = defaultdict(list) for t in list(G._succ.get(s, set())): l = G.label(t) if l: lab_groups[strip_k(l)].append(t) multi = [(lab, ts) for lab, ts in lab_groups.items() if len(ts) > 1] if not multi: break for lab, targets in multi: t_max = max(targets, key=lambda t: prob.get(s, {}).get(t, 0)) total_p = sum(prob.get(s, {}).get(t, 0) for t in targets) if total_p > 0 and t_max in prob.get(s, {}): prob[s][t_max] = total_p for t in targets: if t != t_max: G.rm_edge(s, t) if t in prob.get(s, {}): prob[s][t] = 0.0 prob = baum_welch_fixed(G, prob, sequences, bw_iter) for seq in sequences: if not G.accept(seq): return None D.add(s) for t in list(G._succ.get(s, set())): if t not in D and t != G.sink: Q.append(t) return G def prune(G, sequences): """Prune (iKoa line 4). Remove edges without witnesses in S. Also removes states s ∈ Succ(src) without a witness. """ from .expr import strip_k as _sk witnessed = set() for seq in sequences: if not seq: witnessed.add((G.src, G.sink)) continue cur = {G.src} for sym in seq: nxt = set() for s in cur: for t in G._succ.get(s, set()): lab = G.label(t) if lab and _sk(lab) == sym: nxt.add(t) witnessed.add((s, t)) cur = nxt for s in cur: if G.has_edge(s, G.sink): witnessed.add((s, G.sink)) for s in list(G._succ.keys()): for t in list(G._succ.get(s, set())): if (s, t) not in witnessed: G.rm_edge(s, t) r_from_src = set() q = [G.src] while q: s = q.pop() if s in r_from_src: continue r_from_src.add(s) q.extend(G._succ.get(s, set())) r_to_sink = set() q = [G.sink] while q: s = q.pop() if s in r_to_sink: continue r_to_sink.add(s) q.extend(G._pred.get(s, set())) for n in list(G._succ.keys()): if n in (G.src, G.sink): continue if n not in r_from_src or n not in r_to_sink: G.rm_state(n) return G def ikoa(sequences, k, num_trials=1): """ |———— Algorithm 1: iKoa ————| Require: sample S, value k Ensure: deterministic k-OA G with S ⊆ L(G) 1: P ← init(k, S) 2: P ← BaumWelsh(P, S) 3: G ← Disambiguate(P, S) 4: G ← Prune(G, S) 5: return G """ for _ in range(num_trials): G, _ = build_complete_koa(sequences, k) prob = init_probabilities(G, sequences) prob = baum_welch(G, prob, sequences, iterations=10) G2 = G.copy() prob2 = {s: dict(d) for s, d in prob.items()} result = disambiguate(G2, prob2, sequences) if result is not None: result = prune(result, sequences) if result.sink_reachable(): return result return None