grammar-inference-engine/bex/rwr0.py

225 lines
7.1 KiB
Python
Raw Normal View History

"""RWR₀ — Algorithm 6 (TODS 2010), conference version rules (Figure 10 + Figure 13).
Precedence: CONCATENATION > DISJUNCTION > SELF-LOOP > OPTIONAL
Repair precedence: ENABLE-DISJUNCTION > ENABLE-OPTIONAL-1 > ENABLE-OPTIONAL-2
Conditions checked on ε-closure G* (Definition 25).
Used as rwr²₁ in arXiv 1004.2372 for k>1.
"""
from .soa import SOA
from .expr import concat, disj, star, optional
def _find_concat(G, Gs):
"""Figure 10 CONCATENATION rule, checked on G*.
Check four variants with priority: r·s > r?·s|r·s? > r?·s?
r·s: Succ(r)={s} Pred(s)={r}
r?·s: Succ(r)={s,sink} Pred(s)={r}
r·s?: Succ(r)={s} Pred(s)={r,sink}
r?·s?: Succ(r)={s,sink} Pred(s)={r,sink}
"""
st = G.states()
# Variant 1: r·s (highest priority — check all pairs first)
for r in st:
for s in st:
if r == s:
continue
if Gs.succ(r) == {s} and G.pred(s) == {r}:
return r, s, concat(G.label(r), G.label(s))
# Variants 2-3: r?·s and r·s?
for r in st:
for s in st:
if r == s:
continue
Sr = Gs.succ(r)
Ps = G.pred(s)
if Sr == {s, G.sink} and Ps == {r}:
return r, s, concat(G.label(r), optional(G.label(s)))
if Sr == {s} and Ps == {r, G.sink}:
return r, s, concat(optional(G.label(r)), G.label(s))
# Variant 4: r?·s?
for r in st:
for s in st:
if r == s:
continue
if Gs.succ(r) == {s, G.sink} and G.pred(s) == {r, G.sink}:
return r, s, concat(optional(G.label(r)), optional(G.label(s)))
return None, None, None
def _find_disj(G, Gs):
"""Figure 10 DISJUNCTION rule, checked on G*.
Pred(r)=Pred(s) Succ(r)=Succ(s)
"""
st = G.states()
for i, r in enumerate(st):
for s in st[i + 1:]:
if G._pred_plus(r) == G._pred_plus(s) and G._succ_plus(r) == G._succ_plus(s):
return r, s, disj(G.label(r), G.label(s))
return None, None, None
def _find_selfloop(G, Gs):
"""Figure 10 SELF-LOOP rule. r ∈ Succ(r) in G (not G*)."""
for r in G.states():
if G.has_edge(r, r):
return r, star(G.label(r))
return None, None
def _find_optional(G):
"""Figure 10 OPTIONAL rule. G contains exactly one non-special node besides src, sink.
Only applies when G is not already final (avoids infinite loop)."""
if G.is_final():
return None, None
if G.num_non_special() == 1:
r = G.states()[0]
return r, optional(G.label(r))
return None, None
def _try_ed(G):
"""ENABLE-DISJUNCTION (Figure 13). When Pred(r)=Pred(s) but Succ(r)≠Succ(s):
add edges to make Succ(r)=Succ(s). Or symmetric for Pred.
"""
st = G.states()
for i, r in enumerate(st):
for s in st[i + 1:]:
if G._pred_plus(r) == G._pred_plus(s) and G._succ_plus(r) != G._succ_plus(s):
merged = G._succ_plus(r) | G._succ_plus(s)
changed = False
for t in merged - G._succ_plus(r):
if not G.has_edge(r, t):
G.add_edge(r, t)
changed = True
for t in merged - G._succ_plus(s):
if not G.has_edge(s, t):
G.add_edge(s, t)
changed = True
if changed:
return True
if G._succ_plus(r) == G._succ_plus(s) and G._pred_plus(r) != G._pred_plus(s):
merged = G._pred_plus(r) | G._pred_plus(s)
changed = False
for p in merged - G._pred_plus(r):
if not G.has_edge(p, r):
G.add_edge(p, r)
changed = True
for p in merged - G._pred_plus(s):
if not G.has_edge(p, s):
G.add_edge(p, s)
changed = True
if changed:
return True
return False
def _try_eo1(G):
"""ENABLE-OPTIONAL-1 (Figure 13). If Succ(r)={s,sink} but Pred(s) has other
predecessors besides r, add Pred(s) to r's predecessors.
"""
for r in G.states():
Sr = G.succ(r)
if G.sink in Sr and len(Sr) == 2:
s = next(x for x in Sr if x != G.sink)
if len(G.pred(s)) > 1:
changed = False
for p in G.pred(s) - {r}:
if not G.has_edge(p, r):
G.add_edge(p, r)
changed = True
if changed:
return True
return False
def _try_eo2(G):
"""ENABLE-OPTIONAL-2 (Figure 13). If Pred(s)={r,sink} but Succ(r) has other
successors besides s, add Succ(r) to s's successors.
"""
for s in G.states():
Ps = G.pred(s)
if G.sink in Ps and len(Ps) == 2:
r = next(x for x in Ps if x != G.sink)
if len(G.succ(r)) > 1:
changed = False
for t in G.succ(r) - {s}:
if not G.has_edge(s, t):
G.add_edge(s, t)
changed = True
if changed:
return True
return False
def rwr0(G):
"""
| Algorithm 6: RWR₀ |
Input: SOA G
Output: SORE r (or on failure)
1: if sink not reachable: return
2: if E(G)={(src,sink)}: return ε
3: while not done:
4: if rewrite (Figure 10) applicable:
5: apply with precedence: CONCAT > DISJ > SELF-LOOP > OPTIONAL
6: elif repair (Figure 13) applicable:
7: apply with precedence: ED > EO1 > EO2
8: else: done
9: if final: return r else return
"""
G = G.copy()
if not G.sink_reachable():
return ''
if G.num_non_special() == 0 and G.has_edge(G.src, G.sink):
return 'ε'
done = False
while not done:
applied = False
Gs = G.epsilon_closure()
r, s, lab = _find_concat(G, Gs)
if r is not None:
G.contract(r, s, lab)
applied = True
if not applied:
Gs = G.epsilon_closure()
r, s, lab = _find_disj(G, Gs)
if r is not None:
G.contract(r, s, lab)
applied = True
if not applied:
Gs = G.epsilon_closure()
r, lab = _find_selfloop(G, Gs)
if r is not None:
t = G.contract_single(r, lab)
G.rm_edge(t, t)
applied = True
if not applied:
r, lab = _find_optional(G)
if r is not None:
G.contract_single(r, lab)
if not G.has_edge(G.src, G.sink):
G.add_edge(G.src, G.sink)
applied = True
if not applied:
applied = _try_ed(G)
if not applied:
applied = _try_eo1(G)
if not applied:
applied = _try_eo2(G)
if not applied:
done = True
if G.is_final():
return G.expression()
return ''