Skip to content

Commit f7fd93e

Browse files
committed
S3 loader 3.0.0
1 parent 619ce07 commit f7fd93e

File tree

6 files changed

+103
-72
lines changed

6 files changed

+103
-72
lines changed

docs/api-reference/loaders-storage-targets/s3-loader/configuration-reference/index.md

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,38 @@ This is a complete list of the options that can be configured in the S3 loader H
2121

2222
| parameter | description |
2323
|-----------|-------------|
24-
| `purpose` | Required. Use RAW to sink data exactly as-is. Use `ENRICHED_EVENTS` to also enable event latency metrics. Use `SELF_DESCRIBING` to enable partitioning self-describing data by its schema |
25-
| `input.appName` | Required. Kinesis Client Lib app name (corresponds to DynamoDB table name) |
26-
| `input.streamName` | Required. Name of the kinesis stream from which to read |
27-
| `input.position` | Required. Use `TRIM_HORIZON` to start streaming at the last untrimmed record in the shard, which is the oldest data record in the shard. Or use `LATEST` to start streaming just after the most recent record in the shard |
28-
| `input.customEndpoint` | Optional. Override the default endpoint for kinesis client api calls |
29-
| `input.maxRecords` | Required. How many records the client should pull from kinesis each time |
30-
| `output.s3.path` | Required. Full path to output data, e.g. s3://acme-snowplow-output/raw/ |
31-
| `output.s3.partitionFormat` | Optional. Added in version 2.1.0. Configures how files are partitioned into S3 directories.When loading raw files, you might choose to partition by `date={yy}-{mm}-{dd}`. When loading self describing jsons, you might choose to partition by `{vendor}.{name}/model={model}/date={yy}-{mm}-{dd}`. Valid substitutions are `{vendor}`, `{name}`, `{format}`, `{model}` for self-describing jsons; and `{yy}`, `{mm}`, `{dd}`, `{hh}` for year, month, day and hour. Defaults to `{vendor}.{schema}` when loading self-describing JSONs, or blank (no partitioning) when loading raw or enriched events |
32-
| `output.s3.filenamePrefix` | Optional. Adds a prefix to output |
33-
| `output.s3.compression` | Required. Either LZO or GZIP |
34-
| `output.s3.maxTimeout` | Required. Maximum Timeout that the application is allowed to fail for, e.g. in case of S3 outage |
35-
| `output.s3.customEndpoint` | Optional. Override the default endpoint for s3 client api calls |
36-
| `region` | Optional. When used with the `output.s3.customEndpoint` option, this sets the region of the bucket. Also sets the region of the dynamoDB table. Defaults to the current region |
37-
| `output.bad.streamName` | Required. Name of a kinesis stream to output failures |
38-
| `buffer.byteLimit` | Required. Maximum bytes to read from kinesis before flushing a file to S3 |
39-
| `buffer.recordLimit` | Required. Maximum records to read from kinesis before flushing a file to S3 |
40-
| `buffer.timeLimit` | Required. Maximum time to wait in milliseconds between writing files to S3 |
41-
| `monitoring.snowplow.collector` | Optional. E.g. `https://snplow.acme.com`. URI of a snowplow collector. Used for monitoring application lifecycle and failure events |
42-
| `monitoring.snowplow.appId` | Required only if the collector uri is also configured. Sets the appId field of the snowplow events |
43-
| `monitoring.sentry.dsn` | Optional, for tracking uncaught run time exceptions |
44-
| `monitoring.metrics.cloudwatch` | Optional boolean, with default true. This is used to disable sending metrics to cloudwatch |
45-
| `monitoring.metrics.hostname` | Optional, for sending loading metrics (latency and event counts) to a `statsd` server |
46-
| `monitoring.metrics.port` | Optional, port of the statsd server |
47-
| `monitoring.metrics.tags` | E.g.`{ "key1": "value1", "key2": "value2" }`. Tags are used to annotate the statsd metric with any contextual information |
48-
| `monitoring.metrics.prefix` | Optional, default `snoplow.s3loader`. Configures the prefix of statsd metric names |
24+
| `input.streamName` | Required. Name of the kinesis stream from which to read |
25+
| `input.appName` | Optional. Default: `snowplow-blob-loader-aws`. Kinesis Client Lib app name (corresponds to DynamoDB table name) |
26+
| `input.initialPosition.type` (since 3.0.0) | Optional. Default: `TRIM_HORIZON`. Set the initial position to consume the Kinesis stream. Possible values: `LATEST` (most recent data), `TRIM_HORIZON` (oldest available data), `AT_TIMESTAMP` (start from the record at or after the specified timestamp) |
27+
| `input.initialPosition.timestamp` (since 3.0.0) | Required for `AT_TIMESTAMP`. E.g. `2020-07-17T10:00:00Z` |
28+
| `input.retrievalMode.type` (since 3.0.0) | Optional. Default: `Polling`. Set the mode for retrieving records. Possible values: `Polling` or `FanOut` |
29+
| `input.retrievalMode.maxRecords` (since 3.0.0) | Required for `Polling`. Default: `1000`. Maximum size of a batch returned by a call to `getRecords`. Records are checkpointed after a batch has been fully processed, thus the smaller `maxRecords`, the more often records can be checkpointed into DynamoDb, but possibly reducing the throughput |
30+
| `input.workerIdentifier` (since 3.0.0) | Optional. Default: host name. Name of this KCL worker used in the DynamoDB lease table |
31+
| `input.leaseDuration` (since 3.0.0) | Optional. Default: `10 seconds`. Duration of shard leases. KCL workers must periodically refresh leases in the DynamoDB table before this duration expires |
32+
| `input.maxLeasesToStealAtOneTimeFactor` (since 3.0.0) | Optional. Default: `2.0`. Controls how to pick the max number of leases to steal at one time. E.g. If there are 4 available processors, and `maxLeasesToStealAtOneTimeFactor = 2.0`, then allow the KCL to steal up to 8 leases. Allows bigger instances to more quickly acquire the shard-leases they need to combat latency |
33+
| `input.checkpointThrottledBackoffPolicy.minBackoff` (since 3.0.0) | Optional. Default: `100 millis`. Minimum backoff before retrying when DynamoDB provisioned throughput exceeded |
34+
| `input.checkpointThrottledBackoffPolicy.maxBackoff` (since 3.0.0) | Optional. Default: `1 second`. Maximum backoff before retrying when DynamoDB provisioned throughput limit exceeded |
35+
| `input.debounceCheckpoints` (since 3.0.0) | Optional. Default: `10 seconds`. How frequently to checkpoint our progress to the DynamoDB table. By increasing this value, we can decrease the write-throughput requirements of the DynamoDB table |
36+
| `input.customEndpoint` | Optional. Override the default endpoint for kinesis client api calls |
37+
| `output.good.path` | Required. Full path to output data, e.g. `s3://acme-snowplow-output/` |
38+
| `output.good.partitionFormat` (since 2.1.0) | Optional. Configures how files are partitioned into S3 directories. When loading self describing jsons, you might choose to partition by `{vendor}.{name}/model={model}/date={yy}-{mm}-{dd}`. Valid substitutions are `{vendor}`, `{name}`, `{format}`, `{model}` for self-describing jsons; and `{yy}`, `{mm}`, `{dd}`, `{hh}` for year, month, day and hour. Defaults to `{vendor}.{schema}` when loading self-describing JSONs or blank when loading enriched events |
39+
| `output.good.filenamePrefix` | Optional. Add a prefix to files |
40+
| `output.good.compression` | Optional. Has to be `GZIP` (default) |
41+
| `output.bad.streamName` | Required. Name of a kinesis stream to output failures |
42+
| `output.bad.throttledBackoffPolicy.minBackoff` (since 3.0.0) | Optional. Default: `100 milliseconds`. Minimum backoff before retrying when writing fails with exceeded kinesis write throughput |
43+
| `output.bad.throttledBackoffPolicy.maxBackoff` (since 3.0.0) | Optional. Default: `1 second`. Maximum backoff before retrying when writing fails with exceeded kinesis write throughput |
44+
| `output.bad.recordLimit` (since 3.0.0) | Optional. Default: `500`. Maximum allowed to records we are allowed to send to Kinesis in 1 PutRecords request |
45+
| `output.bad.byteLimit` (since 3.0.0) | Optional. Default: `5242880`. Maximum allowed to bytes we are allowed to send to Kinesis in 1 PutRecords request |
46+
| `purpose` | Required. `ENRICHED_EVENTS` for enriched events or `SELF_DESCRIBING` for self-describing data |
47+
| `batching.maxBytes` (since 3.0.0) | Optional. Default: `67108864`. After this amount of compressed bytes have been added to the buffer it gets written to a file (unless `maxDelay` is reached before) |
48+
| `batching.maxDelay` (since 3.0.0) | Optional. Default: `2 minutes`. After this delay has elapsed the buffer gets written to a file (unless `maxBytes` is reached before) |
49+
| `cpuParallelismFactor` (since 3.0.0) | Optional. Default: `1`. Controls how the app splits the workload into concurrent batches which can be run in parallel, e.g. if there are 4 available processors and `cpuParallelismFactor = 0.75` then we process 3 batches concurrently. Adjusting this value can cause the app to use more or less of the available CPU |
50+
| `uploadParallelismFactor` (since 3.0.0) | Optional. Default: `2`. Controls number of upload jobs that can be run in parallel, e.g. if there are 4 available processors and `sinkParallelismFraction = 2` then we run 8 upload job concurrently. Adjusting this value can cause the app to use more or less of the available CPU |
51+
| `initialBufferSize` (since 3.0.0) | Optional. Default: none. Overrides the initial size of the byte buffer that holds the compressed events in-memory before they get written to a file. If not set, the initial size is picked dynamically based on other configuration options. The default is known to work well. Increasing this value is a way to reduce in-memory copying, but comes at the cost of increased memory usage |
52+
| `monitoring.sentry.dsn` | Optional. For tracking uncaught run time exceptions |
53+
| `monitoring.metrics.statsd.hostname` | Optional. For sending loading metrics (latency and event counts) to a `statsd` server |
54+
| `monitoring.metrics.statsd.port` | Optional. Port of the statsd server |
55+
| `monitoring.metrics.statsd.tags` | E.g.`{ "key1": "value1", "key2": "value2" }`. Tags are used to annotate the statsd metric with any contextual information |
56+
| `monitoring.metrics.statsd.prefix` | Optional. Default `snoplow.s3loader`. Configures the prefix of statsd metric names |
57+
| `monitoring.healthProbe.port` (since 3.0.0) | Optional. Default: `8080`. Port of the HTTP server that returns OK only if the app is healthy |
58+
| `monitoring.healthProbe.unhealthyLatency` (since 3.0.0) | Optional. Default: `2 minutes`. Health probe becomes unhealthy if any received event is still not fully processed before this cutoff time |

