Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ export interface AdminPushConfigCommand extends AdminCommand {
config: Record<string, any>
}

export interface AdminGetLogsCommand extends AdminCommand {
startTime?: string
endTime?: string
maxLogs?: number
moduleName?: string
level?: string
page?: number
}

export interface ICommandHandler {
handle(command: Command): Promise<P2PCommandResponse>
verifyParamsAndRateLimits(task: Command): Promise<P2PCommandResponse>
Expand Down
59 changes: 59 additions & 0 deletions src/components/core/admin/getLogsHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { AdminCommandHandler } from './adminHandler.js'
import { AdminGetLogsCommand } from '../../../@types/commands.js'
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
import {
ValidateParams,
buildInvalidParametersResponse
} from '../../httpRoutes/validateCommands.js'
import { ReadableString } from '../../P2P/handleProtocolCommands.js'
import { readExceptionLogFiles } from '../../../utils/logging/logFiles.js'

export class GetLogsHandler extends AdminCommandHandler {
async validate(command: AdminGetLogsCommand): Promise<ValidateParams> {
return await super.validate(command)
}

async handle(task: AdminGetLogsCommand): Promise<P2PCommandResponse> {
const validation = await this.validate(task)
if (!validation.valid) {
return buildInvalidParametersResponse(validation)
}

try {
const startTime = task.startTime
? new Date(task.startTime)
: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) // Default to 7 days ago
const endTime = task.endTime ? new Date(task.endTime) : new Date() // Default to now
const maxLogs = Math.min(task.maxLogs ?? 100, 1000)
const { moduleName, level, page } = task

const logs = await this.getOceanNode()
.getDatabase()
.logs.retrieveMultipleLogs(startTime, endTime, maxLogs, moduleName, level, page)

if (!logs || logs.length === 0) {
const fileLogs = await readExceptionLogFiles(
startTime,
endTime,
maxLogs,
moduleName,
level
)
return {
status: { httpStatus: 200 },
stream: new ReadableString(JSON.stringify(fileLogs))
}
}

return {
status: { httpStatus: 200 },
stream: new ReadableString(JSON.stringify(logs))
}
} catch (error) {
return {
status: { httpStatus: 500, error: `Error retrieving logs: ${error.message}` },
stream: null
}
}
}
}
2 changes: 2 additions & 0 deletions src/components/core/handler/coreHandlersRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import { IndexingThreadHandler } from '../admin/IndexingThreadHandler.js'
import { CollectFeesHandler } from '../admin/collectFeesHandler.js'
import { FetchConfigHandler } from '../admin/fetchConfigHandler.js'
import { PushConfigHandler } from '../admin/pushConfigHandler.js'
import { GetLogsHandler } from '../admin/getLogsHandler.js'
import { AdminCommandHandler } from '../admin/adminHandler.js'
import {
GetP2PPeerHandler,
Expand Down Expand Up @@ -164,6 +165,7 @@ export class CoreHandlersRegistry {
)
this.registerCoreHandler(PROTOCOL_COMMANDS.FETCH_CONFIG, new FetchConfigHandler(node))
this.registerCoreHandler(PROTOCOL_COMMANDS.PUSH_CONFIG, new PushConfigHandler(node))
this.registerCoreHandler(PROTOCOL_COMMANDS.GET_LOGS, new GetLogsHandler(node))
this.registerCoreHandler(PROTOCOL_COMMANDS.JOBS, new GetJobsHandler(node))
}

Expand Down
91 changes: 67 additions & 24 deletions src/components/database/ElasticSearchDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -864,33 +864,49 @@ export class ElasticsearchLogDatabase extends AbstractLogDatabase {
filterConditions.bool.must.push({ match: { level } })
}

