Skip to content

Tools

System Tools

pcp_mcp.tools.system

System health tools for clumped metric queries.

get_filesystem_usage async

get_filesystem_usage(
    ctx: Context,
    host: Annotated[
        Optional[str],
        Field(
            description="Target pmcd host to query (default: server's configured target)"
        ),
    ] = None,
) -> ToolResult

Get mounted filesystem usage (similar to df command).

Returns capacity, used, available, and percent full for each mounted filesystem. Useful for monitoring disk space and identifying filesystems that may need attention.

Examples:

get_filesystem_usage() - Check all filesystems on default host get_filesystem_usage(host="db1.example.com") - Check remote host

Source code in src/pcp_mcp/tools/system.py
@tool(
    annotations=TOOL_ANNOTATIONS,
    icons=[ICON_FILESYSTEM],
    tags=TAGS_FILESYSTEM,
    timeout=30.0,
)
async def get_filesystem_usage(
    ctx: Context,
    host: Annotated[
        Optional[str],
        Field(description="Target pmcd host to query (default: server's configured target)"),
    ] = None,
) -> ToolResult:
    """Get mounted filesystem usage (similar to df command).

    Returns capacity, used, available, and percent full for each mounted
    filesystem. Useful for monitoring disk space and identifying filesystems
    that may need attention.

    Examples:
        get_filesystem_usage() - Check all filesystems on default host
        get_filesystem_usage(host="db1.example.com") - Check remote host
    """
    from pcp_mcp.errors import handle_pcp_error

    async with get_client_for_host(ctx, host) as client:
        try:
            response = await client.fetch(FILESYSTEM_METRICS)
        except Exception as e:
            raise handle_pcp_error(e, "fetching filesystem metrics") from e

        filesystems = _build_filesystem_list(response)
        assessment = _assess_filesystems(filesystems)

        result = FilesystemSnapshot(
            timestamp=datetime.now(timezone.utc).isoformat(),
            hostname=client.target_host,
            filesystems=filesystems,
            assessment=assessment,
        )
        return ToolResult(
            content=result.model_dump_json(),
            structured_content=result.model_dump(),
        )

get_process_top async

get_process_top(
    ctx: Context,
    sort_by: Annotated[
        Literal["cpu", "memory", "io"],
        Field(description="Resource to sort by"),
    ] = "cpu",
    limit: Annotated[
        int,
        Field(
            default=10,
            ge=1,
            le=50,
            description="Number of processes to return",
        ),
    ] = 10,
    sample_interval: Annotated[
        float,
        Field(
            default=1.0,
            ge=0.5,
            le=5.0,
            description="Seconds to sample for CPU/IO rates",
        ),
    ] = 1.0,
    host: Annotated[
        Optional[str],
        Field(
            description="Target pmcd host to query (default: server's configured target)"
        ),
    ] = None,
) -> ToolResult

Get top processes by resource consumption.

For CPU and I/O, takes two samples to calculate rates. Memory is instantaneous. Returns the top N processes sorted by the requested resource.

Examples:

get_process_top() - Top 10 by CPU (default) get_process_top(sort_by="memory", limit=20) - Top 20 memory consumers get_process_top(sort_by="io", sample_interval=2.0) - Top I/O with longer sample get_process_top(host="db1.example.com") - Query remote host

