From adc52c99ecfd9c9643dcf5f61d2a7f622213a937 Mon Sep 17 00:00:00 2001 From: tobjend Date: Wed, 1 Jul 2026 08:03:10 +0200 Subject: [PATCH] Add MCP server: grammar inference via FastMCP - bex/mcp_server.py: FastMCP server with 3 tools: * infer_grammar(sequences, method='crx'|'idregex') * infer_yaml_grammar(yaml_dir, pattern, method) * infer_ansible_role_grammar(roles_dir) - pyproject.toml: add bex-mcp console_scripts entry point --- bex/mcp_server.py | 128 ++++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 3 ++ 2 files changed, 131 insertions(+) create mode 100644 bex/mcp_server.py diff --git a/bex/mcp_server.py b/bex/mcp_server.py new file mode 100644 index 0000000..a652518 --- /dev/null +++ b/bex/mcp_server.py @@ -0,0 +1,128 @@ +"""Grammar Inference Engine — MCP server. + +Provides tools to infer regular expression grammars from example sequences. +Run as: python -m bex.mcp_server +""" + +import json +import sys +from pathlib import Path +from typing import Any + +from mcp.server.fastmcp import FastMCP + +from .crx import CRX +from .idregex import idregex +from .yaml_to_seq import yaml_file_to_sequence, sequences_to_crx + +mcp = FastMCP("grammar-inference", log_level="ERROR") + + +@mcp.tool() +def infer_grammar( + sequences: list[list[str]], + method: str = "crx", + kmax: int = 2, + N: int = 3, +) -> str: + """Infer a grammar (regular expression) from example sequences. + + Args: + sequences: List of sequences, each a list of symbols (strings). + method: Algorithm to use — 'crx' (fast, deterministic) or 'idregex' (probabilistic, handles noise better). + kmax: Maximum k for k-ORE inference (iDRegEx only). + N: Number of EM iterations (iDRegEx only). + + Returns: + A regular expression string describing the inferred grammar. + """ + if method == "crx": + return CRX().infer(sequences) + elif method == "idregex": + result = idregex(sequences, kmax=kmax, N=N) + return result or "∅" + else: + raise ValueError(f"Unknown method: {method}. Use 'crx' or 'idregex'.") + + +@mcp.tool() +def infer_yaml_grammar( + yaml_dir: str, + pattern: str = "**/*.yml", + method: str = "crx", +) -> str: + """Infer a grammar from YAML files by converting them to key-path sequences. + + Each YAML file is converted to a sequence of key paths (DFS traversal). + CRX then learns the common pattern across all files. + + Args: + yaml_dir: Root directory to search for YAML files. + pattern: Glob pattern for YAML files (default: **/*.yml). + method: Algorithm to use ('crx' or 'idregex'). + + Returns: + A regular expression grammar describing the YAML structure. + """ + files = sorted(Path(yaml_dir).rglob(pattern)) + sequences = [] + for f in files: + if f.is_file(): + try: + seq = yaml_file_to_sequence(f) + if seq: + sequences.append(seq) + except Exception: + continue + if not sequences: + return "ε (no sequences found)" + if method == "crx": + return CRX().infer(sequences) + else: + result = idregex(sequences, kmax=2, N=3) + return result or "∅" + + +@mcp.tool() +def infer_ansible_role_grammar(roles_dir: str = ".") -> str: + """Infer grammars from Ansible role task module sequences. + + Reads tasks/main.yml from each role, extracts the sequence of + Ansible module names, groups roles by category prefix, and learns + a per-category grammar. + + Args: + roles_dir: Path to the Ansible roles directory. + + Returns: + A formatted report with per-category grammars and role listings. + """ + try: + from .role_grammar import collect_all_role_sequences, learn_grammar + except ImportError: + return "role_grammar module not available" + + all_roles, by_category = collect_all_role_sequences(roles_dir) + if not all_roles: + return "No roles found." + + lines = [f"Found {len(all_roles)} roles in {len(by_category)} categories\n"] + for cat in sorted(by_category.keys()): + items = by_category[cat] + seqs = [s for _, s in items] + lines.append(f"── {cat} ({len(items)} roles) ──") + if len(items) > 1: + g = learn_grammar(seqs) + lines.append(f" Grammar: {g}") + name, seq = items[0] + lines.append(f" Roles: {', '.join(n for n, _ in items)}") + lines.append("") + return "\n".join(lines) + + +def main(): + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index dddbe5a..8bc7bc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,3 +11,6 @@ requires-python = ">=3.11" dependencies = [ "PyYAML>=6.0", ] + +[project.scripts] +bex-mcp = "bex.mcp_server:main"