Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable reading and writing to dataset with NodeJS streams #180

Open
mtrunkat opened this issue Jul 15, 2021 · 2 comments
Open

Enable reading and writing to dataset with NodeJS streams #180

mtrunkat opened this issue Jul 15, 2021 · 2 comments

Comments

@mtrunkat
Copy link
Member

mtrunkat commented Jul 15, 2021

We have 2 main endpoints for dataset items collection enabling manipulation with items stored in the dataset

to make transformation more effective we could use NodeJS streams in the following manner:

const { ApifyClient } = require('apify-client');
const { Transform } = require('stream');

const client = new ApifyClient({
    token: 'MY-APIFY-TOKEN',
});

const sourceDatasetClient = client.dataset(SOME_DATASET_ID_1);
const targetDatasetClient = client.dataset(SOME_DATASET_ID_2);

const myTransform = new Transform({
    objectMode: true,
    transform(obj, encoding, callback) {
       delete obj.someField1;
       delete obj.someField2;

        callback(null, obj);
    }
});

const myTransformation = await datasetClient.getListItemsStream();
const itemsWriteStream = await datasetClient.getPushItemsStream();

itemsReadStream
    .pipe(myTransformation)
    .pipe(itemsWriteStream)
  • getListItemsStream should support the same parameters as the non-stream listItems method and just instead of returning object will return a stream. As response may have GBs of data the parsing must also happen in a streaming manner. To make this easier you can use format=jsonl where each line is one item serialized as JSON.
  • getPushItemsStream will have no parameters and will be simply pushing data to the dataset also in a streaming manner. The only tricky part here is our maximal payload which is 9MB so before reaching this threshold you need to close the request and initiate a new one.

Info on Apify Dataset - https://docs.apify.com/storage/dataset
Dataset API - https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items
API client dataset docs - https://docs.apify.com/apify-client-js#datasetclient

@mnmkng
Copy link
Member

mnmkng commented Jul 16, 2021

Wouldn't an iterator that works in batches be preferable instead of streams?

Two reasons: First is that processing each item as an event is not effective and there's a lot of overhead. It might be much faster to download eg 1000 items and then process them in a sync loop.

Second is that the push stream would not really be a stream, but a batch push anyway.

What do you think?

@mtrunkat
Copy link
Member Author

mtrunkat commented Jul 16, 2021

  • Yes, the processing might be CPU intensive due to a large number of callbacks.
  • But push might be actually done the streaming way, you open a POST request and start pushing data there as they are being read and transformed. The only thing is that after every 9MBs of data you need to close and reinitiate the request so some data may get buffered.
  • There is one advantage of this - if transformation or upload is slow the backpressure (if streaming gets implemented correctly) will slow down/pause the request read stream.
  • So with backpressure, it may really effectively use 100% of available reading and writing speed as long as there is enough CPU to cover it

I am not sure how much more effective this might be but it's worth an experiment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants