Extracting pg_stat_activity for cost tracking

Accurate cloud database billing requires moving beyond static instance sizing and into granular workload attribution. For PostgreSQL environments, pg_stat_activity serves as the authoritative ground truth for compute consumption, connection overhead, and query contention. Extracting this system view at scale enables precise Database Cost Attribution & Resource Quota Automation, allowing FinOps teams to map active backend processes directly to business units, tenants, or microservices. When integrated into broader Metric Extraction & Aggregation Pipelines, this data stream transforms raw connection telemetry into auditable cost allocation records.

The diagram below traces the resilient extraction flow from the async query through retry and fallback to final cost attribution.

flowchart TD
    A["Async query pg_stat_activity via asyncpg"] --> B{"Query succeeds"}
    B -->|"yes"| E["Validate required fields"]
    B -->|"no"| C{"Retries remaining"}
    C -->|"yes"| D["Wait exponential backoff"]
    D --> A
    C -->|"final attempt"| F["Fallback to sync psycopg2 query"]
    F --> E
    E --> G{"Required fields present"}
    G -->|"no"| H["Drop invalid record"]
    G -->|"yes"| I["Attribute active_seconds to cost center"]

Query Design and Extraction Strategy

Querying pg_stat_activity in production demands strict adherence to non-blocking, low-overhead patterns. Naive polling or unbounded SELECT * operations can trigger lock contention, memory spikes, and CPU thrashing, especially on heavily loaded clusters. Production implementations must filter aggressively by backend_type, exclude idle connections, and calculate duration server-side to minimize network payload and downstream parsing overhead. Following established System View Querying Patterns, the extraction query isolates active client backends, computes elapsed time, and captures wait events for resource contention analysis.

The query below is optimized for high-frequency polling and cost-weighted aggregation:

SELECT 
    pid,
    usename,
    application_name,
    client_addr,
    state,
    backend_type,
    wait_event_type,
    wait_event,
    EXTRACT(EPOCH FROM (now() - query_start)) AS active_seconds,
    current_setting('app.cost_center', true) AS cost_center_tag
FROM pg_stat_activity
WHERE backend_type = 'client backend'
  AND state != 'idle'
  AND pid != pg_backend_pid()
  AND query_start IS NOT NULL;

Key optimizations include server-side epoch extraction to avoid timezone drift during payload transit, explicit exclusion of the polling connection itself via pg_backend_pid(), and reliance on current_setting for tenant-level cost tagging without requiring schema modifications.

Production-Grade Async Extraction with Fallbacks

Python automation for this workload must handle transient network failures, connection pool exhaustion, and database failovers gracefully. The implementation below uses asyncpg for high-throughput async I/O, implements exponential backoff, validates payloads before downstream routing, and includes a synchronous psycopg2 fallback for environments where async drivers are blocked or degraded. This architecture directly supports Async Usage Parsing Workflows while maintaining strict Error Handling in Cost Pipelines.

import asyncio
import asyncpg
import psycopg2
import psycopg2.extras
import logging
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
import time
import os

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger("pg_cost_extractor")

class ActivityRecordValidator:
    """Lightweight schema validation for billing pipeline ingestion."""
    REQUIRED_FIELDS = {"pid", "usename", "application_name", "state", "active_seconds"}
    
    @classmethod
    def validate(cls, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        valid_records = []
        for record in records:
            if cls.REQUIRED_FIELDS.issubset(record.keys()):
                # Enforce numeric type for active_seconds
                try:
                    record["active_seconds"] = float(record["active_seconds"])
                    valid_records.append(record)
                except (ValueError, TypeError):
                    logger.warning("Dropping record with invalid active_seconds: pid=%s", record.get("pid"))
            else:
                logger.warning("Schema validation failed for record: %s", record)
        return valid_records

async def fetch_activity_async(pool: asyncpg.Pool) -> List[Dict[str, Any]]:
    """Execute optimized pg_stat_activity query with asyncpg."""
    query = """
    SELECT 
        pid, usename, application_name, client_addr, state, backend_type,
        wait_event_type, wait_event,
        EXTRACT(EPOCH FROM (now() - query_start)) AS active_seconds,
        current_setting('app.cost_center', true) AS cost_center_tag
    FROM pg_stat_activity
    WHERE backend_type = 'client backend'
      AND state != 'idle'
      AND pid != pg_backend_pid()
      AND query_start IS NOT NULL;
    """
    async with pool.acquire() as conn:
        rows = await conn.fetch(query)
        return [dict(row) for row in rows]

def fetch_activity_sync(dsn: str) -> List[Dict[str, Any]]:
    """Synchronous psycopg2 fallback for degraded async environments."""
    query = """
    SELECT 
        pid, usename, application_name, client_addr, state, backend_type,
        wait_event_type, wait_event,
        EXTRACT(EPOCH FROM (now() - query_start)) AS active_seconds,
        current_setting('app.cost_center', true) AS cost_center_tag
    FROM pg_stat_activity
    WHERE backend_type = 'client backend'
      AND state != 'idle'
      AND pid != pg_backend_pid()
      AND query_start IS NOT NULL;
    """
    with psycopg2.connect(dsn) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(query)
            return cur.fetchall()

async def extract_with_retry(dsn: str, max_retries: int = 3, base_delay: float = 1.0) -> List[Dict[str, Any]]:
    """Production extractor with exponential backoff and driver fallback."""
    pool = None
    for attempt in range(max_retries):
        try:
            if pool is None:
                pool = await asyncpg.create_pool(dsn, min_size=2, max_size=5, timeout=10)
            records = await fetch_activity_async(pool)
            logger.info("Async extraction successful: %d records", len(records))
            return ActivityRecordValidator.validate(records)
        except (asyncpg.exceptions.ConnectionError, asyncio.TimeoutError) as e:
            logger.warning("Async attempt %d failed: %s", attempt + 1, e)
            if attempt == max_retries - 1:
                logger.info("Falling back to synchronous psycopg2 extraction")
                sync_records = fetch_activity_sync(dsn)
                return ActivityRecordValidator.validate(sync_records)
            delay = base_delay * (2 ** attempt)
            await asyncio.sleep(delay)
    return []

async def main():
    dsn = os.getenv("PG_DSN", "postgresql://user:pass@localhost:5432/db")
    records = await extract_with_retry(dsn)
    logger.info("Validated payload ready for billing routing: %d records", len(records))
    # Downstream routing to Kafka, S3, or FinOps aggregator would occur here

if __name__ == "__main__":
    asyncio.run(main())

Pipeline Integration and Orchestration

Once extracted and validated, records require deterministic routing to downstream aggregation layers. Real-Time Metric Streaming Setup ensures sub-second latency for chargeback dashboards, while Batch Processing for Historical Metrics handles end-of-day reconciliation windows. Python Orchestration Patterns coordinate these parallel streams, ensuring idempotent writes, deduplication by pid and query_start, and schema validation compliance before records enter the billing ledger.

For teams scaling this extraction across hundreds of clusters, connection pooling and query throttling must align with PostgreSQL’s internal statistics collector behavior. The official PostgreSQL Monitoring Statistics documentation details how pg_stat_activity updates are flushed and why aggressive polling intervals below 500ms can yield diminishing returns. Similarly, leveraging asyncpg’s native prepared statement caching reduces CPU overhead during repeated execution cycles, as documented in the asyncpg official API reference.

By treating pg_stat_activity extraction as a first-class FinOps data source, platform teams can transition from reactive cost reporting to proactive quota enforcement. Automated tagging, wait-event correlation, and active-second normalization provide the telemetry foundation required for showback, chargeback, and resource right-sizing at enterprise scale.