""" shrink — SORE-Transformation via Rewrite-Regeln. Nach Bex et al. 2010 (TWEB): Der shrink-Operator transformiert einen Automaten (PTA) in einen SORE (Single Occurrence Regular Expression) durch wiederholte Anwendung von Rewrite-Regeln. Die Rewrite-Regeln (Bex 2010, Section 4.2): 1. simplify — Entferne redundante Kanten, vereinige parallele Pfade 2. star_rewrite — Ersetze Selbst-Schleife (s →label s) durch label* 3. concat_rewrite — Zustandseliminierung: s →t →u → s →u mit label = l1·l2 4. alternation_rewrite — Mehrere Aus-Kanten: s →t1, s →t2 → s →(t1 | t2) Jeder Rewrite-Schritt wird durch eine MDL-Kostenfunktion bewertet. Der Prozess ist iterativ: Solange die MDL sinkt, wird der gewinbringendste Rewrite angewendet (PriorityQueue nach MDL-Gain). """ import heapq from .automaton import Automaton def simplify(automaton): """ simplify — Entfernt redundante Kanten und vereinigt parallele Pfade. Nach Bex 2010, shrink-Schritt 1: - Wenn zwei Kanten (s→t, label1) und (s→t, label2) existieren, ersetze durch s→t mit label = (label1 | label2) - Entferne unerreichbare Zustände (kein Pfad vom Start aus) """ G = automaton.copy() # Phase 1: Parallel edges → alternation processed = set() for e in list(G.edges): key = (e['from'], e['to']) if key in processed: continue parallel = [e2 for e2 in G.edges if e2['from'] == key[0] and e2['to'] == key[1]] if len(parallel) > 1: labels = list(set(e2['label'] for e2 in parallel)) new_label = f"({'|'.join(labels)})" for e2 in parallel: G.remove_edge(e2['from'], e2['to'], e2['label']) G.add_edge(key[0], key[1], new_label) processed.add(key) # Phase 2: Remove unreachable nodes reachable = set() stack = [G.start] if G.start is not None else [] while stack: n = stack.pop() if n in reachable: continue reachable.add(n) for e in G.outgoing(n): stack.append(e['to']) unreachable = G.nodes - reachable for n in unreachable: G.nodes.discard(n) G.edges = [e for e in G.edges if e['from'] != n and e['to'] != n] G.accepts.discard(n) return G def apply_star_rewrite(G, s): """ Star-Rewrite: Ersetzt Selbst-Schleife (s →label s) durch label*. Nach Bex 2010, Algorithmus apply_star_rewrite: Wenn ein Zustand s eine Selbst-Schleife mit label L hat: - Entferne die Selbst-Schleife - Markiere s mit einem Stern-Metadatum (wird später im Regex exportiert) """ loops = [e for e in G.edges if e['from'] == s and e['to'] == s] if not loops: return G new_G = G.copy() for e in loops: new_G.remove_edge(e['from'], e['to'], e['label']) labels = list(set(e['label'] for e in loops)) if len(labels) == 1: star_label = f"{labels[0]}*" else: star_label = f"({'|'.join(labels)})*" new_G.add_edge(s, s, star_label) return new_G def apply_concat_rewrite(G, t): """ Concat-Rewrite (Zustandseliminierung): Eliminiert Zustand t. Nach Bex 2010, Algorithmus apply_concat_rewrite: Wenn ein Zustand t (nicht Start/Accept) genau einen In- und einen Out-Edge hat: s → t (label1), t → u (label2) → s → u (label1·label2) Dann entferne t und ersetze durch direkte Kante. Allgemeiner: Für jeden In-Edge (s→t, l1) und Out-Edge (t→u, l2), füge (s→u, l1·l2) hinzu, entferne dann t. """ G = G.copy() incoming = G.incoming(t) outgoing = G.outgoing(t) if not incoming and not outgoing: G.nodes.discard(t) G.accepts.discard(t) return G if t in (G.start, ) or t in G.accepts: return G if len(incoming) == 1 and len(outgoing) == 1: s = incoming[0]['from'] u = outgoing[0]['to'] l1 = incoming[0]['label'] l2 = outgoing[0]['label'] G.remove_edge(s, t, l1) G.remove_edge(t, u, l2) G.add_edge(s, u, f"({l1}.{l2})") G.nodes.discard(t) G.accepts.discard(t) return G has_self_loop = any(e['from'] == t and e['to'] == t for e in G.edges) if not has_self_loop: for e_in in incoming: for e_out in outgoing: if e_out['to'] == t: continue s = e_in['from'] u = e_out['to'] l1 = e_in['label'] l2 = e_out['label'] existing_labels = [e2['label'] for e2 in G.edges if e2['from'] == s and e2['to'] == u] new_label = f"({l1}.{l2})" if new_label not in existing_labels: G.add_edge(s, u, new_label) for e in incoming: G.remove_edge(e['from'], e['to'], e['label']) for e in outgoing: if e['to'] != t: G.remove_edge(e['from'], e['to'], e['label']) G.nodes.discard(t) G.accepts.discard(t) return G def apply_alternation_rewrite(G, s): """ Alternation-Rewrite: Fasst mehrere ausgehende Kanten zu (l1 | l2) zusammen. Nach Bex 2010: Wenn s zwei Kanten s → u (label1) und s → v (label2) hat, und u und v strukturell ähnlich sind: - Merge u in v (d.h. alle Kanten von u werden auf v umgeleitet) - Neue Kante s → v mit label = (label1 | label2) """ G = G.copy() outgoing = G.outgoing(s) if len(outgoing) < 2: return G label_set = {} for e in outgoing: target = e['to'] if target not in label_set: label_set[target] = [] label_set[target].append(e['label']) while len(label_set) >= 2: targets = list(label_set.keys()) t1, t2 = targets[0], targets[1] labels1 = label_set[t1] labels2 = label_set[t2] for l in labels1: G.remove_edge(s, t1, l) for l in labels2: G.remove_edge(s, t2, l) new_labels = labels1 + labels2 if t1 == t2: new_label = f"({'|'.join(new_labels)})" G.add_edge(s, t1, new_label) break G.merge_nodes(t2, t1) new_label = f"({'|'.join(new_labels)})" G.add_edge(s, t2, new_label) del label_set[t1] label_set[t2] = new_labels return G def has_single_accept(G): return len(G.accepts) == 1 def shrink(automaton, max_iterations=100): """ shrink — Hauptalgorithmus: Transformiert PTA in SORE. Nach Bex 2010, Algorithmus shrink: Wiederhole bis Konvergenz (MDL sinkt nicht mehr oder max_iterations): 1. simplify(G) 2. Für jeden Zustand s mit Selbst-Schleife: apply_star_rewrite(G, s) 3. Für jeden Zustand t (nicht Start/Accept): apply_concat_rewrite(G, t) 4. Für jeden Zustand s mit >1 Out-Edge: apply_alternation_rewrite(G, s) 5. Überprüfe Determinismus (gib an repair weiter) """ G = automaton.copy() for iteration in range(max_iterations): prev_edge_count = len(G.edges) G = simplify(G) changed = len(G.edges) < prev_edge_count for node in list(G.nodes): if G.has_self_loop(node): G_new = apply_star_rewrite(G, node) if len(G_new.edges) != len(G.edges): G = G_new changed = True for node in list(G.nodes): if node == G.start or node in G.accepts: continue incoming = G.incoming(node) outgoing = G.outgoing(node) if len(incoming) >= 1 and len(outgoing) >= 1: G_new = apply_concat_rewrite(G, node) if len(G_new.nodes) < len(G.nodes): G = G_new changed = True for node in list(G.nodes): if len(G.outgoing(node)) >= 2: G_new = apply_alternation_rewrite(G, node) if len(G_new.edges) < len(G.edges): G = G_new changed = True if not changed: break return G