Source code in src/pcp_mcp/tools/system.py
@tool(
    annotations=TOOL_ANNOTATIONS,
    icons=[ICON_PROCESS],
    tags=TAGS_PROCESS,
    timeout=30.0,
)
async def get_process_top(
    ctx: Context,
    sort_by: Annotated[
        Literal["cpu", "memory", "io"],
        Field(description="Resource to sort by"),
    ] = "cpu",
    limit: Annotated[
        int,
        Field(default=10, ge=1, le=50, description="Number of processes to return"),
    ] = 10,
    sample_interval: Annotated[
        float,
        Field(
            default=1.0,
            ge=0.5,
            le=5.0,
            description="Seconds to sample for CPU/IO rates",
        ),
    ] = 1.0,
    host: Annotated[
        Optional[str],
        Field(description="Target pmcd host to query (default: server's configured target)"),
    ] = None,
) -> ToolResult:
    """Get top processes by resource consumption.

    For CPU and I/O, takes two samples to calculate rates. Memory is instantaneous.
    Returns the top N processes sorted by the requested resource.

    Examples:
        get_process_top() - Top 10 by CPU (default)
        get_process_top(sort_by="memory", limit=20) - Top 20 memory consumers
        get_process_top(sort_by="io", sample_interval=2.0) - Top I/O with longer sample
        get_process_top(host="db1.example.com") - Query remote host
    """
    all_metrics = (
        PROCESS_METRICS["info"] + PROCESS_METRICS["memory"] + PROCESS_METRICS.get(sort_by, [])
    )
    if sort_by == "cpu":
        all_metrics.extend(PROCESS_METRICS["cpu"])
    elif sort_by == "io":
        all_metrics.extend(PROCESS_METRICS["io"])

    all_metrics = list(set(all_metrics))
    system_metrics = ["hinv.ncpu", "mem.physmem"]

    counter_metrics = {
        "proc.psinfo.utime",
        "proc.psinfo.stime",
        "proc.io.read_bytes",
        "proc.io.write_bytes",
    }

    from pcp_mcp.errors import handle_pcp_error

    async def report_progress(current: float, total: float, message: str) -> None:
        await ctx.report_progress(current, total, message)

    async with get_client_for_host(ctx, host) as client:
        try:
            proc_data = await client.fetch_with_rates(
                all_metrics, counter_metrics, sample_interval, progress_callback=report_progress
            )
            sys_data = await client.fetch(system_metrics)
        except Exception as e:
            raise handle_pcp_error(e, "fetching process data") from e

        await ctx.report_progress(92, 100, "Processing results...")

        ncpu = get_scalar_value(sys_data, "hinv.ncpu", 1)
        total_mem = get_scalar_value(sys_data, "mem.physmem", 1) * 1024

        processes = build_process_list(proc_data, sort_by, total_mem, ncpu)
        processes.sort(key=lambda p: get_sort_key(p, sort_by), reverse=True)
        processes = processes[:limit]

        assessment = assess_processes(processes, sort_by, ncpu)

        await ctx.report_progress(100, 100, "Complete")
        result = ProcessTopResult(
            timestamp=datetime.now(timezone.utc).isoformat(),
            hostname=client.target_host,
            sort_by=sort_by,
            sample_interval=sample_interval,
            processes=processes,
            total_memory_bytes=int(total_mem),
            ncpu=ncpu,
            assessment=assessment,
        )
        return ToolResult(
            content=result.model_dump_json(),
            structured_content=result.model_dump(),
        )

get_system_snapshot async

get_system_snapshot(
    ctx: Context,
    categories: Annotated[
        Optional[list[str]],
        Field(
            default=None,
            description="Categories to include: cpu, memory, disk, network, load. Defaults to all five if not specified.",
        ),
    ] = None,
    sample_interval: Annotated[
        float,
        Field(
            default=1.0,
            ge=0.1,
            le=10.0,
            description="Seconds between samples for rate calculation",
        ),
    ] = 1.0,
    host: Annotated[
        Optional[str],
        Field(
            description="Target pmcd host to query (default: server's configured target)"
        ),
    ] = None,
) -> ToolResult

Get a point-in-time system health overview.

Returns CPU, memory, disk I/O, network I/O, and load metrics in a single call. For rate metrics (CPU %, disk I/O, network throughput), takes two samples to calculate per-second rates.

Use this tool FIRST for system troubleshooting. It automatically handles counter-to-rate conversion. Do NOT use query_metrics() for CPU, disk, or network counters - those return raw cumulative values since boot.

Examples:

get_system_snapshot() - Quick health check (all categories) get_system_snapshot(categories=["cpu", "memory"]) - CPU and memory only get_system_snapshot(categories=["cpu", "load"]) - CPU and load averages get_system_snapshot(categories=["disk", "network"]) - I/O analysis get_system_snapshot(host="web1.example.com") - Query remote host

