192 lines
6.9 KiB
Python
192 lines
6.9 KiB
Python
"""
|
|
YAMLTokenizer — Extracts token sequences from Ansible YAML files.
|
|
|
|
Per Bex 2007/2010, each YAML document is translated into a sequence of
|
|
symbols (tokens). For Ansible:
|
|
- A playbook → one sequence of module names (apt, service, template, ...)
|
|
- include_tasks is treated as a terminal token (not resolved recursively)
|
|
- block/rescue/always: the block container itself is a token,
|
|
its content is NOT tokenized (too variable)
|
|
"""
|
|
|
|
import os
|
|
import yaml
|
|
|
|
|
|
# Module-Namen, die als strukturelle Token erfasst werden
|
|
# (basierend auf Analyse von 56+ Rollen im Projekt)
|
|
MODULE_TOKENS = {
|
|
'apt', 'service', 'template', 'copy', 'file', 'command', 'shell',
|
|
'get_url', 'uri', 'debug', 'set_fact', 'assert', 'wait_for',
|
|
'include_tasks', 'import_tasks', 'import_playbook',
|
|
'systemd', 'cron', 'user', 'authorized_key', 'group',
|
|
'docker_container', 'docker_volume', 'docker_network', 'docker_image',
|
|
'pip', 'npm', 'package',
|
|
'lineinfile', 'replace', 'blockinfile',
|
|
'stat', 'fetch', 'slurp',
|
|
'meta', 'fail', 'pause',
|
|
'unarchive', 'archive',
|
|
'git', 'hg',
|
|
'mysql_db', 'mysql_user',
|
|
'postgresql_db', 'postgresql_user',
|
|
'certificate', 'openssl',
|
|
'known_hosts',
|
|
'iptables', 'ufw',
|
|
'mount', 'filesystem',
|
|
'sysctl',
|
|
'ini_file',
|
|
'composer',
|
|
'make',
|
|
'configure',
|
|
'npm',
|
|
'composer',
|
|
'pear',
|
|
'pip',
|
|
'gem',
|
|
'cargo',
|
|
}
|
|
|
|
def is_module_name(key):
|
|
return key in MODULE_TOKENS or (isinstance(key, str) and not key.startswith('_'))
|
|
|
|
class YAMLTokenizer:
|
|
def __init__(self, resolve_includes=False):
|
|
self.resolve_includes = resolve_includes
|
|
self._token_counts = {}
|
|
|
|
def tokenize_file(self, filepath):
|
|
with open(filepath) as f:
|
|
content = f.read()
|
|
return self.tokenize_string(content, source=filepath)
|
|
|
|
def tokenize_string(self, content, source='<string>'):
|
|
try:
|
|
data = yaml.safe_load(content)
|
|
except yaml.YAMLError as e:
|
|
return []
|
|
if data is None:
|
|
return []
|
|
return self._tokenize(data, source=source)
|
|
|
|
def _tokenize(self, data, source='<string>', depth=0):
|
|
if isinstance(data, list):
|
|
return self._tokenize_list(data, source, depth)
|
|
elif isinstance(data, dict):
|
|
return self._tokenize_dict(data, source, depth)
|
|
return []
|
|
|
|
def _tokenize_list(self, lst, source, depth):
|
|
tokens = []
|
|
for item in lst:
|
|
if isinstance(item, dict):
|
|
tokens.extend(self._tokenize_dict(item, source, depth))
|
|
elif isinstance(item, str):
|
|
tokens.append(item)
|
|
return tokens
|
|
|
|
def _tokenize_dict(self, d, source, depth):
|
|
tokens = []
|
|
|
|
if 'tasks' in d or 'block' in d or 'pre_tasks' in d or 'post_tasks' in d:
|
|
task_key = next(k for k in ['pre_tasks', 'tasks', 'post_tasks', 'block'] if k in d)
|
|
if task_key == 'block':
|
|
tokens.append('block_start')
|
|
for item in d.get('block', []):
|
|
tokens.extend(self._tokenize_task(item, source, depth + 1))
|
|
if 'rescue' in d:
|
|
tokens.append('rescue_start')
|
|
for item in d['rescue']:
|
|
tokens.extend(self._tokenize_task(item, source, depth + 1))
|
|
tokens.append('rescue_end')
|
|
if 'always' in d:
|
|
tokens.append('always_start')
|
|
for item in d['always']:
|
|
tokens.extend(self._tokenize_task(item, source, depth + 1))
|
|
tokens.append('always_end')
|
|
tokens.append('block_end')
|
|
else:
|
|
for item in d.get(task_key, []):
|
|
tokens.extend(self._tokenize_task(item, source, depth + 1))
|
|
|
|
elif 'hosts' in d:
|
|
tokens.append('play_start')
|
|
for item in d.get('tasks', []):
|
|
tokens.extend(self._tokenize_task(item, source, depth + 1))
|
|
tokens.append('play_end')
|
|
|
|
elif 'roles' in d:
|
|
for role in d.get('roles', []):
|
|
tokens.append(f"role:{role if isinstance(role, str) else list(role.keys())[0]}")
|
|
|
|
elif 'handlers' in d:
|
|
tokens.append('handlers_start')
|
|
for item in d.get('handlers', []):
|
|
tokens.extend(self._tokenize_task(item, source, depth + 1))
|
|
tokens.append('handlers_end')
|
|
|
|
elif 'name' in d and not any(k in d for k in ['tasks', 'block', 'hosts']):
|
|
tokens.extend(self._tokenize_task(d, source, depth))
|
|
|
|
return tokens
|
|
|
|
def _tokenize_task(self, task, source, depth):
|
|
if not isinstance(task, dict):
|
|
return []
|
|
|
|
tokens = []
|
|
|
|
if 'include_tasks' in task or 'import_tasks' in task:
|
|
key = 'include_tasks' if 'include_tasks' in task else 'import_tasks'
|
|
tokens.append(key)
|
|
if self.resolve_includes:
|
|
inc_path = task[key]
|
|
if not os.path.isabs(inc_path):
|
|
base = os.path.dirname(source) if source != '<string>' else '.'
|
|
inc_path = os.path.join(base, inc_path)
|
|
if os.path.exists(inc_path):
|
|
tokens.extend(self.tokenize_file(inc_path))
|
|
return tokens
|
|
|
|
if 'import_playbook' in task:
|
|
tokens.append('import_playbook')
|
|
return tokens
|
|
|
|
if 'block' in task:
|
|
tokens.append('block_start')
|
|
for item in task.get('block', []):
|
|
tokens.extend(self._tokenize_task(item, source, depth))
|
|
if 'rescue' in task:
|
|
tokens.append('rescue_start')
|
|
for item in task['rescue']:
|
|
tokens.extend(self._tokenize_task(item, source, depth))
|
|
tokens.append('rescue_end')
|
|
if 'always' in task:
|
|
tokens.append('always_start')
|
|
for item in task['always']:
|
|
tokens.extend(self._tokenize_task(item, source, depth))
|
|
tokens.append('always_end')
|
|
tokens.append('block_end')
|
|
return tokens
|
|
|
|
if 'name' in task:
|
|
module_name = None
|
|
for key in task:
|
|
if key == 'name':
|
|
continue
|
|
if is_module_name(key) and isinstance(task[key], (str, dict, list, bool, int)):
|
|
module_name = key
|
|
break
|
|
if module_name:
|
|
tokens.append(module_name)
|
|
self._token_counts[module_name] = self._token_counts.get(module_name, 0) + 1
|
|
elif 'ansible.builtin' in str(task):
|
|
for key in task:
|
|
if '.' in str(key):
|
|
module_name = str(key).split('.')[-1]
|
|
tokens.append(module_name)
|
|
break
|
|
|
|
return tokens
|
|
|
|
def get_statistics(self):
|
|
return dict(sorted(self._token_counts.items(), key=lambda x: -x[1]))
|