""" YAMLTokenizer — Extracts token sequences from Ansible YAML files. Per Bex 2007/2010, each YAML document is translated into a sequence of symbols (tokens). For Ansible: - A playbook → one sequence of module names (apt, service, template, ...) - include_tasks is treated as a terminal token (not resolved recursively) - block/rescue/always: the block container itself is a token, its content is NOT tokenized (too variable) """ import os import yaml # Module-Namen, die als strukturelle Token erfasst werden # (basierend auf Analyse von 56+ Rollen im Projekt) MODULE_TOKENS = { 'apt', 'service', 'template', 'copy', 'file', 'command', 'shell', 'get_url', 'uri', 'debug', 'set_fact', 'assert', 'wait_for', 'include_tasks', 'import_tasks', 'import_playbook', 'systemd', 'cron', 'user', 'authorized_key', 'group', 'docker_container', 'docker_volume', 'docker_network', 'docker_image', 'pip', 'npm', 'package', 'lineinfile', 'replace', 'blockinfile', 'stat', 'fetch', 'slurp', 'meta', 'fail', 'pause', 'unarchive', 'archive', 'git', 'hg', 'mysql_db', 'mysql_user', 'postgresql_db', 'postgresql_user', 'certificate', 'openssl', 'known_hosts', 'iptables', 'ufw', 'mount', 'filesystem', 'sysctl', 'ini_file', 'composer', 'make', 'configure', 'npm', 'composer', 'pear', 'pip', 'gem', 'cargo', } def is_module_name(key): return key in MODULE_TOKENS or (isinstance(key, str) and not key.startswith('_')) class YAMLTokenizer: def __init__(self, resolve_includes=False): self.resolve_includes = resolve_includes self._token_counts = {} def tokenize_file(self, filepath): with open(filepath) as f: content = f.read() return self.tokenize_string(content, source=filepath) def tokenize_string(self, content, source=''): try: data = yaml.safe_load(content) except yaml.YAMLError as e: return [] if data is None: return [] return self._tokenize(data, source=source) def _tokenize(self, data, source='', depth=0): if isinstance(data, list): return self._tokenize_list(data, source, depth) elif isinstance(data, dict): return self._tokenize_dict(data, source, depth) return [] def _tokenize_list(self, lst, source, depth): tokens = [] for item in lst: if isinstance(item, dict): tokens.extend(self._tokenize_dict(item, source, depth)) elif isinstance(item, str): tokens.append(item) return tokens def _tokenize_dict(self, d, source, depth): tokens = [] if 'tasks' in d or 'block' in d or 'pre_tasks' in d or 'post_tasks' in d: task_key = next(k for k in ['pre_tasks', 'tasks', 'post_tasks', 'block'] if k in d) if task_key == 'block': tokens.append('block_start') for item in d.get('block', []): tokens.extend(self._tokenize_task(item, source, depth + 1)) if 'rescue' in d: tokens.append('rescue_start') for item in d['rescue']: tokens.extend(self._tokenize_task(item, source, depth + 1)) tokens.append('rescue_end') if 'always' in d: tokens.append('always_start') for item in d['always']: tokens.extend(self._tokenize_task(item, source, depth + 1)) tokens.append('always_end') tokens.append('block_end') else: for item in d.get(task_key, []): tokens.extend(self._tokenize_task(item, source, depth + 1)) elif 'hosts' in d: tokens.append('play_start') for item in d.get('tasks', []): tokens.extend(self._tokenize_task(item, source, depth + 1)) tokens.append('play_end') elif 'roles' in d: for role in d.get('roles', []): tokens.append(f"role:{role if isinstance(role, str) else list(role.keys())[0]}") elif 'handlers' in d: tokens.append('handlers_start') for item in d.get('handlers', []): tokens.extend(self._tokenize_task(item, source, depth + 1)) tokens.append('handlers_end') elif 'name' in d and not any(k in d for k in ['tasks', 'block', 'hosts']): tokens.extend(self._tokenize_task(d, source, depth)) return tokens def _tokenize_task(self, task, source, depth): if not isinstance(task, dict): return [] tokens = [] if 'include_tasks' in task or 'import_tasks' in task: key = 'include_tasks' if 'include_tasks' in task else 'import_tasks' tokens.append(key) if self.resolve_includes: inc_path = task[key] if not os.path.isabs(inc_path): base = os.path.dirname(source) if source != '' else '.' inc_path = os.path.join(base, inc_path) if os.path.exists(inc_path): tokens.extend(self.tokenize_file(inc_path)) return tokens if 'import_playbook' in task: tokens.append('import_playbook') return tokens if 'block' in task: tokens.append('block_start') for item in task.get('block', []): tokens.extend(self._tokenize_task(item, source, depth)) if 'rescue' in task: tokens.append('rescue_start') for item in task['rescue']: tokens.extend(self._tokenize_task(item, source, depth)) tokens.append('rescue_end') if 'always' in task: tokens.append('always_start') for item in task['always']: tokens.extend(self._tokenize_task(item, source, depth)) tokens.append('always_end') tokens.append('block_end') return tokens if 'name' in task: module_name = None for key in task: if key == 'name': continue if is_module_name(key) and isinstance(task[key], (str, dict, list, bool, int)): module_name = key break if module_name: tokens.append(module_name) self._token_counts[module_name] = self._token_counts.get(module_name, 0) + 1 elif 'ansible.builtin' in str(task): for key in task: if '.' in str(key): module_name = str(key).split('.')[-1] tokens.append(module_name) break return tokens def get_statistics(self): return dict(sorted(self._token_counts.items(), key=lambda x: -x[1]))