Source code in src/pcp_mcp/tools/system.py
@tool(
    annotations=TOOL_ANNOTATIONS,
    icons=[ICON_SYSTEM],
    tags=TAGS_SYSTEM,
    timeout=30.0,
)
async def get_system_snapshot(
    ctx: Context,
    categories: Annotated[
        Optional[list[str]],
        Field(
            default=None,
            description=(
                "Categories to include: cpu, memory, disk, network, load. "
                "Defaults to all five if not specified."
            ),
        ),
    ] = None,
    sample_interval: Annotated[
        float,
        Field(
            default=1.0,
            ge=0.1,
            le=10.0,
            description="Seconds between samples for rate calculation",
        ),
    ] = 1.0,
    host: Annotated[
        Optional[str],
        Field(description="Target pmcd host to query (default: server's configured target)"),
    ] = None,
) -> ToolResult:
    """Get a point-in-time system health overview.

    Returns CPU, memory, disk I/O, network I/O, and load metrics in a single
    call. For rate metrics (CPU %, disk I/O, network throughput), takes two
    samples to calculate per-second rates.

    Use this tool FIRST for system troubleshooting. It automatically handles
    counter-to-rate conversion. Do NOT use query_metrics() for CPU, disk, or
    network counters - those return raw cumulative values since boot.

    Examples:
        get_system_snapshot() - Quick health check (all categories)
        get_system_snapshot(categories=["cpu", "memory"]) - CPU and memory only
        get_system_snapshot(categories=["cpu", "load"]) - CPU and load averages
        get_system_snapshot(categories=["disk", "network"]) - I/O analysis
        get_system_snapshot(host="web1.example.com") - Query remote host
    """
    if categories is None:
        categories = ["cpu", "memory", "disk", "network", "load"]
    result = await _fetch_system_snapshot(ctx, categories, sample_interval, host)
    return ToolResult(
        content=result.model_dump_json(),
        structured_content=result.model_dump(),
    )

quick_health async

quick_health(
    ctx: Context,
    host: Annotated[
        Optional[str],
        Field(
            description="Target pmcd host to query (default: server's configured target)"
        ),
    ] = None,
) -> ToolResult

Fast system health check returning only CPU and memory metrics.

Use this for rapid status checks when you don't need disk/network/load details. Uses a shorter sample interval (0.5s) for faster results.

Examples:

quick_health() - Fast health check on default host quick_health(host="web1.example.com") - Fast check on remote host

Source code in src/pcp_mcp/tools/system.py
@tool(
    annotations=TOOL_ANNOTATIONS,
    icons=[ICON_HEALTH],
    tags=TAGS_HEALTH,
    timeout=30.0,
)
async def quick_health(
    ctx: Context,
    host: Annotated[
        Optional[str],
        Field(description="Target pmcd host to query (default: server's configured target)"),
    ] = None,
) -> ToolResult:
    """Fast system health check returning only CPU and memory metrics.

    Use this for rapid status checks when you don't need disk/network/load
    details. Uses a shorter sample interval (0.5s) for faster results.

    Examples:
        quick_health() - Fast health check on default host
        quick_health(host="web1.example.com") - Fast check on remote host
    """
    result = await _fetch_system_snapshot(ctx, ["cpu", "memory"], 0.5, host)
    return ToolResult(
        content=result.model_dump_json(),
        structured_content=result.model_dump(),
    )

smart_diagnose async

smart_diagnose(
    ctx: Context,
    host: Annotated[
        Optional[str],
        Field(
            description="Target pmcd host to query (default: server's configured target)"
        ),
    ] = None,
) -> ToolResult

Use LLM to analyze system metrics and provide diagnosis.

Collects a quick system snapshot (CPU, memory, load) and asks the connected LLM to analyze the metrics and provide actionable insights.

This tool demonstrates FastMCP's LLM sampling capability, where the MCP server can request LLM assistance for complex analysis tasks.

Examples:

smart_diagnose() - Analyze default host smart_diagnose(host="db1.example.com") - Analyze remote host

Source code in src/pcp_mcp/tools/system.py
@tool(
    annotations=TOOL_ANNOTATIONS,
    icons=[ICON_DIAGNOSE],
    tags=TAGS_DIAGNOSE,
    timeout=30.0,
)
async def smart_diagnose(
    ctx: Context,
    host: Annotated[
        Optional[str],
        Field(description="Target pmcd host to query (default: server's configured target)"),
    ] = None,
) -> ToolResult:
    """Use LLM to analyze system metrics and provide diagnosis.

    Collects a quick system snapshot (CPU, memory, load) and asks the
    connected LLM to analyze the metrics and provide actionable insights.

    This tool demonstrates FastMCP's LLM sampling capability, where the
    MCP server can request LLM assistance for complex analysis tasks.

    Examples:
        smart_diagnose() - Analyze default host
        smart_diagnose(host="db1.example.com") - Analyze remote host
    """
    from pcp_mcp.errors import handle_pcp_error

    try:
        snapshot = await _fetch_system_snapshot(ctx, ["cpu", "memory", "load"], 0.5, host)
    except Exception as e:
        raise handle_pcp_error(e, "fetching metrics for diagnosis") from e

    metrics_summary = _format_snapshot_for_llm(snapshot)

    system_prompt = (
        "You are a system performance analyst. Analyze the metrics and provide:\n"
        "1. A brief diagnosis (2-3 sentences)\n"
        "2. A severity level: 'healthy', 'warning', or 'critical'\n"
        "3. Up to 3 actionable recommendations\n\n"
        "Be concise and focus on actionable insights."
    )

    try:
        sampling_result = await ctx.sample(
            messages=f"Analyze these system metrics:\n\n{metrics_summary}",
            system_prompt=system_prompt,
            max_tokens=500,
            result_type=DiagnosisResult,
        )
        result = sampling_result.result
        result.timestamp = snapshot.timestamp
        result.hostname = snapshot.hostname
        return ToolResult(
            content=result.model_dump_json(),
            structured_content=result.model_dump(),
        )
    except Exception:
        result = _build_fallback_diagnosis(snapshot)
        return ToolResult(
            content=result.model_dump_json(),
            structured_content=result.model_dump(),
        )

