grammar-inference-engine/bex/tokenizer.py
tobjend dc559a4aee
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
fix badge position; purge remaining German user-reference comments
2026-07-01 13:28:55 +02:00

192 lines
6.9 KiB
Python

"""
YAMLTokenizer — Extracts token sequences from Ansible YAML files.
Per Bex 2007/2010, each YAML document is translated into a sequence of
symbols (tokens). For Ansible:
- A playbook → one sequence of module names (apt, service, template, ...)
- include_tasks is treated as a terminal token (not resolved recursively)
- block/rescue/always: the block container itself is a token,
its content is NOT tokenized (too variable)
"""
import os
import yaml
# Module-Namen, die als strukturelle Token erfasst werden
# (basierend auf Analyse von 56+ Rollen im Projekt)
MODULE_TOKENS = {
'apt', 'service', 'template', 'copy', 'file', 'command', 'shell',
'get_url', 'uri', 'debug', 'set_fact', 'assert', 'wait_for',
'include_tasks', 'import_tasks', 'import_playbook',
'systemd', 'cron', 'user', 'authorized_key', 'group',
'docker_container', 'docker_volume', 'docker_network', 'docker_image',
'pip', 'npm', 'package',
'lineinfile', 'replace', 'blockinfile',
'stat', 'fetch', 'slurp',
'meta', 'fail', 'pause',
'unarchive', 'archive',
'git', 'hg',
'mysql_db', 'mysql_user',
'postgresql_db', 'postgresql_user',
'certificate', 'openssl',
'known_hosts',
'iptables', 'ufw',
'mount', 'filesystem',
'sysctl',
'ini_file',
'composer',
'make',
'configure',
'npm',
'composer',
'pear',
'pip',
'gem',
'cargo',
}
def is_module_name(key):
return key in MODULE_TOKENS or (isinstance(key, str) and not key.startswith('_'))
class YAMLTokenizer:
def __init__(self, resolve_includes=False):
self.resolve_includes = resolve_includes
self._token_counts = {}
def tokenize_file(self, filepath):
with open(filepath) as f:
content = f.read()
return self.tokenize_string(content, source=filepath)
def tokenize_string(self, content, source='<string>'):
try:
data = yaml.safe_load(content)
except yaml.YAMLError as e:
return []
if data is None:
return []
return self._tokenize(data, source=source)
def _tokenize(self, data, source='<string>', depth=0):
if isinstance(data, list):
return self._tokenize_list(data, source, depth)
elif isinstance(data, dict):
return self._tokenize_dict(data, source, depth)
return []
def _tokenize_list(self, lst, source, depth):
tokens = []
for item in lst:
if isinstance(item, dict):
tokens.extend(self._tokenize_dict(item, source, depth))
elif isinstance(item, str):
tokens.append(item)
return tokens
def _tokenize_dict(self, d, source, depth):
tokens = []
if 'tasks' in d or 'block' in d or 'pre_tasks' in d or 'post_tasks' in d:
task_key = next(k for k in ['pre_tasks', 'tasks', 'post_tasks', 'block'] if k in d)
if task_key == 'block':
tokens.append('block_start')
for item in d.get('block', []):
tokens.extend(self._tokenize_task(item, source, depth + 1))
if 'rescue' in d:
tokens.append('rescue_start')
for item in d['rescue']:
tokens.extend(self._tokenize_task(item, source, depth + 1))
tokens.append('rescue_end')
if 'always' in d:
tokens.append('always_start')
for item in d['always']:
tokens.extend(self._tokenize_task(item, source, depth + 1))
tokens.append('always_end')
tokens.append('block_end')
else:
for item in d.get(task_key, []):
tokens.extend(self._tokenize_task(item, source, depth + 1))
elif 'hosts' in d:
tokens.append('play_start')
for item in d.get('tasks', []):
tokens.extend(self._tokenize_task(item, source, depth + 1))
tokens.append('play_end')
elif 'roles' in d:
for role in d.get('roles', []):
tokens.append(f"role:{role if isinstance(role, str) else list(role.keys())[0]}")
elif 'handlers' in d:
tokens.append('handlers_start')
for item in d.get('handlers', []):
tokens.extend(self._tokenize_task(item, source, depth + 1))
tokens.append('handlers_end')
elif 'name' in d and not any(k in d for k in ['tasks', 'block', 'hosts']):
tokens.extend(self._tokenize_task(d, source, depth))
return tokens
def _tokenize_task(self, task, source, depth):
if not isinstance(task, dict):
return []
tokens = []
if 'include_tasks' in task or 'import_tasks' in task:
key = 'include_tasks' if 'include_tasks' in task else 'import_tasks'
tokens.append(key)
if self.resolve_includes:
inc_path = task[key]
if not os.path.isabs(inc_path):
base = os.path.dirname(source) if source != '<string>' else '.'
inc_path = os.path.join(base, inc_path)
if os.path.exists(inc_path):
tokens.extend(self.tokenize_file(inc_path))
return tokens
if 'import_playbook' in task:
tokens.append('import_playbook')
return tokens
if 'block' in task:
tokens.append('block_start')
for item in task.get('block', []):
tokens.extend(self._tokenize_task(item, source, depth))
if 'rescue' in task:
tokens.append('rescue_start')
for item in task['rescue']:
tokens.extend(self._tokenize_task(item, source, depth))
tokens.append('rescue_end')
if 'always' in task:
tokens.append('always_start')
for item in task['always']:
tokens.extend(self._tokenize_task(item, source, depth))
tokens.append('always_end')
tokens.append('block_end')
return tokens
if 'name' in task:
module_name = None
for key in task:
if key == 'name':
continue
if is_module_name(key) and isinstance(task[key], (str, dict, list, bool, int)):
module_name = key
break
if module_name:
tokens.append(module_name)
self._token_counts[module_name] = self._token_counts.get(module_name, 0) + 1
elif 'ansible.builtin' in str(task):
for key in task:
if '.' in str(key):
module_name = str(key).split('.')[-1]
tokens.append(module_name)
break
return tokens
def get_statistics(self):
return dict(sorted(self._token_counts.items(), key=lambda x: -x[1]))