#!/usr/bin/env python3
################################################################################
#
# Copyright (c) 2026, Perforce Software, Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE SOFTWARE, INC. BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# DATE
#
# $Change: 32743 $
# $Date: 2026/06/16 $
#
# SYNOPSIS
#
# p4diag [-h] [-q] LOG [--start TIME --end TIME]
# p4diag trim LOG --start TIME --end TIME
# p4diag stats LOG
# p4diag log2sql LOG
# p4diag summary LOG
# p4diag FILE.sql [LOG]
# p4diag list
# p4diag schema
# p4diag plots LOG
#
# DESCRIPTION
#
# This script aggregates common Perforce P4 server log analysis workflows.
# It uses log2sql to build a SQLite database from trace data, runs canned and
# summary SQL queries, writes grep-based log statistics, victim/culprit
# write-wait analysis, and command-activity plots into text and HTML under
# .p4diagnostics/ for viewing in a browser. Interactive mode provides a TTY
# menu and starts a small local web server for output files.
#
# REQUIREMENT
#
# Python 3 (built-in sqlite3 module — test: python3 -c "import sqlite3")
# pip install tabulate
# log2sql on PATH (or set LOG2SQL_BIN)
# sqlite3 on PATH — LOG2SQL canned queries, ad hoc SELECT, pid probe (3.31+ for @parameters)
# gnuplot — optional, for plots and summary PNGs
# grep, sed — log stats and trim
#
################################################################################
# $Date: 2026/06/16 $
import os
import pickle
import re
import sys
import shlex
import io
import csv
import json
import gzip
import signal
import threading
import argparse
import hashlib
try:
import readline
except ImportError:
try:
import gnureadline as readline # type: ignore[no-redef]
except ImportError:
readline = None # type: ignore[misc, assignment]
_PYTHON_SQLITE3_MISSING_MSG = """\
p4diag: this Python was built without sqlite3 support (No module named '_sqlite3').
p4diag needs Python's sqlite3 module for lock shortcuts, summary SQL, victim/culprit
analysis, and other core features (in addition to the sqlite3 CLI on PATH).
Fix options:
1. Use a Python that includes sqlite3:
python3 -c "import sqlite3"
On RHEL/CentOS 7, try: yum install python3
Then run: /usr/bin/python3 /path/to/p4diag ...
2. Rebuild /usr/local Python with sqlite headers installed:
yum install sqlite-devel
then re-run Python's configure/make install.
3. Change the shebang at the top of p4diag to a working python3.
"""
try:
import sqlite3
except ImportError:
class _Sqlite3Unavailable:
Error = Exception
class Cursor:
pass
class Row:
pass
class Connection:
pass
@staticmethod
def connect(*_args, **_kwargs):
raise ImportError(_PYTHON_SQLITE3_MISSING_MSG)
sqlite3 = _Sqlite3Unavailable() # type: ignore[misc, assignment]
import subprocess as s
import tempfile
import shutil
import glob
import itertools
from datetime import datetime, timedelta
from typing import Any, Dict, Iterator, List, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from functools import partial
from tabulate import tabulate
# Set in main(); used by log2sql summary / stats when run non-interactively.
QUIET = False
LOG_FILE = ""
_P4DIAG_INSTALL_DIR = os.path.dirname(os.path.abspath(__file__))
# Canned queries that are always long; paginate even if the file omits ``-- p4diag: pager``.
_SQL_FILES_AUTO_PAGER = frozenset({
"locks_held_total.sql",
"locks_all_duration.sql",
"locks_table_summary.sql",
"vicitm_culprit.sql",
"bigLocksPerTable.sql",
"table_BigLocksPerTable.sql",
})
# One-line purpose for each shipped SQL library report (shown in menu and help).
_SQL_LIBRARY_SUMMARIES: Dict[str, str] = {
"locks_table_summary.sql": (
"Which tables are hot across the whole log"
),
"locks_table_by_cmd.sql": (
"One hot table → which command types drove the pain"
),
"locks_held_total.sql": (
"Timeline of big holds (snowball hunting)"
),
"locks_all_duration.sql": (
"All lock wait/hold over a threshold you choose (ms)"
),
"command_summary.sql": (
"Command volume and top users"
),
"cpu_summary.sql": (
"CPU totals by command and user"
),
"memory_summary.sql": (
"Peak memory by command and user"
),
"vicitm_culprit.sql": (
"Pairing victims with specific culprits"
),
"bigLocksPerTable.sql": (
"Big lock holds (>5s) on revdx, resolve, integed, and have — newest first"
),
"table_BigLocksPerTable.sql": (
"Big lock holds (>5s) grouped by table — alternate layout for hot tables"
),
"vc.sql": (
"Write-wait victims paired with likely lock-holding culprits (short name for vc report)"
),
}
# Lock-query section order in ``h`` / help (box table).
_LOCK_QUERY_HELP_ORDER: Tuple[str, ...] = (
"locks_table_summary.sql",
"locks_held_total.sql",
"locks_all_duration.sql",
"vicitm_culprit.sql",
"locks_table_by_cmd.sql",
)
# Lock shortcut examples for ``h`` / help (command | what it shows).
_LOCK_SHORTCUT_EXAMPLES: Tuple[Tuple[str, str], ...] = (
("ww", "Top write waiters, any table"),
("ww db.rev 100", "Write waiters on db.rev, limit 100"),
("wh db.storage", "Write holders on db.storage"),
("rw db.revdx", "Read waiters on db.revdx"),
("rh db.revhx START END", "Read holders in time range"),
("rh db.revdx START END app,args", "Read holders + extra columns"),
("pid", "Probe one command (PID + startTime)"),
)
# Labels for interactive ww/wh/rw/rh shortcuts (match summary HTML Lock Contention section).
_LOCK_CONTENTION_SHORTCUT_LABELS: Dict[str, str] = {
"ww": "Top 25 write waiters over 10 seconds - blocked by other read or write locks (victims)",
"rw": "Top 25 read waiters over 10 seconds - blocked by other write locks (victims)",
"rh": "Top 25 read holders over 10 seconds - blocking writers (culprits)",
"wh": "Top 25 write holders over 10 seconds - blocking readers and writers (culprits)",
}
# Built-in SQL library (shipped in the script; disk ``sql_queries/`` overrides by basename).
_BUILTIN_SQL_LIBRARY: Dict[str, str] = {
'locks_table_summary.sql': """\
.print
.print 'DB CONTENTION - Average Locks Summary (total wait and held each > 10 seconds)'
.print 'Does one table have high average or total wait (victims) or held (culprits)?'
.print ''
.print ' table table name'
.print ' number lock events (tableUse rows for this table)'
.print ' avgRL avg read-lock acquisitions (ms)'
.print ' avgWL avg write-lock acquisitions (ms)'
.print ' avgRW avg read wait (ms); victims blocked on read locks'
.print ' avgRH avg read held (ms); culprits holding read locks'
.print ' avgWW avg write wait (ms); victims blocked on write locks'
.print ' avgWH avg write held (ms); culprits holding write locks'
.print ' totalWait total read+write wait (ms)'
.print ' totalHeld total read+write held (ms)'
.print ' hot worst victim (highest totalWait) and culprit (highest totalHeld)'
.print ''
WITH stats AS (
SELECT
tableName AS "table",
COUNT(readLocks) AS number,
CAST(ROUND(AVG(readLocks)) AS INTEGER) AS avgRL,
CAST(ROUND(AVG(writeLocks)) AS INTEGER) AS avgWL,
CAST(ROUND(AVG(totalReadWait)) AS INTEGER) AS avgRW,
CAST(ROUND(AVG(totalReadHeld)) AS INTEGER) AS avgRH,
CAST(ROUND(AVG(totalWriteWait)) AS INTEGER) AS avgWW,
CAST(ROUND(AVG(totalWriteHeld)) AS INTEGER) AS avgWH,
CAST(ROUND(SUM(totalReadWait) + SUM(totalWriteWait)) AS INTEGER) AS "totalWait",
CAST(ROUND(SUM(totalReadHeld) + SUM(totalWriteHeld)) AS INTEGER) AS "totalHeld"
FROM tableUse
GROUP BY tableUse.tableName
),
filtered AS (
SELECT *
FROM stats
WHERE "totalWait" > 10000
AND "totalHeld" > 10000
),
peaks AS (
SELECT
MAX("totalWait") AS maxWait,
MAX("totalHeld") AS maxHeld
FROM filtered
)
SELECT
f."table",
f.number,
f.avgRL,
f.avgWL,
f.avgRW,
f.avgRH,
f.avgWW,
f.avgWH,
f."totalWait",
f."totalHeld",
TRIM(
CASE WHEN f."totalWait" = p.maxWait THEN 'top wait' ELSE '' END ||
CASE
WHEN f."totalWait" = p.maxWait AND f."totalHeld" = p.maxHeld THEN ', top held'
WHEN f."totalHeld" = p.maxHeld THEN 'top held'
ELSE ''
END
) AS hot
FROM filtered f
CROSS JOIN peaks p
ORDER BY f."totalWait" DESC;
""",
'locks_held_total.sql': """\
-- p4diag: pager
-- Lock holds (readHeld+writeHeld) > 5000ms, sorted by startTime.
-- Optional table filter via @table (default % = all tables).
--
-- Example:
-- p4diag locks_held_total.sql LOG
-- p4diag locks_held_total.sql LOG revhx
.print "Find commands/tables where total lock hold time (totalReadHeld + totalWriteHeld) exceeded 5000ms."
.print "Sorted by startTime (then held_ms desc) to help spot the first significant lock that could trigger a snowball."
.print "Optional table filter: pass a table name on the command line, or omit for all tables."
.print ""
SELECT
p.startTime,
p.pid,
p.user,
p.cmd,
tu.tableName,
(tu.totalReadHeld + tu.totalWriteHeld) AS held_ms,
tu.totalReadHeld AS readHeld_ms,
tu.totalWriteHeld AS writeHeld_ms
FROM tableUse tu
JOIN process p
ON p.processkey = tu.processkey
AND p.lineNumber = tu.lineNumber
WHERE (tu.totalReadHeld + tu.totalWriteHeld) > 5000
AND tu.tableName NOT IN ('storagemasterup_R', 'storageup_R')
AND tu.tableName LIKE @table
ORDER BY p.startTime ASC, held_ms DESC;
""",
'locks_all_duration.sql': """\
-- p4diag: pager
-- Lock wait or hold exceeding @duration ms, sorted by startTime.
--
-- Requires sqlite3 parameter support (.parameter init). Set before .read:
-- .parameter set @duration 5000
--
-- Example:
-- p4diag locks_all_duration.sql LOG 5000
.print 'Lock events where readHeld, writeHeld, readWait, or writeWait exceeds the threshold (ms)'
.print 'Sorted by startTime ASC.'
.print ''
SELECT
p.startTime,
p.pid,
p.user,
p.cmd,
tu.tableName,
tu.totalReadWait AS readWait_ms,
tu.totalReadHeld AS readHeld_ms,
tu.totalWriteWait AS writeWait_ms,
tu.totalWriteHeld AS writeHeld_ms
FROM tableUse tu
JOIN process p
ON p.processkey = tu.processkey
AND p.lineNumber = tu.lineNumber
WHERE (
tu.totalReadHeld > @duration
OR tu.totalWriteHeld > @duration
OR tu.totalReadWait > @duration
OR tu.totalWriteWait > @duration
)
AND tu.tableName NOT IN ('storagemasterup_R', 'storageup_R')
ORDER BY p.startTime ASC;
""",
'locks_table_by_cmd.sql': """\
-- p4diag: require-table
-- Lock time on one table: rollup totals and breakdown by P4 command (cmd).
--
-- Use when a table (e.g. revdx) is clearly the hotspot but victim/culprit
-- pairing does not point to a single offender — see which command types
-- contributed the most wait and hold time on that table.
--
-- Requires sqlite3 parameter support (.parameter init). Set before .read:
-- .parameter set @table revdx
--
-- Example:
-- p4diag locks_table_by_cmd.sql LOG revdx
.print ''
.print 'LOCK TIME ON ONE TABLE — totals and by command'
.print ''
.print ' totalLock_ms = readWait + writeWait + readHeld + writeHeld (all ms on this table)'
.print ''
WITH filtered AS (
SELECT
tu.totalReadWait,
tu.totalWriteWait,
tu.totalReadHeld,
tu.totalWriteHeld,
p.cmd
FROM tableUse tu
JOIN process p
ON p.processkey = tu.processkey
AND p.lineNumber = tu.lineNumber
WHERE tu.tableName LIKE @table
)
SELECT
@table AS "table",
COUNT(*) AS events,
CAST(SUM(totalReadWait) AS INTEGER) AS readWait_ms,
CAST(SUM(totalWriteWait) AS INTEGER) AS writeWait_ms,
CAST(SUM(totalReadHeld) AS INTEGER) AS readHeld_ms,
CAST(SUM(totalWriteHeld) AS INTEGER) AS writeHeld_ms,
CAST(SUM(totalReadWait + totalWriteWait) AS INTEGER) AS totalWait_ms,
CAST(SUM(totalReadHeld + totalWriteHeld) AS INTEGER) AS totalHeld_ms,
CAST(
SUM(totalReadWait + totalWriteWait + totalReadHeld + totalWriteHeld)
AS INTEGER
) AS totalLock_ms
FROM filtered;
.print ''
.print 'By command (sorted by totalLock_ms desc)'
.print ''
WITH filtered AS (
SELECT
tu.totalReadWait,
tu.totalWriteWait,
tu.totalReadHeld,
tu.totalWriteHeld,
p.cmd
FROM tableUse tu
JOIN process p
ON p.processkey = tu.processkey
AND p.lineNumber = tu.lineNumber
WHERE tu.tableName LIKE @table
),
totals AS (
SELECT
SUM(totalReadWait + totalWriteWait + totalReadHeld + totalWriteHeld) AS grandLock_ms
FROM filtered
)
SELECT
f.cmd AS command,
COUNT(*) AS events,
CAST(SUM(f.totalReadWait) AS INTEGER) AS readWait_ms,
CAST(SUM(f.totalWriteWait) AS INTEGER) AS writeWait_ms,
CAST(SUM(f.totalReadHeld) AS INTEGER) AS readHeld_ms,
CAST(SUM(f.totalWriteHeld) AS INTEGER) AS writeHeld_ms,
CAST(SUM(f.totalReadWait + f.totalWriteWait) AS INTEGER) AS totalWait_ms,
CAST(SUM(f.totalReadHeld + f.totalWriteHeld) AS INTEGER) AS totalHeld_ms,
CAST(
SUM(f.totalReadWait + f.totalWriteWait + f.totalReadHeld + f.totalWriteHeld)
AS INTEGER
) AS totalLock_ms,
CAST(
ROUND(
100.0
* SUM(f.totalReadWait + f.totalWriteWait + f.totalReadHeld + f.totalWriteHeld)
/ NULLIF(t.grandLock_ms, 0),
1
)
AS TEXT
) || '%' AS pct_of_table
FROM filtered f
CROSS JOIN totals t
GROUP BY f.cmd, t.grandLock_ms
ORDER BY totalLock_ms DESC;
""",
'command_summary.sql': """\
.print ''
.print 'COMMAND SUMMARY - command usage from the trace log'
.print ''
SELECT
COUNT(*) AS total,
COUNT(DISTINCT cmd) AS commands,
COUNT(DISTINCT user) AS users,
MIN(startTime) AS firstStart,
MAX(endTime) AS lastEnd
FROM process;
.print ''
.print 'Top 20 commands by count'
.print ''
.print ' command P4 command name'
.print ' count number of invocations'
.print ' pct share of all commands'
.print ' users distinct users running this command'
.print ' avgLapse avg completed lapse (s)'
.print ' cpuMin total user+system CPU (minutes)'
.print ''
WITH totals AS (
SELECT COUNT(*) AS n FROM process
),
by_cmd AS (
SELECT
cmd AS command,
COUNT(*) AS count,
COUNT(DISTINCT user) AS users,
CAST(ROUND(AVG(completedLapse)) AS INTEGER) AS avgLapse,
CAST(ROUND(SUM(uCpu + sCpu) / 60000.0) AS INTEGER) AS cpuMin
FROM process
GROUP BY cmd
)
SELECT
command,
count,
CAST(ROUND(100.0 * count / totals.n, 1) AS TEXT) || '%' AS pct,
users,
avgLapse,
cpuMin
FROM by_cmd
CROSS JOIN totals
ORDER BY count DESC
LIMIT 20;
.print ''
.print 'Top 20 command + user pairs by count'
.print ''
.print ' user P4 user'
.print ' command P4 command name'
.print ' count invocations for this user and command'
.print ''
SELECT
user,
cmd AS command,
COUNT(*) AS count
FROM process
GROUP BY user, cmd
ORDER BY count DESC
LIMIT 20;
""",
'cpu_summary.sql': """\
.print ''
.print 'CPU SUMMARY - CPU usage from the trace log'
.print ''
.print 'Note: Windows P4 Server logs do not include "usage" lines, so uCpu/sCpu counters'
.print 'are not populated. On Windows, use the substitute section at the end of this report'
.print '(computed lapse and RPC times) instead of the CPU sections above.'
.print ''
.width 0 0 0 0 0 0
SELECT
COUNT(*) AS total,
SUM(CASE WHEN uCpu > 10000 THEN 1 ELSE 0 END) AS over10sUser,
SUM(CASE WHEN sCpu > 10000 THEN 1 ELSE 0 END) AS over10sSys,
CAST(SUM(uCpu) AS INTEGER) AS totalUserMs,
CAST(SUM(sCpu) AS INTEGER) AS totalSysMs,
CAST(ROUND(SUM(uCpu + sCpu) / 60000.0) AS INTEGER) AS totalCpuMin
FROM process;
.print ''
.print 'Top 20 by total CPU (excluding sync and transmit)'
.print ''
.print ' user P4 user'
.print ' command P4 command name'
.print ' lapse completed lapse (s)'
.print ' uCpuMs user CPU (ms)'
.print ' sCpuMs system CPU (ms)'
.print ' totalMs user + system CPU (ms)'
.print ' startTime command start'
.print ' endTime command end'
.print ''
.width 40 16 8 10 10 10 20 20
SELECT
user,
cmd AS command,
CAST(completedLapse AS INTEGER) AS lapse,
CAST(uCpu AS INTEGER) AS uCpuMs,
CAST(sCpu AS INTEGER) AS sCpuMs,
CAST(uCpu + sCpu AS INTEGER) AS totalMs,
startTime,
endTime
FROM process
WHERE cmd NOT LIKE '%sync%'
AND cmd NOT LIKE '%transmit%'
ORDER BY totalMs DESC
LIMIT 20;
.print ''
.print 'Top 10 users by user CPU'
.print ''
.print ' user P4 user'
.print ' uCpuMs total user CPU (ms)'
.print ' uCpuMin total user CPU (minutes)'
.print ''
.width 40 12 10
SELECT
user,
CAST(SUM(uCpu) AS INTEGER) AS uCpuMs,
CAST(ROUND(SUM(uCpu) / 60000.0) AS INTEGER) AS uCpuMin
FROM process
GROUP BY user
ORDER BY uCpuMs DESC
LIMIT 10;
.print ''
.print 'Top 10 users by system CPU'
.print ''
.print ' user P4 user'
.print ' sCpuMs total system CPU (ms)'
.print ' sCpuMin total system CPU (minutes)'
.print ''
.width 40 12 10
SELECT
user,
CAST(SUM(sCpu) AS INTEGER) AS sCpuMs,
CAST(ROUND(SUM(sCpu) / 60000.0) AS INTEGER) AS sCpuMin
FROM process
GROUP BY user
ORDER BY sCpuMs DESC
LIMIT 10;
.print ''
.print 'Commands using more than 10 seconds of user CPU (by invocation count)'
.print ''
.print ' command P4 command name'
.print ' count invocations over 10s user CPU'
.print ' uCpuMin total user CPU (minutes)'
.print ''
.width 16 8 10
SELECT
cmd AS command,
COUNT(*) AS count,
CAST(ROUND(SUM(uCpu) / 60000.0) AS INTEGER) AS uCpuMin
FROM process
WHERE uCpu > 10000
GROUP BY cmd
ORDER BY count DESC;
.print ''
.print 'Commands using more than 10 seconds of system CPU (by invocation count)'
.print ''
.print ' command P4 command name'
.print ' count invocations over 10s system CPU'
.print ' sCpuMin total system CPU (minutes)'
.print ''
.width 16 8 10
SELECT
cmd AS command,
COUNT(*) AS count,
CAST(ROUND(SUM(sCpu) / 60000.0) AS INTEGER) AS sCpuMin
FROM process
WHERE sCpu > 10000
GROUP BY cmd
ORDER BY count DESC;
.print ''
.print 'Top 15 commands by total user CPU'
.print ''
.print ' command P4 command name'
.print ' uCpuMin total user CPU (minutes)'
.print ''
.width 16 10
SELECT
cmd AS command,
CAST(ROUND(SUM(uCpu) / 60000.0) AS INTEGER) AS uCpuMin
FROM process
GROUP BY cmd
ORDER BY uCpuMin DESC
LIMIT 15;
.print ''
.print 'Windows substitute for CPU'
.print ''
.print ' pid process id'
.print ' user P4 user'
.print ' command P4 command name'
.print ' lapse completed lapse (s)'
.print ' rpcRcv RPC receive time'
.print ' rpcSnd RPC send time'
.print ' computed computed lapse'
.print ' startTime command start'
.print ' endTime command end'
.print ''
.width 8 40 16 8 10 10 10 20 20
SELECT
pid,
user,
cmd AS command,
CAST(ROUND(completedLapse) AS INTEGER) AS lapse,
CAST(ROUND(rpcRcv) AS INTEGER) AS rpcRcv,
CAST(ROUND(rpcSnd) AS INTEGER) AS rpcSnd,
CAST(ROUND(computedLapse) AS INTEGER) AS computed,
startTime,
endTime
FROM process
ORDER BY computedLapse DESC
LIMIT 50;
""",
'memory_summary.sql': """\
.print ''
.print 'MEMORY SUMMARY - memory usage from the trace log (excluding pull)'
.print ''
.print 'Overview (one row for the whole log):'
.print ' total traced commands'
.print ' withMem commands that reported memory data'
.print ' peakMemMB highest cmd memory (MB) on any single command'
.print ' peakMemPeakMB highest peak memory (MB) on any single command'
.print ' avgMemMB average cmd memory (MB) across all commands'
.print ''
.width 0 0 0 0 0
SELECT
COUNT(*) AS total,
SUM(CASE WHEN memMB > 0 THEN 1 ELSE 0 END) AS withMem,
CAST(MAX(memMB) AS INTEGER) AS peakMemMB,
CAST(MAX(memPeakMB) AS INTEGER) AS peakMemPeakMB,
CAST(ROUND(AVG(memMB)) AS INTEGER) AS avgMemMB
FROM process
WHERE cmd != 'pull';
.print ''
.print 'Top 25 memory consumers (excluding pull)'
.print ''
.print ' pid process id'
.print ' user P4 user'
.print ' command P4 command name (cmd only, not args)'
.print ' lapse completed lapse (s)'
.print ' uCpuMs user CPU (ms)'
.print ' sCpuMs system CPU (ms)'
.print ' memMB cmd memory when command finished (MB)'
.print ' memPeakMB peak memory during command (MB)'
.print ' startTime command start'
.print ' endTime command end'
.print ''
.width 8 40 16 8 10 10 8 10 20 20
SELECT
pid,
user,
cmd AS command,
CAST(ROUND(completedLapse) AS INTEGER) AS lapse,
CAST(uCpu AS INTEGER) AS uCpuMs,
CAST(sCpu AS INTEGER) AS sCpuMs,
CAST(memMB AS INTEGER) AS memMB,
CAST(memPeakMB AS INTEGER) AS memPeakMB,
startTime,
endTime
FROM process
WHERE cmd != 'pull'
ORDER BY memMB DESC
LIMIT 25;
.print ''
.print 'Top 15 commands by peak memory (memPeakMB)'
.print ''
.print ' command P4 command name'
.print ' count invocations'
.print ' peakMemMB highest memMB seen'
.print ' peakMaxMB highest memPeakMB seen'
.print ' avgMemMB average memMB'
.print ''
.width 16 10 10 10 10
SELECT
cmd AS command,
COUNT(*) AS count,
CAST(MAX(memMB) AS INTEGER) AS peakMemMB,
CAST(MAX(memPeakMB) AS INTEGER) AS peakMaxMB,
CAST(ROUND(AVG(memMB)) AS INTEGER) AS avgMemMB
FROM process
WHERE cmd != 'pull'
GROUP BY cmd
ORDER BY peakMaxMB DESC
LIMIT 15;
.print ''
.print 'Top 10 users by peak memory (memPeakMB)'
.print ''
.print ' user P4 user'
.print ' count invocations'
.print ' peakMemMB highest memMB seen'
.print ' peakMaxMB highest memPeakMB seen'
.print ''
.width 40 10 10 10
SELECT
user,
COUNT(*) AS count,
CAST(MAX(memMB) AS INTEGER) AS peakMemMB,
CAST(MAX(memPeakMB) AS INTEGER) AS peakMaxMB
FROM process
WHERE cmd != 'pull'
GROUP BY user
ORDER BY peakMaxMB DESC
LIMIT 10;
""",
'vicitm_culprit.sql': """\
-- Victim / culprit write-wait drill-down (matches p4diag victim-culprit analysis).
--
-- Finds commands with high write-wait on a table (victims), then lock-holders on the
-- same table in a time window (culprits). Direct culprits (read/write held > 95% of
-- the victim's write wait) are preferred; when none match, overlap culprits (held > 2s
-- before the victim start) are shown instead.
--
-- Tunables (edit literals below):
-- global pool 50, per-table 5, write-wait floor 5000 ms, overlap floor 2000 ms,
-- hold lookback 4 hours, end margin 4 minutes.
--
-- Each victim/culprit PID includes a resource hint (lock-wait, lock-hold, cpu, rpc, io, mixed).
--
-- Usage:
-- sqlite3 -header .p4diagnostics/mylog.db ".read /warp/sql_queries/vc.sql"
-- p4diagnostics query vc.sql mylog
.headers off
.mode list
WITH
base AS (
SELECT
tu.tableName,
p.user,
p.startTime,
p.endTime,
p.pid,
p.cmd,
p.args,
CAST(p.completedLapse AS INTEGER) AS lapse,
tu.totalWriteWait
FROM tableUse tu
JOIN process p USING (processKey)
WHERE tu.tableName NOT IN (
'clients', 'clientEntity', 'change', 'storageup_R', 'storagemasterup_R', 'pull'
)
AND tu.totalWriteWait > 5000
AND p.completedLapse > 0
),
global_pool AS (
SELECT
*,
ROW_NUMBER() OVER (ORDER BY totalWriteWait DESC) AS rn_global
FROM base
),
top_global AS (
SELECT *
FROM global_pool
WHERE rn_global <= 50
),
per_table AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY tableName
ORDER BY totalWriteWait DESC
) AS rn_table
FROM top_global
),
victims AS (
SELECT
ROW_NUMBER() OVER (ORDER BY totalWriteWait DESC) - 1 AS victim_idx,
tableName,
user,
startTime,
endTime,
pid,
cmd,
args,
lapse,
totalWriteWait
FROM per_table
WHERE rn_table <= 5
),
victim_windows AS (
SELECT
v.*,
ROUND(totalWriteWait * 0.95) AS wait95,
strftime(
'%Y/%m/%d %H:%M:%S',
datetime(replace(v.startTime, '/', '-'), '-4 hours')
) AS hold_search_start,
strftime(
'%Y/%m/%d %H:%M:%S',
datetime(replace(v.endTime, '/', '-'), '+4 minutes')
) AS adj_end
FROM victims v
),
direct_culprits AS (
SELECT
vw.victim_idx,
'direct' AS match_type,
p.pid AS culprit_pid,
p.user AS culprit_user,
p.cmd AS culprit_cmd,
tu.totalReadHeld AS culprit_read_held_ms,
tu.totalWriteHeld AS culprit_write_held_ms,
tu.tableName AS culprit_table,
p.startTime AS culprit_start
FROM victim_windows vw
JOIN tableUse tu
ON tu.tableName = vw.tableName
JOIN process p
ON p.processKey = tu.processKey
WHERE p.startTime >= vw.hold_search_start
AND p.startTime <= vw.adj_end
AND (
tu.totalReadHeld > vw.wait95
OR tu.totalWriteHeld > vw.wait95
)
AND CAST(CAST(p.pid AS REAL) AS INTEGER) != CAST(CAST(vw.pid AS REAL) AS INTEGER)
),
overlap_culprits AS (
SELECT
vw.victim_idx,
'overlap' AS match_type,
p.pid AS culprit_pid,
p.user AS culprit_user,
p.cmd AS culprit_cmd,
tu.totalReadHeld AS culprit_read_held_ms,
tu.totalWriteHeld AS culprit_write_held_ms,
tu.tableName AS culprit_table,
p.startTime AS culprit_start
FROM victim_windows vw
JOIN tableUse tu
ON tu.tableName = vw.tableName
JOIN process p
ON p.processKey = tu.processKey
WHERE NOT EXISTS (
SELECT 1
FROM direct_culprits dc
WHERE dc.victim_idx = vw.victim_idx
)
AND p.startTime >= vw.hold_search_start
AND p.startTime <= vw.startTime
AND (
tu.totalReadHeld > 2000
OR tu.totalWriteHeld > 2000
)
AND CAST(CAST(p.pid AS REAL) AS INTEGER) != CAST(CAST(vw.pid AS REAL) AS INTEGER)
),
all_culprits AS (
SELECT * FROM direct_culprits
UNION ALL
SELECT * FROM overlap_culprits
),
ranked_culprit_probes AS (
SELECT
ac.culprit_pid AS pid,
ac.culprit_start AS startTime,
ROW_NUMBER() OVER (
PARTITION BY ac.victim_idx
ORDER BY ac.culprit_read_held_ms + ac.culprit_write_held_ms DESC
) AS rn
FROM all_culprits ac
),
probe_targets AS (
SELECT pid, startTime FROM victims
UNION
SELECT pid, startTime FROM ranked_culprit_probes WHERE rn <= 5
),
table_agg AS (
SELECT
tu.processKey,
SUM(tu.pagesIn + tu.pagesOut) AS db_io,
SUM(tu.scanRows) AS scans,
SUM(tu.totalReadWait + tu.totalWriteWait) AS lock_wait_ms,
SUM(tu.totalReadHeld + tu.totalWriteHeld) AS lock_held_ms
FROM tableUse tu
JOIN process p ON p.processKey = tu.processKey
JOIN probe_targets pt ON pt.pid = p.pid AND pt.startTime = p.startTime
GROUP BY tu.processKey
),
probe_data AS (
SELECT
p.pid,
p.startTime,
CAST(p.completedLapse AS INTEGER) AS lapse_s,
p.uCpu + COALESCE(p.sCpu, 0) AS cpu_us,
COALESCE(p.diskIn, 0) + COALESCE(p.diskOut, 0) AS disk_io,
COALESCE(p.rpcSnd, 0) + COALESCE(p.rpcRcv, 0) AS rpc_s,
t.db_io,
t.scans,
t.lock_wait_ms,
t.lock_held_ms,
CASE
WHEN t.lock_wait_ms > t.lock_held_ms AND t.lock_wait_ms > 5000 THEN 'lock-wait'
WHEN t.lock_held_ms > 5000 THEN 'lock-hold'
WHEN p.uCpu + COALESCE(p.sCpu, 0) > CAST(p.completedLapse * 1e6 AS INTEGER) / 2 THEN 'cpu'
WHEN COALESCE(p.rpcSnd, 0) + COALESCE(p.rpcRcv, 0) > p.completedLapse / 4 THEN 'rpc'
WHEN t.db_io > 5000
OR COALESCE(p.diskIn, 0) + COALESCE(p.diskOut, 0) > 100000 THEN 'io'
ELSE 'mixed'
END AS hint
FROM probe_targets pt
JOIN process p ON p.pid = pt.pid AND p.startTime = pt.startTime
LEFT JOIN table_agg t ON t.processKey = p.processKey
),
culprit_lines AS (
SELECT
victim_idx,
match_type,
culprit_start,
printf(
' %-8s %-22s %-15s %12s %12s %-8s %s',
CAST(ac.culprit_pid AS TEXT),
substr(COALESCE(ac.culprit_user, ''), 1, 22),
substr(COALESCE(ac.culprit_cmd, ''), 1, 15),
printf('%,.0f', ac.culprit_read_held_ms),
printf('%,.0f', ac.culprit_write_held_ms),
COALESCE(pd.hint, '?'),
ac.culprit_start
) AS line
FROM all_culprits ac
LEFT JOIN probe_data pd
ON pd.pid = ac.culprit_pid AND pd.startTime = ac.culprit_start
),
culprit_blocks AS (
SELECT
cl.victim_idx,
MAX(cl.match_type) AS match_type,
' Culprits (' || MAX(cl.match_type) || ') — lock holders on this table (victim PID excluded):'
|| char(10)
|| ' pid user cmd read-held write-held hint start'
|| char(10)
|| (
SELECT GROUP_CONCAT(cl2.line, char(10))
FROM culprit_lines cl2
WHERE cl2.victim_idx = cl.victim_idx
ORDER BY cl2.culprit_start
) AS block
FROM culprit_lines cl
GROUP BY cl.victim_idx
),
victim_reports AS (
SELECT
vw.victim_idx,
vw.totalWriteWait,
'------------------------------------------------------------------------'
|| char(10)
|| 'Victim: '
|| COALESCE(vw.cmd, '(unknown)')
|| ' ('
|| printf('%,.0f', vw.totalWriteWait)
|| 'ms write wait on '
|| vw.tableName
|| ')'
|| char(10)
|| char(10)
|| ' Victim process (blocked — waiting to write):'
|| char(10)
|| ' PID: ' || COALESCE(CAST(vw.pid AS TEXT), '')
|| char(10)
|| ' Start: ' || COALESCE(vw.startTime, '')
|| char(10)
|| ' End: ' || COALESCE(vw.endTime, '')
|| char(10)
|| ' Cmd: ' || COALESCE(vw.cmd, '(unknown)')
|| char(10)
|| ' Args: ' || COALESCE(NULLIF(trim(vw.args), ''), '(none)')
|| char(10)
|| char(10)
|| ' Victim profile: '
|| COALESCE(vp.hint, '(no process row)')
|| ' lapse=' || COALESCE(CAST(vp.lapse_s AS TEXT), '?') || 's'
|| ' lock_wait=' || printf('%,.0f', COALESCE(vp.lock_wait_ms, 0)) || 'ms'
|| ' lock_held=' || printf('%,.0f', COALESCE(vp.lock_held_ms, 0)) || 'ms'
|| ' db_io=' || printf('%,.0f', COALESCE(vp.db_io, 0))
|| ' scans=' || printf('%,.0f', COALESCE(vp.scans, 0))
|| char(10)
|| char(10)
|| COALESCE(
cb.block,
' Culprits: (none) — no other lock-holder rows matched the time window and thresholds.'
)
|| char(10) AS report
FROM victim_windows vw
LEFT JOIN culprit_blocks cb
ON cb.victim_idx = vw.victim_idx
LEFT JOIN probe_data vp
ON vp.pid = vw.pid AND vp.startTime = vw.startTime
),
report_parts AS (
SELECT
0 AS sort_key,
-1 AS victim_idx,
0.0 AS totalWriteWait,
'VICTIM / CULPRIT REPORT (write-wait drill-down)'
|| char(10)
|| 'Victims: up to 5 rows per table among the top 50 global write-wait rows (>5s).'
|| char(10)
|| 'Each victim is a command with high write-wait; culprits hold read/write locks on that table.'
|| char(10)
|| 'Search window: 4h before victim start through shortly after victim end (direct),'
|| char(10)
|| 'or through victim start only (overlap fallback).'
|| char(10) AS report
UNION ALL
SELECT
1 AS sort_key,
-1 AS victim_idx,
0.0 AS totalWriteWait,
'No significant write waits (> 5s) found.'
WHERE (SELECT COUNT(*) FROM victims) = 0
UNION ALL
SELECT
1 AS sort_key,
victim_idx,
totalWriteWait,
report
FROM victim_reports
WHERE (SELECT COUNT(*) FROM victims) > 0
)
SELECT report
FROM report_parts
ORDER BY sort_key, totalWriteWait DESC, victim_idx;
""",
}
# Built-in PID probe SQL (``pid`` command only; not listed in canned-query menu).
_BUILTIN_PID_PROBE_SQL: Dict[str, str] = {
'probe_pid.sql': """\
-- Probe one command instance by PID + startTime (log2sql process/tableUse).
--
-- Requires sqlite3 parameter support (.parameter init). Set before .read:
-- .parameter set @pid <pid>
-- .parameter set @start 'YYYY/MM/DD HH:MM:SS'
--
-- Example:
-- sqlite3 -header -column DB.db \
-- -cmd ".parameter init" \
-- -cmd ".parameter set @pid 97736" \
-- -cmd ".parameter set @start '2025/07/10 12:00:01'" \
-- ".read /warp/sql_queries/probe_pid.sql"
--
-- Or: probe_pid DB.db 97736 '2025/07/10 12:00:01' (see probe_pid.sh)
.mode line
.headers on
.print ''
.print '=== PID probe (summary) ==='
.print ''
WITH target AS (
SELECT * FROM process
WHERE pid = @pid AND startTime = @start
),
lock_agg AS (
SELECT
COUNT(*) AS tables_touched,
SUM(pagesIn + pagesOut) AS db_pages_io,
SUM(scanRows) AS scan_rows,
SUM(putRows + delRows) AS write_rows,
SUM(totalReadWait + totalWriteWait) AS lock_wait_ms,
SUM(totalReadHeld + totalWriteHeld) AS lock_held_ms,
SUM(totalPeekWait) AS peek_wait_ms,
SUM(readLocks + writeLocks) AS lock_ops_ms,
MAX(totalWriteWait) AS max_write_wait_ms,
MAX(totalReadWait) AS max_read_wait_ms,
MAX(totalWriteHeld) AS max_write_held_ms,
MAX(totalReadHeld) AS max_read_held_ms
FROM tableUse JOIN target USING (processKey)
),
m AS (
SELECT
t.pid, t.startTime, t.endTime, t.user, t.workspace, t.ip, t.cmd, t.args,
CAST(t.completedLapse AS INTEGER) AS lapse_s,
CAST(t.computedLapse AS INTEGER) AS computed_s,
COALESCE(t.paused, 0) AS paused_s,
t.uCpu, t.sCpu,
(COALESCE(t.uCpu, 0) + COALESCE(t.sCpu, 0)) AS cpu_us,
CAST(t.completedLapse * 1000 AS INTEGER) AS lapse_ms,
COALESCE(t.diskIn, 0) + COALESCE(t.diskOut, 0) AS proc_disk_io,
COALESCE(t.ipcIn, 0) + COALESCE(t.ipcOut, 0) AS proc_net_io,
COALESCE(t.rpcSnd, 0) + COALESCE(t.rpcRcv, 0) AS rpc_wait_s,
t.rpcMsgsIn, t.rpcMsgsOut,
ROUND(COALESCE(t.rpcSizeIn, 0) / 1048576.0, 1) AS rpc_in_mb,
ROUND(COALESCE(t.rpcSizeOut, 0) / 1048576.0, 1) AS rpc_out_mb,
t.fileTotalsSnd, t.fileTotalsRcv,
t.fileTotalsSndMB, t.fileTotalsRcvMB,
t.netSyncFilesAdded + t.netSyncFilesUpdated + t.netSyncFilesDeleted AS net_sync_files,
t.memMB, t.memPeakMB, t.running, t.error,
l.*
FROM target t, lock_agg l
)
SELECT
pid, startTime, endTime, user, ip, cmd,
substr(args, 1, 80) AS args_preview,
lapse_s, computed_s, paused_s,
ROUND(cpu_us / 1000.0, 0) AS cpu_ms,
ROUND(100.0 * cpu_us / NULLIF(lapse_ms * 1000, 0), 1) AS pct_wall_cpu,
proc_disk_io, proc_net_io,
rpc_wait_s,
ROUND(100.0 * rpc_wait_s / NULLIF(lapse_s, 0), 1) AS pct_wall_rpc,
rpc_in_mb, rpc_out_mb,
tables_touched, db_pages_io, scan_rows, write_rows,
lock_wait_ms, lock_held_ms, peek_wait_ms, lock_ops_ms,
max_write_wait_ms, max_read_wait_ms,
max_write_held_ms, max_read_held_ms,
CASE
WHEN error IS NOT NULL AND error != '' AND error != '0' THEN 'ERROR: ' || error
WHEN lock_wait_ms >= 5000
AND lock_wait_ms >= COALESCE(lock_held_ms, 0)
AND lock_wait_ms > cpu_us / 1000
THEN 'LOCK-BOUND (victim — waited on DB locks)'
WHEN lock_held_ms >= 5000
AND lock_held_ms > lock_wait_ms
AND lock_held_ms > cpu_us / 1000
THEN 'LOCK-BOUND (culprit — held DB locks)'
WHEN peek_wait_ms >= 1000
THEN 'LOCK-BOUND (peek contention)'
WHEN rpc_wait_s >= 1.0 AND rpc_wait_s >= lapse_s * 0.25
THEN 'CLIENT/NETWORK (RPC snd/rcv dominates wall time)'
WHEN cpu_us >= lapse_ms * 500 OR computed_s >= lapse_s * 0.5
THEN 'CPU-BOUND'
WHEN db_pages_io >= 5000 OR proc_disk_io >= 100000
THEN 'IO-BOUND (DB pages or process disk I/O)'
WHEN cmd LIKE '%sync%' OR cmd LIKE '%transmit%' OR net_sync_files > 0
THEN 'NETWORK/SYNC workload'
ELSE 'MIXED / see per-table breakdown below'
END AS bottleneck_hint
FROM m;
""",
'probe_pid_tables.sql': """\
-- Per-table breakdown for one PID + startTime. Same @pid / @start as probe_pid.sql.
.mode column
.headers on
.print ''
.print '=== PID probe (per-table) ==='
.print ''
SELECT
tu.tableName,
tu.pagesIn + tu.pagesOut AS pages_io,
tu.scanRows,
tu.getRows + tu.posRows AS keyed_rows,
tu.putRows + tu.delRows AS write_rows,
tu.readLocks + tu.writeLocks AS lock_ops_ms,
tu.totalReadWait AS read_wait_ms,
tu.totalWriteWait AS write_wait_ms,
tu.totalReadHeld AS read_held_ms,
tu.totalWriteHeld AS write_held_ms,
tu.peekCount,
tu.totalPeekWait AS peek_wait_ms,
tu.triggerLapse
FROM tableUse tu
JOIN process p USING (processKey)
WHERE p.pid = @pid AND p.startTime = @start
ORDER BY
(tu.totalReadWait + tu.totalWriteWait + tu.totalReadHeld + tu.totalWriteHeld) DESC,
(tu.pagesIn + tu.pagesOut) DESC;
""",
'probe_pid_min.sql': """\
-- log2sql: probe one command by PID + startTime
-- Set parameters, then .read this file (sqlite3 3.31+):
-- .parameter init
-- .parameter set @pid 97736
-- .parameter set @start '2025/07/10 12:00:01'
WITH p AS (
SELECT * FROM process WHERE pid = @pid AND startTime = @start
),
t AS (
SELECT
SUM(pagesIn + pagesOut) AS db_io,
SUM(scanRows) AS scans,
SUM(totalReadWait + totalWriteWait) AS lock_wait_ms,
SUM(totalReadHeld + totalWriteHeld) AS lock_held_ms
FROM tableUse JOIN p USING (processKey)
)
SELECT
p.pid, p.startTime, p.endTime, p.user, p.cmd,
CAST(p.completedLapse AS INT) AS lapse_s,
p.uCpu + COALESCE(p.sCpu, 0) AS cpu_us,
COALESCE(p.diskIn, 0) + COALESCE(p.diskOut, 0) AS disk_io,
COALESCE(p.rpcSnd, 0) + COALESCE(p.rpcRcv, 0) AS rpc_s,
t.db_io, t.scans, t.lock_wait_ms, t.lock_held_ms,
CASE
WHEN t.lock_wait_ms > t.lock_held_ms AND t.lock_wait_ms > 5000 THEN 'lock-wait'
WHEN t.lock_held_ms > 5000 THEN 'lock-hold'
WHEN p.uCpu + COALESCE(p.sCpu, 0) > CAST(p.completedLapse * 1e6 AS INT) / 2 THEN 'cpu'
WHEN COALESCE(p.rpcSnd, 0) + COALESCE(p.rpcRcv, 0) > p.completedLapse / 4 THEN 'rpc'
WHEN t.db_io > 5000 OR COALESCE(p.diskIn, 0) + COALESCE(p.diskOut, 0) > 100000 THEN 'io'
ELSE 'mixed'
END AS hint
FROM p, t;
""",
}
def p4diag_sql_queries_dir() -> str:
"""Directory of ``.sql`` query files for CLI ``query`` and SQL Lab.
Override order: ``P4DIAG_SQL_QUERIES``, then legacy ``P4SLA_SQL_DIR``.
Default: ``<p4diag>/sql_queries``.
"""
for env in ("P4DIAG_SQL_QUERIES", "P4SLA_SQL_DIR"):
override = (os.environ.get(env) or "").strip()
if override:
return os.path.abspath(override)
return os.path.join(_P4DIAG_INSTALL_DIR, "sql_queries")
def p4diag_summary_queries_dir() -> str:
"""Directory of summary-report ``.sql`` files (one subdir per section).
Override: ``P4DIAG_SUMMARY_QUERIES``. Default: ``<p4diag>/summary_queries``.
"""
override = (os.environ.get("P4DIAG_SUMMARY_QUERIES") or "").strip()
if override:
return os.path.abspath(override)
return os.path.join(_P4DIAG_INSTALL_DIR, "summary_queries")
def _summary_section_display_name(dirname: str) -> str:
"""Strip optional ``NN-`` sort prefix from a section directory name."""
m = re.match(r"^\d+-(.+)$", dirname)
return m.group(1) if m else dirname
def _summary_query_display_name(filename_stem: str) -> str:
"""Strip optional ``NN-`` sort prefix from a summary ``.sql`` basename (label in reports)."""
return _summary_section_display_name(filename_stem)
SummarySectionQueries = List[Tuple[str, str]]
def strip_sql_documentation_header(raw: str) -> str:
"""Strip leading ``/* ... */`` or consecutive ``--`` comment lines from a ``.sql`` file."""
if not raw:
return raw
text = raw.lstrip("\ufeff")
m = re.match(r"^\s*/\*([\s\S]*?)\*/", text)
if m:
return text[m.end() :].lstrip("\n\r\t ")
lines = text.splitlines()
i = 0
desc_lines_seen = False
while i < len(lines):
s = lines[i].strip()
if s == "":
if desc_lines_seen:
i += 1
continue
i += 1
continue
if s.startswith("--"):
desc_lines_seen = True
i += 1
continue
break
while i < len(lines) and lines[i].strip() == "":
i += 1
return "\n".join(lines[i:])
def sql_file_pager_from_header(raw: str) -> Optional[List[str]]:
"""If the SQL file header contains ``-- p4diag: pager``, return pager argv (default ``less -S``)."""
if not raw:
return None
text = raw.lstrip("\ufeff")
lines = text.splitlines()
in_block_comment = False
for line in lines:
sline = line.strip()
if not in_block_comment and sline.startswith("/*"):
in_block_comment = "*/" not in sline
continue
if in_block_comment:
if "*/" in sline:
in_block_comment = False
continue
if sline == "":
continue
if not sline.startswith("--"):
break
m = re.match(r"--\s*p4diag:\s*pager(?:\s*=\s*(.+))?\s*$", sline, re.IGNORECASE)
if m:
extra = (m.group(1) or "").strip()
return shlex.split(extra) if extra else ["less"]
return None
def normalize_sql_basename(sql_token: str) -> str:
"""Return ``NAME.sql`` for a library token or path."""
base = os.path.basename(sql_token.strip())
if not base.lower().endswith(".sql"):
base = base + ".sql"
return base
def resolve_sql_query_file(sql_token: str) -> Optional[str]:
"""Resolve library basename, path, or absolute ``.sql`` to an existing file (or None)."""
raw = sql_token.strip()
if os.path.isabs(raw):
path = os.path.abspath(raw)
return path if os.path.isfile(path) else None
base = normalize_sql_basename(raw)
lib = os.path.join(p4diag_sql_queries_dir(), base)
if os.path.isfile(lib):
return lib
if os.path.isfile(raw):
return os.path.abspath(raw)
stem, _ = os.path.splitext(raw)
cwd_rel = stem + ".sql"
if os.path.isfile(cwd_rel):
return os.path.abspath(cwd_rel)
return None
def resolve_sql_query(sql_token: str) -> Tuple[Optional[str], Optional[str]]:
"""Resolve a library query to ``(disk_path, builtin_body)``; exactly one is set when found."""
path = resolve_sql_query_file(sql_token)
if path:
return path, None
base = normalize_sql_basename(sql_token)
body = _BUILTIN_SQL_LIBRARY.get(base)
if body is None:
body = _BUILTIN_PID_PROBE_SQL.get(base)
if body is not None:
return None, body
return None, None
def list_sql_library_names() -> List[str]:
"""Sorted ``*.sql`` basenames from disk library plus menu-visible built-in queries."""
names = set(_BUILTIN_SQL_LIBRARY.keys())
names.update(list_sql_library_files(p4diag_sql_queries_dir()))
names.difference_update(_BUILTIN_PID_PROBE_SQL.keys())
return sorted(names)
def sql_library_summary(basename: str) -> str:
"""Short description of what a library ``.sql`` report is useful for."""
key = normalize_sql_basename(basename)
return _SQL_LIBRARY_SUMMARIES.get(
key,
"custom report — open the .sql file for details",
)
def _effective_sql_pager(pager: Optional[List[str]]) -> Optional[List[str]]:
if not pager:
return None
if not sys.stdout.isatty():
return None
if shutil.which(pager[0]) is None:
return None
return pager
def _sqlite3_rc_ok_after_pager(pager_rc: int, sql_rc: Optional[int]) -> int:
"""Normalize exit code when sqlite3 is piped to a pager.
Closing the pager with ``q`` breaks the pipe; sqlite3 then exits with SIGPIPE
even though the query succeeded.
"""
if pager_rc != 0:
return pager_rc
if sql_rc in (0, None):
return 0
sigpipe = getattr(signal, "SIGPIPE", 13)
if sql_rc in (-sigpipe, 128 + sigpipe):
return 0
return sql_rc
def run_sqlite3_with_optional_pager(argv: List[str], pager: Optional[List[str]]) -> int:
"""Run sqlite3; pipe stdout through a pager when requested and stdout is a TTY."""
use_pager = _effective_sql_pager(pager)
if not use_pager:
return s.call(argv)
proc_sql = s.Popen(argv, stdout=s.PIPE)
proc_page = s.Popen(use_pager, stdin=proc_sql.stdout)
proc_sql.stdout.close()
rc_page = proc_page.wait()
rc_sql = proc_sql.wait()
return _sqlite3_rc_ok_after_pager(rc_page, rc_sql)
def sql_file_requires_pid_start(file_body: str) -> bool:
"""True when a script expects sqlite ``@pid`` and ``@start`` bind parameters."""
return "@pid" in file_body and "@start" in file_body
def sql_file_uses_table_parameter(file_body: str) -> bool:
"""True when a script binds sqlite ``@table`` (optional unless ``-- p4diag: require-table``)."""
return "@table" in file_body
def sql_file_requires_table(file_body: str) -> bool:
"""True when ``@table`` must be supplied (``-- p4diag: require-table`` in header)."""
return bool(
re.search(r"^\s*--\s*p4diag:\s*require-table\s*$", file_body, re.MULTILINE | re.IGNORECASE)
)
def sql_file_requires_duration(file_body: str) -> bool:
"""True when a script expects a sqlite ``@duration`` bind parameter."""
return "@duration" in file_body
def table_parameter_bind_value(table: Optional[str]) -> str:
"""Value for ``@table`` bind: ``%`` matches all tables when ``table`` is omitted."""
if not table:
return "%"
return normalize_table_parameter(table)
def normalize_table_parameter(table: str) -> str:
"""Normalize a user-supplied table name for ``@table`` (strip ``db.`` prefix)."""
t = (table or "").strip()
if t.startswith("db."):
t = t[3:]
if "." in t:
t = t.split(".")[-1]
return t
def validate_table_parameter(table: str) -> Optional[str]:
"""Return an error message when ``table`` is not a safe ``@table`` bind value."""
if not table:
return "table name is required"
if not re.fullmatch(r"[A-Za-z0-9_]+", table):
return f"invalid table name {table!r} (use letters, digits, underscore only)"
return None
def validate_duration_parameter(duration: str) -> Optional[str]:
"""Return an error message when ``duration`` is not a safe ``@duration`` bind value."""
if not duration:
return "duration (milliseconds) is required"
if not re.fullmatch(r"[0-9]+", duration):
return f"invalid duration {duration!r} (use a positive integer, milliseconds)"
if int(duration) <= 0:
return "duration must be greater than 0"
return None
def sqlite3_pid_start_parameter_argv(pid: str, start: str) -> List[str]:
"""sqlite3 ``-cmd`` arguments to bind ``@pid`` and ``@start`` (sqlite 3.31+)."""
return [
"-cmd",
".parameter init",
"-cmd",
f".parameter set @pid {pid}",
"-cmd",
f".parameter set @start '{start}'",
]
def sqlite3_table_parameter_argv(table: str) -> List[str]:
"""sqlite3 ``-cmd`` arguments to bind ``@table`` (sqlite 3.31+)."""
return [
"-cmd",
".parameter init",
"-cmd",
f".parameter set @table {table}",
]
def sqlite3_duration_parameter_argv(duration: str) -> List[str]:
"""sqlite3 ``-cmd`` arguments to bind ``@duration`` (sqlite 3.31+)."""
return [
"-cmd",
".parameter init",
"-cmd",
f".parameter set @duration {duration}",
]
def sqlite3_cli_argv_for_read_script(file_body: str) -> List[str]:
"""sqlite3 argv for ``.read`` scripts; omit ``-column`` when the script sets ``.mode``."""
if re.search(r"^\s*\.mode\b", file_body, re.MULTILINE | re.IGNORECASE):
return ["sqlite3"]
return ["sqlite3", "-header", "-column"]
def prepare_sql_read_script(raw: str) -> str:
"""Insert ``.width 0`` before each query in a multi-statement ``.read`` script.
sqlite3 ``-column`` mode reuses column widths from the previous result set; a short
overview row (e.g. ``total``, ``withMem``) otherwise truncates ``user``/``command``
in later SELECTs. Width 0 resets auto-sizing for the next query.
Inserts only at statement boundaries — not before a final ``SELECT`` that belongs to
an open ``WITH`` block (which would split the statement and cause a syntax error).
"""
width_reset = ".width " + " ".join("0" for _ in range(32))
out: List[str] = []
prev_nonempty = ""
for line in raw.splitlines():
if re.match(r"^WITH\b", line, re.IGNORECASE):
out.append(width_reset)
elif re.match(r"^SELECT\b", line, re.IGNORECASE):
if (
not prev_nonempty
or prev_nonempty.endswith(";")
or prev_nonempty.startswith(".")
):
out.append(width_reset)
out.append(line)
stripped = line.strip()
if stripped and not stripped.startswith("--"):
prev_nonempty = stripped
text = "\n".join(out)
if raw.endswith("\n"):
text += "\n"
return text
def _parse_dot_print_line(line: str) -> str:
"""Return text from a sqlite ``.print '...'`` / ``.print "..."`` line."""
m = re.match(r"\.print\s*(.*)$", line.strip(), re.IGNORECASE)
if not m:
return ""
rest = m.group(1).strip()
if not rest:
return ""
if len(rest) >= 2 and rest[0] == rest[-1] and rest[0] in ("'", '"'):
return rest[1:-1]
return rest
def iter_sql_read_segments(raw: str) -> Iterator[Tuple[str, str]]:
"""Yield ``('print', text)`` and ``('query', sql)`` from a sqlite ``.read`` script."""
query_lines: List[str] = []
def flush_query() -> Iterator[Tuple[str, str]]:
nonlocal query_lines
if not query_lines:
return
sql = "\n".join(query_lines).strip()
query_lines = []
if sql:
yield ("query", sql)
for line in raw.splitlines():
stripped = line.strip()
if not query_lines:
if not stripped or stripped.startswith("--"):
continue
if stripped.startswith("#"):
continue
if re.match(r"\.print\b", stripped, re.IGNORECASE):
yield from flush_query()
yield ("print", _parse_dot_print_line(line))
continue
if re.match(r"^\s*(WITH|SELECT)\b", line, re.IGNORECASE) or query_lines:
query_lines.append(line)
if stripped.endswith(";"):
yield from flush_query()
continue
yield from flush_query()
def load_summary_sections_from_disk(root: str) -> Optional[Dict[str, SummarySectionQueries]]:
"""Load ``section -> [(label, sql), ...]`` from ``<root>/<section>/*.sql``.
Returns ``None`` when ``root`` is missing or contains no usable queries.
"""
if not os.path.isdir(root):
return None
sections: Dict[str, SummarySectionQueries] = {}
for entry in sorted(os.scandir(root), key=lambda e: e.name):
if not entry.is_dir() or entry.name.startswith("."):
continue
section_name = _summary_section_display_name(entry.name)
queries: SummarySectionQueries = []
for sql_path in sorted(glob.glob(os.path.join(entry.path, "*.sql"))):
label = _summary_query_display_name(
os.path.splitext(os.path.basename(sql_path))[0]
)
try:
with open(sql_path, encoding="utf-8") as fh:
raw = fh.read()
except OSError:
continue
sql = strip_sql_documentation_header(raw).strip()
if not sql:
continue
if not sql.rstrip().endswith(";"):
sql += ";"
queries.append((label, sql))
if queries:
sections[section_name] = queries
return sections if sections else None
_BUILTIN_SUMMARY_SECTIONS: Dict[str, SummarySectionQueries] = {
'Time Range': [
('Log Range',
"SELECT MIN(starttime) AS Start, MAX(starttime) AS End FROM process WHERE cmd NOT LIKE '%pull%' AND args LIKE '-i%';"),
('Peek Running',
'SELECT startTime, running FROM process ORDER BY running DESC LIMIT 1;'),
],
'Lock Contention': [
('Top 25 write waiters over 10 seconds - blocked by other read or write locks (victims)',
'SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS "lapse(s)", totalWriteWait AS "wait(ms)", tableName AS "table", startTime, endTime FROM tableUse JOIN process USING (processKey) WHERE totalWriteWait > 10000 AND completedLapse > 0 AND tableName NOT IN (\'clients\',\'clientEntity\',\'change\',\'storageup_R\',\'storagemasterup_R\',\'pull\') ORDER BY "wait(ms)" DESC LIMIT 25;'),
('Top 25 read waiters over 10 seconds - blocked by other write locks (victims)',
'SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS "lapse(s)", totalReadWait AS "wait(ms)", tableName AS "table", startTime, endTime FROM tableUse JOIN process USING (processKey) WHERE totalReadWait > 10000 AND tableName NOT IN (\'clients\',\'clientEntity\',\'change\',\'storageup_R\',\'storagemasterup_R\',\'pull\') ORDER BY totalReadWait DESC LIMIT 25;'),
('Top 25 read holders over 10 seconds - blocking writers (culprits)',
'SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS "lapse(s)", totalReadHeld AS "held(ms)", tableName AS "table", startTime, endTime FROM tableUse JOIN process USING (processKey) WHERE totalReadHeld > 10000 AND tableName NOT IN (\'clients\',\'clientEntity\',\'change\',\'storageup_R\',\'storagemasterup_R\',\'pull\') ORDER BY totalReadHeld DESC LIMIT 25;'),
('Top 25 write holders over 10 seconds - blocking readers and writers (culprits)',
'SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS "lapse(s)", totalWriteHeld AS "held(ms)", tableName AS "table", startTime, endTime FROM tableUse JOIN process USING (processKey) WHERE totalWriteHeld > 10000 AND completedLapse > 0 AND tableName NOT IN (\'clients\',\'clientEntity\',\'change\',\'storageup_R\',\'storagemasterup_R\',\'pull\') ORDER BY totalWriteHeld DESC LIMIT 25;'),
('Average Locks Summary with total locks > 10 seconds. Does one table have high average or total wait (victims) or held (culprits)',
'SELECT * FROM ( SELECT tableName AS "table", COUNT(readLocks) AS Number, CAST(AVG(readLocks) AS INTEGER) AS "avg-read(ms)", CAST(AVG(writeLocks) AS INTEGER) AS "avg-write(ms)", CAST(AVG(totalReadWait) AS INTEGER) AS "avg-total-rw(ms)", CAST(AVG(totalReadHeld) AS INTEGER) AS "avg-total-rh(ms)", CAST(AVG(totalWriteWait) AS INTEGER) AS "avg-total-ww(ms)", CAST(AVG(totalWriteHeld) AS INTEGER) AS "avg-total-wh(ms)", CAST(SUM(totalReadWait)+SUM(totalWriteWait) AS INTEGER) AS "total-wait(ms)", CAST(SUM(totalReadHeld)+SUM(totalWriteHeld) AS INTEGER) AS "total-held(ms)" FROM tableUse GROUP BY tableUse.tableName ) WHERE "total-wait(ms)" > 10000 AND "total-held(ms)" > 10000 ORDER BY "total-wait(ms)" DESC;'),
('Worst Lock Offenders - (maxreadHeld + maxwriteHeld)',
'SELECT user, SUM(maxreadHeld + maxwriteHeld) AS "held(ms)" FROM tableUse JOIN process USING (processKey) GROUP BY user ORDER BY "held(ms)" DESC LIMIT 10;'),
],
'CPU Usage': [],
'Memory Usage': [],
'Command Activity': [],
}
# Summary sections rendered from the same multi-part scripts as the LOG2SQL library.
_SUMMARY_CANNED_QUERY_SECTIONS: Dict[str, str] = {
"Command Activity": "command_summary.sql",
"CPU Usage": "cpu_summary.sql",
"Memory Usage": "memory_summary.sql",
}
# Omitted from built-in and disk-backed summary reports.
_SUMMARY_SECTIONS_OMITTED = frozenset({"Other Resource Metrics"})
def _summary_queries_search_roots() -> List[str]:
"""Directories to scan for summary ``.sql`` libraries (first match wins)."""
roots: List[str] = []
override = (os.environ.get("P4DIAG_SUMMARY_QUERIES") or "").strip()
if override:
roots.append(os.path.abspath(override))
install_root = os.path.join(_P4DIAG_INSTALL_DIR, "summary_queries")
if install_root not in roots:
roots.append(install_root)
cwd_root = os.path.join(os.getcwd(), "summary_queries")
if cwd_root not in roots:
roots.append(cwd_root)
return roots
def get_summary_sections() -> Dict[str, SummarySectionQueries]:
"""Summary SQL sections from disk when present, else built-in queries."""
for root in _summary_queries_search_roots():
custom = load_summary_sections_from_disk(root)
if custom is not None:
return {
name: queries
for name, queries in custom.items()
if name not in _SUMMARY_SECTIONS_OMITTED
}
return {
name: queries
for name, queries in _BUILTIN_SUMMARY_SECTIONS.items()
if name not in _SUMMARY_SECTIONS_OMITTED
}
def resolve_sql_query_text(sql_token: str) -> Optional[str]:
"""Return the full ``.sql`` script body for a library query (disk or built-in)."""
_path, body = resolve_sql_query(sql_token)
if body is not None:
return body
if _path and os.path.isfile(_path):
try:
with open(_path, encoding="utf-8") as fh:
return fh.read()
except OSError:
return None
return None
RecommendationRule = Dict[str, Any]
RecommendationMatch = Dict[str, Any]
def p4diag_recommendations_dir() -> str:
"""Directory of ``*.json`` recommendation rule files.
Override: ``P4DIAG_RECOMMENDATIONS``. Default: ``<p4diag>/recommendations``.
"""
override = (os.environ.get("P4DIAG_RECOMMENDATIONS") or "").strip()
if override:
return os.path.abspath(override)
return os.path.join(_P4DIAG_INSTALL_DIR, "recommendations")
def _recommendations_search_roots() -> List[str]:
"""Directories to scan for recommendation ``.json`` rules (first match wins)."""
roots: List[str] = []
override = (os.environ.get("P4DIAG_RECOMMENDATIONS") or "").strip()
if override:
roots.append(os.path.abspath(override))
install_root = os.path.join(_P4DIAG_INSTALL_DIR, "recommendations")
if install_root not in roots:
roots.append(install_root)
cwd_root = os.path.join(os.getcwd(), "recommendations")
if cwd_root not in roots:
roots.append(cwd_root)
return roots
def _normalize_mitigation_items(raw: Dict[str, Any]) -> List[str]:
"""Return bullet strings from ``mitigation_items`` list or ``mitigation`` text."""
raw_items = raw.get("mitigation_items")
if isinstance(raw_items, list):
items = [str(item).strip() for item in raw_items if str(item).strip()]
if items:
return items
mitigation = (raw.get("mitigation") or "").strip()
if mitigation:
return [line.strip() for line in mitigation.splitlines() if line.strip()]
return []
def _normalize_recommendation_rule(raw: Dict[str, Any], source: str) -> Optional[RecommendationRule]:
"""Validate and normalize one recommendation rule dict."""
rule_id = (raw.get("id") or "").strip()
title = (raw.get("title") or "").strip()
checks = raw.get("checks")
mitigation_items = _normalize_mitigation_items(raw)
if not rule_id or not title or not isinstance(checks, list) or not checks or not mitigation_items:
return None
normalized_checks: List[Dict[str, Any]] = []
for check in checks:
if not isinstance(check, dict):
return None
sql = strip_sql_documentation_header((check.get("sql") or "").strip()).strip()
if not sql:
return None
if not sql.rstrip().endswith(";"):
sql += ";"
label = (check.get("label") or "check").strip()
min_count = check.get("min_count", 1)
try:
min_count_int = max(1, int(min_count))
except (TypeError, ValueError):
min_count_int = 1
normalized_checks.append(
{"label": label, "sql": sql, "min_count": min_count_int}
)
normalized_stats_checks: List[Dict[str, Any]] = []
for check in raw.get("stats_checks") or []:
if not isinstance(check, dict):
return None
key = (check.get("key") or "").strip()
label = (check.get("label") or key or "stats check").strip()
if not key:
return None
try:
min_count_int = max(1, int(check.get("min_count", 1)))
except (TypeError, ValueError):
min_count_int = 1
normalized_stats_checks.append(
{"key": key, "label": label, "min_count": min_count_int}
)
normalized_stats_evidence: List[Dict[str, str]] = []
for item in raw.get("stats_evidence") or []:
if not isinstance(item, dict):
return None
key = (item.get("key") or "").strip()
label = (item.get("label") or key or "stats evidence").strip()
if not key:
return None
normalized_stats_evidence.append({"key": key, "label": label})
confidence = (raw.get("confidence") or "likely").strip().lower()
if confidence not in ("likely", "possible"):
confidence = "likely"
try:
priority = int(raw.get("priority", 0))
except (TypeError, ValueError):
priority = 0
return {
"id": rule_id,
"title": title,
"priority": priority,
"confidence": confidence,
"checks": normalized_checks,
"stats_checks": normalized_stats_checks,
"stats_evidence": normalized_stats_evidence,
"mitigation_items": mitigation_items,
"_source": source,
}
def load_recommendations_from_disk(root: str) -> Optional[List[RecommendationRule]]:
"""Load recommendation rules from ``<root>/*.json``. Returns ``None`` if none found."""
if not os.path.isdir(root):
return None
rules: List[RecommendationRule] = []
for json_path in sorted(glob.glob(os.path.join(root, "*.json"))):
try:
with open(json_path, encoding="utf-8") as fh:
raw = json.load(fh)
except (OSError, json.JSONDecodeError, TypeError):
continue
if not isinstance(raw, dict):
continue
rule = _normalize_recommendation_rule(raw, source=json_path)
if rule is not None:
rules.append(rule)
return rules if rules else None
_BUILTIN_RECOMMENDATIONS: List[RecommendationRule] = [
{
"id": "revhx-revdx-read-starves-writers",
"title": "Read locks on db.revhx/db.revdx may starve writers",
"priority": 10,
"confidence": "likely",
"checks": [
{
"label": "read holders on revhx/revdx from fstat/sync/files (>10s)",
"sql": (
"SELECT COUNT(*) FROM tableUse tu "
"JOIN process p ON tu.processKey = p.processKey "
"WHERE tu.tableName IN ('revhx', 'revdx') "
"AND p.cmd IN ('user-fstat', 'user-sync', 'user-files') "
"AND tu.totalReadHeld > 10000"
),
"min_count": 3,
},
{
"label": "write waiters on revhx/revdx (>10s)",
"sql": (
"SELECT COUNT(*) FROM tableUse tu "
"JOIN process p ON tu.processKey = p.processKey "
"WHERE tu.tableName IN ('revhx', 'revdx') "
"AND tu.totalWriteWait > 10000"
),
"min_count": 1,
},
],
"stats_evidence": [
{
"key": "blocking_mode_commands",
"label": (
"blocking-mode lock acquisitions in log "
"(locks acquired after non-blocking attempts)"
),
},
],
"mitigation_items": [
(
"At db.peeking=2, p4 fstat, p4 sync, and p4 files can hold read locks "
"on db.revhx and db.revdx, blocking writers (p4 submit, p4 populate, "
"and commit/edge paths such as dm-CommitSubmit and rmt-SubmitShelf)."
),
(
"Starved writers often retry and may switch to blocking-mode locking "
"(log: \"locks acquired by blocking after N non-blocking attempts\"), "
"holding other table locks while waiting and amplifying contention."
),
(
"Use the summary Lock Contention and Victim Culprit Report to "
"identify which commands and users are read-locking db.revhx/db.revdx; "
"reduce frequency if they are spamming the server (automation, IDE "
"refresh, repeated fstat/files/sync, and similar)."
),
(
"Consider db.peeking=3 (no server restart; new connections pick up "
"the setting). fstat/sync/files then use peek locks on db.rev instead."
),
"Review P4V.Performance.ServerRefresh to reduce P4V-driven fstat volume.",
"See KB: Lockless Reads.",
],
},
{
"id": "istat-rev-starves-writers",
"title": "p4 istat read locks on db.rev* may starve writers",
"priority": 20,
"confidence": "likely",
"checks": [
{
"label": "istat commands holding read locks on rev* tables (>10s)",
"sql": (
"SELECT COUNT(*) FROM tableUse tu "
"JOIN process p ON tu.processKey = p.processKey "
"WHERE p.cmd = 'user-istat' "
"AND tu.tableName LIKE 'rev%' "
"AND tu.totalReadHeld > 10000"
),
"min_count": 3,
},
{
"label": "istat commands using >10s user CPU",
"sql": (
"SELECT COUNT(*) FROM process "
"WHERE cmd = 'user-istat' AND uCpu > 10000"
),
"min_count": 1,
},
],
"mitigation_items": [
(
"After large stream updates, p4 istat can hold extended read locks on "
"db.rev* tables and use high CPU while the stream cache refreshes. P4V "
"stream graph views drive istat traffic."
),
(
"Upgrade to a P4 Server build with lockless istat reads when possible "
"(2025.2+, or supported patches on 2025.1/2024.2/2024.1)."
),
(
"Update P4V to the latest release; consider "
"P4V.Performance.AllowFullIstats to limit full-stream istat in the "
"stream graph."
),
],
},
]
def get_recommendation_rules() -> List[RecommendationRule]:
"""Recommendation rules from disk when present, else built-in rules."""
for root in _recommendations_search_roots():
custom = load_recommendations_from_disk(root)
if custom is not None:
return custom
normalized: List[RecommendationRule] = []
for raw in _BUILTIN_RECOMMENDATIONS:
rule = _normalize_recommendation_rule(raw, source="builtin")
if rule is not None:
normalized.append(rule)
return normalized
def _recommendations_rules_signature() -> str:
"""Hash of active recommendation rules for summary cache invalidation."""
source_tag = "builtin"
mtime_part = ""
for root in _recommendations_search_roots():
if not os.path.isdir(root):
continue
json_files = sorted(glob.glob(os.path.join(root, "*.json")))
if json_files:
source_tag = os.path.abspath(root)
mtime_part = str(max(os.path.getmtime(p) for p in json_files))
break
rules = get_recommendation_rules()
payload = json.dumps(
[
{
"id": r["id"],
"priority": r["priority"],
"confidence": r["confidence"],
"checks": r["checks"],
"stats_checks": r.get("stats_checks", []),
"stats_evidence": r.get("stats_evidence", []),
"mitigation_items": r.get("mitigation_items", []),
"title": r["title"],
}
for r in rules
],
sort_keys=True,
)
digest = hashlib.sha256(f"{source_tag}|{mtime_part}|{payload}".encode()).hexdigest()
return digest[:16]
def _recommendation_check_count(cursor: sqlite3.Cursor, sql: str) -> Optional[int]:
"""Run a single-row COUNT query; return the integer count or ``None`` on error."""
try:
cursor.execute(sql)
row = cursor.fetchone()
except sqlite3.Error:
return None
if row is None:
return 0
val = row[0]
if val is None:
return 0
try:
return int(val)
except (TypeError, ValueError):
return None
_LOG_STATS_METRIC_PATTERNS: Dict[str, re.Pattern] = {
"blocking_mode_commands": re.compile(
r"^\s*blocking mode commands:\s*(\d+)\s*$", re.MULTILINE
),
"write_waiters_over_10s": re.compile(
r"^\s*write waiters over 10 seconds:\s*(\d+)\s*$", re.MULTILINE
),
}
def parse_log_stats_metrics(stats_text: str) -> Dict[str, int]:
"""Parse numeric counters from ``p4diag stats`` / summary LOG STATISTICS text."""
metrics: Dict[str, int] = {}
if not stats_text:
return metrics
for key, pattern in _LOG_STATS_METRIC_PATTERNS.items():
match = pattern.search(stats_text)
if not match:
continue
try:
metrics[key] = int(match.group(1))
except ValueError:
continue
return metrics
def evaluate_recommendations(
db_path: str,
rules: Optional[List[RecommendationRule]] = None,
stats_text: str = "",
) -> List[RecommendationMatch]:
"""Return recommendation rules whose checks all pass against ``db_path``."""
if rules is None:
rules = get_recommendation_rules()
if not rules or not os.path.isfile(db_path):
return []
stats_metrics = parse_log_stats_metrics(stats_text)
matches: List[RecommendationMatch] = []
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
for rule in rules:
evidence: List[Tuple[str, int]] = []
failed = False
for check in rule["checks"]:
count = _recommendation_check_count(cursor, check["sql"])
if count is None or count < check["min_count"]:
failed = True
break
evidence.append((check["label"], count))
if failed:
continue
for check in rule.get("stats_checks") or []:
count = stats_metrics.get(check["key"], 0)
if count < check["min_count"]:
failed = True
break
evidence.append((check["label"], count))
if failed:
continue
for item in rule.get("stats_evidence") or []:
count = stats_metrics.get(item["key"], 0)
if count > 0:
evidence.append((item["label"], count))
matches.append(
{
"rule": rule,
"evidence": evidence,
}
)
finally:
conn.close()
matches.sort(
key=lambda m: (-int(m["rule"].get("priority", 0)), m["rule"]["title"])
)
return matches
def format_recommendations_plain_text(matches: List[RecommendationMatch]) -> str:
"""Plain-text RECOMMENDATIONS section body."""
if not matches:
return "(no known performance patterns detected in this trace)\n"
lines: List[str] = []
for match in matches:
rule = match["rule"]
conf = rule.get("confidence", "likely").upper()
lines.append(f"[{conf}] {rule['title']}")
ev_parts = [
f"{count} {label}" for label, count in match["evidence"]
]
lines.append(f" Evidence: {'; '.join(ev_parts)}.")
lines.append(" Mitigation:")
for item in rule.get("mitigation_items") or []:
lines.append(f" - {item}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
_MITIGATION_HTML_LINKS: Tuple[Tuple[str, str], ...] = (
("Lock Contention", "summary-lock-contention"),
("Victim Culprit Report", "victim-culprit-report"),
)
def _mitigation_item_to_html(text: str) -> str:
"""Escape mitigation text and link phrases to summary report anchors."""
out = html.escape(text)
for phrase, anchor in _MITIGATION_HTML_LINKS:
esc_phrase = html.escape(phrase)
link = f'<a href="#{anchor}">{esc_phrase}</a>'
out = out.replace(esc_phrase, link)
return out
def format_recommendations_html(matches: List[RecommendationMatch]) -> str:
"""HTML fragment for the recommendations section."""
if not matches:
return '<p class="empty-note">No known performance patterns detected in this trace.</p>\n'
parts = ['<div class="recommendations">\n']
for match in matches:
rule = match["rule"]
conf = html.escape(str(rule.get("confidence", "likely")).upper())
title = html.escape(rule["title"])
parts.append(f'<article class="rec-item rec-{conf.lower()}">\n')
parts.append(f'<h3><span class="rec-badge">{conf}</span> {title}</h3>\n')
parts.append("<ul class=\"rec-evidence\">\n")
for label, count in match["evidence"]:
parts.append(
f"<li>{html.escape(str(count))} {html.escape(label)}</li>\n"
)
parts.append("</ul>\n<p class=\"rec-mitigation-label\"><strong>Mitigation:</strong></p>\n")
parts.append("<ul class=\"rec-mitigation\">\n")
for item in rule.get("mitigation_items") or []:
parts.append(f"<li>{_mitigation_item_to_html(item)}</li>\n")
parts.append("</ul>\n</article>\n")
parts.append("</div>\n")
return "".join(parts)
def log2sql_executable() -> str:
"""``log2sql`` binary (override: ``LOG2SQL_BIN`` or ``LOG2SQL``; default: ``log2sql`` on PATH)."""
return (
os.environ.get("LOG2SQL_BIN")
or os.environ.get("LOG2SQL")
or "log2sql"
)
def _python_sqlite3_available() -> bool:
"""True when this interpreter was built with the ``_sqlite3`` extension."""
try:
import importlib
importlib.import_module("_sqlite3")
return True
except ImportError:
return False
def _require_python_sqlite3() -> None:
if not _python_sqlite3_available():
print(_PYTHON_SQLITE3_MISSING_MSG, file=sys.stderr)
sys.exit(2)
def _p4diag_interactive_setup() -> None:
"""Configure readline and SIGINT once (CLI/menu only; safe for ``import p4diag``)."""
if getattr(_p4diag_interactive_setup, "_done", False):
return
if readline is not None:
readline.set_completer_delims(" \t\n=")
readline.parse_and_bind("tab: complete")
signal.signal(signal.SIGINT, handler)
_p4diag_interactive_setup._done = True # type: ignore[attr-defined]
def trim_p4_server_log_file(log_path: str, start_time: str, end_time: str) -> str:
"""Return path to a trimmed log segment (creates via ``sed`` if missing).
Naming matches ``log.db`` / p4sla: ``<log>.<start>-<end>`` with ``/ :`` → ``_`` in timestamps.
"""
log_path = os.path.abspath(log_path)
if not os.path.isfile(log_path):
raise FileNotFoundError(f"Log file not found: {log_path}")
def _grep_line_number(timestamp: str, *, last: bool) -> Optional[str]:
proc = s.run(
["grep", "-a", "-n", timestamp, log_path],
capture_output=True,
text=True,
errors="replace",
)
if proc.returncode != 0 or not proc.stdout.strip():
return None
lines = [ln for ln in proc.stdout.splitlines() if ln.strip()]
if not lines:
return None
return lines[-1 if last else 0].split(":", 1)[0]
def _find_nearest_line_by_probe(
timestamp_str: str, direction: str
) -> Tuple[Optional[str], Optional[str]]:
def grep_line(ts_minute: str) -> Tuple[Optional[int], Optional[str]]:
pattern = f"{ts_minute}:[0-9][0-9]"
proc = s.run(
["grep", "-a", "-n", "-E", pattern, log_path],
capture_output=True,
text=True,
errors="replace",
)
if proc.returncode != 0 or not proc.stdout.strip():
return None, None
parts = proc.stdout.splitlines()[0].split(":", 1)
if len(parts) != 2:
return None, None
return int(parts[0]), parts[1].strip()
ts = datetime.strptime(timestamp_str, "%Y/%m/%d %H:%M:%S")
for i in range(1, 121):
probe = ts + timedelta(minutes=i) if direction == "forward" else ts - timedelta(minutes=i)
line_num, line_text = grep_line(probe.strftime("%Y/%m/%d %H:%M"))
if line_num:
return str(line_num), line_text
return None, None
rep = {"/": "", ":": "", " ": "_"}
st_s, et_s = start_time, end_time
for k, v in rep.items():
st_s = st_s.replace(k, v)
et_s = et_s.replace(k, v)
log_trim = log_path + "." + st_s + "-" + et_s
if os.path.exists(log_trim):
return log_trim
head = _grep_line_number(start_time, last=False)
tail = _grep_line_number(end_time, last=True)
adj_start, adj_end = start_time, end_time
if head is None:
head_line, head_text = _find_nearest_line_by_probe(start_time, "forward")
if not head_line:
raise ValueError(
f"Start time {start_time!r} not found in log and no nearby timestamp within ±2 hours."
)
head = head_line
adj_start = head_text.split()[0] + " " + head_text.split()[1]
if tail is None:
tail_line, tail_text = _find_nearest_line_by_probe(end_time, "backward")
if not tail_line:
raise ValueError(
f"End time {end_time!r} not found in log and no nearby timestamp within ±2 hours."
)
tail = tail_line
adj_end = tail_text.split()[0] + " " + tail_text.split()[1]
st_s, et_s = adj_start, adj_end
for k, v in rep.items():
st_s = st_s.replace(k, v)
et_s = et_s.replace(k, v)
log_trim = log_path + "." + st_s + "-" + et_s
if os.path.exists(log_trim):
return log_trim
quit_line = str(int(tail) + 1)
sed_script = f"{head},{tail}p; {quit_line}q"
try:
with open(log_trim, "wb") as out_f:
s.run(
["sed", "-n", sed_script, log_path],
stdout=out_f,
check=True,
)
except s.CalledProcessError as e:
if os.path.exists(log_trim):
os.remove(log_trim)
raise RuntimeError(f"sed failed while trimming log: {e}") from e
return log_trim
def _collect_pid_tracking_lines(lines: List[str], pid: str) -> List[str]:
"""Collect ``---`` tracking lines following a command record in *lines*."""
tracking: List[str] = []
completed_re = re.compile(rf"\tpid {re.escape(str(pid))} completed\b")
cmd_re = re.compile(rf"\tpid {re.escape(str(pid))}\b.*'")
for ln in lines[1:]:
stripped = ln.strip()
if not stripped:
continue
if stripped.startswith("---"):
tracking.append(stripped)
continue
if completed_re.search(ln):
if tracking:
break
tracking.append(stripped)
continue
if cmd_re.search(ln) and not tracking:
continue
if (
"Perforce server info:" in ln or "Perforce server error:" in ln
) and tracking:
break
if tracking and not stripped.startswith("Perforce server"):
tracking.append(stripped)
return tracking
def _collect_pid_tracking_after_completed(
lines: List[str], pid: str, start_time: str
) -> Tuple[Optional[str], List[str]]:
"""Collect tracking after a ``pid N completed`` line (*lines*[0])."""
if not lines:
return None, []
header = lines[0].strip()
cmd_line: Optional[str] = None
tracking: List[str] = [header] if header else []
cmd_re = re.compile(rf"\tpid {re.escape(str(pid))}\b.*'")
for ln in lines[1:]:
stripped = ln.strip()
if not stripped or "Perforce server info:" in ln:
if tracking and len(tracking) > 1:
break
continue
if cmd_re.search(ln) and start_time in ln:
cmd_line = stripped
continue
if stripped.startswith("---"):
tracking.append(stripped)
continue
if tracking:
tracking.append(stripped)
return cmd_line, tracking
def _pid_tracking_block_score(block: List[str]) -> int:
score = len(block)
for ln in block:
if ln.startswith("--- lapse"):
score += 10000
break
return score
def _grep_log_line_numbers(pattern: str, log_path: str) -> List[Tuple[int, str]]:
proc = s.run(
["grep", "-a", "-n", "-E", pattern, log_path],
capture_output=True,
text=True,
errors="replace",
)
hits: List[Tuple[int, str]] = []
if proc.returncode != 0:
return hits
for ln in proc.stdout.splitlines():
if not ln.strip():
continue
num_s, _, text = ln.partition(":")
try:
hits.append((int(num_s), text))
except ValueError:
continue
return hits
def _read_log_window(log_path: str, line_num: int, count: int = 400) -> List[str]:
proc = s.run(
["sed", "-n", f"{line_num},{line_num + count}p", log_path],
capture_output=True,
text=True,
errors="replace",
)
return proc.stdout.splitlines()
def extract_pid_tracking_from_log(
log_path: str,
pid: str,
start_time: str,
*,
end_time: Optional[str] = None,
) -> Tuple[Optional[str], List[str]]:
"""Return ``(command_line, tracking_lines)`` from a P4 server log for one command.
Prefers the tracking block after ``pid N completed`` (then ``--- lapse``).
Falls back to ``--- lapse`` after a matching command start record.
"""
log_path = os.path.abspath(log_path)
if not os.path.isfile(log_path):
return None, []
pid_s = str(pid)
completed_patterns: List[str] = []
if end_time:
completed_patterns.append(
rf"{re.escape(end_time)}.*pid {re.escape(pid_s)} completed\b"
)
completed_patterns.append(rf"\tpid {re.escape(pid_s)} completed\b")
best_cmd: Optional[str] = None
best_block: List[str] = []
best_score = -1
for pattern in completed_patterns:
for line_num, _completed_text in _grep_log_line_numbers(pattern, log_path):
lines = _read_log_window(log_path, line_num)
cmd_line, block = _collect_pid_tracking_after_completed(
lines, pid_s, start_time
)
score = _pid_tracking_block_score(block)
if score > best_score:
best_score = score
best_cmd = cmd_line or _completed_text.strip()
best_block = block
if best_block and any(ln.startswith("--- lapse") for ln in best_block):
return best_cmd, best_block
cmd_pattern = rf"{re.escape(start_time)}.*pid {re.escape(pid_s)} .+'"
for line_num, cmd_text in _grep_log_line_numbers(cmd_pattern, log_path):
lines = _read_log_window(log_path, line_num)
block = _collect_pid_tracking_lines(lines, pid_s)
score = _pid_tracking_block_score(block)
if score > best_score:
best_score = score
best_cmd = cmd_text.strip()
best_block = block
return best_cmd, best_block
def _server_error_blocks_in_lines(lines: List[str], pid: str) -> List[List[str]]:
"""Return ``Perforce server error`` bodies that name *pid*."""
pid_re = re.compile(rf"^Pid\s+{re.escape(str(pid))}\b", re.IGNORECASE)
blocks: List[List[str]] = []
i = 0
while i < len(lines):
if "Perforce server error:" in lines[i]:
body: List[str] = []
i += 1
while i < len(lines):
stripped = lines[i].strip()
if stripped.startswith("Perforce server"):
break
if stripped:
body.append(stripped)
i += 1
if body and any(pid_re.match(ln) for ln in body):
blocks.append(body)
continue
i += 1
return blocks
def extract_pid_server_error_from_log(
log_path: str,
pid: str,
start_time: str,
*,
end_time: Optional[str] = None,
) -> List[str]:
"""Return log lines from the ``Perforce server error`` block for one command."""
log_path = os.path.abspath(log_path)
if not os.path.isfile(log_path):
return []
pid_s = str(pid)
search_starts: List[int] = []
if end_time:
for pattern in (
rf"{re.escape(end_time)}.*pid {re.escape(pid_s)} completed\b",
rf"\tpid {re.escape(pid_s)} completed\b",
):
hits = _grep_log_line_numbers(pattern, log_path)
if hits:
search_starts.append(hits[0][0])
break
if not search_starts:
cmd_hits = _grep_log_line_numbers(
rf"{re.escape(start_time)}.*pid {re.escape(pid_s)} .+'",
log_path,
)
if cmd_hits:
search_starts.append(cmd_hits[-1][0])
for line_num in search_starts:
lines = _read_log_window(log_path, line_num, 800)
blocks = _server_error_blocks_in_lines(lines, pid_s)
if blocks:
return blocks[-1]
return []
def generate_log_summary_text(log_path: str) -> str:
"""Grep-based P4 server log statistics (text). Used by p4diag stats and p4sla sidecars."""
log = os.path.abspath(log_path)
if not os.path.isfile(log):
return f"(Log file not found: {log})"
is_gz = log.endswith(".gz")
cat_cmd = f"zcat {shlex.quote(log)}" if is_gz else f"cat {shlex.quote(log)}"
grep_cmd = "zgrep -a" if is_gz else "grep -a"
lq = shlex.quote(log)
def run(cmd: str) -> str:
return s.getoutput(cmd)
commands = {
"size": f"ls -lh {lq} | awk '{{print $5}}'",
"begin_range": f"{cat_cmd} | head -n200 | {grep_cmd} '2.*pid.*user-*' | head -n1 | awk -v OFS=' ' '{{print $1, $2}}'",
"end_range": f"{cat_cmd} | tail -n5000 | {grep_cmd} '2.*pid.*user-*' | tail -n1 | awk -v OFS=' ' '{{print $1, $2}}'",
"completed_cmds": f"{grep_cmd} completed {lq} | wc -l",
"high_active_threads": f"{grep_cmd} 'active threads' {lq} | sort -n -r -k10 | head -n1",
"num_write_waiters": (
f"{grep_cmd} -n -B1 -E 'total.*/+[0-9]{{5,}}ms+' {lq} | "
f"{grep_cmd} -A1 'locks read' | {grep_cmd} total | wc -l"
),
"num_blocking_mode": f"{grep_cmd} 'after.*non-blocking.*attempts' {lq} | wc -l",
"num_killed_ml": f"{grep_cmd} 'killed by MaxLockTime' {lq} | wc -l",
"num_killed_msr": f"{grep_cmd} 'killed by MaxScanRows' {lq} | wc -l",
"num_killed_mres": f"{grep_cmd} 'killed by MaxResults' {lq} | wc -l",
"num_shutdown": f"{grep_cmd} -F {shlex.quote('Perforce Server shutdown')} {lq} | wc -l",
"num_starting": f"{grep_cmd} -F {shlex.quote('Perforce Server starting')} {lq} | wc -l",
"num_signal_exit": f"{grep_cmd} -F {shlex.quote('exited on a signal')} {lq} | wc -l",
"num_journal_repaired": (
f"{grep_cmd} -F {shlex.quote('Journal repaired for child process')} {lq} | wc -l"
),
}
results: Dict[str, str] = {}
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {k: executor.submit(run, cmd) for k, cmd in commands.items()}
for k, future in futures.items():
results[k] = future.result()
match = re.match(
r"^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}).*?using (\d+) active threads",
results["high_active_threads"],
)
if match:
results["high_active_threads"] = (
f"{match.group(2)} active threads at {match.group(1)}"
)
else:
results["high_active_threads"] = "N/A"
def _int_count(key: str) -> int:
try:
return int(results[key].strip())
except (ValueError, KeyError):
return 0
buf = io.StringIO()
def _append_match_block(title: str, key: str, pattern: str) -> None:
n = _int_count(key)
if n <= 0:
return
print(f"\n {title} ({n}):", file=buf)
raw = run(f"{grep_cmd} -F {shlex.quote(pattern)} {lq}").strip()
for log_line in raw.splitlines():
print(f" {log_line.lstrip()}", file=buf)
lines = [
f"\n log name: {log}",
f" log size: {results['size']}",
f" log range: {results['begin_range']} - {results['end_range']}",
f" completed commands: {results['completed_cmds'].strip()}",
f" peak threads: {results['high_active_threads']}",
f" write waiters over 10 seconds: {results['num_write_waiters'].strip()}",
f" blocking mode commands: {results['num_blocking_mode'].strip()}",
f" killed by MaxLockTime: {results['num_killed_ml'].strip()}",
f" killed by MaxScanRows: {results['num_killed_msr'].strip()}",
f" killed by MaxResults: {results['num_killed_mres'].strip()}",
f" Perforce Server shutdown: {results['num_shutdown'].strip()}",
f" Perforce Server starting: {results['num_starting'].strip()}",
f" exited on a signal: {results['num_signal_exit'].strip()}",
f" journal repaired (child process): {results['num_journal_repaired'].strip()}",
]
for line in lines:
print(line, file=buf)
_append_match_block(
"Perforce Server shutdown lines", "num_shutdown", "Perforce Server shutdown"
)
_append_match_block(
"Perforce Server starting lines", "num_starting", "Perforce Server starting"
)
_append_match_block(
"exited on a signal (e.g. under Perforce server error)",
"num_signal_exit",
"exited on a signal",
)
_append_match_block(
"Journal repaired for child process",
"num_journal_repaired",
"Journal repaired for child process",
)
return buf.getvalue().rstrip()
def _resolve_log2sql_executable() -> str:
"""Return an executable path for ``log2sql``; exit 127 if not found."""
exe = log2sql_executable()
if os.path.isfile(exe) and os.access(exe, os.X_OK):
return exe
found = shutil.which(exe)
if found:
return found
print(
f"p4diag: log2sql not found ({exe!r}). "
"Install log2sql and set LOG2SQL_BIN to its path, or add it to PATH.",
file=sys.stderr,
)
sys.exit(127)
# Embedded trace used only for ``schema`` (build once under .p4diagnostics with log2sql).
_HELP_SCHEMA_SAMPLE_LOG_TEXT = """Perforce server info:
2026/05/08 09:07:49 pid 1070252 bruno@bruno_ws 127.0.0.1 [p4/2025.2/LINUX26X86_64/2907753] 'user-fstat //depot/...'
--- ident cmd/group 2D1CDF574ABC5DD327116F55B2CAA810/none
--- lapse 3.92s
--- usage 394+290us 816+0io 0+0net 143748k 0pf
--- memory cmd/proc 132mb/132mb
--- rpc msgs/size in+out 0+158354/0mb+66mb himarks 97604/97604 snd/rcv 3.21s/.000s
--- filetotals (svr) send/recv files+bytes 0+0mb/0+0mb
--- db.user
--- pages in+out+cached 3+0+2
--- locks read/write 1/0 rows get+pos+scan put+del 1+0+0 0+0
--- db.ticket
--- pages in+out+cached 3+0+2
--- locks read/write 1/0 rows get+pos+scan put+del 1+0+0 0+0
--- db.group
--- pages in+out+cached 4+0+2
--- locks read/write 1/0 rows get+pos+scan put+del 0+2+3 0+0
--- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms
--- db.domain
--- pages in+out+cached 4+0+3
--- locks read/write 1/0 rows get+pos+scan put+del 1+0+0 0+0
--- db.view
--- pages in+out+cached 1+0+4
--- locks read/write 1/0 rows get+pos+scan put+del 0+1+2 0+0
--- db.have
--- pages in+out+cached 6+0+5
--- locks read/write 0/0 rows get+pos+scan put+del 0+1+1 0+0
--- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms
--- db.revdx
--- pages in+out+cached 4+0+3
--- locks read/write 1/0 rows get+pos+scan put+del 0+1+19 0+0
--- total lock wait+held read/write 0ms+279ms/0ms+0ms
--- db.revhx
--- pages in+out+cached 6462+0+96
--- locks read/write 1/0 rows get+pos+scan put+del 0+1+158337 0+0
--- total lock wait+held read/write 0ms+279ms/0ms+0ms
--- db.locks
--- pages in+out+cached 3+0+2
--- locks read/write 0/0 rows get+pos+scan put+del 0+158354+158354 0+0
--- peek count 1 wait+held total/max 0ms+44ms/0ms+44ms
--- db.working
--- pages in+out+cached 3+0+2
--- locks read/write 0/0 rows get+pos+scan put+del 0+1+1 0+0
--- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms
--- db.excl
--- pages in+out+cached 3+0+2
--- locks read/write 0/0 rows get+pos+scan put+del 0+256+256 0+0
--- peek count 1 wait+held total/max 0ms+43ms/0ms+43ms
--- db.trigger
--- pages in+out+cached 3+0+2
--- locks read/write 1/0 rows get+pos+scan put+del 0+1+1 0+0
--- db.repo
--- pages in+out+cached 3+0+2
--- locks read/write 1/0 rows get+pos+scan put+del 0+1+1 0+0
--- db.graphperm
--- pages in+out+cached 4+0+2
--- locks read/write 1/0 rows get+pos+scan put+del 0+1+1 0+0
--- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms
--- db.protect
--- pages in+out+cached 5+0+2
--- locks read/write 0/0 rows get+pos+scan put+del 0+1+20 0+0
--- peek count 3 wait+held total/max 0ms+0ms/0ms+0ms
--- db.monitor
--- pages in+out+cached 2+4+1024
--- locks read/write 6/2 rows get+pos+scan put+del 6+0+0 2+0
"""
# Short quiet-cli names (normalized to long forms before ``argparse``; see ``_normalize_quiet_cli_argv``).
QUIET_CLI_SHORT_ALIASES = {
"log2sql": "log2sql",
"plots": "log2sql-plots",
"schema": "log2sql-schema",
}
P4DIAG_DESCRIPTION = """\
p4diag aggregates common P4 Server log analysis workflows in one script:
• Uses log2sql to build a SQLite database from P4 Server log trace data.
• Runs canned and summary SQL queries to surface patterns and commands that
often point to lock contention, CPU use, and other performance issues.
• Writes grep-based log statistics, summary SQL results, victim/culprit
write-wait analysis, and command-activity plots into text and HTML under
.p4diagnostics/ for viewing in a browser (interactive mode starts a small
local web server for those files).
Run from the directory containing your P4 Server log. Use the commands below,
or p4diag LOG for an interactive menu."""
P4DIAG_EPILOG = """
Requirements:
Python 3 (built-in sqlite3 module required — test: python3 -c "import sqlite3")
pip install tabulate
log2sql on PATH (or set LOG2SQL_BIN)
sqlite3 on PATH — LOG2SQL canned .sql reports, ad hoc SELECT, pid probe
(sqlite 3.31+ required for pid / @pid @start @table @duration bind parameters)
gnuplot — optional, for plots and summary PNGs
grep, sed — log stats and trim
Lock shortcuts (ww/wh/rw/rh) use Python's built-in sqlite3 module only; the
sqlite3 CLI is still required for the numbered SQL library and pid.
Install / distribution:
Download: https://swarm.workshop.perforce.com/projects/perforce_software-admin-toolkit/download/main/guest/perforce_software/admin_toolkit/p4diag
Browse: https://swarm.workshop.perforce.com/projects/perforce_software-admin-toolkit/files/main/guest/perforce_software/admin_toolkit/p4diag
Copy the p4diag script anywhere on PATH (e.g. ~/bin/p4diag) and chmod +x p4diag.
Run from the directory that contains your P4 server log; output goes in
./.p4diagnostics/ beside the log.
The standard canned-query library (lock, CPU, memory, command, victim/culprit
reports) is built into the script — no sql_queries/ directory required.
Optional: place sql_queries/ next to p4diag to override or add custom .sql reports
(or set P4DIAG_SQL_QUERIES). Optional summary_queries/ and recommendations/
directories override built-in summary SQL and performance-pattern rules.
Override paths with P4DIAG_SQL_QUERIES, P4DIAG_SUMMARY_QUERIES, and
P4DIAG_RECOMMENDATIONS.
Environment (optional):
LOG2SQL_BIN path to log2sql binary (default: log2sql on PATH)
P4DIAG_SQL_QUERIES directory of library .sql files (default: <p4diag>/sql_queries)
P4DIAG_SUMMARY_QUERIES summary report sections (default: <p4diag>/summary_queries)
P4DIAG_RECOMMENDATIONS pattern rules for summary RECOMMENDATIONS (default: <p4diag>/recommendations)
For --start and --end TIME, TIME="YYYY/MM/DD HH:MM:SS"
Large logs — trim before log2sql / summary:
Building a log2sql database and generating summary reports can take a long
time on large P4 Server logs (multi-GB traces are common). Trim the log to
the time window where the problem occurred before running log2sql or summary.
One useful heuristic: find the peak "active threads" line in the log (see
"peak threads" in p4diag stats output, or interactive menu command active),
note its timestamp, then trim roughly one hour before and one hour after that
peak. That window usually captures the surrounding contention without
processing the full log.
p4diag trim LOG --start TIME --end TIME
p4diag LOG --start TIME --end TIME # trim, then interactive menu
Interactive (TTY menu):
p4diag LOG # interactive TTY menu + HTTP server for .p4diagnostics/
p4diag LOG --start TIME --end TIME # trim log, then interactive menu + HTTP server
Interactive web server (HTTP):
Serves ./.p4diagnostics/ from the directory where you launched p4diag. The
printed URL uses this host's work-network IPv4 (default-route interface), not
the short hostname — open that URL from your laptop when VPN'd in. That address
is not 127.0.0.1: loopback only works if the browser runs on the same machine
as p4diag. Hostname links often fail when the name is missing from DNS or
/etc/hosts even though the IP works. SSH without VPN: ssh -L PORT:127.0.0.1:PORT.
Port is 8000 + (your Unix uid % 1000) — fixed per user, not per log or case
directory, and not overridable.
Only one listener per user: if that port is already in use, a second
interactive p4diag prints "Web server already running" and does not start
another server. The process already bound to the port keeps serving the first
session's .p4diagnostics/ and uses the first session's LOG for /. For two
cases at once, use quiet subcommands (no web server), open .p4diagnostics/
HTML directly, or run interactive sessions as different Unix users.
CLI (no TTY menu):
p4diag trim LOG --start TIME --end TIME # trim LOG by start/end time
p4diag stats LOG # log statistics to .p4diagnostics/LOG.stats.txt
p4diag log2sql LOG # create log2sql SQLite DB to .p4diagnostics/LOG.db
p4diag summary LOG # LOG.summary.txt/LOG.summary.HTML to .p4diagnostics
p4diag FILE.sql LOG # run NAME.sql (basename → P4DIAG_SQL_QUERIES)
p4diag list # list library queries from P4DIAG_SQL_QUERIES
p4diag schema # list schema for log2sql database
p4diag plots LOG # gnuplot ASCII + PNG under .p4diagnostics/
"""
# Subcommands handled by ``run_quiet_subcommand`` (also invocable without ``-q``; see ``_argv_invokes_quiet_cli``).
QUIET_CLI_SUBCOMMANDS = frozenset(
(
"log2sql",
"stats",
"log2sql-schema",
"log2sql-query-sql",
"log2sql-query-sql-list",
"log2sql-plots",
"list",
"summary",
"trim",
*QUIET_CLI_SHORT_ALIASES.keys(),
)
)
def _argv_is_sql_library_query(argv: list) -> bool:
"""True for ``p4diag NAME.sql [LOG]`` (library or absolute ``.sql`` path)."""
return bool(argv) and argv[0].lower().endswith(".sql")
def _argv_is_sql_query_list(argv: list) -> bool:
"""True for ``p4diag list``."""
return len(argv) == 1 and argv[0] == "list"
def _argv_invokes_quiet_cli(argv: list) -> bool:
"""True if ``argv`` (args after script name) should use quiet CLI (no menu / web server)."""
if not argv:
return False
if _argv_is_sql_library_query(argv) or _argv_is_sql_query_list(argv):
return True
return argv[0] in QUIET_CLI_SUBCOMMANDS
@contextmanager
def cli_spinner(message: str) -> Iterator[None]:
"""Animate a spinner on stderr during long CLI steps (no-op if stderr is not a TTY)."""
msg = message.rstrip()
if not getattr(sys.stderr, "isatty", lambda: False)():
yield
return
stop = threading.Event()
def _spin() -> None:
wrote_frame = False
for ch in itertools.cycle("|/-\\"):
if stop.wait(0.1):
break
sys.stderr.write(f"\r{msg} {ch} ")
sys.stderr.flush()
wrote_frame = True
if wrote_frame:
sys.stderr.write("\n")
sys.stderr.flush()
t = threading.Thread(target=_spin, daemon=True)
t.start()
try:
yield
finally:
stop.set()
t.join(timeout=60)
if t.is_alive() and getattr(sys.stderr, "isatty", lambda: False)():
sys.stderr.write("\n")
sys.stderr.flush()
@contextmanager
def quiet_stderr_activity(message: str):
"""While ``QUIET``, show ``cli_spinner`` on stderr."""
if not QUIET:
yield
return
with cli_spinner(message):
yield
import html
import http.server
import socketserver
import socket
from urllib.parse import unquote, urlparse
# ANSI color codes
CYAN = '\033[96m'
BOLD = '\033[1m'
RESET = '\033[0m'
WHITE_BOLD = '\033[97;1m'
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
DIM = '\033[2m'
def handler(signum, frame):
return
class log2sql():
def __init__(self, theFile):
self.logFile = "unset"
self.logFileDetails = "unset"
self.databaseFile = "unset"
self.summaryFile = ""
self.maxActiveThreadsFile = ""
self.activeThreadsSummaryFile = ""
self.failuresFile = ""
self.summaryQueriesFile = ""
self.begin_range=0
self.end_range=0
self.min_hold_time = 2000
self.VALID_COLUMNS = {
# process table columns
"processkey", "lineNumber", "pid", "startTime", "endTime", "computedLapse", "completedLapse",
"paused", "user", "workspace", "ip", "app", "cmd", "args", "uCpu", "sCpu",
"diskIn", "diskOut", "ipcIn", "ipcOut", "maxRss", "pageFaults", "memMB", "memPeakMB",
"rpcMsgsIn", "rpcMsgsOut", "rpcSizeIn", "rpcSizeOut", "rpcHimarkFwd", "rpcHimarkRev",
"rpcSnd", "rpcRcv", "fileTotalsSnd", "fileTotalsRcv", "fileTotalsSndMB", "fileTotalsRcvMB",
"running", "netSyncFilesAdded", "netSyncFilesUpdated", "netSyncFilesDeleted",
"netSyncBytesAdded", "netSyncBytesUpdated", "lbrRcsOpens", "lbrRcsCloses", "lbrRcsCheckins",
"lbrRcsExists", "lbrRcsReads", "lbrRcsReadBytes", "lbrRcsWrites", "lbrRcsWriteBytes",
"lbrRcsDigests", "lbrRcsFileSizes", "lbrRcsModtimes", "lbrRcsCopies", "lbrBinaryOpens",
"lbrBinaryCloses", "lbrBinaryCheckins", "lbrBinaryExists", "lbrBinaryReads", "lbrBinaryReadBytes",
"lbrBinaryWrites", "lbrBinaryWriteBytes", "lbrBinaryDigests", "lbrBinaryFileSizes",
"lbrBinaryModtimes", "lbrBinaryCopies", "lbrCompressOpens", "lbrCompressCloses",
"lbrCompressCheckins", "lbrCompressExists", "lbrCompressReads", "lbrCompressReadBytes",
"lbrCompressWrites", "lbrCompressWriteBytes", "lbrCompressDigests", "lbrCompressFileSizes",
"lbrCompressModtimes", "lbrCompressCopies", "lbrUncompressOpens", "lbrUncompressCloses",
"lbrUncompressCheckins", "lbrUncompressExists", "lbrUncompressReads", "lbrUncompressReadBytes",
"lbrUncompressWrites", "lbrUncompressWriteBytes", "lbrUncompressDigests", "lbrUncompressFileSizes",
"lbrUncompressModtimes", "lbrUncompressCopies", "error",
# tableUse table columns
"tableName", "pagesIn", "pagesOut", "pagesCached", "pagesSplitInternal", "pagesSplitLeaf",
"readLocks", "writeLocks", "getRows", "posRows", "scanRows", "putRows", "delRows",
"totalReadWait", "totalReadHeld", "totalWriteWait", "totalWriteHeld", "maxReadWait",
"maxReadHeld", "maxWriteWait", "maxWriteHeld", "peekCount", "totalPeekWait", "totalPeekHeld",
"maxPeekWait", "maxPeekHeld", "triggerLapse"
}
if theFile:
self.logFile = theFile
self.databaseFile = theFile
def setLogFile(self,show_menu=True):
currentDatabaseFile = self.databaseFile
if self.databaseFile == "unset" or show_menu:
while True:
filelist=self.listFiles(".")
database_file=input('\n\033[1mSelect log file name or number (b = back, x = exit) > \033[0m')
if database_file.isnumeric():
index=int(database_file) - 1
if validateInput(index, len(filelist)+1):
database_file=filelist[index]
break
else:
break
self.databaseFile=database_file.strip()
if self.databaseFile == 'x' or self.databaseFile == 'exit':
exitP4()
if self.databaseFile == 'b' or self.databaseFile == 'back':
exitP4()
if self.databaseFile.endswith('.db'):
self.logFile = self.databaseFile[:-3]
db_name = os.path.basename(self.databaseFile)
else:
self.logFile = self.databaseFile
db_name = os.path.basename(self.logFile) + '.db'
log_abs = os.path.abspath(self.logFile)
diag_dir = os.path.join(os.path.dirname(log_abs), '.p4diagnostics')
os.makedirs(diag_dir, exist_ok=True)
self.databaseFile = os.path.join(diag_dir, db_name)
log_base = os.path.basename(self.logFile)
self.summaryFile = os.path.join(diag_dir, log_base + ".summary" + ".txt")
self.summaryFileHTML = os.path.join(diag_dir, log_base + ".summary" + ".html")
self.summaryQueriesFile = os.path.join(diag_dir, "queries.sql")
self.logFileDetails = os.path.join(diag_dir, log_base + ".stats.txt")
self.errorsSummaryFile = os.path.join(diag_dir, log_base + ".errors_summary")
self.maxActiveThreadsFile = os.path.join(diag_dir, log_base + ".active_threads")
self.activeThreadsSummaryFile = os.path.join(diag_dir, log_base + ".active_threads_summary")
self.failuresFile = os.path.join(diag_dir, log_base + ".fails")
if show_menu:
self.menu()
def createDatabase(self):
if not os.path.exists(self.databaseFile):
cmd = [_resolve_log2sql_executable(), '-d', self.databaseFile, '--no.metrics', self.logFile]
if QUIET:
with open(LOG_FILE, "a") as f:
f.write("[INFO] Generating log2sql database ...\n")
proc = s.Popen(
cmd,
stdout=f,
stderr=f,
text=True,
bufsize=1,
universal_newlines=True,
)
with quiet_stderr_activity("Generating log2sql database..."):
proc.wait()
else:
with cli_spinner("Generating log2sql database..."):
proc = s.Popen(
cmd,
stdout=s.PIPE,
stderr=s.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
if proc.stdout is not None:
for line in proc.stdout:
print(line.rstrip())
proc.wait()
if proc.returncode == 0:
if QUIET:
with open(LOG_FILE, "a") as f:
f.write(f"[INFO] Generated log2sql database {self.databaseFile}\n")
else:
print(
f"{GREEN}✓ Generated log2sql database{RESET} "
f"{BOLD}{GREEN}{self.databaseFile}{RESET}"
)
else:
if QUIET:
with open(LOG_FILE, "a") as f:
f.write(f"[FAIL] log2sql failed with exit code {proc.returncode}\n")
else:
print(f"{RED}✗ log2sql failed with exit code {proc.returncode}{RESET}")
def query_menu(self):
if not os.path.exists(self.databaseFile):
print("You need to generate a database first...")
print("Run createDatabase from the menu.")
self.menu()
return
commands = [
(".schema", "show schema for trace DB tables"),
("m or menu", "query menu"),
("b or back", "back to main menu"),
("x or exit", "exit p4diag"),
]
print(f"\n {BOLD}command [table] [startDate endDate] [limit] [column1,columnN]{RESET}\n")
print(" command = wh | ww | rh | rw | pid")
print(" pid = probe one command (prompts for PID + startTime)")
print(" table = table name")
print(" startDate = begin range of date/time (format: YYYY/MM/DD HH:MM:SS)")
print(" endDate = end range of date/time (format: YYYY/MM/DD HH:MM:SS)")
print(" limit = LIMIT number for results (default 25)")
print(" column1,columnN = additional columns to include in output\n")
queries = list_sql_library_names()
if queries:
print_numbered_list(queries)
print()
else:
print(f" {YELLOW}No SQL library queries available.{RESET}")
print(
" Optional: place sql_queries/ next to p4diag, "
"or set P4DIAG_SQL_QUERIES.\n"
)
print(
f" {BOLD}h or help{RESET} describes what each numbered query is for "
"and shortcut usage examples\n"
)
for cmd, desc in commands:
print(f" {BOLD}{cmd:<15}{RESET} {desc}")
print(
f"\nEnter {BOLD}command{RESET}, path to a .sql file, "
f"or a {BOLD}numbered SQL query{RESET}.\n"
)
self.queryPrompt()
def queryPrompt(self):
query = input(
"\033[1mLOG2SQL ("
+ os.path.basename(self.databaseFile)
+ ") (m = menu): > \033[0m"
)
if (query == 'b' or query == 'back'):
self.menu()
elif (query == 'q' or query == 'quit'):
self.menu()
elif (query == 'm' or query == 'menu'):
self.query_menu()
elif (query == 'x' or query == 'exit'):
exitP4();
elif query.strip().lower() in ('h', 'help', '?'):
self.print_query_shortcut_help()
self.queryPrompt()
elif query.strip().isnumeric():
queries = list_sql_library_names()
if not queries:
print(f"\n {YELLOW}No SQL library queries available.{RESET}")
self.queryPrompt()
return
index = int(query.strip()) - 1
if validateInput(index, len(queries)):
self.query(queries[index])
else:
print(f"\n Enter a number between 1 and {len(queries)}.")
self.queryPrompt()
else:
self.query(query)
def print_query_shortcut_help(self) -> None:
"""Help for interactive LOG2SQL commands, lock shortcuts, and SQL library."""
sql_dir = p4diag_sql_queries_dir()
print(f"\n {BOLD}LOG2SQL command help{RESET}\n")
print(f" {BOLD}Lock shortcuts{RESET}\n")
print(f" {BOLD}ww{RESET} write waiters (victims — blocked on write locks)")
print(f" {BOLD}wh{RESET} write holders (culprits — holding write locks)")
print(f" {BOLD}rw{RESET} read waiters (victims — blocked on read locks)")
print(f" {BOLD}rh{RESET} read holders (culprits — holding read locks)")
print(f" {BOLD}pid{RESET} probe one command (prompts for PID + startTime)\n")
print(
f" Syntax: {BOLD}command [table] [startDate startTime endDate endTime]"
f" [limit] [col1,col2,...]{RESET}\n"
)
print(" table db table name (optional; db. prefix is stripped)")
print(" start/end both required if either is used (YYYY/MM/DD HH:MM:SS)")
print(" limit max rows (default 25; ignored when a date range is set)")
print(" col1,col2 extra columns from the trace DB (comma-separated)\n")
print(f" {BOLD}Examples:{RESET}\n")
print_help_query_table(
list(_LOCK_SHORTCUT_EXAMPLES),
col1="Command",
col2="Shows",
)
print(
"\n START END = YYYY/MM/DD HH:MM:SS (both required if either is used)\n"
)
print(f" {BOLD}SQL library (numbered list in menu){RESET}\n")
print(
" The numbered .sql files in the menu are canned reports shipped with "
"p4diag or added locally."
)
print(" Run one by entering its menu number, or type the basename")
print(" (e.g. cpu_summary.sql) or a full path to any .sql file.\n")
print(f" Library directory: {BOLD}{sql_dir}{RESET}\n")
queries = list_sql_library_names()
if queries:
print_sql_library_help(queries)
print(" Add your own queries:")
print(" • copy a .sql file into that directory (any basename ending in .sql)")
print(" • type m at the prompt to refresh the numbered list")
print(" • override the directory with P4DIAG_SQL_QUERIES if needed")
print(
" Scripts may use sqlite dot-commands (.print, .width) or plain SQL."
)
print(
" pid prompts for PID + startTime (built into p4diag, not a numbered query).\n"
" locks_all_duration.sql prompts for a duration in ms (or: locks_all_duration.sql 5000).\n"
" locks_held_total.sql prompts for an optional table (or: locks_held_total.sql revhx).\n"
" locks_table_by_cmd.sql prompts for a table name (or: locks_table_by_cmd.sql revdx).\n"
)
print(f" {BOLD}Other inputs{RESET}\n")
print(" .schema show trace DB table columns")
print(" SELECT ... run ad hoc SQL against the trace database\n")
def createLogSummary(
self,
filter_section=None,
force_regenerate: bool = False,
) -> bool:
"""Write ``.summary.txt`` / ``.summary.html`` unless a valid cache exists. Returns True if rebuilt."""
MAX_COL_WIDTH = 50 # Max width before truncation
summary_sections = get_summary_sections()
meta_path = summary_meta_path(self.summaryFile)
if force_regenerate:
for path in (self.summaryFile, self.summaryFileHTML, meta_path):
if os.path.isfile(path):
try:
os.remove(path)
except OSError:
pass
if summary_cache_is_current(
self.logFile,
self.databaseFile,
self.logFileDetails,
self.summaryFile,
self.summaryFileHTML,
meta_path,
):
if QUIET and LOG_FILE:
try:
with open(LOG_FILE, "a") as f:
f.write(
f"[INFO] Summary report unchanged, using {self.summaryFile}\n"
)
except OSError:
pass
elif not QUIET:
print(
f"{DIM}Summary report up to date{RESET} "
f"{BOLD}{self.summaryFile}{RESET}"
)
return False
def gen_summary_report():
conn = sqlite3.connect(self.databaseFile)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
sections_to_run = (
summary_sections.items()
if not filter_section
else [(filter_section, summary_sections.get(filter_section, []))]
)
vc_panels = get_victim_culprit_panels_cached(self.databaseFile)
vc_html = format_victim_culprit_html(vc_panels)
vc_text_embed = format_victim_culprit_ascii(vc_panels, include_banner=False)
base_dir = os.getcwd()
target_dir = os.path.dirname(self.databaseFile)
diag_dir = target_dir
log_path = os.path.join(base_dir, self.logFile)
db_path = os.path.join(base_dir, self.databaseFile)
write_p4_plot_pngs(
log_path,
db_path,
target_dir,
skip_existing=True,
)
log_basename = os.path.basename(self.logFile)
db_basename = os.path.basename(self.databaseFile)
png_files = [
f"Active.{log_basename}.png",
f"dbWaitTime.{db_basename}.png",
f"Incoming.{db_basename}.png",
f"Running.{db_basename}.png",
]
stats_lines = []
if self.logFileDetails and os.path.isfile(self.logFileDetails):
with open(self.logFileDetails, "r") as detailsFile:
stats_lines = detailsFile.readlines()
stats_text = "".join(stats_lines)
plots_section_body = _build_summary_plots_section(diag_dir, png_files)
recommendation_matches = evaluate_recommendations(
self.databaseFile,
stats_text=stats_text,
)
recommendations_text = format_recommendations_plain_text(recommendation_matches)
recommendations_html = format_recommendations_html(recommendation_matches)
sql_buf = io.StringIO()
queryFile = open(self.summaryQueriesFile, "w")
queryFile.write(f".mode column\n")
queryFile.write(f".headers on\n")
for section_name, queries in sections_to_run:
sql_buf.write(f"\n--- {section_name.upper()} ---\n")
canned_basename = _SUMMARY_CANNED_QUERY_SECTIONS.get(section_name)
if canned_basename:
script_body = resolve_sql_query_text(canned_basename)
if not script_body:
sql_buf.write(
f"(Canned query not found: {canned_basename})\n"
)
continue
queryFile.write(f"\n-- {section_name} ({canned_basename})\n")
queryFile.write(script_body)
if not script_body.endswith("\n"):
queryFile.write("\n")
try:
section_text = self._run_sql_read_script_to_string(script_body)
sql_buf.write(section_text)
if section_text and not section_text.endswith("\n"):
sql_buf.write("\n")
except RuntimeError as e:
sql_buf.write(f"Error: {e}\n")
continue
for label, query in queries:
queryFile.write(f".print \'\'\n")
queryFile.write(f"\n.print \'{label}\'\n")
sql_buf.write(f"\n@@QUERY@@{label}@@\n")
try:
queryFile.write(f"{query}\n")
cursor.execute(query)
rows = cursor.fetchall()
if rows:
headers = rows[0].keys()
columns = list(headers)
data = [
[str(row[col]) if row[col] is not None else "" for col in columns]
for row in rows
]
truncated_data = [
[val if len(val) <= MAX_COL_WIDTH else val[:MAX_COL_WIDTH - 3] + "..." for val in row]
for row in data
]
widths = [max(len(col), max(len(row[i]) for row in truncated_data)) for i, col in enumerate(columns)]
header_line = " ".join(col.ljust(widths[i]) for i, col in enumerate(columns))
sql_buf.write(header_line + "\n")
underline = " ".join("-" * widths[i] for i in range(len(columns)))
sql_buf.write(underline + "\n")
for row in truncated_data:
sql_buf.write(" ".join(row[i].ljust(widths[i]) for i in range(len(columns))) + "\n")
else:
sql_buf.write("(No results)\n")
except sqlite3.Error as e:
sql_buf.write(f"Error: {e}\n")
queryFile.close()
conn.close()
sql_sections_text = sql_buf.getvalue()
sep = "=" * 72
title_line = f"=== Summary Report for {self.logFile} ===\n"
with open(self.summaryFile, "w") as summaryFile:
summaryFile.write(title_line)
summaryFile.write("\nLOG STATISTICS\n")
summaryFile.write(sep + "\n")
if stats_text.strip():
summaryFile.write(stats_text)
if not stats_text.endswith("\n"):
summaryFile.write("\n")
else:
summaryFile.write("(no log statistics file; run: p4diag stats <log>)\n")
summaryFile.write("\nRECOMMENDATIONS\n")
summaryFile.write(sep + "\n")
summaryFile.write(recommendations_text)
summaryFile.write("\nPLOTS\n")
summaryFile.write(sep + "\n")
summaryFile.write(plots_section_body)
summaryFile.write("\nDETAILED SQL REPORTS\n")
summaryFile.write(sep + "\n")
summaryFile.write(_summary_sql_sections_plain_text(sql_sections_text))
if not sql_sections_text.endswith("\n"):
summaryFile.write("\n")
summaryFile.write("\nVICTIM CULPRIT REPORT\n")
summaryFile.write(sep + "\n")
summaryFile.write(vc_text_embed)
if not vc_text_embed.endswith("\n"):
summaryFile.write("\n")
html_file = self.summaryFileHTML
stats_html_inner = (
stats_text
if stats_text.strip()
else "(no log statistics file; run: p4diag stats <log>)"
)
png_entries: List[Tuple[str, str]] = []
for fname in png_files:
full_path = os.path.join(diag_dir, fname)
if os.path.exists(full_path) and os.path.getsize(full_path) > 0:
png_entries.append((fname, fname))
plots_empty = (
plots_section_body.strip()
or "No PNG files yet — install gnuplot and run log2sql or plots."
)
html_content = render_summary_html_report(
log_file=self.logFile,
db_file=self.databaseFile,
stats_html_inner=stats_html_inner,
recommendations_html=recommendations_html,
png_entries=png_entries,
plots_empty_note=plots_empty,
sql_sections_html=_summary_sql_sections_to_html(sql_sections_text),
vc_html=vc_html,
)
with open(html_file, "w", encoding="utf-8", errors="replace") as f:
f.write(html_content)
sig = _summary_inputs_signature(
self.logFile, self.databaseFile, self.logFileDetails
)
if sig:
payload = {
"version": SUMMARY_CACHE_VERSION,
"layout_version": SUMMARY_HTML_LAYOUT_VERSION,
"recommendations_sig": _recommendations_rules_signature(),
**sig,
}
_summary_meta_write(meta_path, payload)
thread = threading.Thread(target=gen_summary_report)
if QUIET:
with open(LOG_FILE, "a") as f:
f.write("[INFO] Generating summary report ...\n")
thread.start()
with quiet_stderr_activity("Generating summary report..."):
thread.join()
else:
with cli_spinner("Generating summary report..."):
thread.start()
thread.join()
if QUIET:
with open(LOG_FILE, "a") as f:
f.write(f"[INFO] Generated summary report {self.summaryFileHTML}\n")
print(f"Wrote summary report: {self.summaryFile}", flush=True)
print(f"Wrote summary HTML: {self.summaryFileHTML}", flush=True)
else:
print(
f"{GREEN}✓ Generated summary report{RESET} "
f"{BOLD}{self.summaryFileHTML}{RESET}"
)
return True
def printLogSummary(self):
self.createLogSummary(force_regenerate=False)
if os.path.isfile(self.summaryFile):
s.run(["less", self.summaryFile])
def createPlots(self):
base_dir = os.getcwd()
target_dir = os.path.dirname(self.databaseFile)
log_path = os.path.join(base_dir, self.logFile)
db_path = os.path.join(base_dir, self.databaseFile)
write_p4_plot_pngs(log_path, db_path, target_dir, skip_existing=True)
def menu(self):
menu_w = 12
print()
for cmd, desc in (
("1. query", "Query trace database"),
("2. summary", "Summary report for trace database"),
("3. stats", "Show log file statistics"),
("4. search", "Search log file for string"),
("5. pid", "Probe one command by PID + startTime"),
("6. errors", "Show log file errors messages"),
):
print(f" {BOLD}{cmd:<{menu_w}}{RESET}{desc}")
print(f"\n {BOLD}h or help{RESET}\tHelp for server log analysis")
print(f" {BOLD}m or menu{RESET} - server log menu")
print(f" {BOLD}b or back{RESET}- exit p4diag")
print(f" {BOLD}x or exit{RESET} - exit p4diag")
self.commandPrompt()
def commandPrompt(self):
command = input("\n\033[1mcommand (m = menu) > \033[0m")
if (command == 'b' or command == 'back'):
exitP4()
elif (command == 'm' or command == 'menu'):
self.menu()
elif (command == 'x' or command == 'exit'):
exitP4()
else:
self.runCommand(command)
def runCommand(self,command):
if (command == '1' or command == 'query'):
self.query_menu()
if (command == '2' or command == 'summary'):
self.printLogSummary()
self.commandPrompt()
if (command == '3' or command == 'stats'):
self.printLogStats()
self.commandPrompt()
elif (command == '4' or command == 'search'):
self.search_log()
self.commandPrompt()
elif (command == '5' or command == 'pid'):
self.show_pid()
elif (command == '6' or command == 'errors'):
self.show_errors()
self.commandPrompt()
elif (command == 'h' or command == 'help'):
usage_log2sql()
self.commandPrompt()
elif (command == 'b' or command == 'back'):
exitP4()
elif (command == 'x' or command == 'exit'):
exitP4()
else:
self.commandPrompt()
def trim_log(self, start=None, end=None):
def get_valid_time(prompt):
while True:
value = input(prompt)
if value == 'b':
exitP4()
if value == 'x':
exitP4()
if re.match(r'^20\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}$', value.strip()):
return value.strip()
print("\tPlease enter a valid value for date/time")
if self.databaseFile == "unset":
while True:
filelist = listFiles(".")
log_to_trim = input('\n\033[1mLog to trim (b = back, x = exit) > \033[0m')
if log_to_trim == 'b':
exitP4()
if log_to_trim == 'x':
exitP4()
if log_to_trim.isnumeric():
index = int(log_to_trim) - 1
if validateInput(index, len(filelist)):
log_to_trim = filelist[index]
startTime = get_valid_time("\n\033[1mEnter start date/time (yyyy/mm/dd hh:mm:ss) > \033[0m")
endTime = get_valid_time("\033[1mEnter end date/time (yyyy/mm/dd hh:mm:ss) > \033[0m")
handle_trim(log_to_trim, startTime, endTime)
else:
log_to_trim = self.logFile if self.logFile not in ("unset", "") else self.databaseFile
startTime = start
endTime = end
if startTime and endTime and log_to_trim not in ("unset", ""):
return trim_p4_server_log_file(log_to_trim, startTime, endTime)
def find_nearest_line_by_probe(log_file, timestamp_str, direction='forward'):
def grep_line(ts_minute):
# Match any timestamp within the same minute, e.g., 2025/10/22 08:56:XX
pattern = f"{ts_minute}:[0-9][0-9]"
cmd = f"grep -a -n -E '{pattern}' {log_file} | head -1"
result = s.getoutput(cmd).strip()
if result:
parts = result.split(":", 1)
if len(parts) == 2:
line_num = int(parts[0])
line_text = parts[1]
return line_num, line_text.strip()
return None, None
timestamp = datetime.strptime(timestamp_str, "%Y/%m/%d %H:%M:%S")
for i in range(1, 121): # up to 2 hours
probe_time = timestamp + timedelta(minutes=i) if direction == 'forward' else timestamp - timedelta(minutes=i)
probe_time_str = probe_time.strftime("%Y/%m/%d %H:%M")
line_num, line_text = grep_line(probe_time_str)
if line_num:
return line_num, line_text.strip()
return None, None
# Create filename
rep = {"/": "", ":": "", " ": "_"}
st = startTime
et = endTime
for i, j in rep.items():
st = st.replace(i, j)
et = et.replace(i, j)
logTrim = log_to_trim + "." + st + "-" + et
if not os.path.exists(logTrim):
head = s.getoutput(f"grep -a -n '{startTime}' {log_to_trim} | head -1 | cut -f 1 -d :")
tail = s.getoutput(f"grep -a -n '{endTime}' {log_to_trim} | tail -1 | cut -f 1 -d :")
if not head:
print(f"Start time not found. Searching for nearest available time after: {startTime}")
head_line, head_text = find_nearest_line_by_probe(log_to_trim, startTime, direction='forward')
if head_line:
head = head_line
st = head_text.split()[0] + " " + head_text.split()[1]
print(f"Found nearest time after: {st}")
else:
print(f"Start time {startTime} not found and no nearby timestamp found.")
return None
if not tail:
print(f"End time not found. Searching for nearest available time before: {endTime}")
tail_line, tail_text = find_nearest_line_by_probe(log_to_trim, endTime, direction='backward')
if tail_line:
tail = tail_line
et = tail_text.split()[0] + " " + tail_text.split()[1]
print(f"Found nearest time before: {et}")
else:
print(f"End time {endTime} not found and no nearby timestamp found.")
return None
# Create filename
rep = {"/": "", ":": "", " ": "_"}
for i, j in rep.items():
st = st.replace(i, j)
et = et.replace(i, j)
logTrim = log_to_trim + "." + st + "-" + et
def run_trim():
quit_line = str(int(tail) + 1)
s.getoutput(f"sed -n '{head},{tail}p; {quit_line}q' {log_to_trim} > {logTrim}")
thread = threading.Thread(target=run_trim)
if QUIET:
thread.start()
with quiet_stderr_activity("Trimming log..."):
thread.join()
print(f"Trimmed log: {logTrim}", flush=True)
else:
with cli_spinner("Trimming log file..."):
thread.start()
thread.join()
print(f"{GREEN}✓ Trimmed log{RESET} {BOLD}{GREEN}{logTrim}{RESET}")
return logTrim
def createLogStats(self, force_regenerate: bool = False) -> bool:
"""Write ``.stats.txt`` from the P4 log unless a valid cache exists. Returns True if regenerated."""
meta_path = log_stats_meta_path(self.logFileDetails)
log_path = self.logFile
if force_regenerate:
for p in (self.logFileDetails, meta_path):
if os.path.isfile(p):
try:
os.remove(p)
except OSError:
pass
if log_stats_cache_is_current(log_path, self.logFileDetails, meta_path):
if QUIET and LOG_FILE:
try:
with open(LOG_FILE, "a") as f:
f.write(
f"[INFO] Log statistics unchanged (log mtime/size), using {self.logFileDetails}\n"
)
except OSError:
pass
elif not QUIET:
print(f"{DIM}Log statistics up to date{RESET} {BOLD}{self.logFileDetails}{RESET}")
return False
for p in (self.logFileDetails, meta_path):
if os.path.isfile(p):
try:
os.remove(p)
except OSError:
pass
def gen_log_stats():
text = generate_log_summary_text(self.logFile)
with open(self.logFileDetails, "w", encoding="utf-8", errors="replace") as outfile:
outfile.write(text)
if text and not text.endswith("\n"):
outfile.write("\n")
sig = _log_source_signature(self.logFile)
if sig:
_log_stats_meta_write(meta_path, sig)
thread = threading.Thread(target=gen_log_stats)
if QUIET:
with open(LOG_FILE, "a") as f:
f.write("[INFO] Generating log statistics ...\n")
thread.start()
with quiet_stderr_activity("Generating log statistics..."):
thread.join()
else:
with cli_spinner("Generating log statistics..."):
thread.start()
thread.join()
if QUIET:
with open(LOG_FILE, "a") as f:
f.write(f"[INFO] Generated log statistics {self.logFileDetails}\n")
else:
print(
f"{GREEN}✓ Generated log statistics{RESET} "
f"{BOLD}{GREEN}{self.logFileDetails}{RESET}"
)
return True
def printLogStats(self):
self.createLogStats()
s.run(["cat", self.logFileDetails], check=True)
def search_log(self):
print(
f"\n Search {BOLD}{os.path.basename(self.logFile)}{RESET} with grep "
"(case-sensitive substring match)."
)
print(" Quote the pattern if it contains spaces or shell characters.")
context = input(
"\n Show each match with surrounding context lines (±75)? (y/n) [n]: "
).strip().lower()
search_string = input(" Enter search string: ").strip()
if not search_string:
print("\n No search string entered.")
return
if context in ("y", "yes"):
result = s.run(
[
"grep",
"-a",
"--color=always",
"-C75",
"--",
search_string,
self.logFile,
],
capture_output=True,
text=True,
)
if result.returncode > 1:
print(result.stderr or f"grep failed (exit {result.returncode})")
return
lines = result.stdout
else:
lines = s.getoutput(
"grep -a "
+ shlex.quote(search_string)
+ " "
+ shlex.quote(self.logFile)
)
if lines:
print(lines, end="" if lines.endswith("\n") else "\n")
else:
print(" (no matches)")
def show_active_threads(self):
if not (os.path.isfile(self.activeThreadsSummaryFile)):
pid_found = s.getoutput("grep -a -n 'active threads' " + self.logFile + " | sort -nr -k10 | head -1 | cut -f 5 -d ' ' | cut -f 1 -d ':'")
if pid_found:
threadsFile = open(self.activeThreadsSummaryFile, "a")
threadsFile.write("\nMax active threads: \n\t")
s.getoutput("grep -a -n 'active threads' " + self.logFile + " | grep -a " + pid_found + " > " + self.maxActiveThreadsFile)
threadsFile.write(s.getoutput("grep -a -n 'active threads' " + self.logFile + " | grep -a " + pid_found + " | sort -nr -k10 | head -1"))
threadsFile.close()
print("\nSummary of active threads in log file " + self.logFile + "\n")
else:
print("\nNo active threads messages found in\033[1m " + self.logFile + "\033[0m")
return
s.run(["cat", self.activeThreadsSummaryFile])
show_all = input("\n\nShow all active threads from log (y/n)? ")
if show_all == "yes" or show_all == "y":
s.run(["less", self.maxActiveThreadsFile])
self.commandPrompt()
def show_errors(self):
if not (os.path.isfile(self.errorsSummaryFile)):
errorsFile = open(self.errorsSummaryFile, "a")
# calculate and report details
print("\nSummary of errors in log file " + self.logFile + "\n")
errorsFile.write("\nPartner exited unexpectedly errors: ")
errorsFile.write(s.getoutput("grep -a 'Partner exited unexpectedly' " + self.logFile + " | wc -l"))
errorsFile.write("\nFail errors\n")
failure = s.getoutput("grep -a -A5 'Perforce server error' " + self.logFile + " | grep -a failed | sort | uniq -c | sort -r")
s.getoutput("grep -a -n -A5 'Perforce server error' " + self.logFile + " > " + self.failuresFile)
errorsFile.write(s.getoutput("grep -a -A5 'Perforce server error' " + self.logFile + " | grep -a failed | sort | uniq -c | sort -r"))
print("\nFail errors\n")
print(failure)
errorsFile.close()
s.run(["cat", self.errorsSummaryFile])
show_all = input("\n\nShow all errors from log (y/n)? ")
if show_all == "yes" or show_all == "y":
s.run(["less", self.failuresFile])
self.commandPrompt()
def show_pid(self):
self.probe_pid_interactive(return_to="command")
def _prompt_probe_start_time(self) -> Optional[str]:
while True:
value = input(
"\033[1mEnter startTime (YYYY/MM/DD HH:MM:SS, b = back): \033[0m"
).strip()
if value in ("b", "back"):
return None
if value in ("x", "exit"):
exitP4()
if re.match(r"^20\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}$", value):
return value
print("\tPlease enter startTime as YYYY/MM/DD HH:MM:SS")
def probe_pid_interactive(self, *, return_to: str = "query") -> None:
"""Prompt for PID + startTime; probe trace DB and log tracking lines."""
pid = input("\033[1mEnter process id (b = back): \033[0m").strip()
if pid in ("b", "back"):
if return_to == "query":
self.query_menu()
else:
self.commandPrompt()
return
if pid in ("x", "exit"):
exitP4()
if not pid:
print(f"\n {RED}PID is required.{RESET}")
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
return
start = self._prompt_probe_start_time()
if start is None:
if return_to == "query":
self.query_menu()
else:
self.commandPrompt()
return
try:
conn = sqlite3.connect(self.databaseFile)
try:
row = conn.execute(
"SELECT 1 FROM process WHERE pid = ? AND startTime = ?",
(pid, start),
).fetchone()
finally:
conn.close()
except sqlite3.Error as e:
print(f"\n {RED}Database error: {e}{RESET}")
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
return
if not row:
print(
f"\n {RED}No command found for pid {pid} at startTime {start!r} "
f"in {os.path.basename(self.databaseFile)}{RESET}"
)
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
return
rc = self.run_probe_pid(pid, start)
if rc != 0:
print(f"\n {RED}PID probe failed (exit {rc}).{RESET}")
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
def locks_held_total_interactive(self, *, return_to: str = "query") -> None:
"""Prompt for optional table name; run ``locks_held_total.sql``."""
raw = input(
"\033[1mEnter table name (optional, b = back) [all]: \033[0m"
).strip()
if raw in ("b", "back"):
if return_to == "query":
self.query_menu()
else:
self.commandPrompt()
return
if raw in ("x", "exit"):
exitP4()
table = None
if raw:
table = normalize_table_parameter(raw)
err = validate_table_parameter(table)
if err:
print(f"\n {RED}{err}.{RESET}")
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
return
rc = self._run_sql_query_token("locks_held_total.sql", table=table)
if rc != 0:
print(f"\n {RED}locks_held_total failed (exit {rc}).{RESET}")
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
def sql_required_table_interactive(
self, sql_token: str, *, return_to: str = "query"
) -> None:
"""Prompt for table name; run a ``-- p4diag: require-table`` script."""
raw = input("\033[1mEnter table name (b = back): \033[0m").strip()
if raw in ("b", "back"):
if return_to == "query":
self.query_menu()
else:
self.commandPrompt()
return
if raw in ("x", "exit"):
exitP4()
table = normalize_table_parameter(raw)
err = validate_table_parameter(table)
if err:
print(f"\n {RED}{err}.{RESET}")
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
return
rc = self._run_sql_query_token(sql_token, table=table)
if rc != 0:
print(
f"\n {RED}{normalize_sql_basename(sql_token)} failed (exit {rc}).{RESET}"
)
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
def locks_all_duration_interactive(self, *, return_to: str = "query") -> None:
"""Prompt for duration threshold (ms); run ``locks_all_duration.sql``."""
raw = input(
"\033[1mEnter duration threshold in ms (b = back) [5000]: \033[0m"
).strip()
if raw in ("b", "back"):
if return_to == "query":
self.query_menu()
else:
self.commandPrompt()
return
if raw in ("x", "exit"):
exitP4()
duration = raw or "5000"
err = validate_duration_parameter(duration)
if err:
print(f"\n {RED}{err}.{RESET}")
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
return
rc = self._run_sql_query_token("locks_all_duration.sql", duration=duration)
if rc != 0:
print(f"\n {RED}locks_all_duration failed (exit {rc}).{RESET}")
if return_to == "query":
self.queryPrompt()
else:
self.commandPrompt()
def run_probe_pid(self, pid: str, start: str) -> int:
"""Run ``probe_pid.sql``, per-table SQL, and log tracking excerpt."""
rc = self._run_sql_query_token("probe_pid.sql", pid=pid, start=start)
if rc != 0:
return rc
end_time = self._probe_end_time(pid, start)
self.print_probe_pid_server_error(pid, start, end_time=end_time)
rc = self._run_sql_query_token(
"probe_pid_tables.sql", pid=pid, start=start
)
if rc != 0:
return rc
self.print_probe_pid_log_tracking(pid, start, end_time=end_time)
return 0
def _probe_end_time(self, pid: str, start: str) -> Optional[str]:
try:
conn = sqlite3.connect(self.databaseFile)
try:
row = conn.execute(
"SELECT endTime FROM process WHERE pid = ? AND startTime = ?",
(str(pid), start),
).fetchone()
return row[0] if row else None
finally:
conn.close()
except sqlite3.Error:
return None
def print_probe_pid_server_error(
self,
pid: str,
start: str,
*,
end_time: Optional[str] = None,
) -> None:
"""Print ``Perforce server error`` log lines for a probed command, if any."""
log_path = os.path.abspath(self.logFile)
if not os.path.isfile(log_path):
return
if end_time is None:
end_time = self._probe_end_time(pid, start)
error_lines = extract_pid_server_error_from_log(
log_path, pid, start, end_time=end_time
)
if not error_lines:
return
print("", flush=True)
print("=== Server error (from log) ===", flush=True)
print("", flush=True)
for ln in error_lines:
print(ln, flush=True)
def listFiles(self, path):
process = s.Popen(['find', '.', '-maxdepth', '1', '-type', 'f', '-printf', '%f\n'], stdout=s.PIPE)
files, err = process.communicate()
#decode bytes to python string
files = files.decode('utf-8')
# convert to python list
files = sorted(files.split('\n'))
logs = []
counter = 0
for f in files:
if f == "":
continue
f.lstrip('./')
if os.path.isfile(f) and is_p4d_log(f):
logs.append(f)
for log in logs:
if counter == 0:
print(f"\n ({str(counter+1)}) {log}")
else:
print(f" ({str(counter+1)}) {log}")
counter = counter + 1
return logs
def get_column_widths(self, db_file, sql_query):
rows = self._fetch_sqlite_csv_rows(db_file, sql_query)
if not rows:
return []
return self._column_widths_from_csv_rows(rows)
def _fetch_sqlite_csv_rows(self, db_file: str, sql_query: str) -> List[List[str]]:
if not sql_query.strip().endswith(";"):
sql_query += ";"
proc = s.run(
["sqlite3", db_file, "-header", "-csv", sql_query],
stdout=s.PIPE,
stderr=s.PIPE,
text=True,
)
if proc.returncode != 0:
raise RuntimeError(f"SQLite error: {proc.stderr}")
return list(csv.reader(io.StringIO(proc.stdout)))
def _column_widths_from_csv_rows(self, rows: List[List[str]]) -> List[int]:
if not rows:
return []
ncols = max(len(row) for row in rows)
widths = [0] * ncols
for row in rows:
for i, cell in enumerate(row):
widths[i] = max(widths[i], len(str(cell)))
mins = {"user": 16, "command": 48}
for i, name in enumerate(rows[0]):
if i < len(widths):
widths[i] = max(widths[i], mins.get(str(name).lower(), 0))
return widths
def run_sqlite_query_auto_width(self, db_file, sql_query, indent=False):
if not sql_query.strip().endswith(";"):
sql_query += ";"
try:
rows = self._fetch_sqlite_csv_rows(db_file, sql_query)
except RuntimeError as e:
print(str(e))
return
if not rows:
return
widths = self._column_widths_from_csv_rows(rows)
self._print_sqlite_csv_rows(rows, widths, sys.stdout, indent=indent)
def _print_sqlite_csv_rows(
self,
rows: List[List[str]],
widths: List[int],
out,
*,
indent: bool = False,
) -> None:
header_row = rows[0]
padded_header = [
str(cell).ljust(widths[i]) for i, cell in enumerate(header_row)
]
header_line = " ".join(padded_header)
if indent:
header_line = f"\t{header_line}"
print(header_line, file=out)
underline_line = " ".join("-" * widths[i] for i in range(len(widths)))
if indent:
underline_line = f"\t{underline_line}"
print(underline_line, file=out)
for row in rows[1:]:
padded_row = [str(cell).ljust(widths[i]) for i, cell in enumerate(row)]
line = " ".join(padded_row)
if indent:
line = f"\t{line}"
print(line, file=out)
def _run_sqlite_query_to_stream(
self,
db_file: str,
sql_query: str,
out,
*,
indent: bool = False,
) -> int:
"""Run one SQL query and write an auto-width table to ``out``. Returns exit code."""
if not sql_query.strip().endswith(";"):
sql_query += ";"
try:
rows = self._fetch_sqlite_csv_rows(db_file, sql_query)
except RuntimeError as e:
print(str(e), file=sys.stderr)
return 1
if rows:
widths = self._column_widths_from_csv_rows(rows)
self._print_sqlite_csv_rows(rows, widths, out, indent=indent)
return 0
def _run_sql_read_script(self, file_body: str, pager: Optional[List[str]] = None) -> int:
"""Run a ``.print`` / SELECT script with auto-width columns (no sqlite3 truncation)."""
use_pager = _effective_sql_pager(pager)
buf = io.StringIO() if use_pager else None
out = buf if buf is not None else sys.stdout
for kind, payload in iter_sql_read_segments(file_body):
if kind == "print":
print(payload, file=out)
else:
rc = self._run_sqlite_query_to_stream(
self.databaseFile, payload, out
)
if rc != 0:
return rc
print(file=out)
if use_pager and buf is not None:
proc = s.Popen(use_pager, stdin=s.PIPE)
proc.communicate(buf.getvalue().encode())
return proc.returncode or 0
return 0
def _run_sql_read_script_to_string(self, file_body: str) -> str:
"""Run a ``.print`` / SELECT script; return the same text the CLI would print."""
buf = io.StringIO()
for kind, payload in iter_sql_read_segments(file_body):
if kind == "print":
print(payload, file=buf)
else:
rc = self._run_sqlite_query_to_stream(
self.databaseFile, payload, buf
)
if rc != 0:
raise RuntimeError(f"Query failed in summary SQL script")
print(file=buf)
return buf.getvalue()
def parse_query_parts(self, query: str, valid_columns: set) -> Dict:
parts = query.strip().split()
if not parts:
print("Query is empty.")
return
query_type = parts[0].lower()
if query_type not in {"ww", "wh", "rw", "rh"}:
print (f"Unknown query type: {query_type}")
return
order_by_field_map = {
"ww": "totalWriteWait AS \'wait(ms)\'",
"wh": "totalWriteHeld AS \'held(ms)\'",
"rw": "totalReadWait AS \'wait(ms)\'",
"rh": "totalReadHeld AS \'held(ms)\'",
}
order_by_field = order_by_field_map[query_type]
table_filter = ""
date_filter = ""
extra_columns = ""
limit_value = 25
error_code=""
idx = 1
table_name = None
start_datetime = None
end_datetime = None
# 1. Optional table name
if idx < len(parts) and not re.match(r'^\d{4}/\d{2}/\d{2}', parts[idx]) and not parts[idx].isdigit() and "," not in parts[idx]:
table_name = parts[idx]
# Strip 'db.' prefix if present
if table_name not in 'rdb.lbr':
if '.' in table_name:
table_name = table_name.split('.')[-1]
table_filter = f"AND tablename = '{table_name}'"
idx += 1
# 2. Optional start and end datetime
if idx + 3 < len(parts):
if re.match(r'^\d{4}/\d{2}/\d{2}', parts[idx]) and re.match(r'^\d{2}:\d{2}:\d{2}', parts[idx+1]) and \
re.match(r'^\d{4}/\d{2}/\d{2}', parts[idx+2]) and re.match(r'^\d{2}:\d{2}:\d{2}', parts[idx+3]):
start_datetime = f"{parts[idx]} {parts[idx+1]}"
end_datetime = f"{parts[idx+2]} {parts[idx+3]}"
date_filter = f"AND startTime >= '{start_datetime}' AND startTime <= '{end_datetime}'"
idx += 4
elif re.match(r'^\d{4}/\d{2}/\d{2}', parts[idx]):
error_code = f"Incomplete datetime provided. Both start and end datetime must be specified."
# 3. Optional limit
if idx < len(parts) and parts[idx].isdigit():
limit_value = int(parts[idx])
idx += 1
# 4. Remaining parts are extra columns
if idx < len(parts):
columns_str = " ".join(parts[idx:]).replace(" ", "")
if columns_str:
extra_columns_list = columns_str.split(',')
invalid_columns = [col for col in extra_columns_list if col not in valid_columns]
if invalid_columns:
error_code = f"Invalid column(s): {', '.join(invalid_columns)}"
extra_columns = ", " + ", ".join(extra_columns_list)
# If date filter exists, ignore limit
if date_filter:
limit_value = None
return {
"cmd": query_type,
"order_by_field": order_by_field,
"table_filter": table_filter,
"date_filter": date_filter,
"extra_columns": extra_columns,
"limit_value": limit_value,
"error_code": error_code
}
def build_sql_query(self, parsed: dict) -> str:
excluded_tables = [
'clients', 'clientEntity', 'change',
'storageup_R', 'storagemasterup_R', 'pull'
]
sql_query = (
f"SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS \'lapse(s)\'"
f"{parsed['extra_columns']}, "
f"{parsed['order_by_field']} "
f", tableName AS \'table\', startTime, endTime "
f"FROM tableUse JOIN process USING (processKey) "
f"WHERE 1=1 "
f"{parsed['table_filter']} "
f"{parsed['date_filter']} "
)
# Exclude system tables unless user specifies a table
if not parsed['table_filter']:
excluded_list = ", ".join(repr(t) for t in excluded_tables)
sql_query += f"AND tablename NOT IN ({excluded_list}) "
# Add minimum hold/wait thresholds if table and date are filtered
min_time_fields = {
'wh': 'totalWriteHeld',
'ww': 'totalWriteHeld',
'rh': 'totalReadHeld',
'rw': 'totalReadWait'
}
min_field = min_time_fields.get(parsed['cmd'])
if parsed['table_filter'] and parsed['date_filter']:
if min_field:
sql_query += f"AND {min_field} > {self.min_hold_time} "
# Order and limit
if parsed['date_filter']:
if min_field:
sql_query += f"AND {min_field} > {self.min_hold_time} "
sql_query += "ORDER BY startTime"
else:
order_field = parsed['order_by_field'].split(' AS ')[0].strip()
sql_query += f" ORDER BY {order_field} DESC"
if parsed['limit_value'] is not None:
sql_query += f" LIMIT {parsed['limit_value']}"
return sql_query
def query(self, query: str):
query_type = self.detect_query_type(query)
handler = {
'select': self.handle_select_query,
'sql_file': self.handle_sql_file_query,
'shortcut': self.handle_shortcut_query
}.get(query_type)
if not query.strip():
print("No query provided. Please enter a valid query.")
self.queryPrompt()
return
first_word = query.split()[0].lower() if query.strip() else ""
if first_word == 'pid':
self.probe_pid_interactive(return_to='query')
return
if query.strip() == '.schema':
self.schema_tables_pretty()
self.queryPrompt()
return
if query_type == 'shortcut':
valid_queries = ['wh', 'ww', 'rh', 'rw', '.schema']
if first_word not in valid_queries:
print(f"\n {RED}{query} not a valid command{RESET}")
self.query_menu()
self.queryPrompt()
return
if handler:
handler(query)
else:
print("Unsupported query type.")
self.queryPrompt()
def detect_query_type(self, query: str) -> str:
if re.search(r'\bselect\b', query, re.IGNORECASE):
return 'select'
if os.path.splitext(query)[1].lower() == '.sql':
return 'sql_file'
return 'shortcut'
def handle_select_query(self, query: str):
cleaned_query = query.rstrip(';')
s.run(["sqlite3", self.databaseFile, "-header", "-column", cleaned_query])
self.queryPrompt()
def handle_sql_file_query(self, query: str):
parts = query.split(None, 1)
sql_token = parts[0]
extra = parts[1].strip() if len(parts) > 1 else None
base = normalize_sql_basename(sql_token)
if base in {"probe_pid.sql", "probe_pid_min.sql", "probe_pid_tables.sql"}:
self.probe_pid_interactive(return_to="query")
return
if base == "locks_held_total.sql":
if extra:
table = normalize_table_parameter(extra.split()[0])
err = validate_table_parameter(table)
if err:
print(f"\n {RED}{err}.{RESET}")
self.queryPrompt()
return
rc = self._run_sql_query_token(sql_token, table=table)
else:
self.locks_held_total_interactive(return_to="query")
return
if rc == 2:
print(f"\n {RED}SQL file not found: {query}{RESET}")
self.queryPrompt()
return
if base == "locks_all_duration.sql":
if extra:
duration = extra.split()[0]
err = validate_duration_parameter(duration)
if err:
print(f"\n {RED}{err}.{RESET}")
self.queryPrompt()
return
rc = self._run_sql_query_token(sql_token, duration=duration)
else:
self.locks_all_duration_interactive(return_to="query")
return
if rc == 2:
print(f"\n {RED}SQL file not found: {query}{RESET}")
self.queryPrompt()
return
path, body = resolve_sql_query(sql_token)
file_body = ""
if path is not None:
try:
with open(path, encoding="utf-8") as fh:
file_body = fh.read()
except OSError:
pass
elif body is not None:
file_body = body
if file_body and sql_file_requires_table(file_body):
if extra:
table = normalize_table_parameter(extra.split()[0])
err = validate_table_parameter(table)
if err:
print(f"\n {RED}{err}.{RESET}")
self.queryPrompt()
return
rc = self._run_sql_query_token(sql_token, table=table)
else:
self.sql_required_table_interactive(sql_token, return_to="query")
return
if rc == 2:
print(f"\n {RED}SQL file not found: {query}{RESET}")
self.queryPrompt()
return
rc = self._run_sql_query_token(query)
if rc == 2:
print(f"\n {RED}SQL file not found: {query}{RESET}")
self.queryPrompt()
def _run_sql_query_token(
self,
query: str,
*,
pid: Optional[str] = None,
start: Optional[str] = None,
table: Optional[str] = None,
duration: Optional[str] = None,
) -> int:
"""Run a library ``NAME.sql`` (disk or built-in). Returns exit code (0 = ok)."""
path, body = resolve_sql_query(query)
if path is not None:
try:
with open(path, encoding="utf-8") as fh:
file_body = fh.read()
except OSError as e:
print(f"Could not read SQL file {path}: {e}", file=sys.stderr)
return 2
return self._run_sql_body(
file_body,
display_name=os.path.basename(path),
read_path=os.path.abspath(path),
pid=pid,
start=start,
table=table,
duration=duration,
)
if body is not None:
return self._run_sql_body(
body,
display_name=normalize_sql_basename(query),
pid=pid,
start=start,
table=table,
duration=duration,
)
return 2
def _run_sql_body(
self,
file_body: str,
*,
display_name: str,
read_path: Optional[str] = None,
pid: Optional[str] = None,
start: Optional[str] = None,
table: Optional[str] = None,
duration: Optional[str] = None,
) -> int:
"""Run SQL script text against ``self.databaseFile``. Returns exit code (0 = ok)."""
if sql_file_requires_pid_start(file_body):
if pid is None or start is None:
print(
"p4diag: "
f"{display_name} requires PID and startTime:\n"
f" p4diag {display_name} LOG PID 'YYYY/MM/DD HH:MM:SS'",
file=sys.stderr,
)
return 2
if sql_file_requires_table(file_body):
if table is None:
print(
"p4diag: "
f"{display_name} requires a table name:\n"
f" p4diag {display_name} LOG TABLE",
file=sys.stderr,
)
return 2
err = validate_table_parameter(table)
if err:
print(f"p4diag: {err}", file=sys.stderr)
return 2
elif sql_file_uses_table_parameter(file_body) and table is not None:
err = validate_table_parameter(table)
if err:
print(f"p4diag: {err}", file=sys.stderr)
return 2
if sql_file_requires_duration(file_body):
if duration is None:
print(
"p4diag: "
f"{display_name} requires a duration threshold in milliseconds:\n"
f" p4diag {display_name} LOG MILLISECONDS",
file=sys.stderr,
)
return 2
err = validate_duration_parameter(duration)
if err:
print(f"p4diag: {err}", file=sys.stderr)
return 2
pager = sql_file_pager_from_header(file_body)
if pager is None and display_name in _SQL_FILES_AUTO_PAGER:
pager = ["less"]
sql = strip_sql_documentation_header(file_body).strip()
if not sql:
print(f"SQL file is empty: {display_name}", file=sys.stderr)
return 2
if re.search(r"^\s*\.", sql, re.MULTILINE):
if (
re.search(r"^\s*\.width\b", file_body, re.MULTILINE | re.IGNORECASE)
or sql_file_requires_pid_start(file_body)
or sql_file_uses_table_parameter(file_body)
or sql_file_requires_duration(file_body)
):
prepared = file_body
else:
prepared = prepare_sql_read_script(file_body)
tmp_path = None
try:
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".sql",
delete=False,
encoding="utf-8",
) as tf:
tf.write(prepared)
tmp_path = tf.name
argv = sqlite3_cli_argv_for_read_script(file_body)
if sql_file_uses_table_parameter(file_body):
argv.extend(
sqlite3_table_parameter_argv(
table_parameter_bind_value(table)
)
)
elif duration is not None:
argv.extend(sqlite3_duration_parameter_argv(duration))
elif pid is not None and start is not None:
argv.extend(sqlite3_pid_start_parameter_argv(pid, start))
argv.extend([self.databaseFile, f".read {tmp_path}"])
return run_sqlite3_with_optional_pager(argv, pager)
finally:
if tmp_path:
try:
os.unlink(tmp_path)
except OSError:
pass
if not sql.rstrip().endswith(";"):
sql += ";"
argv = ["sqlite3", "-header", "-column"]
if sql_file_uses_table_parameter(file_body):
argv.extend(
sqlite3_table_parameter_argv(table_parameter_bind_value(table))
)
elif duration is not None:
argv.extend(sqlite3_duration_parameter_argv(duration))
elif pid is not None and start is not None:
argv.extend(sqlite3_pid_start_parameter_argv(pid, start))
argv.extend([self.databaseFile, sql])
if pager:
return run_sqlite3_with_optional_pager(argv, pager)
if (
(pid is not None and start is not None)
or sql_file_uses_table_parameter(file_body)
or duration is not None
):
return s.call(argv)
self.run_sqlite_query_auto_width(self.databaseFile, sql)
return 0
def handle_shortcut_query(self, query: str):
parsed = self.parse_query_parts(query, self.VALID_COLUMNS)
if parsed['error_code']:
print(f"\n {RED}{parsed['error_code']}{RESET}")
self.query_menu()
else:
if parsed['cmd'] in {'ww', 'wh', 'rw', 'rh'}:
self.print_shortcut_query_info(parsed)
sql_query = self.build_sql_query(parsed)
if parsed['cmd'] not in {'ww', 'wh', 'rw', 'rh'}:
print(f"\n{sql_query}\n")
self.run_sqlite_query_auto_width(self.databaseFile, sql_query)
self.queryPrompt()
def execute_sql_file_noninteractive(
self,
sql_path: str,
*,
pid: Optional[str] = None,
start: Optional[str] = None,
table: Optional[str] = None,
) -> None:
"""Run a ``.sql`` file against ``self.databaseFile``; results go to stdout.
Absolute ``sql_path`` selects that file. A non-absolute name uses
``P4DIAG_SQL_QUERIES/<basename>`` (adds ``.sql`` when missing).
``probe_pid*.sql`` scripts require ``pid`` and ``start`` (``@pid`` / ``@start``).
``probe_pid.sql`` always includes the per-table breakdown and log tracking.
``locks_held_total.sql`` accepts an optional ``table`` (``@table``) as the third
positional argument; omit it for all tables.
``locks_table_by_cmd.sql`` requires ``table`` (``@table``); pass it as the third
positional argument.
``locks_all_duration.sql`` requires ``duration`` (``@duration`` ms); pass it as
the third positional argument.
"""
base = normalize_sql_basename(sql_path)
if base == "probe_pid.sql" and pid and start:
rc = self.run_probe_pid(pid, start)
if rc == 2:
sys.exit(2)
if rc != 0:
sys.exit(rc)
return
bind_table = table
bind_duration = None
if bind_table is None and bind_duration is None and pid and start is None:
path, body = resolve_sql_query(sql_path)
file_body = ""
if path is not None:
try:
with open(path, encoding="utf-8") as fh:
file_body = fh.read()
except OSError:
pass
elif body is not None:
file_body = body
if sql_file_requires_duration(
file_body
) and not sql_file_requires_pid_start(file_body):
bind_duration = pid
pid = None
elif sql_file_uses_table_parameter(
file_body
) and not sql_file_requires_pid_start(file_body):
bind_table = normalize_table_parameter(pid)
pid = None
rc = self._run_sql_query_token(
sql_path,
pid=pid,
start=start,
table=bind_table,
duration=bind_duration,
)
if rc == 2:
raw = sql_path.strip()
tried = (
os.path.abspath(raw)
if os.path.isabs(raw)
else os.path.join(p4diag_sql_queries_dir(), normalize_sql_basename(raw))
)
file_body = ""
if os.path.isfile(tried):
try:
file_body = open(tried, encoding="utf-8").read()
except OSError:
pass
needs_pid = sql_file_requires_pid_start(file_body) and (
pid is None or start is None
)
needs_table = sql_file_requires_table(file_body) and bind_table is None
needs_duration = (
sql_file_requires_duration(file_body) and bind_duration is None
)
if not (needs_pid or needs_table or needs_duration):
print(f"SQL file not found: {tried}", file=sys.stderr)
sys.exit(2)
if rc != 0:
sys.exit(rc)
if base == "probe_pid_min.sql" and pid and start:
self.print_probe_pid_log_tracking(pid, start)
def print_probe_pid_log_tracking(
self,
pid: str,
start: str,
*,
end_time: Optional[str] = None,
) -> None:
"""Print raw ``---`` tracking lines from the server log for a probed command."""
log_path = os.path.abspath(self.logFile)
if not os.path.isfile(log_path):
print(
f"\n(Log file not found for tracking excerpt: {log_path})",
file=sys.stderr,
)
return
if end_time is None:
end_time = self._probe_end_time(pid, start)
if not QUIET and getattr(sys.stderr, "isatty", lambda: False)():
print(
"Scanning server log for tracking data (large logs may take a moment)...",
file=sys.stderr,
flush=True,
)
cmd_line, tracking = extract_pid_tracking_from_log(
log_path, pid, start, end_time=end_time
)
print("")
print("=== Log tracking (from server log) ===")
print("")
if cmd_line and not any(
ln == cmd_line or ln.endswith(cmd_line) for ln in tracking[:3]
):
print(cmd_line)
if not tracking:
print(
f"(no tracking lines found for pid {pid} at {start} in {os.path.basename(log_path)})"
)
return
for ln in tracking:
print(ln)
def schema_tables_pretty(self) -> None:
"""Print readable column lists (with log2sql DDL comments) for trace tables."""
conn = sqlite3.connect(self.databaseFile)
try:
master = conn.execute(
"SELECT name, sql FROM sqlite_master "
"WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name"
).fetchall()
if not master:
print("(no tables in database)")
return
print(
f"\n{BOLD}log2sql trace database:{RESET} "
f"{os.path.basename(self.databaseFile)}\n"
)
for tbl, create_sql in master:
comments = _column_comments_from_create_sql(create_sql)
rows = conn.execute(f'PRAGMA table_info("{tbl}")').fetchall()
if not rows:
print(f"-- {tbl}\n (table missing or empty)\n")
continue
display = []
for _cid, col, typ, notnull, _dflt, pk in rows:
flags = []
if pk:
flags.append("PK")
if notnull:
flags.append("NOT NULL")
desc = comments.get(col, "")
if len(desc) > 60:
desc = desc[:57] + "..."
display.append(
(col, typ, ", ".join(flags) if flags else "-", desc)
)
print(f"-- {tbl}")
print(
tabulate(
display,
headers=["column", "type", "flags", "description"],
tablefmt="simple",
)
)
print()
finally:
conn.close()
def print_shortcut_query_info(self, parsed: dict):
label = _LOCK_CONTENTION_SHORTCUT_LABELS.get(parsed["cmd"], "Lock contention")
if not parsed['table_filter'] and not parsed['date_filter']:
print(f"\n{label}")
elif parsed['table_filter'] and parsed['date_filter']:
table_name = re.search(r"tablename = '([^']+)'", parsed['table_filter'])
table_name = table_name.group(1) if table_name else "Unknown"
date_range = re.findall(r"\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}", parsed['date_filter'])
if len(date_range) == 2:
date_str = f"between {date_range[0]} and {date_range[1]}"
else:
date_str = "Unknown date range"
print(f"\n{label} ({table_name}, {date_str})")
def usage_log2sql():
print("")
print("The \033[1mquery\033[0m option provides a list of canned queries to run. You can also enter custom")
print("SQL queries, providing SQL directly at the query prompt or by passing a *.sql filename")
print("containing SQL queries.")
print("")
print("The \033[1msummary\033[0m option runs a series of SQL queries against the current trace database.")
def exitP4():
sys.exit(0)
def validateInput(inputNum, listLen):
if inputNum < 0 or inputNum >= listLen:
print(f"\nPlease enter a number between 1 and {listLen}")
return False
return True
def _column_comments_from_create_sql(create_sql: Optional[str]) -> Dict[str, str]:
"""Map column name → trailing ``--`` comment from log2sql ``CREATE TABLE`` DDL."""
if not create_sql:
return {}
comments: Dict[str, str] = {}
for line in create_sql.splitlines():
line = line.strip().rstrip(",")
if not line or line.upper().startswith("CREATE") or line.upper().startswith("PRIMARY"):
continue
m = re.match(r"(\w+)\s+.+?--\s*(.+)$", line)
if m:
comments[m.group(1)] = m.group(2).strip()
return comments
def list_sql_library_files(directory: str) -> List[str]:
"""Sorted ``*.sql`` basenames under ``directory`` (empty if missing or none)."""
if not os.path.isdir(directory):
return []
return sorted(
name
for name in os.listdir(directory)
if name.endswith(".sql") and os.path.isfile(os.path.join(directory, name))
)
def print_help_query_table(
rows: List[Tuple[str, str]],
*,
indent: str = " ",
col1: str = "Query",
col2: str = "Best for",
) -> None:
"""Print a two-column help table with box-drawing borders."""
if not rows:
return
query_w = max(len(col1), *(len(query) for query, _ in rows))
best_w = max(len(col2), *(len(best_for) for _, best_for in rows))
def border(left: str, mid: str, right: str) -> str:
return (
indent
+ left
+ "─" * (query_w + 2)
+ mid
+ "─" * (best_w + 2)
+ right
)
def cell(text: str, width: int) -> str:
return f" {text}{' ' * (width - len(text))} "
def data_row(query: str, best_for: str) -> str:
return (
indent
+ "│"
+ cell(query, query_w)
+ "│"
+ cell(best_for, best_w)
+ "│"
)
print(border("┌", "┬", "┐"))
print(data_row(col1, col2))
print(border("├", "┼", "┤"))
for query, best_for in rows:
print(data_row(query, best_for))
print(border("└", "┴", "┘"))
def print_sql_library_help(queries: List[str]) -> None:
"""Help tables for shipped SQL library reports (lock section + other reports)."""
query_set = set(queries)
lock_rows = [
(name, sql_library_summary(name))
for name in _LOCK_QUERY_HELP_ORDER
if name in query_set
]
if lock_rows:
print(f" {BOLD}Lock contention queries:{RESET}\n")
print_help_query_table(lock_rows)
print()
other_names = sorted(
name for name in queries if name not in _LOCK_QUERY_HELP_ORDER
)
if other_names:
other_rows = [(name, sql_library_summary(name)) for name in other_names]
print(f" {BOLD}Other reports:{RESET}\n")
print_help_query_table(other_rows)
print()
def print_numbered_list(items: List[str]) -> None:
for i, item in enumerate(items, 1):
print(f" ({i}) {item}")
def print_numbered_sql_library(queries: List[str]) -> None:
"""Numbered SQL library list with a one-line purpose per report."""
for i, name in enumerate(queries, 1):
print(f" ({i}) {name}")
print(f" {sql_library_summary(name)}")
def listFiles(path):
# List files at a provided path (trim log picker). Used by 'ls' and legacy callers.
if not os.path.isdir(path):
print(f" {YELLOW}Directory not found:{RESET} {path}")
return []
names = sorted(
name
for name in os.listdir(path)
if os.path.isfile(os.path.join(path, name))
)
print_numbered_list(names)
return names
def is_p4d_log(logfile, max_lines=2000):
pattern = re.compile(r"20.* pid ")
def read_lines(file_obj):
for i, line in enumerate(file_obj):
if i >= max_lines:
break
if pattern.search(line):
return True
return False
open_func = gzip.open if logfile.endswith('.gz') else open
mode = 'rt' if logfile.endswith('.gz') else 'r'
try:
with open_func(logfile, mode, errors='ignore') as f:
return read_lines(f)
except Exception as e:
return False
def p4diag_summary_html_basename(log_token: str) -> str:
"""Basename of ``LOG.summary.html`` under ``.p4diagnostics`` for a log path or basename."""
base = os.path.basename((log_token or "").rstrip("/"))
if base.lower().endswith(".db"):
base = base[:-3]
return base + ".summary.html"
def _resolve_summary_html_index(diag_dir: str, log_token: Optional[str] = None) -> Optional[str]:
"""Basename of the summary HTML to serve at ``/`` (existing file under ``diag_dir``)."""
if log_token:
candidate = p4diag_summary_html_basename(log_token)
if os.path.isfile(os.path.join(diag_dir, candidate)):
return candidate
newest_path: Optional[str] = None
newest_mtime = -1.0
try:
names = os.listdir(diag_dir)
except OSError:
return None
for name in names:
if not name.endswith(".summary.html"):
continue
path = os.path.join(diag_dir, name)
if not os.path.isfile(path):
continue
mtime = os.path.getmtime(path)
if mtime > newest_mtime:
newest_mtime = mtime
newest_path = name
return newest_path
class P4diagHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
"""Serve ``.p4diagnostics``; ``/`` shows ``LOG.summary.html`` when available."""
log_token: Optional[str] = None
def __init__(self, *args, directory=None, log_token=None, **kwargs):
self.log_token = log_token
super().__init__(*args, directory=directory, **kwargs)
def log_message(self, format, *args):
pass
def _index_document(self) -> Optional[str]:
return _resolve_summary_html_index(self.directory, self.log_token)
def list_directory(self, path):
self.send_error(
404,
"No summary report yet. Run: p4diag summary LOG (or use the interactive log menu).",
)
return None
def do_GET(self):
parsed = urlparse(self.path)
req_path = unquote(parsed.path)
if req_path in ("", "/"):
index_doc = self._index_document()
if index_doc:
self.path = "/" + index_doc
return super().do_GET()
class ReusableTCPServer(socketserver.TCPServer):
allow_reuse_address = True
def is_port_in_use(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
return sock.connect_ex(('0.0.0.0', port)) == 0
def _web_server_reachable_ipv4() -> Optional[str]:
"""IPv4 on the default-route interface (VPN/LAN), for browser on another host."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.connect(("8.8.8.8", 80))
return sock.getsockname()[0]
except OSError:
return None
def _web_server_display_url(port: int) -> str:
"""URL shown at startup; bind is still on all interfaces (``""``)."""
host = _web_server_reachable_ipv4() or "127.0.0.1"
return f"http://{host}:{port}/"
def _print_web_server_urls(port: int, *, already_running: bool, detail: str = "") -> None:
url = _web_server_display_url(port)
prefix = "Web server already running" if already_running else "Starting web server on"
line = f"{prefix} {url}"
if detail:
line += f" {detail}"
print(line)
lan_ip = _web_server_reachable_ipv4()
if lan_ip:
print(f" Browser on this host only: http://127.0.0.1:{port}/")
def start_web_server(path, port, log_token: Optional[str] = None):
if is_port_in_use(port):
_print_web_server_urls(port, already_running=True)
return
def run_server():
handler = partial(
P4diagHTTPRequestHandler,
directory=path,
log_token=log_token,
)
index_document = _resolve_summary_html_index(path, log_token)
if index_document:
detail = f"({index_document})"
else:
detail = "(summary HTML appears here after log2sql summary)"
_print_web_server_urls(port, already_running=False, detail=detail)
with ReusableTCPServer(("", port), handler) as httpd:
httpd.serve_forever()
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
def _preflight_log2sql_log_exists(file, start, end) -> None:
"""Exit with code 2 if the log path for ``log2sql`` is not usable. Does not run ``trim_log`` (avoids
duplicate work). Call from ``main`` before starting the web UI so a missing log is reported without
printing \"Web server already running\".
"""
if start is not None and end is not None:
_p = os.path.abspath(file)
if not os.path.isfile(_p):
print(
f"p4diag: log file not found: {file!r} (resolved: {_p!r})",
file=sys.stderr,
)
sys.exit(2)
return
l = log2sql(file)
l.setLogFile(show_menu=False)
_log_in = os.path.abspath(l.logFile)
if not os.path.isfile(_log_in):
print(
f"p4diag: log file not found: {l.logFile!r} (resolved: {_log_in!r})",
file=sys.stderr,
)
sys.exit(2)
def handle_log2sql(file, start=None, end=None):
if (start is None) != (end is None):
print(
"p4diag: --start and --end must be provided together",
file=sys.stderr,
)
sys.exit(2)
global l
l = log2sql(file)
if start != None and end != None:
trimmed_log = l.trim_log(start,end)
l.logFile = l.databaseFile = trimmed_log
l.setLogFile(show_menu=False)
_log_in = os.path.abspath(l.logFile)
if not os.path.isfile(_log_in):
print(
f"p4diag: log file not found: {l.logFile!r} (resolved: {_log_in!r})",
file=sys.stderr,
)
sys.exit(2)
l.createLogStats()
l.createDatabase()
l.createLogSummary()
l.createPlots()
if not QUIET:
l.menu()
def parse_datetime(dt_str):
"""Parse and validate date string in YYYY/MM/DD HH:MM:SS format."""
try:
datetime.strptime(dt_str, "%Y/%m/%d %H:%M:%S")
return dt_str
except ValueError:
raise argparse.ArgumentTypeError(f"Invalid date format: '{dt_str}'. Use YYYY/MM/DD HH:MM:SS")
# --- Log statistics cache (keyed on source P4 log path + mtime + size) ---
# Bump when log-stats grep logic changes so ``.stats.meta.json`` is invalidated.
LOG_STATS_CACHE_VERSION = 2
SUMMARY_CACHE_VERSION = 3
def log_stats_meta_path(stats_txt_path: str) -> str:
"""Sidecar JSON next to ``*.stats.txt`` (e.g. ``log.stats.meta.json``)."""
if stats_txt_path.endswith(".stats.txt"):
return stats_txt_path[: -len(".stats.txt")] + ".stats.meta.json"
return stats_txt_path + ".stats.meta.json"
def _log_source_signature(log_path: str) -> Optional[Tuple[str, float, int]]:
if not log_path or not os.path.isfile(log_path):
return None
ab = os.path.abspath(log_path)
st = os.stat(ab)
return (ab, st.st_mtime, st.st_size)
def _log_stats_meta_read(meta_path: str) -> Optional[Dict[str, Any]]:
try:
with open(meta_path, "r", encoding="utf-8") as fh:
return json.load(fh)
except (OSError, json.JSONDecodeError, TypeError):
return None
def _log_stats_meta_write(meta_path: str, sig: Tuple[str, float, int]) -> None:
payload = {
"version": LOG_STATS_CACHE_VERSION,
"log_path": sig[0],
"log_mtime": sig[1],
"log_size": sig[2],
}
try:
with open(meta_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, indent=2)
except OSError:
pass
def log_stats_cache_is_current(
log_path: str, stats_txt_path: str, meta_path: str
) -> bool:
"""True if ``stats_txt_path`` matches the current signature of ``log_path``."""
if not os.path.isfile(stats_txt_path) or not os.path.isfile(meta_path):
return False
cur = _log_source_signature(log_path)
if cur is None:
return False
meta = _log_stats_meta_read(meta_path)
if not meta or meta.get("version") != LOG_STATS_CACHE_VERSION:
return False
if meta.get("log_path") != cur[0]:
return False
if meta.get("log_mtime") != cur[1] or meta.get("log_size") != cur[2]:
return False
return True
def summary_meta_path(summary_txt_path: str) -> str:
"""Sidecar JSON next to ``*.summary.txt`` (e.g. ``log.summary.meta.json``)."""
if summary_txt_path.endswith(".summary.txt"):
return summary_txt_path[: -len(".summary.txt")] + ".summary.meta.json"
return summary_txt_path + ".summary.meta.json"
def _summary_meta_read(meta_path: str) -> Optional[Dict[str, Any]]:
try:
with open(meta_path, "r", encoding="utf-8") as fh:
return json.load(fh)
except (OSError, json.JSONDecodeError, TypeError):
return None
def _summary_meta_write(meta_path: str, payload: Dict[str, Any]) -> None:
try:
with open(meta_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, indent=2)
except OSError:
pass
def _summary_inputs_signature(
log_path: str,
db_path: str,
stats_path: str,
) -> Optional[Dict[str, Any]]:
"""JSON-serializable signatures for summary cache invalidation."""
log_sig = _log_source_signature(log_path)
db_sig = _log_source_signature(db_path)
stats_sig = _log_source_signature(stats_path)
if log_sig is None or db_sig is None:
return None
out: Dict[str, Any] = {
"log_path": log_sig[0],
"log_mtime": log_sig[1],
"log_size": log_sig[2],
"db_path": db_sig[0],
"db_mtime": db_sig[1],
"db_size": db_sig[2],
}
if stats_sig is not None:
out["stats_path"] = stats_sig[0]
out["stats_mtime"] = stats_sig[1]
out["stats_size"] = stats_sig[2]
else:
out["stats_path"] = None
return out
def summary_cache_is_current(
log_path: str,
db_path: str,
stats_path: str,
summary_txt_path: str,
summary_html_path: str,
meta_path: str,
) -> bool:
"""True if text/HTML summary reports match current log, DB, and stats inputs."""
if not (
os.path.isfile(summary_txt_path)
and os.path.isfile(summary_html_path)
and os.path.isfile(meta_path)
):
return False
if _summary_html_needs_regen(summary_html_path):
return False
cur = _summary_inputs_signature(log_path, db_path, stats_path)
if cur is None:
return False
meta = _summary_meta_read(meta_path)
if not meta or meta.get("version") != SUMMARY_CACHE_VERSION:
return False
if meta.get("layout_version") != SUMMARY_HTML_LAYOUT_VERSION:
return False
if meta.get("recommendations_sig") != _recommendations_rules_signature():
return False
for key in (
"log_path",
"log_mtime",
"log_size",
"db_path",
"db_mtime",
"db_size",
"stats_path",
"stats_mtime",
"stats_size",
):
if meta.get(key) != cur.get(key):
return False
return True
def quiet_list_sql_queries() -> None:
"""Print sorted built-in + disk ``*.sql`` basenames, one per line."""
names = list_sql_library_names()
if not names:
d = p4diag_sql_queries_dir()
print(f"No library queries (checked built-in and {d!r})", file=sys.stderr)
sys.exit(2)
for n in names:
print(n)
# --- Victim/culprit (aligned with p4sla.py write-wait analysis; sqlite3 only, no pandas) ---
VC_HOLD_LOOKBACK = timedelta(hours=4)
VC_TIME_MARGIN = timedelta(minutes=4)
# Pool: top N global rows by totalWriteWait; then up to M rows per tableName within that pool.
VC_VICTIM_GLOBAL_POOL = 50
VC_VICTIM_PER_TABLE = 5
VC_SQL = f"""
WITH base AS (
SELECT tableName, user, startTime, endTime, pid, cmd, args,
CAST(completedLapse AS INTEGER) AS lapse, totalWriteWait
FROM tableUse JOIN process USING (processKey)
WHERE tableName NOT IN ('clients', 'clientEntity', 'change', 'storageup_R', 'storagemasterup_R', 'pull')
AND totalWriteWait > 5000 AND completedLapse > 0
),
global_pool AS (
SELECT *, ROW_NUMBER() OVER (ORDER BY totalWriteWait DESC) AS rn_global
FROM base
),
top_global AS (
SELECT * FROM global_pool WHERE rn_global <= {VC_VICTIM_GLOBAL_POOL}
),
per_table AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY tableName ORDER BY totalWriteWait DESC) AS rn_table
FROM top_global
)
SELECT tableName, user, startTime, endTime, pid, cmd, args, lapse, totalWriteWait
FROM per_table
WHERE rn_table <= {VC_VICTIM_PER_TABLE}
ORDER BY totalWriteWait DESC
"""
# Bump when VC_SQL / panel shape changes so old .victim_culprit.pkl files are ignored.
VC_PANELS_CACHE_VERSION = 2
VC_PANELS_CACHE_SUFFIX = ".victim_culprit.pkl"
def _victim_culprit_cache_path(database_file: str) -> str:
ab = os.path.abspath(database_file)
d, base = os.path.split(ab)
stem = base[:-3] if base.lower().endswith(".db") else base
return os.path.join(d, stem + VC_PANELS_CACHE_SUFFIX)
def _victim_culprit_db_stat(db_path: str) -> Optional[Tuple[float, int]]:
try:
st = os.stat(db_path)
return (st.st_mtime, st.st_size)
except OSError:
return None
def _load_victim_culprit_panels_cache(database_file: str) -> Optional[list]:
ab = os.path.abspath(database_file)
stat = _victim_culprit_db_stat(ab)
if stat is None:
return None
cache_path = _victim_culprit_cache_path(ab)
if not os.path.isfile(cache_path):
return None
try:
with open(cache_path, "rb") as fh:
payload = pickle.load(fh)
except (OSError, pickle.UnpicklingError, EOFError, TypeError, AttributeError):
return None
if not isinstance(payload, dict):
return None
if payload.get("version") != VC_PANELS_CACHE_VERSION:
return None
if payload.get("db_path") != ab:
return None
if payload.get("db_mtime") != stat[0] or payload.get("db_size") != stat[1]:
return None
panels = payload.get("panels")
if not isinstance(panels, list):
return None
return panels
def _save_victim_culprit_panels_cache(database_file: str, panels: list) -> None:
ab = os.path.abspath(database_file)
stat = _victim_culprit_db_stat(ab)
if stat is None:
return
cache_path = _victim_culprit_cache_path(ab)
payload = {
"version": VC_PANELS_CACHE_VERSION,
"db_path": ab,
"db_mtime": stat[0],
"db_size": stat[1],
"panels": panels,
}
try:
with open(cache_path, "wb") as fh:
pickle.dump(payload, fh, protocol=pickle.HIGHEST_PROTOCOL)
except OSError:
pass
def get_victim_culprit_panels_cached(database_file: str) -> list:
"""Victim/culprit panels; recomputes only when the log2sql DB mtime/size changes."""
loaded = _load_victim_culprit_panels_cache(database_file)
if loaded is not None:
return loaded
panels = compute_victim_culprit_panels(database_file)
_save_victim_culprit_panels_cache(database_file, panels)
return panels
def _vc_sql_quote_ident(s: str) -> str:
return str(s).replace("'", "''")
def _vc_format_ms(val: Any) -> str:
try:
return f"{int(round(float(val))):,}"
except (TypeError, ValueError):
return str(val)
def _vc_row_to_dict(row: sqlite3.Row) -> Dict[str, Any]:
return {k: row[k] for k in row.keys()}
def _vc_fetchall_dicts(conn: sqlite3.Connection, sql: str) -> list:
cur = conn.execute(sql)
rows = cur.fetchall()
if not rows:
return []
return [_vc_row_to_dict(r) for r in rows]
def _vc_exclude_victim_pid(culprits: list, victim: Dict[str, Any]) -> list:
v_pid = victim.get("pid")
if v_pid is None:
return culprits
try:
v_n = int(float(v_pid))
except (TypeError, ValueError):
return culprits
out = []
for r in culprits:
try:
if int(float(r.get("pid"))) == v_n:
continue
except (TypeError, ValueError):
pass
out.append(r)
return out
def _vc_one_panel(conn: sqlite3.Connection, idx: int, row: Dict[str, Any]) -> Dict[str, Any]:
t_name = row.get("tableName") or row.get("tablename") or "UnknownTable"
v_wait = row.get("totalWriteWait", 0)
cmd = str(row.get("cmd") or "").strip() or "(unknown)"
args_s = str(row.get("args") or "").strip() or "(none)"
label = f"Victim: {cmd} ({_vc_format_ms(v_wait)}ms write wait on {t_name})"
code = (
f"PID: {row.get('pid', '')}\n"
f"Start: {row.get('startTime', '')}\n"
f"End: {row.get('endTime', '')}\n"
f"Cmd: {cmd}\n"
f"Args: {args_s}"
)
ks = str(idx)
tq = _vc_sql_quote_ident(t_name)
try:
start_dt = datetime.strptime(str(row["startTime"]).strip(), "%Y/%m/%d %H:%M:%S")
end_dt = datetime.strptime(str(row["endTime"]).strip(), "%Y/%m/%d %H:%M:%S")
hold_search_start = (start_dt - VC_HOLD_LOOKBACK).strftime("%Y/%m/%d %H:%M:%S")
adj_end = (end_dt + VC_TIME_MARGIN).strftime("%Y/%m/%d %H:%M:%S")
wait95 = round(float(v_wait) * 0.95)
except (ValueError, KeyError, TypeError):
return {
"type": "panel",
"label": label,
"code": code,
"tableName": t_name,
"victim_cmd": cmd,
"culprits": None,
"key_suffix": ks,
}
direct_sql = (
f"SELECT pid, user, cmd, "
f'totalReadHeld AS "totalReadHeld(ms)", totalWriteHeld AS "totalWriteHeld(ms)", '
f"tableName, startTime "
f"FROM tableUse JOIN process USING (processKey) WHERE tableName = '{tq}' "
f"AND startTime >= '{hold_search_start}' AND startTime <= '{adj_end}' "
f"AND (totalReadHeld > {wait95} OR totalWriteHeld > {wait95}) ORDER BY startTime"
)
culprits = _vc_fetchall_dicts(conn, direct_sql)
culprits = _vc_exclude_victim_pid(culprits, row)
if culprits:
return {
"type": "panel",
"label": label,
"code": code,
"tableName": t_name,
"victim_cmd": cmd,
"culprits": culprits,
"key_suffix": ks,
}
vst = _vc_sql_quote_ident(str(row["startTime"]).strip())
overlap_sql = (
f"SELECT pid, user, cmd, "
f'totalReadHeld AS "totalReadHeld(ms)", totalWriteHeld AS "totalWriteHeld(ms)", '
f"tableName, startTime "
f"FROM tableUse JOIN process USING (processKey) WHERE tableName = '{tq}' "
f"AND startTime >= '{hold_search_start}' AND startTime <= '{vst}' "
f"AND (totalReadHeld > 2000 OR totalWriteHeld > 2000) ORDER BY startTime"
)
culprits2 = _vc_fetchall_dicts(conn, overlap_sql)
culprits2 = _vc_exclude_victim_pid(culprits2, row)
if culprits2:
return {
"type": "panel",
"label": label,
"code": code,
"tableName": t_name,
"victim_cmd": cmd,
"culprits": culprits2,
"key_suffix": ks,
}
return {
"type": "panel",
"label": label,
"code": code,
"tableName": t_name,
"victim_cmd": cmd,
"culprits": None,
"key_suffix": ks,
}
def compute_victim_culprit_panels(database_file: str) -> list:
"""Return panel dicts (same roles as p4sla ``compute_write_waiter_panels``).
Victims are selected with ``VC_SQL``: up to ``VC_VICTIM_PER_TABLE`` rows per
``tableName`` within the top ``VC_VICTIM_GLOBAL_POOL`` global write-wait rows.
"""
db_path = os.path.abspath(database_file)
if not os.path.isfile(db_path):
return [{"type": "error", "message": f"Database not found: {db_path}"}]
conn = None
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
victims = _vc_fetchall_dicts(conn, VC_SQL)
if not victims:
return [{"type": "warning", "message": "No significant write waits (> 5s) found."}]
panels = []
for i, row in enumerate(victims):
panels.append(_vc_one_panel(conn, i, row))
return panels
except sqlite3.Error as e:
return [{"type": "error", "message": str(e)}]
finally:
if conn:
conn.close()
def format_victim_culprit_ascii(panels: list, include_banner: bool = True) -> str:
"""Fixed-width friendly text for terminals.
If ``include_banner`` is False, omit the top title lines (for embedding in the combined
summary file where a section header is printed above this block).
"""
lines = []
if include_banner:
lines.append("VICTIM / CULPRIT REPORT (write-wait drill-down)")
lines.append(
f"Victims: up to {VC_VICTIM_PER_TABLE} rows per table among the top "
f"{VC_VICTIM_GLOBAL_POOL} global write-wait rows (>5s, excluded tables filtered)."
)
lines.append("Each victim is a command with high write-wait on a table; culprits hold read/write")
lines.append(
f"locks on that table in a window from {int(VC_HOLD_LOOKBACK.total_seconds() // 3600)}h "
"before the victim start through shortly after the victim end."
)
lines.append("")
if not panels:
lines.append("(no panels)")
return "\n".join(lines) + "\n"
first = panels[0]
if first.get("type") == "warning":
lines.append(first.get("message", ""))
return "\n".join(lines) + "\n"
if first.get("type") == "error":
lines.append(first.get("message", "Error"))
return "\n".join(lines) + "\n"
for p in panels:
if p.get("type") != "panel":
continue
lines.append("-" * 72)
lines.append(p.get("label", ""))
lines.append("")
lines.append(
" Victim process (blocked — waiting to write; PID/start/end/cmd/args below identify the victim):"
)
for cl in str(p.get("code", "")).splitlines():
lines.append(" " + cl)
cul = p.get("culprits")
lines.append("")
if cul:
lines.append(
" Culprits (other processes holding read/write locks on this table in the "
"search window; victim PID excluded):"
)
lines.append("")
hdr = list(cul[0].keys())
tab = [[str(r.get(h, "")) for h in hdr] for r in cul]
lines.append(tabulate(tab, headers=hdr, tablefmt="simple"))
else:
lines.append(
" Culprits: (none) — no other lock-holder rows matched the time window "
"and thresholds."
)
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def format_victim_culprit_html(panels: list) -> str:
"""HTML fragment for summary page (no outer <html>)."""
parts = [
'<div class="vc-report">',
"<p class=\"section-lead\">Up to "
f"{VC_VICTIM_PER_TABLE} rows per table among the top {VC_VICTIM_GLOBAL_POOL} global write-wait rows; "
"culprits are processes holding read/write locks on the same table.</p>",
]
if not panels:
parts.append("<p><i>(no data)</i></p></div>")
return "\n".join(parts)
first = panels[0]
if first.get("type") == "warning":
parts.append(f"<p>{html.escape(first.get('message', ''))}</p></div>")
return "\n".join(parts)
if first.get("type") == "error":
parts.append(f"<p><b>Error:</b> {html.escape(first.get('message', ''))}</p></div>")
return "\n".join(parts)
for p in panels:
if p.get("type") != "panel":
continue
parts.append("<h3>" + html.escape(p.get("label", "")) + "</h3>")
parts.append(
"<p><strong>Victim process</strong> — blocked, waiting to write; "
"PID/start/end/cmd/args below identify the victim.</p>"
)
parts.append("<pre>" + html.escape(str(p.get("code", ""))) + "</pre>")
cul = p.get("culprits")
if cul:
parts.append(
"<p><strong>Culprits</strong> — other processes holding read/write locks on "
"this table in the search window (victim PID excluded).</p>"
)
hdr = list(cul[0].keys())
tab = [[str(r.get(h, "")) for h in hdr] for r in cul]
parts.append("<pre>" + html.escape(tabulate(tab, headers=hdr, tablefmt="simple")) + "</pre>")
else:
parts.append(
"<p><i>Culprits: none — no other lock-holder rows matched the time window "
"and thresholds.</i></p>"
)
parts.append("</div>")
return "\n".join(parts)
_SUMMARY_QUERY_MARKER_RE = re.compile(r"^@@QUERY@@(.+)@@\s*$")
def _summary_sql_sections_plain_text(section_text: str) -> str:
"""Strip ``@@QUERY@@…@@`` markers for the plain-text summary file."""
out_lines: List[str] = []
for line in section_text.splitlines(keepends=True):
m = _SUMMARY_QUERY_MARKER_RE.match(line.strip())
if m:
out_lines.append(m.group(1).strip() + ("\n" if line.endswith("\n") else ""))
else:
out_lines.append(line)
return "".join(out_lines)
def _summary_html_section_id(section_title: str) -> str:
"""Stable fragment id for a detailed SQL section heading."""
slug = re.sub(r"[^a-z0-9-]+", "", section_title.strip().lower().replace(" ", "-"))
return f"summary-{slug}" if slug else "summary-section"
def _summary_sql_sections_to_html(section_text: str) -> str:
"""Turn section headers, query titles, and result tables into HTML."""
if not section_text.strip():
return "<p class=\"empty-note\"><i>(no detailed SQL sections)</i></p>"
out: list = ['<div class="sql-sections">']
pre_buf: list = []
def flush_pre() -> None:
nonlocal pre_buf
if not pre_buf:
return
text = "".join(pre_buf)
if not text.strip():
pre_buf = []
return
out.append("<pre>" + html.escape(text) + "</pre>")
pre_buf = []
for line in section_text.splitlines(keepends=True):
stripped = line.strip()
qm = _SUMMARY_QUERY_MARKER_RE.match(stripped)
if qm:
flush_pre()
out.append(
"<h4 class=\"query-title\">"
+ html.escape(qm.group(1).strip())
+ "</h4>"
)
continue
sm = re.match(r"^--- (.+) ---\s*$", stripped)
if sm:
flush_pre()
title = sm.group(1).strip()
sec_id = _summary_html_section_id(title)
out.append(f'<h3 id="{sec_id}">' + html.escape(title) + "</h3>")
else:
pre_buf.append(line)
flush_pre()
out.append("</div>")
return "\n".join(out)
# Bump when summary HTML section order or styling changes (triggers HTML regen).
SUMMARY_HTML_LAYOUT_VERSION = 9
_SUMMARY_HTML_STYLES = """
:root {
color-scheme: light;
--bg: #eef1f5;
--card: #ffffff;
--ink: #1e293b;
--muted: #64748b;
--accent: #1e4d8c;
--accent-soft: #e8f0fa;
--border: #d8dee9;
--pre-bg: #f8fafc;
--shadow: 0 1px 3px rgba(15, 23, 42, 0.08), 0 4px 14px rgba(15, 23, 42, 0.06);
}
* { box-sizing: border-box; }
body {
font-family: "Segoe UI", system-ui, -apple-system, sans-serif;
background: var(--bg);
color: var(--ink);
margin: 0;
padding: 16px 12px 40px;
line-height: 1.45;
}
.report-wrap {
max-width: min(96vw, 1680px);
width: 100%;
margin: 0 auto;
}
.report-header {
background: linear-gradient(135deg, #1e4d8c 0%, #2d6bb3 100%);
color: #fff;
border-radius: 10px;
padding: 22px 26px;
margin-bottom: 24px;
box-shadow: var(--shadow);
}
.report-header h1 {
margin: 0 0 8px;
font-size: 1.65rem;
font-weight: 600;
letter-spacing: -0.02em;
}
.report-meta {
margin: 0;
font-size: 0.9rem;
opacity: 0.92;
font-family: ui-monospace, "Cascadia Code", "Consolas", monospace;
}
.report-meta span { display: block; margin-top: 4px; }
section.section-card {
background: var(--card);
border: 1px solid var(--border);
border-radius: 10px;
padding: 20px 22px 22px;
margin-bottom: 22px;
box-shadow: var(--shadow);
}
section.section-card > h2 {
margin: 0 0 14px;
font-size: 1.2rem;
font-weight: 600;
color: var(--accent);
border-bottom: 2px solid var(--accent-soft);
padding-bottom: 8px;
}
.section-lead { margin: 0 0 14px; color: var(--muted); font-size: 0.95rem; }
.stats-block pre, .vc-report pre, .sql-sections pre {
white-space: pre;
overflow-x: auto;
overflow-y: visible;
max-width: 100%;
font-family: ui-monospace, "Cascadia Code", "Consolas", monospace;
font-size: 0.84rem;
line-height: 1.4;
background: var(--pre-bg);
border: 1px solid var(--border);
border-radius: 6px;
padding: 14px 16px;
margin: 0 0 12px;
-webkit-overflow-scrolling: touch;
}
.gallery {
display: flex;
flex-direction: column;
gap: 22px;
}
.plot {
background: var(--pre-bg);
border: 1px solid var(--border);
border-radius: 8px;
padding: 14px;
text-align: center;
}
.plot a {
display: block;
line-height: 0;
cursor: zoom-in;
}
.plot a:hover img {
box-shadow: 0 4px 16px rgba(15, 23, 42, 0.12);
}
.plot a:focus-visible {
outline: 2px solid var(--accent);
outline-offset: 3px;
border-radius: 4px;
}
.plot img {
max-width: 100%;
width: auto;
height: auto;
border-radius: 4px;
vertical-align: middle;
}
.filename {
margin-top: 10px;
font-size: 0.8rem;
color: var(--muted);
word-break: break-all;
}
.filename a {
color: var(--accent);
text-decoration: none;
cursor: pointer;
}
.filename a:hover { text-decoration: underline; }
.sql-sections h3 {
margin: 22px 0 10px;
font-size: 1.05rem;
font-weight: 700;
color: var(--accent);
}
.sql-sections h3:first-child { margin-top: 0; }
.sql-sections h4.query-title {
margin: 16px 0 8px;
font-size: 0.98rem;
font-weight: 700;
color: #1e293b;
line-height: 1.35;
}
.vc-report h3 {
margin: 20px 0 8px;
font-size: 1rem;
font-weight: 600;
color: #334155;
}
.vc-report h3:first-of-type { margin-top: 0; }
.vc-report p { margin: 0 0 10px; }
.empty-note { color: var(--muted); margin: 0; }
.plots-empty { color: var(--muted); font-size: 0.95rem; margin: 0; }
.recommendations { display: flex; flex-direction: column; gap: 18px; }
.rec-item {
border: 1px solid var(--border);
border-radius: 8px;
padding: 14px 16px;
background: #fafbfc;
}
.rec-item h3 {
margin: 0 0 10px;
font-size: 1rem;
font-weight: 700;
color: #1e293b;
line-height: 1.35;
}
.rec-badge {
display: inline-block;
font-size: 0.72rem;
font-weight: 700;
letter-spacing: 0.04em;
padding: 2px 8px;
border-radius: 999px;
margin-right: 8px;
vertical-align: middle;
}
.rec-likely .rec-badge { background: #fef3c7; color: #92400e; }
.rec-possible .rec-badge { background: #e0e7ff; color: #3730a3; }
.rec-evidence {
margin: 0 0 10px 1.1rem;
padding: 0;
color: #334155;
font-size: 0.92rem;
}
.rec-mitigation-label {
margin: 0 0 6px;
font-size: 0.92rem;
color: #334155;
}
.rec-mitigation {
margin: 0 0 0 1.1rem;
padding: 0;
color: #334155;
font-size: 0.92rem;
line-height: 1.45;
}
.rec-mitigation li { margin-bottom: 6px; }
.rec-mitigation li:last-child { margin-bottom: 0; }
.rec-mitigation a {
color: var(--accent);
text-decoration: none;
}
.rec-mitigation a:hover { text-decoration: underline; }
"""
def _summary_html_needs_regen(html_path: str) -> bool:
"""True if ``html_path`` is missing or was produced by an older layout version."""
if not os.path.isfile(html_path):
return True
marker = f"p4diag-summary-layout:{SUMMARY_HTML_LAYOUT_VERSION}"
try:
with open(html_path, encoding="utf-8", errors="replace") as fh:
return marker not in fh.read(512)
except OSError:
return True
def render_summary_html_report(
*,
log_file: str,
db_file: str,
stats_html_inner: str,
recommendations_html: str,
png_entries: List[Tuple[str, str]],
plots_empty_note: str,
sql_sections_html: str,
vc_html: str,
) -> str:
"""Full HTML document for ``.summary.html`` (stats, recommendations, plots, SQL, VC)."""
log_esc = html.escape(os.path.basename(log_file))
db_esc = html.escape(os.path.basename(db_file))
parts = [
f"<!-- p4diag-summary-layout:{SUMMARY_HTML_LAYOUT_VERSION} -->\n",
"<!DOCTYPE html>\n<html lang=\"en\">\n<head>\n",
"<meta charset=\"UTF-8\" />\n",
"<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />\n",
"<title>P4 Diagnostics Summary — ",
log_esc,
"</title>\n<style>",
_SUMMARY_HTML_STYLES,
"</style>\n</head>\n<body>\n<div class=\"report-wrap\">\n",
"<header class=\"report-header\">\n",
"<h1>P4 Diagnostics Summary</h1>\n",
"<p class=\"report-meta\">",
f"<span>Log: {html.escape(log_file)}</span>",
f"<span>Database: {html.escape(db_file)}</span>",
"</p>\n</header>\n",
"<section class=\"section-card\">\n<h2>Log statistics</h2>\n",
"<div class=\"stats-block\"><pre>",
html.escape(stats_html_inner),
"</pre></div>\n</section>\n",
"<section class=\"section-card\">\n<h2>Recommendations</h2>\n",
recommendations_html,
"</section>\n",
"<section class=\"section-card\">\n<h2>Plots</h2>\n",
]
if png_entries:
parts.append("<div class=\"gallery\">\n")
for fname, alt in png_entries:
fn = html.escape(fname)
parts.append(
f"<div class=\"plot\">"
f"<a href=\"{fn}\" target=\"_blank\" rel=\"noopener\" "
f"title=\"Open {fn} full size\">"
f"<img src=\"{fn}\" alt=\"{html.escape(alt)}\" /></a>"
f"<div class=\"filename\"><a href=\"{fn}\" target=\"_blank\" "
f"rel=\"noopener\">{fn}</a></div></div>\n"
)
parts.append("</div>\n")
else:
parts.append(f"<p class=\"plots-empty\">{html.escape(plots_empty_note)}</p>\n")
parts.extend(
[
"</section>\n",
"<section class=\"section-card\">\n<h2>Detailed SQL reports</h2>\n",
sql_sections_html,
"</section>\n",
"<section class=\"section-card\" id=\"victim-culprit-report\">\n"
"<h2>Victim Culprit Report</h2>\n",
vc_html,
"</section>\n",
"</div>\n</body>\n</html>\n",
]
)
return "".join(parts)
def _normalize_quiet_cli_argv(av: list) -> None:
"""Map ``FILE.sql``, ``list``, legacy ``query``, and short aliases to internal subcommand names."""
if not av:
return
if _argv_is_sql_query_list(av):
av[:] = ["log2sql-query-sql-list"]
return
if _argv_is_sql_library_query(av):
av[:] = ["log2sql-query-sql"] + av
return
if av[0] == "query":
if len(av) >= 2 and av[1] == "list":
av[:] = ["log2sql-query-sql-list"]
else:
av[:] = ["log2sql-query-sql"] + av[1:]
return
dst = QUIET_CLI_SHORT_ALIASES.get(av[0])
if dst:
av[0] = dst
def _resolve_quiet_log_source_path(lt: str, log_base: str) -> Optional[str]:
"""Find an existing log file on disk for ``log2sql`` input (absolute path)."""
candidates = []
if lt:
candidates.append(lt)
candidates.append(os.path.expanduser(lt))
if log_base:
candidates.append(os.path.join(os.getcwd(), log_base))
candidates.append(log_base)
if lt.lower().endswith(".db"):
stem = lt[:-3]
candidates.append(stem)
candidates.append(os.path.expanduser(stem))
candidates.append(os.path.join(os.getcwd(), os.path.basename(stem)))
seen = set()
for c in candidates:
if not c or c in seen:
continue
seen.add(c)
if os.path.isfile(c):
return os.path.abspath(c)
return None
def _configure_log2sql_quiet_paths(
log_token: str,
) -> Tuple[str, str, str, str, str]:
"""Return ``(lt, db_path, diag_dir, log_base, db_name)`` for quiet log2sql layout."""
lt = log_token.strip()
base = os.path.basename(lt.rstrip("/"))
if base.lower().endswith(".db"):
db_name = base
log_base = base[:-3]
else:
db_name = base + ".db"
log_base = base
db_path = os.path.abspath(os.path.join(".p4diagnostics", db_name))
diag_dir = os.path.dirname(db_path)
return lt, db_path, diag_dir, log_base, db_name
def _populate_log2sql_quiet_sidecars(
l: "log2sql",
*,
log_for_obj: str,
db_path: str,
diag_dir: str,
sidecar_base: str,
) -> None:
l.logFile = log_for_obj
l.databaseFile = db_path
l.summaryFile = os.path.join(diag_dir, sidecar_base + ".summary" + ".txt")
l.summaryFileHTML = os.path.join(diag_dir, sidecar_base + ".summary" + ".html")
l.summaryQueriesFile = os.path.join(diag_dir, "queries.sql")
l.logFileDetails = os.path.join(diag_dir, sidecar_base + ".stats.txt")
l.errorsSummaryFile = os.path.join(diag_dir, sidecar_base + ".errors_summary")
l.maxActiveThreadsFile = os.path.join(diag_dir, sidecar_base + ".active_threads")
l.activeThreadsSummaryFile = os.path.join(diag_dir, sidecar_base + ".active_threads_summary")
l.failuresFile = os.path.join(diag_dir, sidecar_base + ".fails")
def configure_log2sql_for_quiet_db(log_token: str) -> "log2sql":
"""Point log2sql at ``.p4diagnostics/<basename(log)>.db`` for a log basename or path.
Matches ``setLogFile``: the DB is ``basename(logFile) + '.db'``. Do not use
``os.path.splitext`` on the basename — logs like ``log.0710.20250710_...`` have
multiple dots and splitext would wrongly treat part of the name as an extension.
If the database file is missing, creates it (same as ``create``).
"""
lt, db_path, diag_dir, log_base, _ = _configure_log2sql_quiet_paths(log_token)
if not os.path.isfile(db_path):
print(
"p4diag: log2sql database not found; creating it ... ",
file=sys.stderr,
flush=True,
)
return quiet_log2sql_create_database(log_token)
l = log2sql(log_base)
_populate_log2sql_quiet_sidecars(
l, log_for_obj=log_base, db_path=db_path, diag_dir=diag_dir, sidecar_base=log_base
)
return l
def configure_log2sql_for_quiet_log_stats_only(log_token: str) -> "log2sql":
"""Set sidecar paths for a P4 server log and ``.p4diagnostics/<log>.stats.txt``.
Does not read or create a log2sql SQLite database — only the log file is required.
"""
lt, db_path, diag_dir, log_base, _ = _configure_log2sql_quiet_paths(log_token)
os.makedirs(diag_dir, exist_ok=True)
log_src = _resolve_quiet_log_source_path(lt, log_base)
if not log_src:
print(
f"Could not find log file for {log_token!r}.\n"
"Expected a readable P4 server log (basename or path).",
file=sys.stderr,
)
sys.exit(2)
l = log2sql(log_src)
_populate_log2sql_quiet_sidecars(
l,
log_for_obj=log_src,
db_path=db_path,
diag_dir=diag_dir,
sidecar_base=log_base,
)
return l
def quiet_log2sql_create_database(log_token: str) -> "log2sql":
"""Ensure ``.p4diagnostics/<basename(log)>.db`` exists, creating it with ``log2sql`` if needed."""
lt, db_path, diag_dir, log_base, _ = _configure_log2sql_quiet_paths(log_token)
os.makedirs(diag_dir, exist_ok=True)
if os.path.isfile(db_path):
l = log2sql(log_base)
_populate_log2sql_quiet_sidecars(
l, log_for_obj=log_base, db_path=db_path, diag_dir=diag_dir, sidecar_base=log_base
)
print(f"log2sql database (already present): {db_path}", flush=True)
return l
log_src = _resolve_quiet_log_source_path(lt, log_base)
if not log_src:
print(
f"log2sql database not found: {db_path}\n"
f"Could not find log file {log_base!r} (or path {lt!r}) to build it.",
file=sys.stderr,
)
sys.exit(2)
sidecar_base = os.path.basename(log_src)
l = log2sql(log_src)
_populate_log2sql_quiet_sidecars(
l,
log_for_obj=log_src,
db_path=db_path,
diag_dir=diag_dir,
sidecar_base=sidecar_base,
)
l.createDatabase()
if not os.path.isfile(db_path):
print(
f"log2sql did not create database at {db_path} (see {LOG_FILE or 'log2sql output'}).",
file=sys.stderr,
)
sys.exit(1)
print(f"log2sql database written: {db_path}", flush=True)
return l
def ensure_help_schema_sample_database() -> "log2sql":
"""Materialize ``_HELP_SCHEMA_SAMPLE_LOG_TEXT`` and an SQLite DB via ``log2sql`` for ``schema`` output."""
digest = hashlib.sha256(_HELP_SCHEMA_SAMPLE_LOG_TEXT.encode("utf-8")).hexdigest()
diag_dir = os.path.abspath(".p4diagnostics")
os.makedirs(diag_dir, exist_ok=True)
log_path = os.path.join(diag_dir, "_schema_help_sample.log")
db_path = os.path.join(diag_dir, "_schema_help_sample.db")
stamp_path = os.path.join(diag_dir, "_schema_help_sample.sha256")
try:
with open(log_path, "w", encoding="utf-8", newline="\n") as fh:
fh.write(_HELP_SCHEMA_SAMPLE_LOG_TEXT)
except OSError as e:
print(f"p4diag: schema: cannot write sample log {log_path}: {e}", file=sys.stderr)
sys.exit(1)
need_build = True
try:
if os.path.isfile(stamp_path) and os.path.isfile(db_path):
with open(stamp_path, encoding="utf-8") as sf:
prev = sf.read().strip()
need_build = prev != digest
except OSError:
need_build = True
if need_build:
try:
if os.path.isfile(db_path):
os.remove(db_path)
except OSError as e:
print(f"p4diag: schema: cannot refresh sample database: {e}", file=sys.stderr)
sys.exit(1)
lo = log2sql(log_path)
lo.logFile = os.path.abspath(log_path)
lo.databaseFile = db_path
lo.createDatabase()
if not os.path.isfile(db_path):
print(
"p4diag: schema: log2sql failed to build sample database "
f"(see {LOG_FILE or 'log2sql output'}): {db_path}",
file=sys.stderr,
)
sys.exit(1)
try:
with open(stamp_path, "w", encoding="utf-8") as sf:
sf.write(digest + "\n")
except OSError:
pass
lo = log2sql(log_path)
lo.logFile = os.path.abspath(log_path)
lo.databaseFile = db_path
return lo
# SQL for DB/incoming/running plots (minute-rounded times via substr(..., 1, 16) where applicable).
_SQL_PLOT_DBWAIT = (
"SELECT substr(startTime, 1, 16) AS t, "
"round((SUM(totalReadWait) + SUM(totalWriteWait)) / 1000) AS wait_s "
"FROM tableUse JOIN process USING (processKey) "
"WHERE cmd NOT IN ('user-sync', 'user-transmit') "
"GROUP BY substr(startTime, 1, 16) ORDER BY t"
)
_SQL_PLOT_INCOMING = (
"SELECT substr(startTime, 1, 16) AS minute, COUNT(*) AS n "
"FROM process GROUP BY minute ORDER BY minute"
)
_SQL_PLOT_RUNNING = "SELECT endTime, MIN(running) AS r FROM process GROUP BY endTime ORDER BY endTime"
def _terminal_width_for_plots() -> int:
try:
return max(40, os.get_terminal_size().columns)
except OSError:
try:
return max(40, int(os.environ.get("COLUMNS", "80")))
except ValueError:
return 80
def _parse_plot_time(s: str) -> Optional[datetime]:
s = (s or "").strip().strip('"')
for fmt in ("%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M"):
try:
return datetime.strptime(s, fmt)
except ValueError:
continue
return None
def _xtics_step_seconds(start: datetime, end: datetime, target_ticks: int = 10) -> int:
"""Seconds between major x tics; ~``target_ticks`` across the span, rounded to a readable step."""
span = max(1, int((end - start).total_seconds()))
raw = max(60, span // max(1, target_ticks))
# Round up to a human-friendly interval so date labels do not pile up (e.g. dbWaitTime).
nices = (
60,
120,
180,
300,
600,
900,
1800,
3600,
7200,
10800,
14400,
21600,
43200,
86400,
172800,
604800,
2592000,
)
for n in nices:
if n >= raw:
return n
days = (raw + 86399) // 86400
return max(86400, days * 86400)
def _collect_active_threads_csv_rows(log_path: str, pid_token: str) -> List[Tuple[str, int]]:
"""Lines matching plot_p4_active_commands.sh: grep 'active threads' | grep PID; awk $1,$2,$10."""
rows: List[Tuple[str, int]] = []
tok = (pid_token or "active").strip()
with open(log_path, "r", encoding="utf-8", errors="replace") as fh:
for line in fh:
if "active threads" not in line:
continue
if tok not in line:
continue
parts = line.split()
if len(parts) < 10:
continue
dt = f"{parts[0]} {parts[1]}"
try:
n = int(parts[9])
except ValueError:
continue
rows.append((dt, n))
return rows
def _sqlite_plot_rows(db_path: str, sql: str) -> List[Tuple]:
conn = sqlite3.connect(db_path)
try:
cur = conn.execute(sql)
return [tuple(r) for r in cur.fetchall()]
finally:
conn.close()
def _rows_with_leading_date(rows: List[Tuple]) -> List[Tuple]:
out: List[Tuple] = []
for r in rows:
if not r or r[0] is None:
continue
if re.match(r"^\d{4}/", str(r[0]).strip()):
out.append(r)
return out
def _write_csv_two_cols(path: str, rows: List[Tuple]) -> None:
with open(path, "w", encoding="utf-8") as fh:
for r in rows:
if len(r) < 2:
continue
a, b = r[0], r[1]
fh.write(f"{a},{b}\n")
def _run_gnuplot_script(script: str) -> Tuple[int, str, str]:
gnuplot = shutil.which("gnuplot")
if not gnuplot:
return 127, "", "gnuplot not found in PATH"
with tempfile.NamedTemporaryFile(mode="w", suffix=".gp", delete=False, encoding="utf-8") as tf:
tf.write(script)
gp_path = tf.name
try:
proc = s.run([gnuplot, gp_path], capture_output=True, text=True, timeout=120)
return proc.returncode, proc.stdout, proc.stderr or ""
finally:
try:
os.unlink(gp_path)
except OSError:
pass
def _gnuplot_active_threads_ascii(
csv_path: str,
term_width: int,
warn: int,
crit: int,
start_t: str,
end_t: str,
xtics_step: int,
) -> str:
"""plot_p4_active_commands.sh-style data; colored thresholds like the user's dumb example."""
esc = csv_path.replace("\\", "/").replace("'", "''")
st = start_t.replace("'", "''")
en = end_t.replace("'", "''")
rc, out, err = _run_gnuplot_script(
"\n".join(
[
'set title "P4 Commands Active Threads"',
"set xdata time",
'set timefmt "%Y/%m/%d %H:%M:%S"',
'set xrange ["{}":"{}"]'.format(st, en),
'set format x "%m/%d %H:%M"',
"set xtics {}".format(xtics_step),
"set term dumb ansi size {}, 30".format(term_width),
"set grid xtics ytics",
"set datafile separator ','",
"plot '{}' using 1:($2 < {} ? $2 : 1/0) title 'Normal' with impulses lc 2, \\"
.format(esc, warn),
" '{}' using 1:($2 >= {} && $2 < {} ? $2 : 1/0) title 'Warning' with impulses lc 3, \\"
.format(esc, warn, crit),
" '{}' using 1:($2 >= {} ? $2 : 1/0) title 'Critical' with impulses lc 1".format(
esc, crit
),
]
)
)
if rc != 0:
return "(gnuplot failed: {})\n".format((err or "").strip() or rc)
return out
def _gnuplot_simple_impulse_ascii(
title: str,
csv_path: str,
term_width: int,
timefmt: str,
y_label: str,
xtics_step: int,
start_t: str,
end_t: str,
) -> str:
cpath = csv_path.replace("\\", "/").replace("'", "''")
st = start_t.replace("'", "''")
en = end_t.replace("'", "''")
xfmt = "%m/%d" if xtics_step >= 86400 else "%m/%d %H:%M"
rc, out, err = _run_gnuplot_script(
"\n".join(
[
'set title "{}"'.format(title.replace('"', '\\"')),
"set xdata time",
'set timefmt "{}"'.format(timefmt),
'set xrange ["{}":"{}"]'.format(st, en),
'set format x "{}"'.format(xfmt),
"set xtics {}".format(xtics_step),
"set term dumb ansi size {}, 30".format(term_width),
"set grid xtics ytics",
"set datafile separator ','",
"plot '{}' using 1:2 title '{}' with impulses linewidth 1 lc rgb 'blue'".format(
cpath, y_label.replace("'", "''")
),
]
)
)
if rc != 0:
return "(gnuplot failed: {})\n".format((err or "").strip() or rc)
return out
def _gnuplot_active_threads_png(
csv_path: str,
out_png: str,
start_t: str,
end_t: str,
xtics_step: int,
) -> Optional[str]:
"""Same style as plot_p4_active_commands.sh: single blue impulse, 2048x420 PNG."""
esc = csv_path.replace("\\", "/").replace("'", "''")
st = start_t.replace("'", "''")
en = end_t.replace("'", "''")
outp = out_png.replace("\\", "/").replace("'", "''")
rc, _out, err = _run_gnuplot_script(
"\n".join(
[
'set title "Commands Active"',
"set xdata time",
'set timefmt "%Y/%m/%d %H:%M:%S"',
'set xrange ["{}":"{}"]'.format(st, en),
'set format x "%m/%d %H:%M"',
"set xtics {}".format(xtics_step),
"set grid xtics ytics",
"set term png size 2048,420",
'set output "{}"'.format(outp),
"set datafile separator ','",
"plot '{}' using 1:2 title 'P4 Commands Active Threads' with impulses linewidth 1 lc rgb 'blue'".format(
esc
),
"unset output",
]
)
)
if rc != 0:
return (err or "").strip() or str(rc)
return None
def _gnuplot_simple_impulse_png(
title: str,
csv_path: str,
out_png: str,
timefmt: str,
y_label: str,
xtics_step: int,
start_t: str,
end_t: str,
) -> Optional[str]:
"""Same style as plot_p4_dbwaittime / incoming / running .sh: blue impulses, 2048x420 PNG."""
cpath = csv_path.replace("\\", "/").replace("'", "''")
st = start_t.replace("'", "''")
en = end_t.replace("'", "''")
outp = out_png.replace("\\", "/").replace("'", "''")
# Shorter labels when tics are sparse (days); rotate so long labels do not overlap.
xfmt = "%m/%d" if xtics_step >= 86400 else "%m/%d %H:%M"
rc, _out, err = _run_gnuplot_script(
"\n".join(
[
'set title "{}"'.format(title.replace('"', '\\"')),
"set xdata time",
'set timefmt "{}"'.format(timefmt),
'set xrange ["{}":"{}"]'.format(st, en),
'set format x "{}"'.format(xfmt),
"set xtics {} rotate by -45".format(xtics_step),
"set bmargin 6",
"set grid xtics ytics",
"set term png size 2048,480",
'set output "{}"'.format(outp),
"set datafile separator ','",
"plot '{}' using 1:2 title '{}' with impulses linewidth 1 lc rgb 'blue'".format(
cpath, y_label.replace("'", "''")
),
"unset output",
]
)
)
if rc != 0:
return (err or "").strip() or str(rc)
return None
def write_p4_plot_pngs(
log_path: str,
db_path: str,
diag_dir: str,
*,
pid_filter: str = "active",
warn: int = 20,
crit: int = 50,
skip_existing: bool = True,
) -> List[str]:
"""Write summary PNGs under ``diag_dir`` (embedded gnuplot; same output names as legacy plot_p4_*.sh)."""
if not shutil.which("gnuplot"):
return []
log_path = os.path.abspath(log_path)
db_path = os.path.abspath(db_path)
diag_dir = os.path.abspath(diag_dir)
os.makedirs(diag_dir, exist_ok=True)
db_bn = os.path.basename(db_path)
log_bn = os.path.basename(log_path)
png_written: List[str] = []
def _maybe_skip(name: str) -> bool:
if not skip_existing:
return False
out = os.path.join(diag_dir, name)
return os.path.isfile(out) and os.path.getsize(out) > 0
with tempfile.TemporaryDirectory(prefix="p4diag_plots_") as tmp:
active_png_name = f"Active.{log_bn}.png"
if os.path.isfile(log_path) and not _maybe_skip(active_png_name):
active_rows = _collect_active_threads_csv_rows(log_path, pid_filter)
if active_rows:
active_csv = os.path.join(tmp, "Active.csv")
_write_csv_two_cols(active_csv, active_rows)
t0 = _parse_plot_time(active_rows[0][0])
t1 = _parse_plot_time(active_rows[-1][0])
xstep = (
max(1, int((t1 - t0).total_seconds() // 10))
if t0 and t1
else 60
)
active_png = os.path.join(diag_dir, active_png_name)
pe = _gnuplot_active_threads_png(
active_csv,
active_png,
active_rows[0][0],
active_rows[-1][0],
xstep,
)
if pe:
print(
f"p4diag: could not write {active_png}: {pe}",
file=sys.stderr,
flush=True,
)
elif os.path.isfile(active_png) and os.path.getsize(active_png) > 0:
png_written.append(os.path.abspath(active_png))
def _sql_png(
csv_name: str,
sql: str,
g_title: str,
y_lab: str,
minute_fmt: bool,
png_basename: str,
) -> None:
if _maybe_skip(png_basename):
return
try:
raw = _sqlite_plot_rows(db_path, sql)
except sqlite3.Error as e:
print(
f"p4diag: plot {png_basename}: SQL error: {e}",
file=sys.stderr,
flush=True,
)
return
rows = _rows_with_leading_date(raw)
if not rows:
return
path = os.path.join(tmp, csv_name)
_write_csv_two_cols(path, rows)
t0 = _parse_plot_time(str(rows[0][0]))
t1 = _parse_plot_time(str(rows[-1][0]))
if not t0 or not t1:
return
tfmt = "%Y/%m/%d %H:%M" if minute_fmt else "%Y/%m/%d %H:%M:%S"
step = _xtics_step_seconds(t0, t1)
st = str(rows[0][0]).strip().strip('"')
en = str(rows[-1][0]).strip().strip('"')
out_png = os.path.join(diag_dir, png_basename)
pe = _gnuplot_simple_impulse_png(
g_title, path, out_png, tfmt, y_lab, step, st, en
)
if pe:
print(
f"p4diag: could not write {out_png}: {pe}",
file=sys.stderr,
flush=True,
)
elif os.path.isfile(out_png) and os.path.getsize(out_png) > 0:
png_written.append(os.path.abspath(out_png))
if os.path.isfile(db_path):
_sql_png(
"dbWaitTime.csv",
_SQL_PLOT_DBWAIT,
"Commands dbWaitTime",
"P4 Commands DB Wait Time in seconds",
True,
f"dbWaitTime.{db_bn}.png",
)
_sql_png(
"Incoming.csv",
_SQL_PLOT_INCOMING,
"Commands Incoming per minute",
"P4 Commands Incoming",
True,
f"Incoming.{db_bn}.png",
)
_sql_png(
"Running.csv",
_SQL_PLOT_RUNNING,
"Commands Running",
"P4 Commands Running",
False,
f"Running.{db_bn}.png",
)
return png_written
def format_log2sql_ascii_plots(
log_path: str,
db_path: str,
*,
pid_filter: str = "active",
warn: int = 20,
crit: int = 50,
term_width: Optional[int] = None,
) -> str:
"""Return gnuplot ``dumb`` ASCII plot blocks for embedding in text reports."""
if not shutil.which("gnuplot"):
return ""
w = term_width if term_width is not None else _terminal_width_for_plots()
log_path = os.path.abspath(log_path)
db_path = os.path.abspath(db_path)
chunks: List[str] = []
with tempfile.TemporaryDirectory(prefix="p4diag_plots_") as tmp:
active_csv = os.path.join(tmp, "Active.csv")
if os.path.isfile(log_path):
active_rows = _collect_active_threads_csv_rows(log_path, pid_filter)
if active_rows:
_write_csv_two_cols(active_csv, active_rows)
t0 = _parse_plot_time(active_rows[0][0])
t1 = _parse_plot_time(active_rows[-1][0])
xstep = (
max(1, int((t1 - t0).total_seconds() // 10))
if t0 and t1
else 60
)
chunks.append("=== Commands Active (from log) ===\n")
chunks.append(
_gnuplot_active_threads_ascii(
active_csv,
w,
warn,
crit,
active_rows[0][0],
active_rows[-1][0],
xstep,
).rstrip()
+ "\n"
)
else:
chunks.append(
"=== Commands Active (from log) ===\n(no lines matching 'active threads'"
+ (f" and {pid_filter!r}" if pid_filter else "")
+ ")\n"
)
else:
chunks.append(
"=== Commands Active (from log) ===\n"
f"(log file not readable: {log_path})\n"
)
def _sql_block(
section_title: str,
csv_name: str,
sql: str,
g_title: str,
y_lab: str,
minute_fmt: bool,
) -> None:
if not os.path.isfile(db_path):
chunks.append(f"=== {section_title} ===\n(database not found: {db_path})\n")
return
try:
raw = _sqlite_plot_rows(db_path, sql)
except sqlite3.Error as e:
chunks.append(f"=== {section_title} ===\n(SQL error: {e})\n")
return
rows = _rows_with_leading_date(raw)
if not rows:
chunks.append(f"=== {section_title} ===\n(no data)\n")
return
path = os.path.join(tmp, csv_name)
_write_csv_two_cols(path, rows)
t0 = _parse_plot_time(str(rows[0][0]))
t1 = _parse_plot_time(str(rows[-1][0]))
if not t0 or not t1:
chunks.append(f"=== {section_title} ===\n(could not parse time column)\n")
return
tfmt = "%Y/%m/%d %H:%M" if minute_fmt else "%Y/%m/%d %H:%M:%S"
step = _xtics_step_seconds(t0, t1)
st = str(rows[0][0]).strip().strip('"')
en = str(rows[-1][0]).strip().strip('"')
chunks.append(f"=== {section_title} ===\n")
chunks.append(
_gnuplot_simple_impulse_ascii(
g_title,
path,
w,
tfmt,
y_lab,
step,
st,
en,
).rstrip()
+ "\n"
)
_sql_block(
"DB wait time",
"dbWaitTime.csv",
_SQL_PLOT_DBWAIT,
"Commands dbWaitTime",
"P4 Commands DB Wait Time in seconds",
True,
)
_sql_block(
"Incoming commands",
"Incoming.csv",
_SQL_PLOT_INCOMING,
"Commands Incoming per minute",
"P4 Commands Incoming",
True,
)
_sql_block(
"Running commands",
"Running.csv",
_SQL_PLOT_RUNNING,
"Commands Running",
"P4 Commands Running",
False,
)
return "".join(chunks)
def _build_summary_plots_section(diag_dir: str, png_files: List[str]) -> str:
"""PNG file list for the plain-text summary (ASCII plots: ``p4diag plots``)."""
plot_lines = []
for fname in png_files:
full_path = os.path.join(diag_dir, fname)
if os.path.isfile(full_path) and os.path.getsize(full_path) > 0:
plot_lines.append(f" {fname}\n")
if plot_lines:
return f"Directory: {diag_dir}\n" + "".join(plot_lines)
return " (no PNG files yet — install gnuplot and run log2sql or plots)\n"
def run_log2sql_plots(
*,
log_token: str,
lq: Any,
pid_filter: str,
warn: int,
crit: int,
term_width: Optional[int],
) -> Tuple[str, List[str]]:
"""Build ASCII plots (gnuplot dumb) and write PNGs under ``.p4diagnostics/`` (same names as createPlots)."""
lt, _db_path, _diag, log_base, _ = _configure_log2sql_quiet_paths(log_token)
log_src = _resolve_quiet_log_source_path(lt, log_base)
if not log_src or not os.path.isfile(log_src):
cand = os.path.abspath(lq.logFile)
if os.path.isfile(cand):
log_src = cand
db_path = os.path.abspath(lq.databaseFile)
diag_dir = os.path.dirname(db_path)
os.makedirs(diag_dir, exist_ok=True)
log_file = log_src or os.path.abspath(lq.logFile)
ascii_text = format_log2sql_ascii_plots(
log_file,
db_path,
pid_filter=pid_filter,
warn=warn,
crit=crit,
term_width=term_width,
)
png_written = write_p4_plot_pngs(
log_file,
db_path,
diag_dir,
pid_filter=pid_filter,
warn=warn,
crit=crit,
skip_existing=False,
)
return ascii_text, png_written
def run_quiet_subcommand(argv: list) -> None:
"""Non-interactive CLI (``-q`` or bare subcommand; no prompts)."""
global LOG_FILE
av = list(argv)
_normalize_quiet_cli_argv(av)
p = argparse.ArgumentParser(
prog="p4diag",
description=(
"Non-interactive mode (no menu, prompts, or web UI). "
"Invoke as p4diag SUBCOMMAND ... (with or without -q). "
"Subcommands: trim, stats, log2sql, summary, schema, plots; "
"or p4diag FILE.sql [LOG], p4diag list. See: p4diag -h"
),
)
sub = p.add_subparsers(dest="qcmd", required=True, metavar="SUBCOMMAND")
p_trim = sub.add_parser(
"trim",
help="Trim a P4 server log to --start / --end (writes segment file).",
)
p_trim.add_argument("logfile", help="Path to source log file")
p_trim.add_argument("--start", required=True, type=parse_datetime, metavar="TIME")
p_trim.add_argument("--end", required=True, type=parse_datetime, metavar="TIME")
def _add_log2sql_db_arg(sp):
sp.add_argument(
"log",
nargs="?",
default="log",
help="Basename for .p4diagnostics/<log>.db (default: log)",
)
def _add_log2sql_query_sql(sp_name: str, help_txt: str) -> None:
sp = sub.add_parser(
sp_name,
help=help_txt,
)
sp.add_argument(
"sqlfile",
help="SQL script: basename uses P4DIAG_SQL_QUERIES/NAME.sql; absolute path runs that file",
)
sp.add_argument(
"log",
nargs="?",
default="log",
help="Basename for .p4diagnostics/<log>.db (default: log)",
)
sp.add_argument(
"pid",
nargs="?",
help=(
"For probe_pid*.sql: process PID (@pid). "
"For locks_held_total.sql: optional table name (@table). "
"For locks_table_by_cmd.sql: table name (@table). "
"For locks_all_duration.sql: duration in ms (@duration)."
),
)
sp.add_argument(
"start",
nargs="?",
help="For probe_pid*.sql: startTime (@start), e.g. '2026/06/02 12:53:21'",
)
_add_log2sql_query_sql(
"log2sql-query-sql",
"Run NAME.sql with sqlite3 .read on .p4diagnostics/<log>.db.",
)
sub.add_parser(
"log2sql-query-sql-list",
help=f"List .sql query files in {p4diag_sql_queries_dir()} (basename per line).",
)
p_sch = sub.add_parser(
"log2sql-schema",
help="Print process/tableUse columns using a built-in sample trace + SQLite DB.",
)
p_stats = sub.add_parser(
"stats",
help=(
"Write log statistics only (.p4diagnostics/<log>.stats.txt) from the P4 server log; "
"no SQLite DB required. Run this before summary when generating a log summary."
),
)
_add_log2sql_db_arg(p_stats)
# Subcommand details: set ``formatter_class=argparse.RawDescriptionHelpFormatter`` and a
# multi-line ``description=`` so ``p4diag <subcommand> -h`` preserves notes / blank lines.
p_plots = sub.add_parser(
"log2sql-plots",
formatter_class=argparse.RawDescriptionHelpFormatter,
help=(
"ASCII + PNG command-activity plots (gnuplot); use -h for what each graph shows."
),
description=(
"\tGraph active threads based on a log\n"
"\tGraph db WaitTime based on log.db\n"
"\tGraph incoming commands based on log.db\n"
"\tGraph running commands based on log.db\n"
"\n"
"Also prints ASCII plots to stdout (gnuplot dumb) and writes PNGs under .p4diagnostics/\n"
"(Active.<log>.png, dbWaitTime.<db>.png, Incoming.<db>.png, Running.<db>.png).\n"
"Creates log.db if missing. Requires gnuplot in PATH."
),
)
_add_log2sql_db_arg(p_plots)
p_plots.add_argument(
"--pid",
default="active",
help=(
"Active-threads line filter: substring (default: active). "
"Pass a parent P4D PID to restrict to one server when logs are shared."
),
)
p_plots.add_argument(
"--warn",
type=int,
default=20,
metavar="N",
help="Active plot: warn threshold in thread count (default: 20).",
)
p_plots.add_argument(
"--crit",
type=int,
default=50,
metavar="N",
help="Active plot: critical threshold in thread count (default: 50).",
)
p_plots.add_argument(
"--width",
type=int,
default=None,
metavar="COLS",
help="gnuplot dumb width in columns (default: terminal width or COLUMNS).",
)
p_l2create = sub.add_parser(
"log2sql",
help="Build .p4diagnostics/<log>.db from the source log if missing.",
)
_add_log2sql_db_arg(p_l2create)
p_sum_auto = sub.add_parser(
"summary",
formatter_class=argparse.RawDescriptionHelpFormatter,
help="Write LOG.summary.txt and LOG.summary.html under .p4diagnostics/.",
description=(
"Write summary text/HTML under .p4diagnostics/. "
"Creates the SQLite DB if missing (log2sql) and log statistics if missing (stats)."
),
)
p_sum_auto.add_argument(
"log",
help="P4 server log path or basename for .p4diagnostics/<log>.db",
)
args = p.parse_args(av)
if args.qcmd == "trim":
lo = log2sql(args.logfile)
lo.databaseFile = args.logfile
lo.trim_log(args.start, args.end)
return
if args.qcmd == "summary":
path_tok = args.log
lq = configure_log2sql_for_quiet_db(path_tok)
if not os.path.isfile(lq.logFileDetails):
print(
"p4diag: log statistics not found; building them "
"(same as stats) …",
file=sys.stderr,
flush=True,
)
lq.createLogStats(force_regenerate=False)
if not os.path.isfile(lq.logFileDetails):
print(
f"p4diag: log statistics not present: {lq.logFileDetails}\n"
"Could not build statistics — is the P4 server log readable?\n"
f" Log path in use: {lq.logFile!r}\n"
f"Try: p4diag stats {path_tok!r}",
file=sys.stderr,
)
sys.exit(2)
lq.createLogSummary(force_regenerate=False)
return
if args.qcmd == "log2sql":
quiet_log2sql_create_database(args.log)
return
if args.qcmd == "stats":
lq = configure_log2sql_for_quiet_log_stats_only(args.log)
rebuilt = lq.createLogStats(force_regenerate=False)
if rebuilt:
msg = f"Wrote log statistics: {lq.logFileDetails}"
else:
msg = f"Log statistics up to date (log unchanged): {lq.logFileDetails}"
print(msg, file=sys.stderr, flush=True)
with open(lq.logFileDetails, "r", encoding="utf-8", errors="replace") as fh:
sys.stdout.write(fh.read())
return
if args.qcmd == "log2sql-query-sql-list":
quiet_list_sql_queries()
return
if args.qcmd == "log2sql-schema":
lq = ensure_help_schema_sample_database()
with quiet_stderr_activity("Reading schema..."):
lq.schema_tables_pretty()
return
if args.qcmd == "log2sql-plots":
if not shutil.which("gnuplot"):
print(
"p4diag: log2sql-plots requires gnuplot in PATH.",
file=sys.stderr,
)
sys.exit(2)
lq = configure_log2sql_for_quiet_db(args.log)
with quiet_stderr_activity("ASCII + PNG plots (gnuplot)..."):
text, png_written = run_log2sql_plots(
log_token=args.log,
lq=lq,
pid_filter=args.pid,
warn=args.warn,
crit=args.crit,
term_width=args.width,
)
sys.stdout.write(text)
for p in png_written:
print(f"p4diag: wrote PNG: {p}", file=sys.stderr, flush=True)
return
if args.qcmd == "log2sql-query-sql":
lq = configure_log2sql_for_quiet_db(args.log)
lq.execute_sql_file_noninteractive(
args.sqlfile,
pid=args.pid,
start=args.start,
)
return
p.error(f"Unhandled quiet command: {args.qcmd}")
def run_interactive_auto_detect(
av: list,
*,
case_dir: str,
export_dir: str,
port: int,
) -> None:
"""Interactive TTY menu for a P4 server log."""
global QUIET
QUIET = False
parser = argparse.ArgumentParser(
prog="p4diag",
description="Interactive mode (TTY menu): PATH is a Perforce server log.",
)
parser.add_argument(
"path",
help="Server log file (basename or path)",
)
parser.add_argument(
"--start",
type=parse_datetime,
metavar="TIME",
default=None,
help="Start time (YYYY/MM/DD HH:MM:SS); must be paired with --end",
)
parser.add_argument(
"--end",
type=parse_datetime,
metavar="TIME",
default=None,
help="End time (YYYY/MM/DD HH:MM:SS); must be paired with --start",
)
args = parser.parse_args(av)
if (args.start is None) != (args.end is None):
parser.error("--start and --end must be provided together")
_preflight_log2sql_log_exists(args.path, args.start, args.end)
if not QUIET:
start_web_server(export_dir, port, log_token=args.path)
os.chdir(case_dir)
handle_log2sql(args.path, args.start, args.end)
def _root_help_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="p4diag",
usage="p4diag [-h] [-q] PATH [--start TIME --end TIME] ...",
description=P4DIAG_DESCRIPTION,
epilog=P4DIAG_EPILOG,
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="Use quiet CLI (optional if you invoke a quiet subcommand directly, e.g. log2sql).",
)
return parser
def main():
global l, port, QUIET, LOG_FILE
_p4diag_interactive_setup()
case_dir = os.getcwd()
s.run(["mkdir", "-p", ".p4diagnostics"])
port = 8000 + os.getuid() % 1000
export_dir = os.path.join(os.getcwd(), ".p4diagnostics")
LOG_FILE = os.path.join(".p4diagnostics", "p4diag.log")
av = sys.argv[1:]
wants_help = "-h" in av or "--help" in av
if wants_help and not (av and av[0] not in ("-h", "--help") and _argv_invokes_quiet_cli(av)):
_root_help_parser().print_help()
return
_require_python_sqlite3()
if av and _argv_invokes_quiet_cli(av):
QUIET = not (len(av) == 1 and av[0] == "stats")
os.chdir(case_dir)
run_quiet_subcommand(av)
return
if av and av[0] in ("-q", "--quiet"):
QUIET = True
rest = av[1:]
if not rest:
print(
"p4diag -q: missing subcommand. Try: log2sql, stats, summary, "
"plots, schema, trim, list, or FILE.sql [LOG].",
file=sys.stderr,
)
sys.exit(2)
os.chdir(case_dir)
run_quiet_subcommand(rest)
return
if av and not av[0].startswith("-"):
run_interactive_auto_detect(av, case_dir=case_dir, export_dir=export_dir, port=port)
return
print("p4diag: missing LOG path (Perforce server log).", file=sys.stderr)
_root_help_parser().print_help()
sys.exit(2)
if __name__ == "__main__":
main()
| # | Change | User | Description | Committed | |
|---|---|---|---|---|---|
| #4 | 32743 | scommon | Update feedback/improvements reference on exit of p4diag. | ||
| #3 | 32742 | scommon | HTML Summary for CPU, Memory, and Commands now mimics the canned reports. | ||
| #2 | 32739 | scommon |
• Self-contained install: All 8 standard canned SQL reports are built into the script; no sql_queries/ directory needed (disk files still override by name). • PID probe: probe_pid*.sql embedded; pid always runs summary, server-error excerpt, per-table breakdown, and log tracking. • Interactive UX: Full numbered query menu, h/help with query descriptions, prompts for @table / @duration / @pid+@start, log search with ±75 lines of context. • Fixes: Pager SIGPIPE on q, safer multi-query .width handling, respect .mode in sqlite3 scripts. • Portability: Optional readline/gnureadline; clear error if Python lacks sqlite3; download URL and requirements documented in -h. |
||
| #1 | 32737 | scommon |
p4diag aggregates and simplifies common Perforce P4 server log analysis workflows. It uses log2sql to build a SQLite database from trace data, runs canned and summary SQL queries, writes grep-based log statistics, victim/culprit write-wait analysis, and command-activity plots into text and HTML under .p4diagnostics/ for viewing in a browser. Interactive mode provides a TTY menu and starts a small local web server for output files. |