Metrics Tools

pcp_mcp.tools.metrics

Core metric tools for querying PCP metrics.

describe_metric async

describe_metric(
    ctx: Context,
    name: Annotated[
        str,
        Field(
            description="Full PCP metric name (e.g., 'kernel.all.cpu.user')"
        ),
    ],
    host: Annotated[
        Optional[str],
        Field(
            description="Target pmcd host to query (default: server's configured target)"
        ),
    ] = None,
) -> ToolResult

Get detailed metadata about a PCP metric.

Returns type, semantics, units, and help text for the metric. Use this to understand what a metric measures and how to interpret it.

Examples:

describe_metric("kernel.all.load") - Learn about load average semantics describe_metric("mem.util.available") - Understand available memory describe_metric("disk.all.read_bytes") - Check if metric is counter vs instant describe_metric("kernel.all.load", host="web1.example.com") - Describe on remote

Source code in src/pcp_mcp/tools/metrics.py
@tool(
    annotations=TOOL_ANNOTATIONS,
    output_schema=MetricInfo.model_json_schema(),
    icons=[ICON_INFO],
    tags=TAGS_METRICS | TAGS_DISCOVERY,
    timeout=30.0,
)
async def describe_metric(
    ctx: Context,
    name: Annotated[
        str,
        Field(description="Full PCP metric name (e.g., 'kernel.all.cpu.user')"),
    ],
    host: Annotated[
        Optional[str],
        Field(description="Target pmcd host to query (default: server's configured target)"),
    ] = None,
) -> ToolResult:
    """Get detailed metadata about a PCP metric.

    Returns type, semantics, units, and help text for the metric.
    Use this to understand what a metric measures and how to interpret it.

    Examples:
        describe_metric("kernel.all.load") - Learn about load average semantics
        describe_metric("mem.util.available") - Understand available memory
        describe_metric("disk.all.read_bytes") - Check if metric is counter vs instant
        describe_metric("kernel.all.load", host="web1.example.com") - Describe on remote
    """
    from fastmcp.exceptions import ToolError

    from pcp_mcp.errors import handle_pcp_error

    async with get_client_for_host(ctx, host) as client:
        try:
            info = await client.describe(name)
        except Exception as e:
            raise handle_pcp_error(e, "describing metric") from e

        if not info:
            raise ToolError(f"Metric not found: {name}")

        result = MetricInfo(
            name=info.get("name", name),
            type=info.get("type", "unknown"),
            semantics=info.get("sem", "unknown"),
            units=format_units(info),
            help_text=extract_help_text(info),
            indom=info.get("indom"),
        )
        return ToolResult(
            content=result.model_dump_json(),
            structured_content=result.model_dump(),
        )

query_metrics async

query_metrics(
    ctx: Context,
    names: Annotated[
        list[str],
        Field(
            description="List of PCP metric names to fetch (e.g., ['kernel.all.load'])"
        ),
    ],
    host: Annotated[
        Optional[str],
        Field(
            description="Target pmcd host to query (default: server's configured target)"
        ),
    ] = None,
) -> ToolResult

Fetch current values for specific PCP metrics.

Returns the current value for each requested metric. For metrics with instances (e.g., per-CPU, per-disk), returns one MetricValue per instance.

Examples:

query_metrics(["kernel.all.load"]) - Get load averages query_metrics(["mem.util.available", "mem.physmem"]) - Get memory stats query_metrics(["hinv.ncpu"]) - Get CPU count query_metrics(["kernel.all.load"], host="web1.example.com") - Query remote host

Warning: CPU, disk, and network metrics are counters (cumulative since boot). Use get_system_snapshot() instead for rates.

