- CRX: direct CHARE inference (Algorithm 7, TODS 2010) - iDRegEx: k-ORE inference (Algorithm 4, arXiv 2010) - RWR₀: SORE repair (Algorithm 6, TODS 2010) - rwr²: k-ORE extraction (Algorithm 3, arXiv 2010) - SOA, k-OA, iKoa, 2T-INF, Baum-Welch - Ansible role grammar adapter - Generic YAML key-path converter - 28 tests, all passing
224 lines
7.1 KiB
Python
224 lines
7.1 KiB
Python
"""RWR₀ — Algorithm 6 (TODS 2010), conference version rules (Figure 10 + Figure 13).
|
|
|
|
Precedence: CONCATENATION > DISJUNCTION > SELF-LOOP > OPTIONAL
|
|
Repair precedence: ENABLE-DISJUNCTION > ENABLE-OPTIONAL-1 > ENABLE-OPTIONAL-2
|
|
|
|
Conditions checked on ε-closure G* (Definition 25).
|
|
Used as rwr²₁ in arXiv 1004.2372 for k>1.
|
|
"""
|
|
|
|
from .soa import SOA
|
|
from .expr import concat, disj, star, optional
|
|
|
|
|
|
def _find_concat(G, Gs):
|
|
"""Figure 10 CONCATENATION rule, checked on G*.
|
|
|
|
Check four variants with priority: r·s > r?·s|r·s? > r?·s?
|
|
r·s: Succ(r)={s} ∧ Pred(s)={r}
|
|
r?·s: Succ(r)={s,sink} ∧ Pred(s)={r}
|
|
r·s?: Succ(r)={s} ∧ Pred(s)={r,sink}
|
|
r?·s?: Succ(r)={s,sink} ∧ Pred(s)={r,sink}
|
|
"""
|
|
st = G.states()
|
|
# Variant 1: r·s (highest priority — check all pairs first)
|
|
for r in st:
|
|
for s in st:
|
|
if r == s:
|
|
continue
|
|
if Gs.succ(r) == {s} and G.pred(s) == {r}:
|
|
return r, s, concat(G.label(r), G.label(s))
|
|
# Variants 2-3: r?·s and r·s?
|
|
for r in st:
|
|
for s in st:
|
|
if r == s:
|
|
continue
|
|
Sr = Gs.succ(r)
|
|
Ps = G.pred(s)
|
|
if Sr == {s, G.sink} and Ps == {r}:
|
|
return r, s, concat(G.label(r), optional(G.label(s)))
|
|
if Sr == {s} and Ps == {r, G.sink}:
|
|
return r, s, concat(optional(G.label(r)), G.label(s))
|
|
# Variant 4: r?·s?
|
|
for r in st:
|
|
for s in st:
|
|
if r == s:
|
|
continue
|
|
if Gs.succ(r) == {s, G.sink} and G.pred(s) == {r, G.sink}:
|
|
return r, s, concat(optional(G.label(r)), optional(G.label(s)))
|
|
return None, None, None
|
|
|
|
|
|
def _find_disj(G, Gs):
|
|
"""Figure 10 DISJUNCTION rule, checked on G*.
|
|
|
|
Pred⁺(r)=Pred⁺(s) ∧ Succ⁺(r)=Succ⁺(s)
|
|
"""
|
|
st = G.states()
|
|
for i, r in enumerate(st):
|
|
for s in st[i + 1:]:
|
|
if G._pred_plus(r) == G._pred_plus(s) and G._succ_plus(r) == G._succ_plus(s):
|
|
return r, s, disj(G.label(r), G.label(s))
|
|
return None, None, None
|
|
|
|
|
|
def _find_selfloop(G, Gs):
|
|
"""Figure 10 SELF-LOOP rule. r ∈ Succ(r) in G (not G*)."""
|
|
for r in G.states():
|
|
if G.has_edge(r, r):
|
|
return r, star(G.label(r))
|
|
return None, None
|
|
|
|
|
|
def _find_optional(G):
|
|
"""Figure 10 OPTIONAL rule. G contains exactly one non-special node besides src, sink.
|
|
Only applies when G is not already final (avoids infinite loop)."""
|
|
if G.is_final():
|
|
return None, None
|
|
if G.num_non_special() == 1:
|
|
r = G.states()[0]
|
|
return r, optional(G.label(r))
|
|
return None, None
|
|
|
|
|
|
def _try_ed(G):
|
|
"""ENABLE-DISJUNCTION (Figure 13). When Pred(r)=Pred(s) but Succ(r)≠Succ(s):
|
|
add edges to make Succ(r)=Succ(s). Or symmetric for Pred.
|
|
"""
|
|
st = G.states()
|
|
for i, r in enumerate(st):
|
|
for s in st[i + 1:]:
|
|
if G._pred_plus(r) == G._pred_plus(s) and G._succ_plus(r) != G._succ_plus(s):
|
|
merged = G._succ_plus(r) | G._succ_plus(s)
|
|
changed = False
|
|
for t in merged - G._succ_plus(r):
|
|
if not G.has_edge(r, t):
|
|
G.add_edge(r, t)
|
|
changed = True
|
|
for t in merged - G._succ_plus(s):
|
|
if not G.has_edge(s, t):
|
|
G.add_edge(s, t)
|
|
changed = True
|
|
if changed:
|
|
return True
|
|
if G._succ_plus(r) == G._succ_plus(s) and G._pred_plus(r) != G._pred_plus(s):
|
|
merged = G._pred_plus(r) | G._pred_plus(s)
|
|
changed = False
|
|
for p in merged - G._pred_plus(r):
|
|
if not G.has_edge(p, r):
|
|
G.add_edge(p, r)
|
|
changed = True
|
|
for p in merged - G._pred_plus(s):
|
|
if not G.has_edge(p, s):
|
|
G.add_edge(p, s)
|
|
changed = True
|
|
if changed:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _try_eo1(G):
|
|
"""ENABLE-OPTIONAL-1 (Figure 13). If Succ(r)={s,sink} but Pred(s) has other
|
|
predecessors besides r, add Pred(s) to r's predecessors.
|
|
"""
|
|
for r in G.states():
|
|
Sr = G.succ(r)
|
|
if G.sink in Sr and len(Sr) == 2:
|
|
s = next(x for x in Sr if x != G.sink)
|
|
if len(G.pred(s)) > 1:
|
|
changed = False
|
|
for p in G.pred(s) - {r}:
|
|
if not G.has_edge(p, r):
|
|
G.add_edge(p, r)
|
|
changed = True
|
|
if changed:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _try_eo2(G):
|
|
"""ENABLE-OPTIONAL-2 (Figure 13). If Pred(s)={r,sink} but Succ(r) has other
|
|
successors besides s, add Succ(r) to s's successors.
|
|
"""
|
|
for s in G.states():
|
|
Ps = G.pred(s)
|
|
if G.sink in Ps and len(Ps) == 2:
|
|
r = next(x for x in Ps if x != G.sink)
|
|
if len(G.succ(r)) > 1:
|
|
changed = False
|
|
for t in G.succ(r) - {s}:
|
|
if not G.has_edge(s, t):
|
|
G.add_edge(s, t)
|
|
changed = True
|
|
if changed:
|
|
return True
|
|
return False
|
|
|
|
|
|
def rwr0(G):
|
|
"""
|
|
|———— Algorithm 6: RWR₀ ————|
|
|
Input: SOA G
|
|
Output: SORE r (or ∅ on failure)
|
|
|
|
1: if sink not reachable: return ∅
|
|
2: if E(G)={(src,sink)}: return ε
|
|
3: while not done:
|
|
4: if rewrite (Figure 10) applicable:
|
|
5: apply with precedence: CONCAT > DISJ > SELF-LOOP > OPTIONAL
|
|
6: elif repair (Figure 13) applicable:
|
|
7: apply with precedence: ED > EO1 > EO2
|
|
8: else: done
|
|
9: if final: return r else return ∅
|
|
"""
|
|
G = G.copy()
|
|
if not G.sink_reachable():
|
|
return '∅'
|
|
if G.num_non_special() == 0 and G.has_edge(G.src, G.sink):
|
|
return 'ε'
|
|
|
|
done = False
|
|
while not done:
|
|
applied = False
|
|
Gs = G.epsilon_closure()
|
|
|
|
r, s, lab = _find_concat(G, Gs)
|
|
if r is not None:
|
|
G.contract(r, s, lab)
|
|
applied = True
|
|
|
|
if not applied:
|
|
Gs = G.epsilon_closure()
|
|
r, s, lab = _find_disj(G, Gs)
|
|
if r is not None:
|
|
G.contract(r, s, lab)
|
|
applied = True
|
|
|
|
if not applied:
|
|
Gs = G.epsilon_closure()
|
|
r, lab = _find_selfloop(G, Gs)
|
|
if r is not None:
|
|
t = G.contract_single(r, lab)
|
|
G.rm_edge(t, t)
|
|
applied = True
|
|
|
|
if not applied:
|
|
r, lab = _find_optional(G)
|
|
if r is not None:
|
|
G.contract_single(r, lab)
|
|
if not G.has_edge(G.src, G.sink):
|
|
G.add_edge(G.src, G.sink)
|
|
applied = True
|
|
|
|
if not applied:
|
|
applied = _try_ed(G)
|
|
if not applied:
|
|
applied = _try_eo1(G)
|
|
if not applied:
|
|
applied = _try_eo2(G)
|
|
if not applied:
|
|
done = True
|
|
|
|
if G.is_final():
|
|
return G.expression()
|
|
return '∅'
|