Skip to content

Commit

Permalink
Fix/paginate priority (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
rickimoore authored Feb 11, 2025
1 parent 68d74e1 commit c15a8b7
Show file tree
Hide file tree
Showing 25 changed files with 417 additions and 131 deletions.
13 changes: 13 additions & 0 deletions app/api/log-metrics/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { NextResponse } from 'next/server'
import getReqAuthToken from '../../../utilities/getReqAuthToken'
import {fetchMetrics} from '../logs'

export async function GET(req: Request) {
try {
const token = getReqAuthToken(req)
const data = await fetchMetrics(token)
return NextResponse.json(data)
} catch (error) {
return NextResponse.json({ error: 'Failed to fetch logs metrics' }, { status: 500 })
}
}
22 changes: 22 additions & 0 deletions app/api/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,25 @@ export const fetchLogMetrics = async (token: string) =>
fetchFromApi(`${backendUrl}/logs/metrics`, token)
export const dismissLogAlert = async (token: string, index: string) =>
fetchFromApi(`${backendUrl}/logs/dismiss/${index}`, token)
export const fetchMetrics = async (token: string) =>
fetchFromApi(`${backendUrl}/logs/log-metrics`, token)

export interface fetchPriorityProps {
token: string
type?: string
limit?: string
order?: string
since?: string
}

export const fetchPriorityLogs = async (props: fetchPriorityProps) => {
const { token, type, limit, order, since } = props
const params = new URLSearchParams()

if (type) params.append('type', type)
if (limit) params.append('limit', limit)
if (order) params.append('order', order)
if (since) params.append('since', since)

return await fetchFromApi(`${backendUrl}/logs/priority-logs?${params.toString()}`, token)
}
10 changes: 8 additions & 2 deletions app/api/priority-logs/route.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { NextResponse } from 'next/server'
import getReqAuthToken from '../../../utilities/getReqAuthToken'
import { fetchLogMetrics } from '../logs'
import {fetchPriorityLogs} from '../logs'

export async function GET(req: Request) {
try {
const { searchParams } = new URL(req.url)
const type = searchParams.get('type') || undefined
const limit = searchParams.get('limit') || undefined
const order = searchParams.get('order') || undefined
const since = searchParams.get('since') || undefined

const token = getReqAuthToken(req)
const data = await fetchLogMetrics(token)
const data = await fetchPriorityLogs({token, type, limit, order, since})
return NextResponse.json(data)
} catch (error) {
return NextResponse.json({ error: 'Failed to fetch priority logs' }, { status: 500 })
Expand Down
23 changes: 16 additions & 7 deletions app/dashboard/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ import useLocalStorage from '../../src/hooks/useLocalStorage'
import useNetworkMonitor from '../../src/hooks/useNetworkMonitor'
import useSWRPolling from '../../src/hooks/useSWRPolling'
import { exchangeRates, proposerDuties } from '../../src/recoil/atoms'
import { ActivityResponse, LogMetric, ProposerDuty, StatusColor } from '../../src/types'
import {
ActivityResponse,
LogData,
Metric,
ProposerDuty,
StatusColor
} from '../../src/types'
import { BeaconNodeSpecResults, SyncData } from '../../src/types/beacon'
import { Diagnostics, PeerDataResults } from '../../src/types/diagnostic'
import { ValidatorCache, ValidatorInclusionData, ValidatorInfo } from '../../src/types/validator'
Expand All @@ -35,8 +41,9 @@ export interface MainProps {
initValCaches: ValidatorCache
initInclusionRate: ValidatorInclusionData
initProposerDuties: ProposerDuty[]
initLogMetrics: LogMetric
initActivityData: ActivityResponse
initMetrics: Metric
initPriorityLogs: LogData[]
}

const Main: FC<MainProps> = (props) => {
Expand All @@ -52,8 +59,9 @@ const Main: FC<MainProps> = (props) => {
lighthouseVersion,
genesisTime,
initProposerDuties,
initLogMetrics,
initActivityData,
initMetrics,
initPriorityLogs
} = props

const { t } = useTranslation()
Expand Down Expand Up @@ -112,9 +120,9 @@ const Main: FC<MainProps> = (props) => {
networkError,
})

const { data: logMetrics } = useSWRPolling<LogMetric>('/api/priority-logs', {
const { data: metrics } = useSWRPolling<Metric>('/api/log-metrics', {
refreshInterval: slotInterval / 2,
fallbackData: initLogMetrics,
fallbackData: initMetrics,
networkError,
})

Expand All @@ -123,7 +131,7 @@ const Main: FC<MainProps> = (props) => {
const { isReady } = executionSync
const { connected } = peerData
const { natOpen } = nodeHealth
const warningCount = logMetrics.warningLogs?.length || 0
const warningCount = metrics.warningCount || 0

useEffect(() => {
setDuties((prev) => formatUniqueObjectArray([...prev, ...valDuties]))
Expand Down Expand Up @@ -252,7 +260,8 @@ const Main: FC<MainProps> = (props) => {
/>
<ValidatorTable validators={validatorStates} className='mt-8 lg:mt-2' />
<DiagnosticTable
metrics={logMetrics}
priorityLogs={initPriorityLogs}
logMetrics={metrics}
bnSpec={beaconSpec}
syncData={syncData}
beanHealth={nodeHealth}
Expand Down
8 changes: 5 additions & 3 deletions app/dashboard/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
fetchSyncData,
} from '../api/beacon'
import { fetchBeaconNodeVersion, fetchGenesisData, fetchValidatorVersion } from '../api/config'
import { fetchLogMetrics } from '../api/logs'
import {fetchMetrics, fetchPriorityLogs} from '../api/logs'
import { fetchValCaches, fetchValStates } from '../api/validator'
import Wrapper from './Wrapper'

Expand All @@ -30,8 +30,9 @@ export default async function Page() {
const bnVersion = await fetchBeaconNodeVersion(token)
const lighthouseVersion = await fetchValidatorVersion(token)
const proposerDuties = await fetchProposerDuties(token)
const logMetrics = await fetchLogMetrics(token)
const activities = await fetchActivities({ token })
const metrics = await fetchMetrics(token)
const priorityLogs = await fetchPriorityLogs({token})

return (
<Wrapper
Expand All @@ -43,11 +44,12 @@ export default async function Page() {
initSyncData={syncData}
initInclusionRate={inclusion}
initPeerData={peerData}
initLogMetrics={logMetrics}
genesisTime={genesisBlock}
lighthouseVersion={lighthouseVersion.version}
bnVersion={bnVersion.version}
beaconSpec={beaconSpec}
initMetrics={metrics}
initPriorityLogs={priorityLogs}
/>
)
} catch (e) {
Expand Down
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"@nestjs/schematics": "^10.0.0",
"@nestjs/testing": "^10.0.0",
"@types/cookie-parser": "^1.4.7",
"@types/eventsource": "^1.1.15",
"@types/express": "^4.17.17",
"@types/jest": "^29.5.2",
"@types/node": "^20.3.1",
Expand Down
10 changes: 3 additions & 7 deletions backend/src/activity/activity.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { ActivityService } from './activity.service';
import { SessionGuard } from '../session.guard';
import { Request, Response } from 'express';
import { KEEP_ALIVE_MESSAGE, SSE_HEADER } from '../../../src/constants/sse';

@Controller('activity')
@UseGuards(SessionGuard)
Expand Down Expand Up @@ -42,19 +43,14 @@ export class ActivityController {

@Get('stream')
sse(@Req() req: Request, @Res() res: Response) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
res.writeHead(200, SSE_HEADER);

res.flushHeaders();

this.activityService.addClient(res);

const heartbeatInterval = setInterval(() => {
res.write(': keep-alive\n\n');
res.write(KEEP_ALIVE_MESSAGE);
}, 10000);

req.on('close', () => {
Expand Down
20 changes: 5 additions & 15 deletions backend/src/activity/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Activity } from './entities/activity.entity';
import { ActivityType } from '../../../src/types';
import { UpdateOptions, Op } from 'sequelize';
import { Response } from 'express';
import { ClientManager } from '../utils/client-manager';

@Injectable()
export class ActivityService {
Expand All @@ -12,19 +13,18 @@ export class ActivityService {
private activityRepository: typeof Activity,
) {}

private clients: Response[] = [];
private clientManager = new ClientManager();

public addClient(client: Response) {
this.clients.push(client);
this.clientManager.addClient(client);
}

public removeClient(client: Response) {
this.clients = this.clients.filter((c) => c !== client);
this.clientManager.removeClient(client);
}

public sendMessageToClients(data: any) {
const message = `data: ${JSON.stringify(data)}\n\n`;
this.clients.forEach((client) => client.write(message));
this.clientManager.sendMessageToClients(data);
}

public async storeActivity(data: string, pubKey: string, type: ActivityType) {
Expand Down Expand Up @@ -65,16 +65,6 @@ export class ActivityService {
}
: undefined;

if (whereClause) {
return {
count: await this.activityRepository.count(),
rows: await this.activityRepository.findAll({
where: whereClause,
order: [['createdAt', orderQuery]],
}),
};
}

return this.activityRepository.findAndCountAll({
limit: queryLimit === 0 ? undefined : queryLimit,
offset: Number(offset) || 0,
Expand Down
39 changes: 37 additions & 2 deletions backend/src/logs/logs.controller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// src/logs/logs.controller.ts
import { Controller, Get, Res, Req, Param, UseGuards } from '@nestjs/common';
import {Controller, Get, Res, Req, Param, UseGuards, Query} from '@nestjs/common';
import { Request, Response } from 'express';
import { LogsService } from './logs.service';
import { SessionGuard } from '../session.guard';
import { KEEP_ALIVE_MESSAGE, SSE_HEADER } from '../../../src/constants/sse';
import {LogType} from "../../../src/types";

@Controller('logs')
@UseGuards(SessionGuard)
Expand All @@ -26,6 +27,40 @@ export class LogsController {
return this.logsService.readLogMetrics();
}

@Get('priority-logs')
getPriorityLogs(
@Query('type') type?: LogType,
@Query('limit') limit?: string,
@Query('order') order?: string,
@Query('since') since?: string,
) {
return this.logsService.paginatedPriorityLogs(type, order, since, limit)
}

@Get('log-metrics')
getMetrics() {
return this.logsService.readMetrics();
}

@Get('priority-log-stream')
sse(@Req() req: Request, @Res() res: Response) {
res.writeHead(200, SSE_HEADER);

res.flushHeaders();

this.logsService.addClient(res);

const heartbeatInterval = setInterval(() => {
res.write(KEEP_ALIVE_MESSAGE);
}, 10000);

req.on('close', () => {
clearInterval(heartbeatInterval);
this.logsService.removeClient(res);
res.end();
});
}

@Get('dismiss/:index')
dismissLogAlert(@Param('index') index: string) {
return this.logsService.dismissLog(index);
Expand Down
Loading

0 comments on commit c15a8b7

Please sign in to comment.