grammar-inference-engine/tests/test_bex.py

417 lines
12 KiB
Python

"""Tests for BEX paper algorithm implementations."""
from bex.soa import SOA
from bex.twotinf import build_soa
from bex.rwr0 import rwr0
from bex.crx import CRX
from bex.idregex import is_deterministic, idregex
from bex.expr import concat, disj, star, optional, alphabet, strip_k
from bex.koa import KOA, build_complete_koa
from bex.marking import mark_koa
from bex.rwrsq import rwr_sq, strip
from bex.ikoa import ikoa
def test_soa_basics():
G = SOA()
a = G.add_state('a')
b = G.add_state('b')
G.add_edge(G.src, a)
G.add_edge(a, b)
G.add_edge(b, G.sink)
assert G.accept(['a', 'b'])
assert not G.accept(['a'])
assert not G.accept(['b'])
assert not G.accept(['a', 'b', 'c'])
print(" PASS test_soa_basics")
def test_soa_contract():
G = SOA()
a = G.add_state('a')
b = G.add_state('b')
G.add_edge(G.src, a)
G.add_edge(a, b)
G.add_edge(b, G.sink)
G.contract(a, b, concat('a', 'b'))
assert G.is_final()
assert G.expression() == 'a.b'
print(" PASS test_soa_contract")
def test_soa_epsilon_closure():
G = SOA()
a = G.add_state('a')
b = G.add_state('a+')
G.add_edge(G.src, a)
G.add_edge(a, b)
G.add_edge(b, G.sink)
G.add_edge(b, b)
Gs = G.epsilon_closure()
assert Gs.has_edge(b, b)
print(" PASS test_soa_epsilon_closure")
def test_twotinf():
seqs = [['a', 'b', 'c'], ['a', 'c']]
G = build_soa(seqs)
assert G.accept(['a', 'b', 'c'])
assert G.accept(['a', 'c'])
assert not G.accept(['b', 'c'])
print(" PASS test_twotinf")
def test_rwr0_concat():
G = SOA()
a = G.add_state('a')
b = G.add_state('b')
G.add_edge(G.src, a)
G.add_edge(a, b)
G.add_edge(b, G.sink)
result = rwr0(G)
assert result == 'a.b', f"Expected 'a.b', got {result}"
print(" PASS test_rwr0_concat")
def test_rwr0_disj():
G = SOA()
a = G.add_state('a')
b = G.add_state('b')
G.add_edge(G.src, a)
G.add_edge(G.src, b)
G.add_edge(a, G.sink)
G.add_edge(b, G.sink)
result = rwr0(G)
assert result == '(a|b)', f"Expected '(a|b)', got {result}"
print(" PASS test_rwr0_disj")
def test_rwr0_iteration():
G = SOA()
a = G.add_state('a')
G.add_edge(G.src, a)
G.add_edge(a, G.sink)
G.add_edge(a, a)
result = rwr0(G)
assert result == 'a+', f"Expected 'a+', got {result}"
print(" PASS test_rwr0_iteration")
def test_rwr0_optional():
G = SOA()
a = G.add_state('a')
G.add_edge(G.src, a)
G.add_edge(a, G.sink)
result = rwr0(G)
# Single state src→a→sink: language is {a}, not {a,ε}
assert result == 'a', f"Expected 'a', got {result}"
print(" PASS test_rwr0_optional")
def test_rwr0_empty():
G = SOA()
result = rwr0(G)
assert result == '', f"Expected '', got {result}"
print(" PASS test_rwr0_empty")
def test_rwr0_epsilon():
G = SOA()
G.add_edge(G.src, G.sink)
result = rwr0(G)
assert result == 'ε', f"Expected 'ε', got {result}"
print(" PASS test_rwr0_epsilon")
def test_rwr0_complex_a():
# {abc, ab, ac} is NOT a SORE language (c appears in two roles)
G = build_soa([['a', 'b', 'c'], ['a', 'b'], ['a', 'c']])
result = rwr0(G)
assert result == '', f"Expected ∅ for non-SORE, got {result}"
print(" PASS test_rwr0_complex_a: ∅ (non-SORE)")
def test_rwr0_disj_concat():
"""a·b and a·c share Pred/Succ for b,c after processing."""
G = build_soa([['a', 'b'], ['a', 'c']])
result = rwr0(G)
assert result is not None
print(f" PASS test_rwr0_disj_concat: {result}")
def test_crx_simple():
crx = CRX()
result = crx.infer([['a', 'b'], ['a', 'b', 'c']])
assert result is not None and result != ''
assert 'a' in result
assert 'b' in result
print(f" PASS test_crx_simple: {result}")
def test_crx_example():
"""Example from TODS paper: S = {abccde, cccad, bfegg, bfehi}"""
crx = CRX()
S = [
['a', 'b', 'c', 'c', 'd', 'e'],
['c', 'c', 'c', 'a', 'd'],
['b', 'f', 'e', 'g', 'g'],
['b', 'f', 'e', 'h', 'i'],
]
result = crx.infer(S)
assert result is not None
assert '(' in result # should have disjunction factors
print(f" PASS test_crx_example: {result}")
def test_crx_cycle_class():
"""Symbols a,b,c form a cycle in S = {abc, bca, cab}."""
crx = CRX()
S = [['a', 'b', 'c'], ['b', 'c', 'a'], ['c', 'a', 'b']]
result = crx.infer(S)
assert result is not None
assert 'a' in result and 'b' in result and 'c' in result
print(f" PASS test_crx_cycle_class: {result}")
def test_determinism_check():
assert is_deterministic('a.b')
assert is_deterministic('a+')
assert is_deterministic('(a|b)')
assert not is_deterministic('(a|a)')
print(" PASS test_determinism_check")
def test_marking():
G = KOA(k=2)
a1 = G.add_state('a_1')
a2 = G.add_state('a_2')
G.add_edge(G.src, a1)
G.add_edge(a1, a2)
G.add_edge(a2, G.sink)
H = mark_koa(G)
assert H.label(a1) == 'a_1'
assert H.label(a2) == 'a_2'
assert H.accept(['a_1', 'a_2'])
print(" PASS test_marking")
def test_strip():
assert strip('a_1.b_1') == 'a.b'
assert strip('(a_1|b_1)+') == '(a|b)+'
print(" PASS test_strip")
def test_expr_utils():
assert concat('a', 'b') == 'a.b'
assert disj('a', 'b') == '(a|b)'
assert star('a') == 'a+'
assert optional('a') == 'a?'
assert optional('a.b') == '(a.b)?'
assert alphabet('a.b') == {'a', 'b'}
assert alphabet('(a|b)+') == {'a', 'b'}
assert strip_k('a_1') == 'a'
print(" PASS test_expr_utils")
def test_idregex_deterministic():
"""iDRegEx should produce a deterministic expression for simple data."""
seqs = [['a', 'b'], ['a'], ['a', 'b', 'c']]
result = idregex(seqs, kmax=2, N=2)
if result is None:
print(" SKIP test_idregex_deterministic (returned None)")
return
assert is_deterministic(result), f"Non-deterministic: {result}"
print(f" PASS test_idregex_deterministic: {result}")
def test_complete_koa():
G, states = build_complete_koa([['a', 'b'], ['a']], k=2)
assert G.count_symbol('a') == 2
assert G.count_symbol('b') == 2
assert G.has_edge(G.src, G.sink)
print(" PASS test_complete_koa")
def run_all():
tests = [
test_soa_basics,
test_soa_contract,
test_soa_epsilon_closure,
test_twotinf,
test_rwr0_concat,
test_rwr0_disj,
test_rwr0_iteration,
test_rwr0_optional,
test_rwr0_empty,
test_rwr0_epsilon,
test_rwr0_complex_a,
test_rwr0_disj_concat,
test_crx_simple,
test_crx_example,
test_crx_cycle_class,
test_determinism_check,
test_marking,
test_strip,
test_expr_utils,
test_idregex_deterministic,
test_complete_koa,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
print(f" FAIL {t.__name__}: {e}")
failed += 1
print(f"\n{passed} passed, {failed} failed")
# ── Integration tests with real Ansible task data ──
def test_integration_linear_sequence():
"""Simple linear sequence — all tasks always in same order."""
seqs = [
['file', 'template', 'docker_image', 'command', 'set_fact', 'shell', 'wait_for'],
['file', 'template', 'docker_image', 'command', 'set_fact', 'shell', 'wait_for'],
]
crx = CRX()
result = crx.infer(seqs)
assert result is not None
assert all(t in result for t in ['file', 'template', 'docker_image', 'command', 'set_fact', 'shell', 'wait_for'])
print(f" PASS linear_sequence: {result}")
def test_integration_optional_tasks():
"""Optional tasks — some sequences have more of the same."""
seqs = [
['shell', 'debug', 'shell', 'debug'],
['shell', 'debug', 'shell', 'debug', 'shell', 'debug'],
['shell', 'debug'],
]
crx = CRX()
result = crx.infer(seqs)
assert result is not None
assert 'shell' in result and 'debug' in result
print(f" PASS optional_tasks: {result}")
def test_integration_branching_paths():
"""Branching: one path or an alternative."""
seqs = [
['file', 'template', 'command_v2', 'set_fact', 'shell', 'wait_for'],
['file', 'template', 'command_v1', 'set_fact', 'shell', 'wait_for'],
]
crx = CRX()
result = crx.infer(seqs)
assert result is not None
assert 'file' in result and 'template' in result and 'shell' in result
print(f" PASS branching_paths: {result}")
def test_integration_conditional_tasks():
"""Tasks that sometimes appear, sometimes not."""
seqs = [
['assert', 'file', 'template', 'shell', 'wait_for'],
['assert', 'file', 'template', 'command_fw', 'command_fw', 'shell', 'wait_for'],
['assert', 'file', 'template', 'command_fw', 'shell', 'wait_for'],
]
crx = CRX()
result = crx.infer(seqs)
assert result is not None
assert 'assert' in result and 'file' in result
print(f" PASS conditional_tasks: {result}")
def test_integration_idregex_linear():
"""iDRegEx on simple linear sequences."""
seqs = [
['assert', 'file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
['assert', 'file', 'template', 'command', 'set_fact', 'shell'],
]
try:
result = idregex(seqs, kmax=2, N=3)
if result:
assert is_deterministic(result)
print(f" PASS idregex_linear: {result}")
else:
print(" SKIP idregex_linear (returned None)")
except Exception as e:
print(f" FAIL idregex_linear: {e}")
def test_integration_ikoa_linear():
"""iKoa + rwr² on simple linear sequences."""
from bex.ikoa import ikoa
from bex.rwrsq import rwr_sq
seqs = [
['assert', 'file', 'template', 'command', 'set_fact', 'shell', 'wait_for'],
['assert', 'file', 'template', 'command', 'set_fact', 'shell'],
]
G = ikoa(seqs, k=3)
if G is None:
print(" SKIP ikoa_linear (returned None)")
return
expr = rwr_sq(G)
assert expr is not None
print(f" PASS ikoa_linear: {expr}")
def test_integration_looping_tasks():
"""Sequence with loop (repeated tasks)."""
seqs = [
['package', 'assert', 'file', 'template', 'template', 'template', 'template', 'template', 'template', 'systemd', 'systemd', 'systemd'],
['package', 'assert', 'file', 'template', 'template', 'template', 'template', 'template', 'template', 'systemd'],
]
crx = CRX()
result = crx.infer(seqs)
assert result is not None
print(f" PASS looping_tasks: {result}")
def run_all():
tests = [
test_soa_basics,
test_soa_contract,
test_soa_epsilon_closure,
test_twotinf,
test_rwr0_concat,
test_rwr0_disj,
test_rwr0_iteration,
test_rwr0_optional,
test_rwr0_empty,
test_rwr0_epsilon,
test_rwr0_complex_a,
test_rwr0_disj_concat,
test_crx_simple,
test_crx_example,
test_crx_cycle_class,
test_determinism_check,
test_marking,
test_strip,
test_expr_utils,
test_idregex_deterministic,
test_complete_koa,
test_integration_quartz_deploy,
test_integration_validate_system,
test_integration_docker_detect_branch,
test_integration_firewall_gating,
test_integration_idregex_linear,
test_integration_ikoa_linear,
test_integration_backup_restic,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
print(f" FAIL {t.__name__}: {e}")
failed += 1
print(f"\n{passed} passed, {failed} failed")
if __name__ == '__main__':
run_all()