docs/api-reference/loaders-storage-targets/s3-loader/index.md

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,12 @@ import CodeBlock from '@theme/CodeBlock';
1010

1111
Snowplow S3 Loader consumes records from an [Amazon Kinesis](http://aws.amazon.com/kinesis/) stream and writes them to [S3](http://aws.amazon.com/s3/). A typical Snowplow pipeline would use the S3 loader in several places:
1212

13-
- Load collector payloads from the "raw" stream, to maintain an archive of the original data, before enrichment.
1413
- Load enriched events from the "enriched" stream. These serve as input for [the RDB loader](/docs/api-reference/loaders-storage-targets/snowplow-rdb-loader/index.md) when loading to a warehouse.
1514
- Load failed events from the "bad" stream.
1615

1716
Records that can't be successfully written to S3 are written to a [second Kinesis stream](https://github.com/snowplow/snowplow-s3-loader/blob/master/examples/config.hocon.sample#L75) with the error message.
1817

19-
## Output Formats
20-
21-
### LZO
22-
23-
Records are treated as raw byte arrays. [Elephant Bird's](https://github.com/twitter/elephant-bird/) `BinaryBlockWriter` class is used to serialize them as a [Protocol Buffers](https://github.com/google/protobuf/) array (so it is clear where one record ends and the next begins) before compressing them.
24-
25-
The compression process generates both compressed .lzo files and small .lzo.index files ([splittable LZO](https://github.com/twitter/hadoop-lzo)). Each index file contain the byte offsets of the LZO blocks in the corresponding compressed file, meaning that the blocks can be processed in parallel.
26-
27-
LZO encoding is generally used for raw data produced by Snowplow Collector.
28-
29-
### Gzip
18+
## Output format : GZIP
3019

3120
The records are treated as byte arrays containing UTF-8 encoded strings (whether CSV, JSON or TSV). New lines are used to separate records written to a file. This format can be used with the Snowplow Kinesis Enriched stream, among other streams.
3221

@@ -42,17 +31,10 @@ A Terraform module which deploys the Snowplow S3 Loader on AWS EC2 for use with
4231

4332
### Docker image
4433

45-
We publish three different flavours of the docker image.
46-
47-
- <p> Pull the <code>{`:${versions.s3Loader}`}</code> tag if you only need GZip output format </p>
48-
- <p> Pull the <code>{`:${versions.s3Loader}-lzo`}</code> tag if you also need LZO output format </p>
49-
- <p> Pull the <code>{`:${versions.s3Loader}-distroless`}</code> tag for an lightweight alternative to <code>{`:${versions.s3Loader}`}</code> </p>
34+
We publish two different flavours of the docker image:
5035

51-
<CodeBlock language="bash">{
52-
`docker pull snowplow/snowplow-s3-loader:${versions.s3Loader}
53-
docker pull snowplow/snowplow-s3-loader:${versions.s3Loader}-lzo
54-
docker pull snowplow/snowplow-s3-loader:${versions.s3Loader}-distroless
55-
`}</CodeBlock>
36+
- <p><code>{`snowplow/snowplow-s3-loader:${versions.s3Loader}`}</code></p>
37+
- <p><code>{`snowplow/snowplow-s3-loader:${versions.s3Loader}-distroless`}</code> (lightweight alternative)</p>
5638

5739
Here is a standard command to run the loader on a EC2 instance in AWS:
5840

@@ -73,15 +55,8 @@ Here is a standard command to run the loader on a EC2 instance in AWS:
7355

7456
### Jar
7557

76-
JARs can be found attached to the [Github release](https://github.com/snowplow/snowplow-s3-loader/releases). Only pick the `-lzo` version of the JAR file if you need to output in LZO format
58+
JARs can be found attached to the [Github release](https://github.com/snowplow/snowplow-s3-loader/releases).
7759

7860
<CodeBlock language="bash">{
7961
`java -jar snowplow-s3-loader-${versions.s3Loader}.jar --config config.hocon
80-
java -jar snowplow-s3-loader-lzo-${versions.s3Loader}.jar --config config.hocon
81-
`}</CodeBlock>
82-
83-
Running the jar requires to have the native LZO binaries installed. For example for Debian this can be done with:
84-
85-
```bash
86-
sudo apt-get install lzop liblzo2-dev
87-
```
62+
`}</CodeBlock>

docs/api-reference/loaders-storage-targets/s3-loader/monitoring/index.md

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@ When processing enriched events, the S3 loader can emit metrics to a statsd daem
1414

1515
```text
1616
snowplow.s3loader.count:42|c|#tag1:value1
17-
snowplow.s3loader.latency_collector_to_load:123.4|g|#tag1:value1
17+
snowplow.s3loader.latency_collector_to_load:123|g|#tag1:value1
18+
snowplow.s3loader.latency_millis:56|g|#tag1:value1
19+
snowplow.s3loader.e2e_latency_millis:123|g|#tag1:value1
1820
```
1921

2022
- `count_good`: the total number of events in the batch that was loaded.
21-
- `latency_collector_to_load`: this is the time difference between reaching the collector and getting loaded to S3.
23+
- `latency_collector_to_load`: this is the time difference between reaching the collector and getting loaded to S3 (only for enriched events).
24+
- `latency_millis`: delay between the input record getting written to the stream and S3 loader starting to process it.
25+
- `e2e_latency_millis`: same as `latency_collector_to_load` (which will get deprecated eventually).
2226

2327
Statsd monitoring is configured by setting the `monitoring.metrics.statsd` section in [the hocon file](/docs/api-reference/loaders-storage-targets/s3-loader/configuration-reference/index.md):
2428

@@ -35,6 +39,9 @@ Statsd monitoring is configured by setting the `monitoring.metrics.statsd` sec
3539
}
3640
}
3741
```
42+
## Health probe
43+
44+
Starting with `3.0.0` version S3 loader gets a health probe, configured via the `monitoring.healthProbe` section (see the configuration reference).
3845

3946
## Sentry
4047

@@ -49,16 +56,3 @@ Sentry monitoring is configured by setting the `monitoring.sentry.dsn` key in
4956
"dsn": "http://sentry.acme.com"
5057
}
5158
```
52-
53-
## Snowplow Tracking
54-
55-
The loader can emit a Snowplow event to a collector when the application experiences runtime problems. It sends [app_initialized](https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.monitoring.kinesis/app_initialized/jsonschema/1-0-0) and [app_heartbeat](https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.monitoring.kinesis/app_heartbeat/jsonschema/1-0-0) events to show the application is alive. A [storage_write_failed event](https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.monitoring.kinesis/storage_write_failed/jsonschema/1-0-0) is sent when a file cannot be written to S3, and a [app_shutdown event](https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.monitoring.kinesis/app_shutdown/jsonschema/1-0-0) is sent when the application exits due to too many S3 errors.
56-
57-
Snowplow monitoring is configured by setting the `monitoring.snowplow` section in [the hocon file](/docs/api-reference/loaders-storage-targets/s3-loader/configuration-reference/index.md):
58-
59-
```json
60-
"monitoring": {
61-
"appId": "redshift-loader"
62-
"collector": "collector.acme.com"
63-
}
64-
```

0 commit comments

Comments
 (0)