const numLogs = await this.getLogsCount()
const from = (page || 0) * Math.min(maxLogs, 250)
const size = Math.min(maxLogs, 250)
// not checking this limits will throw:
// illegal_argument_exception: Result window is too large, from + size must be less than or equal to: [10000] but was [XYZ]
if (from > 10000 || size > 10000 || size > numLogs) {
DATABASE_LOGGER.logMessageWithEmoji(
`Result window is too large, from + size must be less than or equal to: [10000]. "from": ${from}", "size": ${size}, "num": ${numLogs}`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
if (page !== undefined && page !== null) {
return await this.fetchLogPage(filterConditions, page, Math.min(maxLogs, 250))
}

const allLogs: Record<string, any>[] = []
let currentPage = 0
const pageSize = 250

while (allLogs.length < maxLogs) {
const from = currentPage * pageSize
const size = Math.min(pageSize, maxLogs - allLogs.length)

// Elasticsearch result window limit
if (from + size > 10000) {
DATABASE_LOGGER.logMessageWithEmoji(
`Reached Elasticsearch result window limit (10000). Returning ${allLogs.length} logs.`,
true,
GENERIC_EMOJIS.EMOJI_OCEAN_WAVE,
LOG_LEVELS_STR.LEVEL_INFO
)
break
}

const result = await this.client.search({
index: this.index,
body: {
query: filterConditions,
sort: [{ timestamp: { order: 'desc' } }]
},
size,
from
})

const hits = result.hits.hits.map((hit: any) =>
normalizeDocumentId(hit._source, hit._id)
)
return []
allLogs.push(...hits)

if (hits.length < size) break
currentPage++
}
const result = await this.client.search({
index: this.index,
body: {
query: filterConditions,
sort: [{ timestamp: { order: 'desc' } }]
},
size,
from
})

return result.hits.hits.map((hit: any) => {
return normalizeDocumentId(hit._source, hit._id)
})
return allLogs.slice(0, maxLogs)
} catch (error) {
const errorMsg = `Error when retrieving multiple log entries: ${error.message}`
DATABASE_LOGGER.logMessageWithEmoji(
Expand All @@ -903,6 +919,33 @@ export class ElasticsearchLogDatabase extends AbstractLogDatabase {
}
}

private async fetchLogPage(
filterConditions: any,
page: number,
size: number
): Promise<Record<string, any>[]> {
const from = page * size
if (from + size > 10000) {
DATABASE_LOGGER.logMessageWithEmoji(
`Result window is too large, from + size must be less than or equal to: [10000]. "from": ${from}, "size": ${size}`,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return []
}
const result = await this.client.search({
index: this.index,
body: {
query: filterConditions,
sort: [{ timestamp: { order: 'desc' } }]
},
size,
from
})
return result.hits.hits.map((hit: any) => normalizeDocumentId(hit._source, hit._id))
}

async delete(logId: string): Promise<void> {
if (!logId) {
throw new Error('Log ID is required for deletion.')
Expand Down
74 changes: 51 additions & 23 deletions src/components/database/TypenseDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -797,34 +797,42 @@ export class TypesenseLogDatabase extends AbstractLogDatabase {
filterConditions += ` && level:${level}`
}

const logsLimit = Math.min(maxLogs, TYPESENSE_HITS_CAP)
if (maxLogs > TYPESENSE_HITS_CAP) {
DATABASE_LOGGER.logMessageWithEmoji(
`Max logs is capped at 250 as Typesense is unable to return more results per page.`,
true,
GENERIC_EMOJIS.EMOJI_OCEAN_WAVE,
LOG_LEVELS_STR.LEVEL_INFO
if (page !== undefined && page !== null) {
return await this.fetchLogPage(
filterConditions,
page,
Math.min(maxLogs, TYPESENSE_HITS_CAP)
)
}

// Define search parameters
const searchParameters = {
q: '*',
query_by: 'message,level,meta',
filter_by: filterConditions,
sort_by: 'timestamp:desc',
per_page: logsLimit,
page: page || 1 // Default to the first page if page number is not provided
}
const allLogs: Record<string, any>[] = []
let currentPage = 1 // Typesense pages are 1-based

// Execute search query
const result = await this.provider
.collections(this.schema.name)
.documents()
.search(searchParameters)
while (allLogs.length < maxLogs) {
const perPage = Math.min(TYPESENSE_HITS_CAP, maxLogs - allLogs.length)

// Map and return the search hits as log entries
return result.hits.map((hit) => hit.document)
const searchParameters = {
q: '*',
query_by: 'message,level,meta',
filter_by: filterConditions,
sort_by: 'timestamp:desc',
per_page: perPage,
page: currentPage
}

const result = await this.provider
.collections(this.schema.name)
.documents()
.search(searchParameters)

const hits = result.hits.map((hit) => hit.document)
allLogs.push(...hits)

if (hits.length < perPage) break
currentPage++
}

return allLogs.slice(0, maxLogs)
} catch (error) {
const errorMsg = `Error when retrieving multiple log entries: ${error.message}`
DATABASE_LOGGER.logMessageWithEmoji(
Expand All @@ -837,6 +845,26 @@ export class TypesenseLogDatabase extends AbstractLogDatabase {
}
}

private async fetchLogPage(
filterConditions: string,
page: number,
perPage: number
): Promise<Record<string, any>[]> {
const searchParameters = {
q: '*',
query_by: 'message,level,meta',
filter_by: filterConditions,
sort_by: 'timestamp:desc',
per_page: perPage,
page
}
const result = await this.provider
.collections(this.schema.name)
.documents()
.search(searchParameters)
return result.hits.map((hit) => hit.document)
}

async delete(logId: string): Promise<void> {
if (!logId) {
throw new Error('Log ID is required for deletion.')
Expand Down
49 changes: 31 additions & 18 deletions src/components/httpRoutes/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import express from 'express'
import { validateAdminSignature } from '../../utils/auth.js'
import { HTTP_LOGGER } from '../../utils/logging/common.js'
import { CommonValidation } from '../../utils/validators.js'
import { GetLogsHandler } from '../core/admin/getLogsHandler.js'
import { PROTOCOL_COMMANDS } from '../../utils/constants.js'
import { streamToObject } from '../../utils/util.js'
import { Readable } from 'node:stream'

export const logRoutes = express.Router()

Expand All @@ -11,7 +15,7 @@ const validateRequest = async (
res: express.Response,
next: express.NextFunction
) => {
const { signature } = req.body
const { signature, address } = req.body
let { expiryTimestamp } = req.body

if (!signature) {
Expand All @@ -29,7 +33,8 @@ const validateRequest = async (

const isValid: CommonValidation = await validateAdminSignature(
expiryTimestamp,
signature
signature,
address
)
if (!isValid.valid) {
return res.status(403).send(`Invalid signature: ${isValid.error}`)
Expand All @@ -38,32 +43,40 @@ const validateRequest = async (
next() // Proceed to the next middleware/function if validation is successful
}

logRoutes.post('/logs', express.json(), validateRequest, async (req, res) => {
logRoutes.post('/logs', express.json(), async (req, res) => {
try {
const startTime =
typeof req.query.startTime === 'string'
? new Date(req.query.startTime)
: new Date(Date.now() - 90 * 24 * 60 * 60 * 1000) // Default to 90 days ago
const endTime =
typeof req.query.endTime === 'string' ? new Date(req.query.endTime) : new Date() // Default to now
const maxLogs =
typeof req.query.maxLogs === 'string' ? parseInt(req.query.maxLogs, 10) : 100 // default to 100 logs
const { signature, expiryTimestamp, address } = req.body

const maxLogs = Math.min(
typeof req.query.maxLogs === 'string' ? parseInt(req.query.maxLogs, 10) : 100,
1000
) // default to 100 logs, max 1000
const moduleName =
typeof req.query.moduleName === 'string' ? req.query.moduleName : undefined
const level = typeof req.query.level === 'string' ? req.query.level : undefined

const page =
typeof req.query.page === 'string' ? parseInt(req.query.page, 10) : undefined // Default to undefined, which will fetch all logs

// Retrieve logs from the database with pagination
const logs = await req.oceanNode
.getDatabase()
.logs.retrieveMultipleLogs(startTime, endTime, maxLogs, moduleName, level, page)
const response = await new GetLogsHandler(req.oceanNode).handle({
command: PROTOCOL_COMMANDS.GET_LOGS,
signature,
expiryTimestamp,
address,
startTime: req.query.startTime as string,
endTime: req.query.endTime as string,
maxLogs,
moduleName,
level,
page
})

if (logs.length > 0) {
res.json(logs)
if (response.status.httpStatus === 200) {
const result = await streamToObject(response.stream as Readable)
res.status(200).json(result)
} else {
res.status(404).send('No logs found')
HTTP_LOGGER.log('LEVEL_ERROR', `Error fetching logs: ${response.status.error}`)
res.status(response.status.httpStatus).json({ error: response.status.error })
}
} catch (error) {
HTTP_LOGGER.error(`Error retrieving logs: ${error.message}`)
Expand Down
2 changes: 2 additions & 0 deletions src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const PROTOCOL_COMMANDS = {
INVALIDATE_AUTH_TOKEN: 'invalidateAuthToken',
FETCH_CONFIG: 'fetchConfig',
PUSH_CONFIG: 'pushConfig',
GET_LOGS: 'getLogs',
JOBS: 'jobs'
}
// more visible, keep then close to make sure we always update both
Expand Down Expand Up @@ -77,6 +78,7 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [
PROTOCOL_COMMANDS.INVALIDATE_AUTH_TOKEN,
PROTOCOL_COMMANDS.FETCH_CONFIG,
PROTOCOL_COMMANDS.PUSH_CONFIG,
PROTOCOL_COMMANDS.GET_LOGS,
PROTOCOL_COMMANDS.JOBS
]

Expand Down
Loading
Loading