""" kore — k-ORE Inference (iDRegEx) nach Bex et al. 2008/2010. iDRegEx (Bex 2008): 1. Prefix-Tree Automaton (PTA) aus Beispielsequenzen 2. Shrink: Rewrite-Regeln generalisieren den Automaten (simplify → star_rewrite → concat_rewrite → alternation_rewrite) 3. Repair: Stelle Determinismus nach jedem Rewrite-Durchlauf wieder her 4. Convert: Überführe den Automaten in einen regulären Ausdruck (State-Elimination nach Brzozowski & McCluskey) 5. k-ORE Prüfung: Der Ausdruck muss die k-Occurrence-Bedingung erfüllen (jedes Symbol maximal k-mal nennenswert) 6. MDL: Wähle k mit minimalem MDL-Score """ from .automaton import Automaton from .pta import build_pta from .shrink import shrink from .repair import repair from .mdl import mdl_score def _state_elimination(G): """ State Elimination nach Brzozowski & McCluskey. Entfernt nacheinander alle Nicht-Start/Accept-Zustände. Für jeden eliminierten Zustand q: - Für jedes Paar (p, r) mit p→q (Label A) und q→r (Label B): - R_self_q = disjunktion aller Selbst-Schleifen auf q - Neues Label = A · (R_self_q)* · B - Füge Kante p → r mit dem neuen Label hinzu (oder merge mit existierender) Nach Elimination: Nur Start- und Accept-Zustände bleiben. Der Ausdruck ist: summe aller Pfade von Start zu Accept. """ G = G.copy() eliminated = set() # Wiederhole bis nur Start + Accepts übrig sind changed = True while changed: changed = False # Wähle einen Zustand zur Elimination (nicht Start, nicht Accept) for q in list(G.nodes): if q == G.start or q in G.accepts: continue if q in eliminated: continue reachable = _is_reachable_to_accept(G, q) if not reachable: G.nodes.discard(q) G.accepts.discard(q) G.edges = [e for e in G.edges if e['from'] != q and e['to'] != q] eliminated.add(q) changed = True continue incoming = G.incoming(q) outgoing = G.outgoing(q) # R_self_q = (a1 | a2 | ...)* für alle Selbst-Schleifen auf q self_loops = [e for e in outgoing if e['to'] == q] outgoing_no_self = [e for e in outgoing if e['to'] != q] if not outgoing_no_self: # Sackgasse, keine Outgoing-Kanten (außer self-loop) # Entferne eingehende Kanten + q for e in incoming: G.remove_edge(e['from'], e['to'], e['label']) G.nodes.discard(q) G.accepts.discard(q) eliminated.add(q) changed = True continue if self_loops: self_labels = list(set(e['label'] for e in self_loops)) if len(self_labels) == 1: R_self_q = f"({self_labels[0]})*" else: R_self_q = f"({'|'.join(self_labels)})*" else: R_self_q = "" # Für jedes Paar (p, r): p→q (incoming), q→r (outgoing, r != q) for e_in in incoming: p = e_in['from'] if p == q: continue A = e_in['label'] for e_out in outgoing_no_self: r = e_out['to'] B = e_out['label'] if R_self_q: new_label = f"({A}.{R_self_q}.{B})" else: new_label = f"({A}.{B})" # Merge mit existierender Kante p→r wenn vorhanden existing = [e for e in G.edges if e['from'] == p and e['to'] == r] existing_labels = [e['label'] for e in existing] if new_label not in existing_labels and f"({new_label})" not in existing_labels: # Vereinige mit existierenden Labels via | if existing: old_label = existing[0]['label'] merged = f"({old_label}|{new_label})" G.remove_edge(p, r, old_label) G.add_edge(p, r, merged) else: G.add_edge(p, r, new_label) # Lösche q und alle seine Kanten for e in incoming: G.remove_edge(e['from'], e['to'], e['label']) for e in self_loops: G.remove_edge(e['from'], e['to'], e['label']) for e in outgoing_no_self: G.remove_edge(e['from'], e['to'], e['label']) G.nodes.discard(q) G.accepts.discard(q) eliminated.add(q) changed = True break return G def _is_reachable_to_accept(G, q): """Prüft ob von q aus ein Accept-Zustand erreichbar ist.""" visited = set() stack = [q] while stack: n = stack.pop() if n in visited: continue visited.add(n) if n in G.accepts: return True for e in G.outgoing(n): stack.append(e['to']) return False def _extract_expression(G): """ Extrahiert den regulären Ausdruck aus dem eliminierten Automaten. Nach Elimination gibt es nur Startzustand und Accept-Zustände. Der Ausdruck ist die Disjunktion aller Pfade von Start zu Accept. """ if G.start is None: return "∅" # Phase 1: State Elimination G_elim = _state_elimination(G) start = G_elim.start if not G_elim.accepts: return "∅" paths = [] outgoing = G_elim.outgoing(start) # Spezialfall: Start ist selbst Accept if start in G_elim.accepts: # Prüfe auf Selbst-Schleife self_edges = [e for e in outgoing if e['to'] == start] non_self = [e for e in outgoing if e['to'] != start] if not non_self and not self_edges: return "ε" if self_edges: self_labels = '|'.join(set(e['label'] for e in self_edges)) paths.append(f"({self_labels})*") # Außer Start → Accept → andere Accepts for e in non_self: target = e['to'] if target in G_elim.accepts: paths.append(e['label']) # Pfade von Start zu Accept-Zuständen for acc in G_elim.accepts: if acc == start: continue # Kante start → acc direct = [e for e in outgoing if e['to'] == acc] for e in direct: paths.append(e['label']) self_loops_start = [e for e in G_elim.outgoing(start) if e['to'] == start] # Weitere Kanten: start → x (wo x != accept) intermediate = [e for e in outgoing if e['to'] not in G_elim.accepts and e['to'] != start] for e in intermediate: # Folge Pfad von intermediate zu accept suffix = _follow_path(G_elim, e['to'], G_elim.accepts, set()) if suffix: paths.append(f"({e['label']}.{suffix})") # Entferne Duplikate paths = list(set(paths)) if not paths: return "ε" if len(paths) == 1: expr = paths[0] else: expr = f"({'|'.join(paths)})" # Vereinfache: Entferne überflüssige Klammern expr = _simplify_expression(expr) return expr def _follow_path(G, start, accepts, visited): """Findet den Pfad von start zu einem Accept.""" if start in accepts: return "ε" if start in visited: return None visited.add(start) outgoing = G.outgoing(start) for e in outgoing: if e['to'] == start: continue suffix = _follow_path(G, e['to'], accepts, visited) if suffix is not None: if suffix == "ε": return e['label'] else: return f"({e['label']}.{suffix})" return None def _simplify_expression(expr): """ Vereinfacht einen regulären Ausdruck. Entfernt überflüssige Klammern, doppelte Operatoren, etc. """ if not expr or expr in ('ε', '∅'): return expr # (ε. X ) → X # (X . ε) → X # ((X)) → X # (a|a) → a simplified = expr while True: prev = simplified simplified = _simplify_once(simplified) if simplified == prev: break return simplified def _simplify_once(expr): """Ein Reduktionsschritt.""" # (ε.X) → X # (X.ε) → X # ((X)) → X # (a|a) → a result = expr # ((X)) → X (doppelte Klammern) import re result = re.sub(r'$$\(([^()]+)\)$$', r'(\1)', result) return result def validate_k_ore(expr, k_index): """ Prüft ob ein Ausdruck die k-Occurrence-Bedingung erfüllt. Ein k-ORE erlaubt jedes Symbol maximal einmal pro k-Indikator, d.h. in jedem Konjunkt (Teilausdruck ohne |) darf jedes Symbol höchstens k-mal vorkommen. Vereinfacht: Zähle Vorkommen jedes eindeutigen Token-Namens im Ausdruck. Wenn ein Token mehr als k-mal vorkommt, ist die Bedingung verletzt. Returns: bool, str: (erfüllt, Grund) """ # Extrahiere alle Token-Namen aus dem Ausdruck tokens = set() for c in '*+?()|.': pass token_names = set() i = 0 while i < len(expr): if expr[i].isalnum() or expr[i] in '/_-': j = i while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'): j += 1 token_names.add(expr[i:j]) i = j else: i += 1 # Zähle Vorkommen token_counts = {} i = 0 while i < len(expr): if expr[i].isalnum() or expr[i] in '/_-': j = i while j < len(expr) and (expr[j].isalnum() or expr[j] in '/_-'): j += 1 token = expr[i:j] token_counts[token] = token_counts.get(token, 0) + 1 i = j else: i += 1 violations = [t for t, c in token_counts.items() if c > k_index] if violations: return False, f"Token {violations} erscheint > {k_index}-mal" return True, "OK" class kOREInference: """ iDRegEx: k-ORE Inferenz via PTA → Shrink → Repair → Expression. Nach Bex et al. 2008: - Baue PTA aus Sequenzen - Shrink: Rewrite-Regeln generalisieren - Repair: Stelle Determinismus wieder her - Convert: Extrahiere regulären Ausdruck via State Elimination - Prüfe k-Occurrence - Wähle k mit MDL """ def __init__(self, k_max=5): self.k_max = k_max def infer(self, sequences): """ Inferiere den besten k-ORE. Returns: (Automaton, expression_string, best_k) oder None """ sequences = [s for s in sequences if s] if not sequences: return None, "∅", 0 best_score = float('inf') best_result = None for k in range(1, self.k_max + 1): try: auto, expr = self._infer_k_expression(sequences, k) if auto is None: continue score = mdl_score(auto, sequences) if score < best_score: best_score = score best_result = (auto, expr, k) except Exception: continue return best_result def _infer_k_expression(self, sequences, k): """Führe iDRegEx für ein spezifisches k durch.""" # 1. PTA bauen pta = build_pta(sequences) # 2. Shrink shrunk = shrink(pta, max_iterations=20) # 3. Repair repaired = repair(shrunk) # 4. Expression extrahieren expr = _extract_expression(repaired) # 5. k-ORE Prüfung valid, _ = validate_k_ore(expr, k) if not valid: expr = self._generalize_to_k_ore(expr, k) return repaired, expr def _generalize_to_k_ore(self, expr, k): """ Generalisiere den Ausdruck zur k-ORE. Wenn Token t mehr als k-mal vorkommt: - Ersetze Wiederholungen durch t+ oder t* """ # Einfache Heuristik: Extrahiere Token, zähle, ersetze result = expr token_counts = {} i = 0 while i < len(result): if result[i].isalnum() or result[i] in '/_-': j = i while j < len(result) and (result[j].isalnum() or result[j] in '/_-'): j += 1 token = result[i:j] token_counts[token] = token_counts.get(token, 0) + 1 i = j else: i += 1 for token, count in token_counts.items(): if count > k: # Ersetze token.token durch token+ import re pattern = re.escape(token) + r'\..' + re.escape(token) replacement = f"{token}+" result = re.sub(pattern, replacement, result, count=1) break return result