Skip to content

onResponse Stream Transform  #60

@enricodeleo

Description

@enricodeleo

Feature proposal

onResponse could support node transforms applied to the pipeline in order to allow reasonable performant, on-the-fly, transformations .

A developer can achieve such result in a reusable fashion with the following implementation:

index.js

import { parse, stringify } from 'JSONStream';

import onResponse from './lib/on-response';
import { API_HOST } from './config/env';
 
// [...]
routes: [
  {
      prefix: '/endpoint',
      target: API_HOST,
      hooks: {
        rewriteRequestHeaders(req, headers) {
          headers['accept-encoding'] = 'identity'; // Request clear (not gzipped) resources from upstream 
          return headers;
        },
        onResponse: (req, res, stream) => onResponse(req, res, stream, [
          parse('products', (product) => {
            product.id = product._id;
            return product;
          }),
          stringify(),
        ]),
      },
  },
],
// [...]

lib/on-response.js

import { pipeline } from 'stream';
import streamToArray from 'stream-to-array';

const TRANSFER_ENCODING_HEADER_NAME = 'transfer-encoding';
const CONTENT_LENGTH_HEADER_NAME = 'content-length';

/**
 * OnResponse
 * Hook executed on response received from target (http remote service)
 * @param {*} req http req object
 * @param {*} res http res object
 * @param {*} stream remote stream data
 * @param {array} transforms array of Transforms to execute on response pipeline
 */
export default async function onResponse(req, res, stream, transforms = []) {
  const chunked = stream.headers[TRANSFER_ENCODING_HEADER_NAME]
    ? stream.headers[TRANSFER_ENCODING_HEADER_NAME].endsWith('chunked')
    : false;

  if (req.headers.connection === 'close' && chunked) {
    try {
      // remove transfer-encoding header
      const transferEncoding = stream.headers[TRANSFER_ENCODING_HEADER_NAME].replace(/(,( )?)?chunked/, '');
      if (transferEncoding) {
        // header format includes many encodings, example: gzip, chunked
        res.setHeader(TRANSFER_ENCODING_HEADER_NAME, transferEncoding);
      } else {
        res.removeHeader(TRANSFER_ENCODING_HEADER_NAME);
      }

      if (!stream.headers[CONTENT_LENGTH_HEADER_NAME]) {
        // pack all pieces into 1 buffer to calculate content length
        const resBuffer = Buffer.concat(await streamToArray(stream));

        // add content-length header and send the merged response buffer
        res.setHeader(CONTENT_LENGTH_HEADER_NAME, '' + Buffer.byteLength(resBuffer));
        res.end(resBuffer);
      }
    } catch (err) {
      res.statusCode = 500;
      res.end(err.message);
    }
  } else {
    res.statusCode = stream.statusCode;
    res.removeHeader(CONTENT_LENGTH_HEADER_NAME);

    const pipelineActions = [
      stream,
      ...transforms,
      res,
      (err) => err && req.log.error(err),
    ];

    pipeline(
      ...pipelineActions,
    );
  }
}

Why not generalising this approach and make it a feature of fast-gateway?

ps I'm using the built-in pipeline here that can supersedes the 3d party pump

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions