grammar-inference-engine/examples/role_grammar.py

111 lines
3.6 KiB
Python

"""Extract Ansible role task module sequences and learn per-group grammars."""
from pathlib import Path
import yaml
from collections import defaultdict
from .crx import CRX
from .expr import strip_k
IGNORE_MODULES = frozenset({'name', 'tags', 'when', 'register', 'no_log',
'changed_when', 'failed_when', 'ignore_errors',
'run_once', 'delegate_to', 'loop', 'loop_control',
'until', 'retries', 'delay', 'poll', 'async',
'become', 'become_user', 'become_flags',
'check_mode', 'diff', 'environment',
'vars', 'notify', 'args',
'block', 'rescue', 'always', 'include_tasks'})
def extract_module_name(task):
"""Extract the Ansible module name from a task dict.
The module is the key that is NOT a known non-module key.
Returns 'skip' for non-task entries like block/rescue/always.
"""
if not isinstance(task, dict):
return None
# Check for block/rescue/always — these contain nested tasks
for key in ('block', 'rescue', 'always'):
if key in task:
nested = task[key]
if isinstance(nested, list):
return [extract_module_name(t) for t in nested]
return None
# Find the module key (not name, not meta-keys)
for key, value in task.items():
if key in ('name',):
continue
if key in IGNORE_MODULES:
continue
if isinstance(value, (dict, list, str, bool, int, float)):
# It's the module name (venv or fqcn)
return strip_k(key)
return None
def flatten_nested(seq):
"""Flatten nested lists into a single list."""
result = []
for item in seq:
if isinstance(item, list):
result.extend(flatten_nested(item))
elif item is not None and item != 'skip':
result.append(item)
return result
def get_role_category(role_name):
"""Extract category from role name like deploy_foo → deploy."""
parts = role_name.split('_')
if len(parts) >= 2:
return parts[0]
return 'other'
def load_role_module_sequence(role_dir):
"""Load a role's task file and extract the module sequence."""
task_file = role_dir / 'tasks' / 'main.yml'
if not task_file.exists():
return None, None
with open(task_file) as f:
data = yaml.safe_load(f)
if not isinstance(data, list):
return None, None
modules = []
for task in data:
result = extract_module_name(task)
if isinstance(result, list):
modules.extend(flatten_nested(result))
elif result is not None:
modules.append(result)
return role_dir.name, modules
def collect_all_role_sequences(roles_dir='roles'):
"""Collect module sequences from all roles, grouped by category."""
by_category = defaultdict(list)
all_roles = []
for role_dir in sorted(Path(roles_dir).glob('*/tasks/main.yml')):
role_name = role_dir.parent.parent.name
name, seq = load_role_module_sequence(role_dir.parent.parent)
if seq:
cat = get_role_category(role_name)
by_category[cat].append((role_name, seq))
all_roles.append((role_name, seq))
return all_roles, by_category
def learn_grammar(sequences):
"""Run CRX on a list of sequences."""
if len(sequences) < 2:
seqs = [sequences[0]] if sequences else []
else:
seqs = sequences
if not seqs:
return 'ε'
crx = CRX()
return crx.infer(seqs)