Skip to content

add KinesisClient documentation for k6-jslib-aws #1980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
---
title: 'KinesisClient'
description: 'KinesisClient allows interacting with AWS Kinesis streams'
weight: 00
---

# KinesisClient

{{< docs/shared source="k6" lookup="blocking-aws-blockquote.md" version="<K6_VERSION>" >}}

`KinesisClient` interacts with the AWS Kinesis service.

With it, you can perform operations such as creating streams, putting records, listing streams, and reading records from streams. For a full list of supported operations, see [Methods](#methods).

Both the dedicated `kinesis.js` jslib bundle and the all-encompassing `aws.js` bundle include the `KinesisClient`.

### Methods

| Function | Description |
| :---------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------------------------------------------------- |
| [createStream(streamName, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/createstream) | Create a new Kinesis stream |
| [deleteStream(streamName)](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/deletestream) | Delete a Kinesis stream |
| [listStreams([options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/liststreams) | List available Kinesis streams |
| [putRecords(streamName, records)](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/putrecords) | Put multiple records into a Kinesis stream |
| [getRecords(shardIterator, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/getrecords) | Get records from a Kinesis stream shard |
| [listShards(streamName, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/listshards) | List shards in a Kinesis stream |
| [getShardIterator(streamName, shardId, shardIteratorType, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/getsharditerator) | Get a shard iterator for reading records from a stream |

### Throws

KinesisClient methods will throw errors in case of failure.

| Error | Condition |
| :-------------------- | :--------------------------------------------------------- |
| InvalidSignatureError | When invalid credentials are provided. |
| KinesisServiceError | When AWS replies to the requested operation with an error. |

### Examples


<!-- md-k6:skip -->

```javascript
import { check } from 'k6';
import exec from 'k6/execution';

import {
AWSConfig,
KinesisClient,
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';

const awsConfig = new AWSConfig({
region: __ENV.AWS_REGION,
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
});

const kinesis = new KinesisClient(awsConfig);
const testStreamName = 'test-stream';

export default async function () {
// List available streams
const streams = await kinesis.listStreams();
console.log('Available streams:', streams.streamNames);

// Check if our test stream exists
if (!streams.streamNames.includes(testStreamName)) {
// Create the stream if it doesn't exist
await kinesis.createStream(testStreamName, { shardCount: 1 });
console.log(`Created stream: ${testStreamName}`);
}

// Put some records into the stream
const records = [
{
data: JSON.stringify({ message: 'Hello from k6!', timestamp: Date.now() }),
partitionKey: 'test-partition-1',
},
{
data: JSON.stringify({ message: 'Another message', timestamp: Date.now() }),
partitionKey: 'test-partition-2',
},
];

const putResult = await kinesis.putRecords(testStreamName, records);
console.log('Put records result:', putResult);

// List shards in the stream
const shards = await kinesis.listShards(testStreamName);
console.log('Stream shards:', shards.shards);

// Get a shard iterator for reading records
if (shards.shards.length > 0) {
const shardId = shards.shards[0].shardId;
const shardIterator = await kinesis.getShardIterator(testStreamName, shardId, 'TRIM_HORIZON');

// Get records from the shard
const getResult = await kinesis.getRecords(shardIterator.shardIterator);
console.log('Retrieved records:', getResult.records);
}
}
```


#### Stream management


<!-- md-k6:skip -->

```javascript
import {
AWSConfig,
KinesisClient,
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';

const awsConfig = new AWSConfig({
region: __ENV.AWS_REGION,
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
});

const kinesis = new KinesisClient(awsConfig);

export default async function () {
const streamName = 'my-test-stream';

// Create a stream with on-demand billing
await kinesis.createStream(streamName, {
streamModeDetails: {
streamMode: 'ON_DEMAND',
},
});

// List all streams
const streams = await kinesis.listStreams();
console.log('All streams:', streams.streamNames);

// Clean up - delete the stream
await kinesis.deleteStream(streamName);
console.log(`Deleted stream: ${streamName}`);
}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
---
title: 'createStream'
description: 'KinesisClient.createStream creates a new Kinesis stream'
weight: 10
---

# createStream

`KinesisClient.createStream(streamName, [options])` creates a new Kinesis stream.

### Parameters

| Parameter | Type | Description |
| :--------- | :----- | :---------------------------------------------- |
| streamName | string | The name of the Kinesis stream to create. |
| options | object | Optional configuration for the stream creation. |

#### Options

| Parameter | Type | Description |
| :--------------------------- | :----- | :-------------------------------------------------------------------- |
| shardCount | number | The number of shards for the stream (for provisioned mode). |
| streamModeDetails | object | Configuration for the stream mode. |
| streamModeDetails.streamMode | string | The billing mode for the stream. Either `PROVISIONED` or `ON_DEMAND`. |

### Returns

| Type | Description |
| :-------------- | :-------------------------------------------------------------------- |
| `Promise<void>` | A Promise that fulfills when the stream creation request is complete. |

### Example


<!-- md-k6:skip -->

```javascript
import {
AWSConfig,
KinesisClient,
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';

const awsConfig = new AWSConfig({
region: __ENV.AWS_REGION,
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
});

const kinesis = new KinesisClient(awsConfig);

export default async function () {
// Create a stream with provisioned billing and 2 shards
await kinesis.createStream('my-provisioned-stream', {
shardCount: 2,
});

// Create a stream with on-demand billing
await kinesis.createStream('my-on-demand-stream', {
streamModeDetails: {
streamMode: 'ON_DEMAND',
},
});
}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
title: 'deleteStream'
description: 'KinesisClient.deleteStream deletes a Kinesis stream'
weight: 10
---

# deleteStream

`KinesisClient.deleteStream(streamName)` deletes a Kinesis stream.

### Parameters

| Parameter | Type | Description |
| :--------- | :----- | :---------------------------------------- |
| streamName | string | The name of the Kinesis stream to delete. |

### Returns

| Type | Description |
| :-------------- | :-------------------------------------------------------------------- |
| `Promise<void>` | A Promise that fulfills when the stream deletion request is complete. |

### Example


<!-- md-k6:skip -->

```javascript
import {
AWSConfig,
KinesisClient,
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';

const awsConfig = new AWSConfig({
region: __ENV.AWS_REGION,
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
});

const kinesis = new KinesisClient(awsConfig);

export default async function () {
const streamName = 'my-test-stream';

// Delete the stream
await kinesis.deleteStream(streamName);
console.log(`Stream ${streamName} deleted`);
}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
---
title: 'getRecords'
description: 'KinesisClient.getRecords gets records from a Kinesis stream shard'
weight: 10
---

# getRecords

`KinesisClient.getRecords(shardIterator, [options])` gets records from a Kinesis stream shard using a shard iterator.

### Parameters

| Parameter | Type | Description |
| :------------ | :----- | :---------------------------------------------------- |
| shardIterator | string | The shard iterator from which to get records. |
| options | object | Optional configuration for the get records operation. |

#### Options

| Parameter | Type | Description |
| :-------- | :----- | :--------------------------------------- |
| limit | number | The maximum number of records to return. |

### Returns

| Type | Description |
| :---------------- | :------------------------------------------------- |
| `Promise<Object>` | A Promise that fulfills with the records response. |

#### Returns object

| Property | Type | Description |
| :----------------- | :------------ | :--------------------------------------------------- |
| records | Array<Object> | An array of records retrieved from the stream. |
| nextShardIterator | string | The next shard iterator to use for subsequent calls. |
| millisBehindLatest | number | The number of milliseconds behind the latest record. |

#### Record object

| Property | Type | Description |
| :-------------------------- | :----- | :----------------------------------------------- |
| sequenceNumber | string | The sequence number of the record. |
| approximateArrivalTimestamp | Date | The approximate arrival timestamp of the record. |
| data | string | The data payload of the record. |
| partitionKey | string | The partition key of the record. |

### Example


<!-- md-k6:skip -->

```javascript
import {
AWSConfig,
KinesisClient,
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';

const awsConfig = new AWSConfig({
region: __ENV.AWS_REGION,
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
});

const kinesis = new KinesisClient(awsConfig);

export default async function () {
const streamName = 'my-test-stream';

// First, get the shards for the stream
const shards = await kinesis.listShards(streamName);

if (shards.shards.length > 0) {
const shardId = shards.shards[0].shardId;

// Get a shard iterator for the first shard
const shardIteratorResponse = await kinesis.getShardIterator(
streamName,
shardId,
'TRIM_HORIZON'
);

const shardIterator = shardIteratorResponse.shardIterator;

// Get records from the shard
const recordsResponse = await kinesis.getRecords(shardIterator, { limit: 10 });

console.log('Records retrieved:', recordsResponse.records.length);
console.log('Milliseconds behind latest:', recordsResponse.millisBehindLatest);

// Process the records
recordsResponse.records.forEach((record, index) => {
console.log(`Record ${index}:`);
console.log(' Sequence number:', record.sequenceNumber);
console.log(' Partition key:', record.partitionKey);
console.log(' Data:', record.data);
console.log(' Arrival timestamp:', record.approximateArrivalTimestamp);

// Parse JSON data if applicable
try {
const jsonData = JSON.parse(record.data);
console.log(' Parsed data:', jsonData);
} catch (e) {
console.log(' Data is not JSON');
}
});

// Continue reading with the next shard iterator
if (recordsResponse.nextShardIterator) {
const nextBatch = await kinesis.getRecords(recordsResponse.nextShardIterator, { limit: 5 });
console.log('Next batch size:', nextBatch.records.length);
}
}
}
```

Loading