Skip to content

Tools

System Tools

pcp_mcp.tools.system

System health tools for clumped metric queries.

register_system_tools

register_system_tools(mcp: FastMCP) -> None

Register system health tools with the MCP server.

Source code in src/pcp_mcp/tools/system.py
def register_system_tools(mcp: "FastMCP") -> None:
    """Register system health tools with the MCP server."""

    @mcp.tool(
        annotations=TOOL_ANNOTATIONS,
        output_schema=SystemSnapshot.model_json_schema(),
        icons=[ICON_SYSTEM],
        tags=TAGS_SYSTEM,
    )
    async def get_system_snapshot(
        ctx: Context,
        categories: Annotated[
            Optional[list[str]],
            Field(
                default=None,
                description=(
                    "Categories to include: cpu, memory, disk, network, load. "
                    "Defaults to all five if not specified."
                ),
            ),
        ] = None,
        sample_interval: Annotated[
            float,
            Field(
                default=1.0,
                ge=0.1,
                le=10.0,
                description="Seconds between samples for rate calculation",
            ),
        ] = 1.0,
        host: Annotated[
            Optional[str],
            Field(description="Target pmcd host to query (default: server's configured target)"),
        ] = None,
    ) -> SystemSnapshot:
        """Get a point-in-time system health overview.

        Returns CPU, memory, disk I/O, network I/O, and load metrics in a single
        call. For rate metrics (CPU %, disk I/O, network throughput), takes two
        samples to calculate per-second rates.

        Use this tool FIRST for system troubleshooting. It automatically handles
        counter-to-rate conversion. Do NOT use query_metrics() for CPU, disk, or
        network counters - those return raw cumulative values since boot.

        Examples:
            get_system_snapshot() - Quick health check (all categories)
            get_system_snapshot(categories=["cpu", "memory"]) - CPU and memory only
            get_system_snapshot(categories=["cpu", "load"]) - CPU and load averages
            get_system_snapshot(categories=["disk", "network"]) - I/O analysis
            get_system_snapshot(host="web1.example.com") - Query remote host
        """
        if categories is None:
            categories = ["cpu", "memory", "disk", "network", "load"]
        return await _fetch_system_snapshot(ctx, categories, sample_interval, host)

    @mcp.tool(
        annotations=TOOL_ANNOTATIONS,
        output_schema=SystemSnapshot.model_json_schema(),
        icons=[ICON_HEALTH],
        tags=TAGS_HEALTH,
    )
    async def quick_health(
        ctx: Context,
        host: Annotated[
            Optional[str],
            Field(description="Target pmcd host to query (default: server's configured target)"),
        ] = None,
    ) -> SystemSnapshot:
        """Fast system health check returning only CPU and memory metrics.

        Use this for rapid status checks when you don't need disk/network/load
        details. Uses a shorter sample interval (0.5s) for faster results.

        Examples:
            quick_health() - Fast health check on default host
            quick_health(host="web1.example.com") - Fast check on remote host
        """
        return await _fetch_system_snapshot(ctx, ["cpu", "memory"], 0.5, host)

    @mcp.tool(
        annotations=TOOL_ANNOTATIONS,
        output_schema=ProcessTopResult.model_json_schema(),
        icons=[ICON_PROCESS],
        tags=TAGS_PROCESS,
    )
    async def get_process_top(
        ctx: Context,
        sort_by: Annotated[
            Literal["cpu", "memory", "io"],
            Field(description="Resource to sort by"),
        ] = "cpu",
        limit: Annotated[
            int,
            Field(default=10, ge=1, le=50, description="Number of processes to return"),
        ] = 10,
        sample_interval: Annotated[
            float,
            Field(
                default=1.0,
                ge=0.5,
                le=5.0,
                description="Seconds to sample for CPU/IO rates",
            ),
        ] = 1.0,
        host: Annotated[
            Optional[str],
            Field(description="Target pmcd host to query (default: server's configured target)"),
        ] = None,
    ) -> ProcessTopResult:
        """Get top processes by resource consumption.

        For CPU and I/O, takes two samples to calculate rates. Memory is instantaneous.
        Returns the top N processes sorted by the requested resource.

        Examples:
            get_process_top() - Top 10 by CPU (default)
            get_process_top(sort_by="memory", limit=20) - Top 20 memory consumers
            get_process_top(sort_by="io", sample_interval=2.0) - Top I/O with longer sample
            get_process_top(host="db1.example.com") - Query remote host
        """
        all_metrics = (
            PROCESS_METRICS["info"] + PROCESS_METRICS["memory"] + PROCESS_METRICS.get(sort_by, [])
        )
        if sort_by == "cpu":
            all_metrics.extend(PROCESS_METRICS["cpu"])
        elif sort_by == "io":
            all_metrics.extend(PROCESS_METRICS["io"])

        all_metrics = list(set(all_metrics))
        system_metrics = ["hinv.ncpu", "mem.physmem"]

        counter_metrics = {
            "proc.psinfo.utime",
            "proc.psinfo.stime",
            "proc.io.read_bytes",
            "proc.io.write_bytes",
        }

        from pcp_mcp.errors import handle_pcp_error

        async def report_progress(current: float, total: float, message: str) -> None:
            await ctx.report_progress(current, total, message)

        async with get_client_for_host(ctx, host) as client:
            try:
                proc_data = await client.fetch_with_rates(
                    all_metrics, counter_metrics, sample_interval, progress_callback=report_progress
                )
                sys_data = await client.fetch(system_metrics)
            except Exception as e:
                raise handle_pcp_error(e, "fetching process data") from e

            await ctx.report_progress(92, 100, "Processing results...")

            ncpu = get_scalar_value(sys_data, "hinv.ncpu", 1)
            total_mem = get_scalar_value(sys_data, "mem.physmem", 1) * 1024

            processes = build_process_list(proc_data, sort_by, total_mem, ncpu)
            processes.sort(key=lambda p: get_sort_key(p, sort_by), reverse=True)
            processes = processes[:limit]

            assessment = assess_processes(processes, sort_by, ncpu)

            await ctx.report_progress(100, 100, "Complete")
            return ProcessTopResult(
                timestamp=datetime.now(timezone.utc).isoformat(),
                hostname=client.target_host,
                sort_by=sort_by,
                sample_interval=sample_interval,
                processes=processes,
                total_memory_bytes=int(total_mem),
                ncpu=ncpu,
                assessment=assessment,
            )

    @mcp.tool(
        annotations=TOOL_ANNOTATIONS,
        output_schema=DiagnosisResult.model_json_schema(),
        icons=[ICON_DIAGNOSE],
        tags=TAGS_DIAGNOSE,
    )
    async def smart_diagnose(
        ctx: Context,
        host: Annotated[
            Optional[str],
            Field(description="Target pmcd host to query (default: server's configured target)"),
        ] = None,
    ) -> DiagnosisResult:
        """Use LLM to analyze system metrics and provide diagnosis.

        Collects a quick system snapshot (CPU, memory, load) and asks the
        connected LLM to analyze the metrics and provide actionable insights.

        This tool demonstrates FastMCP's LLM sampling capability, where the
        MCP server can request LLM assistance for complex analysis tasks.

        Examples:
            smart_diagnose() - Analyze default host
            smart_diagnose(host="db1.example.com") - Analyze remote host
        """
        from pcp_mcp.errors import handle_pcp_error

        try:
            snapshot = await _fetch_system_snapshot(ctx, ["cpu", "memory", "load"], 0.5, host)
        except Exception as e:
            raise handle_pcp_error(e, "fetching metrics for diagnosis") from e

        metrics_summary = _format_snapshot_for_llm(snapshot)

        system_prompt = (
            "You are a system performance analyst. Analyze the metrics and provide:\n"
            "1. A brief diagnosis (2-3 sentences)\n"
            "2. A severity level: 'healthy', 'warning', or 'critical'\n"
            "3. Up to 3 actionable recommendations\n\n"
            "Be concise and focus on actionable insights."
        )

        try:
            sampling_result = await ctx.sample(
                messages=f"Analyze these system metrics:\n\n{metrics_summary}",
                system_prompt=system_prompt,
                max_tokens=500,
                result_type=DiagnosisResult,
            )
            result = sampling_result.result
            result.timestamp = snapshot.timestamp
            result.hostname = snapshot.hostname
            return result
        except Exception:
            return _build_fallback_diagnosis(snapshot)

