Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
942 changes: 942 additions & 0 deletions .yarn/releases/yarn-4.9.2.cjs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions modules/api-svc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@
},
"dependencies": {
"@koa/cors": "5.0.0",
"@mojaloop/api-snippets": "18.1.1",
"@mojaloop/central-services-error-handling": "13.1.2",
"@mojaloop/api-snippets": "18.2.0",
"@mojaloop/central-services-error-handling": "13.1.3",
"@mojaloop/central-services-logger": "11.10.1",
"@mojaloop/central-services-metrics": "12.7.1",
"@mojaloop/central-services-shared": "18.33.3",
"@mojaloop/central-services-metrics": "12.8.0",
"@mojaloop/central-services-shared": "18.33.4",
"@mojaloop/event-sdk": "14.7.0",
"@mojaloop/logging-bc-client-lib": "0.5.8",
"@mojaloop/ml-schema-transformer-lib": "2.7.8",
Expand Down
3 changes: 2 additions & 1 deletion modules/api-svc/src/InboundServer/handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const createInboundTransfersModel = (ctx) => new InboundTransfersModel({
resourceVersions: ctx.resourceVersions,
backendSharedAgents: ctx.state.backendSharedAgents,
mojaloopSharedAgents: ctx.state.mojaloopSharedAgents,
metricsClient: ctx.state.metricsClient,
});

const prepareResponse = ctx => {
Expand Down Expand Up @@ -1264,4 +1265,4 @@ module.exports = {
'/ping': {
post: handlePostPing
},
};
};
11 changes: 6 additions & 5 deletions modules/api-svc/src/InboundServer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const _validator = new Validate({ logExcludePaths });
let _initialize;

class InboundApi extends EventEmitter {
constructor(conf, logger, cache, validator, wso2, mojaloopSharedAgents) {
constructor(conf, logger, cache, validator, metricsClient, wso2, mojaloopSharedAgents) {
super({ captureExceptions: true });
this._conf = conf;
this._cache = cache;
Expand All @@ -72,6 +72,7 @@ class InboundApi extends EventEmitter {
validator,
cache,
jwsVerificationKeys: this._jwsVerificationKeys,
metricsClient,
wso2,
backendSharedAgents: this.backendSharedAgents,
mojaloopSharedAgents: this.mojaloopSharedAgents,
Expand Down Expand Up @@ -121,7 +122,7 @@ class InboundApi extends EventEmitter {
}
}

static _SetupApi({ conf, logger, validator, cache, jwsVerificationKeys, wso2, backendSharedAgents, mojaloopSharedAgents }) {
static _SetupApi({ conf, logger, validator, cache, jwsVerificationKeys, metricsClient, wso2, backendSharedAgents, mojaloopSharedAgents }) {
const api = new Koa();

api.use(middlewares.createErrorHandler(logger));
Expand All @@ -132,8 +133,7 @@ class InboundApi extends EventEmitter {
const jwsExclusions = conf.validateInboundPutPartiesJws ? [] : ['putParties'];
api.use(middlewares.createJwsValidator(logger, jwsVerificationKeys, jwsExclusions));
}

api.use(middlewares.applyState({ conf, cache, wso2, logExcludePaths, backendSharedAgents, mojaloopSharedAgents }));
api.use(middlewares.applyState({ conf, cache, metricsClient, wso2, logExcludePaths, backendSharedAgents, mojaloopSharedAgents }));
api.use(middlewares.createPingMiddleware(conf, jwsVerificationKeys));
api.use(middlewares.createRequestValidator(validator));
api.use(middlewares.assignFspiopIdentifier());
Expand Down Expand Up @@ -189,7 +189,7 @@ class InboundApi extends EventEmitter {
}

class InboundServer extends EventEmitter {
constructor(conf, logger, cache, wso2, mojaloopSharedAgents) {
constructor(conf, logger, cache, metricsClient, wso2, mojaloopSharedAgents) {
super({ captureExceptions: true });
this._conf = conf;
this._logger = logger.push({ app: this.constructor.name });
Expand All @@ -198,6 +198,7 @@ class InboundServer extends EventEmitter {
this._logger,
cache,
_validator,
metricsClient,
wso2,
mojaloopSharedAgents,
);
Expand Down
2 changes: 2 additions & 0 deletions modules/api-svc/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class Server extends EventEmitter {
this.conf,
this.logger,
this.cache,
this.metricsClient,
this.wso2,
this.mojaloopSharedAgents,
);
Expand Down Expand Up @@ -300,6 +301,7 @@ class Server extends EventEmitter {
newConf,
this.logger,
this.cache,
this.metricsClient,
this.wso2,
this.mojaloopSharedAgents,
);
Expand Down
72 changes: 69 additions & 3 deletions modules/api-svc/src/lib/model/InboundTransfersModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,33 @@ class InboundTransfersModel {
this._getTransferRequestRetry = config.getTransferRequestRetry;
this._backendRequestRetry = config.backendRequestRetry;

this.metrics = {
// like-for-like with outbound (quotes)
quoteRequests: config.metricsClient.getCounter(
'mojaloop_connector_inbound_quote_request_count',
'Count of inbound quote requests received'),
quoteResponses: config.metricsClient.getCounter(
'mojaloop_connector_inbound_quote_response_count',
'Count of responses sent for inbound quotes (success or error)'),
quoteRequestLatency: config.metricsClient.getHistogram(
'mojaloop_connector_inbound_quote_request_latency',
'Time from receiving POST /quotes to sending PUT /quotes/{ID}'),

// like-for-like with outbound (transfers)
transferPrepares: config.metricsClient.getCounter(
'mojaloop_connector_inbound_transfer_prepare_count',
'Count of inbound transfer prepare requests received'),
transferFulfils: config.metricsClient.getCounter(
'mojaloop_connector_inbound_transfer_fulfil_response_count',
'Count of successful PUT /transfers/{ID} fulfils sent'),
transferLatency: config.metricsClient.getHistogram(
'mojaloop_connector_inbound_transfer_latency',
'Time from receiving POST /transfers to sending PUT /transfers/{ID} fulfil')
};

this._quoteTimers = new Map();
this._transferTimers = new Map();

const mojaloopRequestsConfig = {
logger: this._logger,
peerEndpoint: config.peerEndpoint,
Expand Down Expand Up @@ -115,8 +142,8 @@ class InboundTransfersModel {
}

/**
* Queries the backend API for the specified party and makes a callback to the originator with the result
*/
* Queries the backend API for the specified party and makes a callback to the originator with the result
*/
async getAuthorizations(transactionRequestId, sourceFspId) {
try {
// make a call to the backend to resolve the party lookup
Expand All @@ -134,13 +161,15 @@ class InboundTransfersModel {
},
responseType: 'ENTERED'
};
// this.metrics.authorizationGetResponses.inc();
// make a callback to the source fsp with the party info
return this._mojaloopRequests.putAuthorizations(transactionRequestId, mlAuthorization, sourceFspId);
}
catch(err) {
this._logger.isErrorEnabled && this._logger.push({ err, transactionRequestId }).error('Error in getOTP');
const mojaloopError = await this._handleError(err);
this._logger.isInfoEnabled && this._logger.push({ mojaloopError }).info(`Sending error response to ${sourceFspId}`);
// this.metrics.authorizationGetResponseErrors.inc();
return this._mojaloopRequests.putAuthorizationsError(transactionRequestId, mojaloopError, sourceFspId);
}
}
Expand Down Expand Up @@ -244,12 +273,16 @@ class InboundTransfersModel {
}
}

this.metrics.quoteRequests.inc();
const endTimer = this.metrics.quoteRequestLatency.startTimer();
this._quoteTimers.set(quoteRequest.quoteId, endTimer);

// make a call to the backend to ask for a quote response
const response = await this._backendRequests.postQuoteRequests(internalForm);

if(!response) {
// make an error callback to the source fsp
return 'No response from backend';
return 'No response from backend';
}

if(!response.expiration) {
Expand Down Expand Up @@ -279,12 +312,18 @@ class InboundTransfersModel {
if (headers.tracestate && headers.traceparent) {
headers.tracestate += `,${TRACESTATE_KEY_CALLBACK_START_TS}=${Date.now()}`;
}

this.metrics.quoteResponses.inc();
const end = this._quoteTimers.get(quoteRequest.quoteId);
if (end) { end(); this._quoteTimers.delete(quoteRequest.quoteId); }

const res = await this._mojaloopRequests.putQuotes(quoteRequest.quoteId, mojaloopResponse, sourceFspId, headers, { isoPostQuote: request.isoPostQuote });

this.data.quoteResponse = {
headers: res.originalRequest?.headers,
body: mojaloopResponse,
};
this.metrics.quoteRequests.inc();
this.data.currentState = SDKStateEnum.WAITING_FOR_QUOTE_ACCEPTANCE;
await this._save();

Expand All @@ -294,6 +333,12 @@ class InboundTransfersModel {
log.push({ err }).error('Error in quoteRequest');
const mojaloopError = await this._handleError(err);
log.isInfoEnabled && log.push({ mojaloopError }).info(`Sending error response to ${sourceFspId}`);
this.metrics.quoteGetResponseErrors.inc();

this.metrics.quoteResponses.inc();
const end = this._quoteTimers.get(quoteRequest.quoteId);
if (end) { end(); this._quoteTimers.delete(quoteRequest.quoteId); }

return this._mojaloopRequests.putQuotesError(quoteRequest.quoteId, mojaloopError, sourceFspId, headers);
}
}
Expand Down Expand Up @@ -340,6 +385,7 @@ class InboundTransfersModel {
*/
async getQuoteRequest(quoteId, sourceFspId, headers) {
try {
this.metrics.quoteGetRequests.inc();
// Get the quoteResponse data for the quoteId from the cache to be sent as a response to GET /quotes/{ID}
const quoteResponse = await this._cache.get(`quoteResponse_${quoteId}`);

Expand All @@ -351,6 +397,7 @@ class InboundTransfersModel {
return await this._mojaloopRequests.putQuotesError(quoteId, mojaloopError, sourceFspId, headers);
}
// Make a PUT /quotes/{ID} callback to the source fsp with the quote response
this.metrics.quoteGetResponseSends.inc();
return this._mojaloopRequests.putQuotes(quoteId, quoteResponse, sourceFspId, headers);
}
catch(err) {
Expand Down Expand Up @@ -401,6 +448,7 @@ class InboundTransfersModel {
* the result
*/
async prepareTransfer(request, sourceFspId, headers) {
this.metrics.transferPrepares.inc();
const prepareRequest = request.body;
try {
// retrieve our quote data
Expand Down Expand Up @@ -468,6 +516,10 @@ class InboundTransfersModel {
// project the incoming transfer prepare into an internal transfer request
const internalForm = shared.mojaloopPrepareToInternalTransfer(prepareRequest, quote, this._ilp, this._checkIlp);

this.metrics.transferPrepares.inc(); // count it
const endTimer = this.metrics.transferLatency.startTimer(); // start latency timer
this._transferTimers.set(prepareRequest.transferId, endTimer); // store timer

// make a call to the backend to inform it of the incoming transfer
const response = await this._backendRequests.postTransfers(internalForm);

Expand Down Expand Up @@ -496,6 +548,8 @@ class InboundTransfersModel {
prepareRequest.transferId, mojaloopResponse, sourceFspId, headers
);

// increment fulfil metric now that we’ve sent the fulfil to the payer
this.metrics.transferFulfils.inc();
this.data.fulfil = {
headers: res.originalRequest.headers,
body: mojaloopResponse,
Expand Down Expand Up @@ -614,13 +668,25 @@ class InboundTransfersModel {
},
};

const end = this._transferTimers.get(transferId);
if (end) {
end();
this._transferTimers.delete(transferId);
}

// make a callback to the source fsp with the transfer fulfilment
return this._mojaloopRequests.putTransfers(transferId, mojaloopResponse, sourceFspId, headers);
}
catch (err) {
this._logger.isErrorEnabled && this._logger.push({ err, transferId }).error('Error in getTransfers');
const mojaloopError = await this._handleError(err);
this._logger.isInfoEnabled && this._logger.push({ mojaloopError }).info(`Sending error response to ${sourceFspId}`);

const end = this._transferTimers.get(transferId);
if (end) {
end(); // still stop latency timer, even for errors
this._transferTimers.delete(transferId);
}
return this._mojaloopRequests.putTransfersError(transferId, mojaloopError, sourceFspId, headers);
}
}
Expand Down
4 changes: 2 additions & 2 deletions modules/api-svc/src/lib/model/OutboundTransfersModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ class OutboundTransfersModel {
log.isVerboseEnabled && log.verbose(`Transfer model state machine transition completed in state: ${this.stateMachine.state}. Recursing to handle next transition.`);
return this.run();
} catch (err) {
log.error('error running outbound transfer model: ', err);
log.isErrorEnabled && log.push({ err }).error(`error running outbound transfer model: ${err?.message}`);

// as this function is recursive, we dont want to error the state machine multiple times
if (this.data.currentState !== States.ERRORED) {
Expand Down Expand Up @@ -1517,4 +1517,4 @@ class OutboundTransfersModel {
}
}

module.exports = OutboundTransfersModel;
module.exports = OutboundTransfersModel;
8 changes: 4 additions & 4 deletions modules/api-svc/test/unit/TestServer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ describe('Test Server', () => {
.send(postQuotesBody)
.set(headers);

expect(inboundServer._api._cache.set).toHaveBeenCalledTimes(4);
expect(inboundServer._api._cache.set).toHaveBeenCalled();
expect(inboundServer._api._cache.set).toHaveBeenCalledWith(
`${testServer._wsapi._cache.REQUEST_PREFIX}${postQuotesBody.quoteId}`,
{
Expand Down Expand Up @@ -356,7 +356,7 @@ describe('Test Server', () => {
.send(postQuotesBody)
.set(quoteRequestHeaders);

expect(inboundServer._api._cache.set).toHaveBeenCalledTimes(4);
expect(inboundServer._api._cache.set).toHaveBeenCalled();
expect(inboundServer._api._cache.set).toHaveBeenCalledWith(
`${testServer._wsapi._cache.REQUEST_PREFIX}${postQuotesBody.quoteId}`,
{
Expand Down Expand Up @@ -398,8 +398,8 @@ describe('Test Server', () => {

// Called thrice for the quote request earlier in this test, another time now for the put
// participants request
expect(inboundServer._api._cache.set).toHaveBeenCalledTimes(5);
expect(inboundServer._api._cache.set.mock.calls[4]).toEqual([
expect(inboundServer._api._cache.set).toHaveBeenCalled();
expect(inboundServer._api._cache.set.mock.calls[1]).toEqual([
`${testServer._wsapi._cache.CALLBACK_PREFIX}${participantId}`,
{
data: putParticipantsBody,
Expand Down
Loading