All files / src/app/api/admin/monitoring/stream route.ts

0% Statements 0/226
100% Branches 0/0
0% Functions 0/1
0% Lines 0/226

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
/**
 * Real-time Metrics Stream API
 *
 * Server-Sent Events endpoint for streaming dashboard metrics.
 * Provides real-time updates for SLOs, system metrics, and errors.
 */

import { NextRequest } from 'next/server';
import { calculateAllSLOs, type SLOResult } from '@/lib/observability';
import { prisma } from '@/lib/prisma';
import { logger } from '@/lib/logging';

/**
 * SSE Event types
 */
export type SSEEventType =
  | 'slo_update'
  | 'metrics_update'
  | 'error_update'
  | 'alert'
  | 'heartbeat'
  | 'connected';

export interface MetricsSummary {
  totalRequests: number;
  successRate: number;
  avgDuration: number;
  errorCount: number;
}

export interface SSEMessage {
  type: SSEEventType;
  data: unknown;
  timestamp: number;
}

/**
 * Format SSE message
 */
function formatSSE(event: SSEEventType, data: unknown): string {
  const message: SSEMessage = {
    type: event,
    data,
    timestamp: Date.now(),
  };
  return `event: ${event}\ndata: ${JSON.stringify(message)}\n\n`;
}

/**
 * Get metrics summary
 */
async function getMetricsSummary(): Promise<MetricsSummary> {
  const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000);

  try {
    const httpMetrics = await prisma.httpMetric.groupBy({
      by: ['statusCode'],
      where: { timestamp: { gte: oneHourAgo } },
      _count: true,
      _avg: { duration: true },
    });

    const totalRequests = httpMetrics.reduce((sum, m) => sum + m._count, 0);
    const successRequests = httpMetrics
      .filter((m) => m.statusCode < 400)
      .reduce((sum, m) => sum + m._count, 0);
    const avgDuration =
      httpMetrics.length > 0
        ? httpMetrics.reduce((sum, m) => sum + (m._avg.duration || 0) * m._count, 0) / totalRequests
        : 0;

    const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
    const errorCount = await prisma.errorLog.count({
      where: { createdAt: { gte: oneDayAgo } },
    });

    return {
      totalRequests,
      successRate: totalRequests > 0 ? (successRequests / totalRequests) * 100 : 100,
      avgDuration,
      errorCount,
    };
  } catch {
    return {
      totalRequests: 0,
      successRate: 100,
      avgDuration: 0,
      errorCount: 0,
    };
  }
}

/**
 * Get recent errors
 */
async function getRecentErrors() {
  const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);

  try {
    return await prisma.errorLog.findMany({
      where: { createdAt: { gte: oneDayAgo } },
      orderBy: { createdAt: 'desc' },
      take: 10,
    });
  } catch {
    return [];
  }
}

/**
 * Check for SLO alerts
 */
function checkForAlerts(slos: SLOResult[]): Array<{ sloName: string; status: string }> {
  return slos
    .filter((slo) => slo.status === 'critical' || slo.status === 'warning')
    .map((slo) => ({ sloName: slo.name, status: slo.status }));
}

/**
 * SSE Stream endpoint
 */
export async function GET(request: NextRequest) {
  const encoder = new TextEncoder();

  // Check for incident mode (more frequent updates)
  const incidentMode = request.nextUrl.searchParams.get('incident') === 'true';
  const updateInterval = incidentMode ? 2000 : 5000; // 2s for incident, 5s normal

  // Track previous state for change detection
  let previousSLOs: SLOResult[] = [];
  let previousErrorCount = 0;

  const stream = new ReadableStream({
    async start(controller) {
      // Send initial connected event
      controller.enqueue(encoder.encode(formatSSE('connected', { interval: updateInterval })));

      // Send initial data
      try {
        const [slos, metrics, errors] = await Promise.all([
          calculateAllSLOs(),
          getMetricsSummary(),
          getRecentErrors(),
        ]);

        previousSLOs = slos;
        previousErrorCount = errors.length;

        controller.enqueue(encoder.encode(formatSSE('slo_update', slos)));
        controller.enqueue(encoder.encode(formatSSE('metrics_update', metrics)));
        controller.enqueue(encoder.encode(formatSSE('error_update', errors)));

        // Check for initial alerts
        const alerts = checkForAlerts(slos);
        if (alerts.length > 0) {
          controller.enqueue(encoder.encode(formatSSE('alert', alerts)));
        }
      } catch (error) {
        logger.error('SSE initial data fetch failed', error instanceof Error ? error : new Error(String(error)), { category: 'API' });
      }

      // Set up polling interval
      const intervalId = setInterval(async () => {
        try {
          const [slos, metrics, errors] = await Promise.all([
            calculateAllSLOs(),
            getMetricsSummary(),
            getRecentErrors(),
          ]);

          // Always send SLO updates
          controller.enqueue(encoder.encode(formatSSE('slo_update', slos)));

          // Send metrics update
          controller.enqueue(encoder.encode(formatSSE('metrics_update', metrics)));

          // Check for new errors
          if (errors.length !== previousErrorCount) {
            controller.enqueue(encoder.encode(formatSSE('error_update', errors)));
            previousErrorCount = errors.length;
          }

          // Check for alerts on status changes
          const newAlerts = checkForAlerts(slos);
          const statusChanged = slos.some((slo) => {
            const prev = previousSLOs.find((p) => p.name === slo.name);
            return prev && prev.status !== slo.status;
          });

          if (statusChanged && newAlerts.length > 0) {
            controller.enqueue(encoder.encode(formatSSE('alert', newAlerts)));
          }

          previousSLOs = slos;
        } catch (error) {
          logger.error('SSE update failed', error instanceof Error ? error : new Error(String(error)), { category: 'API' });
        }
      }, updateInterval);

      // Heartbeat every 30 seconds to keep connection alive
      const heartbeatId = setInterval(() => {
        try {
          controller.enqueue(encoder.encode(formatSSE('heartbeat', { timestamp: Date.now() })));
        } catch {
          // Connection closed, cleanup will happen
        }
      }, 30000);

      // Handle client disconnect
      request.signal.addEventListener('abort', () => {
        clearInterval(intervalId);
        clearInterval(heartbeatId);
        controller.close();
      });
    },
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache, no-transform',
      Connection: 'keep-alive',
      'X-Accel-Buffering': 'no', // Disable nginx buffering
    },
  });
}