Source code in src/pcp_mcp/tools/metrics.py
@tool(
    annotations=TOOL_ANNOTATIONS,
    icons=[ICON_METRICS],
    tags=TAGS_METRICS,
    timeout=30.0,
)
async def query_metrics(
    ctx: Context,
    names: Annotated[
        list[str],
        Field(description="List of PCP metric names to fetch (e.g., ['kernel.all.load'])"),
    ],
    host: Annotated[
        Optional[str],
        Field(description="Target pmcd host to query (default: server's configured target)"),
    ] = None,
) -> ToolResult:
    """Fetch current values for specific PCP metrics.

    Returns the current value for each requested metric. For metrics with
    instances (e.g., per-CPU, per-disk), returns one MetricValue per instance.

    Examples:
        query_metrics(["kernel.all.load"]) - Get load averages
        query_metrics(["mem.util.available", "mem.physmem"]) - Get memory stats
        query_metrics(["hinv.ncpu"]) - Get CPU count
        query_metrics(["kernel.all.load"], host="web1.example.com") - Query remote host

    Warning: CPU, disk, and network metrics are counters (cumulative since boot).
    Use get_system_snapshot() instead for rates.
    """
    from pcp_mcp.errors import handle_pcp_error

    async with get_client_for_host(ctx, host) as client:
        try:
            response = await client.fetch(names)
        except Exception as e:
            raise handle_pcp_error(e, "fetching metrics") from e

        results: list[MetricValue] = []
        for metric in response.get("values", []):
            metric_name = metric.get("name", "")
            instances = metric.get("instances", [])

            for inst in instances:
                instance_id = inst.get("instance")
                value = inst.get("value")

                instance_name = None
                if instance_id is not None and instance_id != -1:
                    instance_name = str(instance_id)

                results.append(
                    MetricValue(
                        name=metric_name,
                        value=value,
                        instance=instance_name,
                    )
                )

        return ToolResult(
            content=json.dumps([v.model_dump() for v in results]),
            structured_content={"metrics": [v.model_dump() for v in results]},
        )

search_metrics async

search_metrics(
    ctx: Context,
    pattern: Annotated[
        str,
        Field(
            description="Metric name prefix to search for (e.g., 'kernel.all', 'mem')"
        ),
    ],
    host: Annotated[
        Optional[str],
        Field(
            description="Target pmcd host to query (default: server's configured target)"
        ),
    ] = None,
) -> ToolResult

Find PCP metrics matching a name pattern.

Use this to discover available metrics before querying them. Returns metric names and brief descriptions.

Examples:

search_metrics("kernel.all") - Find kernel-wide metrics search_metrics("mem.util") - Find memory utilization metrics search_metrics("disk.dev") - Find per-disk metrics search_metrics("network.interface") - Find per-interface metrics search_metrics("kernel", host="db1.example.com") - Search on remote host

Source code in src/pcp_mcp/tools/metrics.py
@tool(
    annotations=TOOL_ANNOTATIONS,
    icons=[ICON_SEARCH],
    tags=TAGS_METRICS | TAGS_DISCOVERY,
    timeout=30.0,
)
async def search_metrics(
    ctx: Context,
    pattern: Annotated[
        str,
        Field(description="Metric name prefix to search for (e.g., 'kernel.all', 'mem')"),
    ],
    host: Annotated[
        Optional[str],
        Field(description="Target pmcd host to query (default: server's configured target)"),
    ] = None,
) -> ToolResult:
    """Find PCP metrics matching a name pattern.

    Use this to discover available metrics before querying them.
    Returns metric names and brief descriptions.

    Examples:
        search_metrics("kernel.all") - Find kernel-wide metrics
        search_metrics("mem.util") - Find memory utilization metrics
        search_metrics("disk.dev") - Find per-disk metrics
        search_metrics("network.interface") - Find per-interface metrics
        search_metrics("kernel", host="db1.example.com") - Search on remote host
    """
    from pcp_mcp.errors import handle_pcp_error

    async with get_client_for_host(ctx, host) as client:
        try:
            metrics = await client.search(pattern)
        except Exception as e:
            raise handle_pcp_error(e, "searching metrics") from e

        results = [
            MetricSearchResult(
                name=m.get("name", ""),
                help_text=extract_help_text(m),
            )
            for m in metrics
        ]
        result = MetricSearchResultList(results=results)
        return ToolResult(
            content=result.model_dump_json(),
            structured_content=result.model_dump(),
        )