grammar-inference-engine/bex/rwr0.py
tobjend 7c00c6713d Initial commit: BEX-based grammar inference engine
- CRX: direct CHARE inference (Algorithm 7, TODS 2010)
- iDRegEx: k-ORE inference (Algorithm 4, arXiv 2010)
- RWR₀: SORE repair (Algorithm 6, TODS 2010)
- rwr²: k-ORE extraction (Algorithm 3, arXiv 2010)
- SOA, k-OA, iKoa, 2T-INF, Baum-Welch
- Ansible role grammar adapter
- Generic YAML key-path converter
- 28 tests, all passing
2026-07-01 08:01:16 +02:00

224 lines
7.1 KiB
Python

"""RWR₀ — Algorithm 6 (TODS 2010), conference version rules (Figure 10 + Figure 13).
Precedence: CONCATENATION > DISJUNCTION > SELF-LOOP > OPTIONAL
Repair precedence: ENABLE-DISJUNCTION > ENABLE-OPTIONAL-1 > ENABLE-OPTIONAL-2
Conditions checked on ε-closure G* (Definition 25).
Used as rwr²₁ in arXiv 1004.2372 for k>1.
"""
from .soa import SOA
from .expr import concat, disj, star, optional
def _find_concat(G, Gs):
"""Figure 10 CONCATENATION rule, checked on G*.
Check four variants with priority: r·s > r?·s|r·s? > r?·s?
r·s: Succ(r)={s} ∧ Pred(s)={r}
r?·s: Succ(r)={s,sink} ∧ Pred(s)={r}
r·s?: Succ(r)={s} ∧ Pred(s)={r,sink}
r?·s?: Succ(r)={s,sink} ∧ Pred(s)={r,sink}
"""
st = G.states()
# Variant 1: r·s (highest priority — check all pairs first)
for r in st:
for s in st:
if r == s:
continue
if Gs.succ(r) == {s} and G.pred(s) == {r}:
return r, s, concat(G.label(r), G.label(s))
# Variants 2-3: r?·s and r·s?
for r in st:
for s in st:
if r == s:
continue
Sr = Gs.succ(r)
Ps = G.pred(s)
if Sr == {s, G.sink} and Ps == {r}:
return r, s, concat(G.label(r), optional(G.label(s)))
if Sr == {s} and Ps == {r, G.sink}:
return r, s, concat(optional(G.label(r)), G.label(s))
# Variant 4: r?·s?
for r in st:
for s in st:
if r == s:
continue
if Gs.succ(r) == {s, G.sink} and G.pred(s) == {r, G.sink}:
return r, s, concat(optional(G.label(r)), optional(G.label(s)))
return None, None, None
def _find_disj(G, Gs):
"""Figure 10 DISJUNCTION rule, checked on G*.
Pred⁺(r)=Pred⁺(s) ∧ Succ⁺(r)=Succ⁺(s)
"""
st = G.states()
for i, r in enumerate(st):
for s in st[i + 1:]:
if G._pred_plus(r) == G._pred_plus(s) and G._succ_plus(r) == G._succ_plus(s):
return r, s, disj(G.label(r), G.label(s))
return None, None, None
def _find_selfloop(G, Gs):
"""Figure 10 SELF-LOOP rule. r ∈ Succ(r) in G (not G*)."""
for r in G.states():
if G.has_edge(r, r):
return r, star(G.label(r))
return None, None
def _find_optional(G):
"""Figure 10 OPTIONAL rule. G contains exactly one non-special node besides src, sink.
Only applies when G is not already final (avoids infinite loop)."""
if G.is_final():
return None, None
if G.num_non_special() == 1:
r = G.states()[0]
return r, optional(G.label(r))
return None, None
def _try_ed(G):
"""ENABLE-DISJUNCTION (Figure 13). When Pred(r)=Pred(s) but Succ(r)≠Succ(s):
add edges to make Succ(r)=Succ(s). Or symmetric for Pred.
"""
st = G.states()
for i, r in enumerate(st):
for s in st[i + 1:]:
if G._pred_plus(r) == G._pred_plus(s) and G._succ_plus(r) != G._succ_plus(s):
merged = G._succ_plus(r) | G._succ_plus(s)
changed = False
for t in merged - G._succ_plus(r):
if not G.has_edge(r, t):
G.add_edge(r, t)
changed = True
for t in merged - G._succ_plus(s):
if not G.has_edge(s, t):
G.add_edge(s, t)
changed = True
if changed:
return True
if G._succ_plus(r) == G._succ_plus(s) and G._pred_plus(r) != G._pred_plus(s):
merged = G._pred_plus(r) | G._pred_plus(s)
changed = False
for p in merged - G._pred_plus(r):
if not G.has_edge(p, r):
G.add_edge(p, r)
changed = True
for p in merged - G._pred_plus(s):
if not G.has_edge(p, s):
G.add_edge(p, s)
changed = True
if changed:
return True
return False
def _try_eo1(G):
"""ENABLE-OPTIONAL-1 (Figure 13). If Succ(r)={s,sink} but Pred(s) has other
predecessors besides r, add Pred(s) to r's predecessors.
"""
for r in G.states():
Sr = G.succ(r)
if G.sink in Sr and len(Sr) == 2:
s = next(x for x in Sr if x != G.sink)
if len(G.pred(s)) > 1:
changed = False
for p in G.pred(s) - {r}:
if not G.has_edge(p, r):
G.add_edge(p, r)
changed = True
if changed:
return True
return False
def _try_eo2(G):
"""ENABLE-OPTIONAL-2 (Figure 13). If Pred(s)={r,sink} but Succ(r) has other
successors besides s, add Succ(r) to s's successors.
"""
for s in G.states():
Ps = G.pred(s)
if G.sink in Ps and len(Ps) == 2:
r = next(x for x in Ps if x != G.sink)
if len(G.succ(r)) > 1:
changed = False
for t in G.succ(r) - {s}:
if not G.has_edge(s, t):
G.add_edge(s, t)
changed = True
if changed:
return True
return False
def rwr0(G):
"""
|———— Algorithm 6: RWR₀ ————|
Input: SOA G
Output: SORE r (or ∅ on failure)
1: if sink not reachable: return ∅
2: if E(G)={(src,sink)}: return ε
3: while not done:
4: if rewrite (Figure 10) applicable:
5: apply with precedence: CONCAT > DISJ > SELF-LOOP > OPTIONAL
6: elif repair (Figure 13) applicable:
7: apply with precedence: ED > EO1 > EO2
8: else: done
9: if final: return r else return ∅
"""
G = G.copy()
if not G.sink_reachable():
return ''
if G.num_non_special() == 0 and G.has_edge(G.src, G.sink):
return 'ε'
done = False
while not done:
applied = False
Gs = G.epsilon_closure()
r, s, lab = _find_concat(G, Gs)
if r is not None:
G.contract(r, s, lab)
applied = True
if not applied:
Gs = G.epsilon_closure()
r, s, lab = _find_disj(G, Gs)
if r is not None:
G.contract(r, s, lab)
applied = True
if not applied:
Gs = G.epsilon_closure()
r, lab = _find_selfloop(G, Gs)
if r is not None:
t = G.contract_single(r, lab)
G.rm_edge(t, t)
applied = True
if not applied:
r, lab = _find_optional(G)
if r is not None:
G.contract_single(r, lab)
if not G.has_edge(G.src, G.sink):
G.add_edge(G.src, G.sink)
applied = True
if not applied:
applied = _try_ed(G)
if not applied:
applied = _try_eo1(G)
if not applied:
applied = _try_eo2(G)
if not applied:
done = True
if G.is_final():
return G.expression()
return ''