detector.py #1

  • //
  • p4mona/
  • dev/
  • p4-rca-agent/
  • p4rca/
  • detector.py
  • View
  • Commits
  • Open Download .zip Download (2 KB)
"""Anomaly detection layer — pluggable detector classes.

Each detector consumes the rolling SQLite window and emits
CandidateIncident events when thresholds are exceeded.
"""
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Iterator

from .models import CandidateIncident

logger = logging.getLogger(__name__)


class BaseDetector(ABC):
    """Abstract base for all detectors."""

    @abstractmethod
    def scan(self, db_path: Path) -> Iterator[CandidateIncident]:
        """Scan the rolling SQLite window and yield any triggered incidents."""
        ...


class WedgeDetector(BaseDetector):
    """Detects database wedge scenarios.

    Signature: write lock on a specific db.* table with lock wait time
    exceeding threshold, combined with a process pile-up count above
    min_waiting_processes.
    """

    def __init__(
        self,
        lock_wait_threshold_ms: float = 5000.0,
        min_waiting_processes: int = 3,
    ) -> None:
        """
        Args:
            lock_wait_threshold_ms: Write lock wait time that triggers detection.
            min_waiting_processes: Minimum number of waiting processes required.
        """
        self.lock_wait_threshold_ms = lock_wait_threshold_ms
        self.min_waiting_processes = min_waiting_processes

    def scan(self, db_path: Path) -> Iterator[CandidateIncident]:
        raise NotImplementedError


class SlowCommandDetector(BaseDetector):
    """Detects individual commands exceeding compute or lock time thresholds."""

    def __init__(self, compute_threshold_ms: float = 30000.0) -> None:
        self.compute_threshold_ms = compute_threshold_ms

    def scan(self, db_path: Path) -> Iterator[CandidateIncident]:
        raise NotImplementedError


class ConnectionSpikeDetector(BaseDetector):
    """Detects abnormal spikes in connection count (configurable σ threshold)."""

    def __init__(self, sigma_threshold: float = 3.0) -> None:
        self.sigma_threshold = sigma_threshold

    def scan(self, db_path: Path) -> Iterator[CandidateIncident]:
        raise NotImplementedError
# Change User Description Committed
#1 32636 bot_Claude_Anthropic Scaffold p4-rca-agent repo: directory structure, data models, layer stubs, test fixtures, config, docs.
Covers briefing tasks 2 and 3.
#review-32637 @robert_cowham @tom_tyler