"""Tests for BEX paper algorithm implementations.""" import sys sys.path.insert(0, '/home/tobi/Desktop/kesai/ProjectManagement/companyweb') from bex.soa import SOA from bex.twotinf import build_soa from bex.rwr0 import rwr0 from bex.crx import CRX from bex.idregex import is_deterministic, idregex from bex.expr import concat, disj, star, optional, alphabet, strip_k from bex.koa import KOA, build_complete_koa from bex.marking import mark_koa from bex.rwrsq import rwr_sq, strip from bex.ikoa import ikoa def test_soa_basics(): G = SOA() a = G.add_state('a') b = G.add_state('b') G.add_edge(G.src, a) G.add_edge(a, b) G.add_edge(b, G.sink) assert G.accept(['a', 'b']) assert not G.accept(['a']) assert not G.accept(['b']) assert not G.accept(['a', 'b', 'c']) print(" PASS test_soa_basics") def test_soa_contract(): G = SOA() a = G.add_state('a') b = G.add_state('b') G.add_edge(G.src, a) G.add_edge(a, b) G.add_edge(b, G.sink) G.contract(a, b, concat('a', 'b')) assert G.is_final() assert G.expression() == 'a.b' print(" PASS test_soa_contract") def test_soa_epsilon_closure(): G = SOA() a = G.add_state('a') b = G.add_state('a+') G.add_edge(G.src, a) G.add_edge(a, b) G.add_edge(b, G.sink) G.add_edge(b, b) Gs = G.epsilon_closure() assert Gs.has_edge(b, b) print(" PASS test_soa_epsilon_closure") def test_twotinf(): seqs = [['a', 'b', 'c'], ['a', 'c']] G = build_soa(seqs) assert G.accept(['a', 'b', 'c']) assert G.accept(['a', 'c']) assert not G.accept(['b', 'c']) print(" PASS test_twotinf") def test_rwr0_concat(): G = SOA() a = G.add_state('a') b = G.add_state('b') G.add_edge(G.src, a) G.add_edge(a, b) G.add_edge(b, G.sink) result = rwr0(G) assert result == 'a.b', f"Expected 'a.b', got {result}" print(" PASS test_rwr0_concat") def test_rwr0_disj(): G = SOA() a = G.add_state('a') b = G.add_state('b') G.add_edge(G.src, a) G.add_edge(G.src, b) G.add_edge(a, G.sink) G.add_edge(b, G.sink) result = rwr0(G) assert result == '(a|b)', f"Expected '(a|b)', got {result}" print(" PASS test_rwr0_disj") def test_rwr0_iteration(): G = SOA() a = G.add_state('a') G.add_edge(G.src, a) G.add_edge(a, G.sink) G.add_edge(a, a) result = rwr0(G) assert result == 'a+', f"Expected 'a+', got {result}" print(" PASS test_rwr0_iteration") def test_rwr0_optional(): G = SOA() a = G.add_state('a') G.add_edge(G.src, a) G.add_edge(a, G.sink) result = rwr0(G) # Single state src→a→sink: language is {a}, not {a,ε} assert result == 'a', f"Expected 'a', got {result}" print(" PASS test_rwr0_optional") def test_rwr0_empty(): G = SOA() result = rwr0(G) assert result == '∅', f"Expected '∅', got {result}" print(" PASS test_rwr0_empty") def test_rwr0_epsilon(): G = SOA() G.add_edge(G.src, G.sink) result = rwr0(G) assert result == 'ε', f"Expected 'ε', got {result}" print(" PASS test_rwr0_epsilon") def test_rwr0_complex_a(): # {abc, ab, ac} is NOT a SORE language (c appears in two roles) G = build_soa([['a', 'b', 'c'], ['a', 'b'], ['a', 'c']]) result = rwr0(G) assert result == '∅', f"Expected ∅ for non-SORE, got {result}" print(" PASS test_rwr0_complex_a: ∅ (non-SORE)") def test_rwr0_disj_concat(): """a·b and a·c share Pred/Succ for b,c after processing.""" G = build_soa([['a', 'b'], ['a', 'c']]) result = rwr0(G) assert result is not None print(f" PASS test_rwr0_disj_concat: {result}") def test_crx_simple(): crx = CRX() result = crx.infer([['a', 'b'], ['a', 'b', 'c']]) assert result is not None and result != '∅' assert 'a' in result assert 'b' in result print(f" PASS test_crx_simple: {result}") def test_crx_example(): """Example from TODS paper: S = {abccde, cccad, bfegg, bfehi}""" crx = CRX() S = [ ['a', 'b', 'c', 'c', 'd', 'e'], ['c', 'c', 'c', 'a', 'd'], ['b', 'f', 'e', 'g', 'g'], ['b', 'f', 'e', 'h', 'i'], ] result = crx.infer(S) assert result is not None assert '(' in result # should have disjunction factors print(f" PASS test_crx_example: {result}") def test_crx_cycle_class(): """Symbols a,b,c form a cycle in S = {abc, bca, cab}.""" crx = CRX() S = [['a', 'b', 'c'], ['b', 'c', 'a'], ['c', 'a', 'b']] result = crx.infer(S) assert result is not None assert 'a' in result and 'b' in result and 'c' in result print(f" PASS test_crx_cycle_class: {result}") def test_determinism_check(): assert is_deterministic('a.b') assert is_deterministic('a+') assert is_deterministic('(a|b)') assert not is_deterministic('(a|a)') print(" PASS test_determinism_check") def test_marking(): G = KOA(k=2) a1 = G.add_state('a_1') a2 = G.add_state('a_2') G.add_edge(G.src, a1) G.add_edge(a1, a2) G.add_edge(a2, G.sink) H = mark_koa(G) assert H.label(a1) == 'a_1' assert H.label(a2) == 'a_2' assert H.accept(['a_1', 'a_2']) print(" PASS test_marking") def test_strip(): assert strip('a_1.b_1') == 'a.b' assert strip('(a_1|b_1)+') == '(a|b)+' print(" PASS test_strip") def test_expr_utils(): assert concat('a', 'b') == 'a.b' assert disj('a', 'b') == '(a|b)' assert star('a') == 'a+' assert optional('a') == 'a?' assert optional('a.b') == '(a.b)?' assert alphabet('a.b') == {'a', 'b'} assert alphabet('(a|b)+') == {'a', 'b'} assert strip_k('a_1') == 'a' print(" PASS test_expr_utils") def test_idregex_deterministic(): """iDRegEx should produce a deterministic expression for simple data.""" seqs = [['a', 'b'], ['a'], ['a', 'b', 'c']] result = idregex(seqs, kmax=2, N=2) if result is None: print(" SKIP test_idregex_deterministic (returned None)") return assert is_deterministic(result), f"Non-deterministic: {result}" print(f" PASS test_idregex_deterministic: {result}") def test_complete_koa(): G, states = build_complete_koa([['a', 'b'], ['a']], k=2) assert G.count_symbol('a') == 2 assert G.count_symbol('b') == 2 assert G.has_edge(G.src, G.sink) print(" PASS test_complete_koa") def run_all(): tests = [ test_soa_basics, test_soa_contract, test_soa_epsilon_closure, test_twotinf, test_rwr0_concat, test_rwr0_disj, test_rwr0_iteration, test_rwr0_optional, test_rwr0_empty, test_rwr0_epsilon, test_rwr0_complex_a, test_rwr0_disj_concat, test_crx_simple, test_crx_example, test_crx_cycle_class, test_determinism_check, test_marking, test_strip, test_expr_utils, test_idregex_deterministic, test_complete_koa, ] passed = 0 failed = 0 for t in tests: try: t() passed += 1 except Exception as e: print(f" FAIL {t.__name__}: {e}") failed += 1 print(f"\n{passed} passed, {failed} failed") # ── Integration tests with real Ansible task data ── def test_integration_quartz_deploy(): """Simple linear sequence — all tasks always in same order.""" seqs = [ ['file', 'template', 'docker_image', 'command', 'set_fact', 'shell', 'wait_for'], ['file', 'template', 'docker_image', 'command', 'set_fact', 'shell', 'wait_for'], ] crx = CRX() result = crx.infer(seqs) assert result is not None assert all(t in result for t in ['file', 'template', 'docker_image', 'command', 'set_fact', 'shell', 'wait_for']) print(f" PASS quartz_deploy: {result}") def test_integration_validate_system(): """Optional shell tasks.""" seqs = [ ['shell', 'debug', 'shell', 'debug'], ['shell', 'debug', 'shell', 'debug', 'shell', 'debug'], ['shell', 'debug'], ] crx = CRX() result = crx.infer(seqs) assert result is not None assert 'shell' in result and 'debug' in result print(f" PASS validate_system: {result}") def test_integration_docker_detect_branch(): """Branching: docker compose v2 check or v1 fallback.""" seqs = [ ['file', 'template', 'command_v2', 'set_fact', 'shell', 'wait_for'], ['file', 'template', 'command_v1', 'set_fact', 'shell', 'wait_for'], ] crx = CRX() result = crx.infer(seqs) assert result is not None assert 'file' in result and 'template' in result and 'shell' in result print(f" PASS docker_detect: {result}") def test_integration_firewall_gating(): """Conditional firewall rule sequence (gated).""" seqs = [ ['assert', 'file', 'template', 'shell', 'wait_for'], ['assert', 'file', 'template', 'command_fw', 'command_fw', 'shell', 'wait_for'], ['assert', 'file', 'template', 'command_fw', 'shell', 'wait_for'], ] crx = CRX() result = crx.infer(seqs) assert result is not None assert 'assert' in result and 'file' in result print(f" PASS firewall_gating: {result}") def test_integration_idregex_linear(): """iDRegEx on simple linear sequences.""" seqs = [ ['assert', 'file', 'template', 'command', 'set_fact', 'shell', 'wait_for'], ['assert', 'file', 'template', 'command', 'set_fact', 'shell'], ] try: result = idregex(seqs, kmax=2, N=3) if result: assert is_deterministic(result) print(f" PASS idregex_linear: {result}") else: print(" SKIP idregex_linear (returned None)") except Exception as e: print(f" FAIL idregex_linear: {e}") def test_integration_ikoa_linear(): """iKoa + rwr² on simple linear sequences.""" from bex.ikoa import ikoa from bex.rwrsq import rwr_sq seqs = [ ['assert', 'file', 'template', 'command', 'set_fact', 'shell', 'wait_for'], ['assert', 'file', 'template', 'command', 'set_fact', 'shell'], ] G = ikoa(seqs, k=3) if G is None: print(" SKIP ikoa_linear (returned None)") return expr = rwr_sq(G) assert expr is not None print(f" PASS ikoa_linear: {expr}") def test_integration_backup_restic(): """Sequence with loop (systemd enable).""" seqs = [ ['package', 'assert', 'file', 'template', 'template', 'template', 'template', 'template', 'template', 'systemd', 'systemd', 'systemd'], ['package', 'assert', 'file', 'template', 'template', 'template', 'template', 'template', 'template', 'systemd'], ] crx = CRX() result = crx.infer(seqs) assert result is not None print(f" PASS backup_restic: {result}") def run_all(): tests = [ test_soa_basics, test_soa_contract, test_soa_epsilon_closure, test_twotinf, test_rwr0_concat, test_rwr0_disj, test_rwr0_iteration, test_rwr0_optional, test_rwr0_empty, test_rwr0_epsilon, test_rwr0_complex_a, test_rwr0_disj_concat, test_crx_simple, test_crx_example, test_crx_cycle_class, test_determinism_check, test_marking, test_strip, test_expr_utils, test_idregex_deterministic, test_complete_koa, test_integration_quartz_deploy, test_integration_validate_system, test_integration_docker_detect_branch, test_integration_firewall_gating, test_integration_idregex_linear, test_integration_ikoa_linear, test_integration_backup_restic, ] passed = 0 failed = 0 for t in tests: try: t() passed += 1 except Exception as e: print(f" FAIL {t.__name__}: {e}") failed += 1 print(f"\n{passed} passed, {failed} failed") if __name__ == '__main__': run_all()