Metrics Tools

pcp_mcp.tools.metrics

Core metric tools for querying PCP metrics.

register_metrics_tools

register_metrics_tools(mcp: FastMCP) -> None

Register core metric tools with the MCP server.

Source code in src/pcp_mcp/tools/metrics.py
def register_metrics_tools(mcp: "FastMCP") -> None:
    """Register core metric tools with the MCP server."""

    @mcp.tool(
        annotations=TOOL_ANNOTATIONS,
        icons=[ICON_METRICS],
        tags=TAGS_METRICS,
    )
    async def query_metrics(
        ctx: Context,
        names: Annotated[
            list[str],
            Field(description="List of PCP metric names to fetch (e.g., ['kernel.all.load'])"),
        ],
        host: Annotated[
            Optional[str],
            Field(description="Target pmcd host to query (default: server's configured target)"),
        ] = None,
    ) -> MetricValueList:
        """Fetch current values for specific PCP metrics.

        Returns the current value for each requested metric. For metrics with
        instances (e.g., per-CPU, per-disk), returns one MetricValue per instance.

        Examples:
            query_metrics(["kernel.all.load"]) - Get load averages
            query_metrics(["mem.util.available", "mem.physmem"]) - Get memory stats
            query_metrics(["hinv.ncpu"]) - Get CPU count
            query_metrics(["kernel.all.load"], host="web1.example.com") - Query remote host

        Warning: CPU, disk, and network metrics are counters (cumulative since boot).
        Use get_system_snapshot() instead for rates.
        """
        from pcp_mcp.errors import handle_pcp_error

        async with get_client_for_host(ctx, host) as client:
            try:
                response = await client.fetch(names)
            except Exception as e:
                raise handle_pcp_error(e, "fetching metrics") from e

            results: list[MetricValue] = []
            for metric in response.get("values", []):
                metric_name = metric.get("name", "")
                instances = metric.get("instances", [])

                for inst in instances:
                    instance_id = inst.get("instance")
                    value = inst.get("value")

                    instance_name = None
                    if instance_id is not None and instance_id != -1:
                        instance_name = str(instance_id)

                    results.append(
                        MetricValue(
                            name=metric_name,
                            value=value,
                            instance=instance_name,
                        )
                    )

            return MetricValueList(metrics=results)

    @mcp.tool(
        annotations=TOOL_ANNOTATIONS,
        icons=[ICON_SEARCH],
        tags=TAGS_METRICS | TAGS_DISCOVERY,
    )
    async def search_metrics(
        ctx: Context,
        pattern: Annotated[
            str,
            Field(description="Metric name prefix to search for (e.g., 'kernel.all', 'mem')"),
        ],
        host: Annotated[
            Optional[str],
            Field(description="Target pmcd host to query (default: server's configured target)"),
        ] = None,
    ) -> MetricSearchResultList:
        """Find PCP metrics matching a name pattern.

        Use this to discover available metrics before querying them.
        Returns metric names and brief descriptions.

        Examples:
            search_metrics("kernel.all") - Find kernel-wide metrics
            search_metrics("mem.util") - Find memory utilization metrics
            search_metrics("disk.dev") - Find per-disk metrics
            search_metrics("network.interface") - Find per-interface metrics
            search_metrics("kernel", host="db1.example.com") - Search on remote host
        """
        from pcp_mcp.errors import handle_pcp_error

        async with get_client_for_host(ctx, host) as client:
            try:
                metrics = await client.search(pattern)
            except Exception as e:
                raise handle_pcp_error(e, "searching metrics") from e

            results = [
                MetricSearchResult(
                    name=m.get("name", ""),
                    help_text=extract_help_text(m),
                )
                for m in metrics
            ]
            return MetricSearchResultList(results=results)

    @mcp.tool(
        annotations=TOOL_ANNOTATIONS,
        output_schema=MetricInfo.model_json_schema(),
        icons=[ICON_INFO],
        tags=TAGS_METRICS | TAGS_DISCOVERY,
    )
    async def describe_metric(
        ctx: Context,
        name: Annotated[
            str,
            Field(description="Full PCP metric name (e.g., 'kernel.all.cpu.user')"),
        ],
        host: Annotated[
            Optional[str],
            Field(description="Target pmcd host to query (default: server's configured target)"),
        ] = None,
    ) -> MetricInfo:
        """Get detailed metadata about a PCP metric.

        Returns type, semantics, units, and help text for the metric.
        Use this to understand what a metric measures and how to interpret it.

        Examples:
            describe_metric("kernel.all.load") - Learn about load average semantics
            describe_metric("mem.util.available") - Understand available memory
            describe_metric("disk.all.read_bytes") - Check if metric is counter vs instant
            describe_metric("kernel.all.load", host="web1.example.com") - Describe on remote
        """
        from fastmcp.exceptions import ToolError

        from pcp_mcp.errors import handle_pcp_error

        async with get_client_for_host(ctx, host) as client:
            try:
                info = await client.describe(name)
            except Exception as e:
                raise handle_pcp_error(e, "describing metric") from e

            if not info:
                raise ToolError(f"Metric not found: {name}")

            return MetricInfo(
                name=info.get("name", name),
                type=info.get("type", "unknown"),
                semantics=info.get("sem", "unknown"),
                units=format_units(info),
                help_text=extract_help_text(info),
                indom=info.get("indom"),
            )