Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/index.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
# Coming soon
# What is streams-bootstrap?
16 changes: 16 additions & 0 deletions docs/docs/user/concepts/common.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Common concepts

## Application types

- App
- ConfiguredApp
- ExecutableApp

## Application lifecycle

- Running an application
- Cleaning an application

## Configuration

## Command line interface
15 changes: 15 additions & 0 deletions docs/docs/user/concepts/producer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Producer apps

Producer apps are applications that generate data and send it to a Kafka topic.
They can be used to produce messages from various sources, such as databases, files, or real-time events.

## Application lifecycle

- Running an application
- Cleaning an application

## Configuration

## Command line interface

## Deployment
36 changes: 36 additions & 0 deletions docs/docs/user/concepts/streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Streams apps

Streams apps are applications that process data in real-time as it flows through Kafka topics.
They can be used to filter, transform, aggregate, or enrich data streams.
Streams apps can also produce new messages to other topics based on the processed data.

## Application lifecycle

- Running an application
- Resetting an application
- Cleaning an application

## Configuration

- Topics
- Application id
- Kafka properties
- Lifecycle hooks
- Setup
- Clean up
- Execution options
- On start
- Application server
- State listener
- Uncaught exception handler
- Closing options

## Command line interface

## Deployment

## Kafka streams extensions

- Simple topic access
- Error handling
- Serde auto configuration
5 changes: 5 additions & 0 deletions docs/docs/user/deployment/kubernetes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Deployment to Kubernetes

- Autoscaling
- Monitoring
- Persistence
1 change: 1 addition & 0 deletions docs/docs/user/deployment/local.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Local deployment
1 change: 1 addition & 0 deletions docs/docs/user/examples/interactive-queries.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Interactive queries
1 change: 1 addition & 0 deletions docs/docs/user/examples/word-count.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Word count
1 change: 1 addition & 0 deletions docs/docs/user/extensions/large-messages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Large messages
1 change: 1 addition & 0 deletions docs/docs/user/getting-started/quick-start.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Quick start
1 change: 1 addition & 0 deletions docs/docs/user/getting-started/setup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Setup
1 change: 1 addition & 0 deletions docs/docs/user/monitoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Monitoring
1 change: 1 addition & 0 deletions docs/docs/user/testing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Testing
21 changes: 20 additions & 1 deletion docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,24 @@ plugins:
markdown_extensions:
- attr_list
nav:
- Index: index.md
- User guide:
- What is streams-bootstrap: index.md
- Changelog: user/changelog.md
- Getting Started:
- Setup: user/getting-started/setup.md
- Quick start: user/getting-started/quick-start.md
- Concepts:
- Common concepts: user/concepts/common.md
- Streams concepts: user/concepts/streams.md
- Producer concepts: user/concepts/producer.md
- Testing: user/testing.md
- Monitoring: user/monitoring.md
- Extensions:
- Large messages: user/extensions/large-messages.md
- Deployment:
- Local deployment: user/deployment/local.md
- Kubernetes: user/deployment/kubernetes.md
- Examples:
- Word count: user/examples/word-count.md
- Interactive queries: user/examples/interactive-queries.md
- Javadoc: javadoc/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
* This class provides the following configuration options:
* <ul>
* <li>{@link #bootstrapServers}</li>
* <li>{@link #outputTopic}</li>
* <li>{@link #labeledOutputTopics}</li>
* <li>{@link #schemaRegistryUrl}</li>
* <li>{@link #kafkaConfig}</li>
* </ul>
Expand Down Expand Up @@ -82,11 +80,6 @@ public abstract class KafkaApplication<R extends Runner, CR extends CleanUpRunne
// ConcurrentLinkedDeque required because calling #stop() causes asynchronous #run() calls to finish and thus
// concurrently iterating and removing from #runners
private final ConcurrentLinkedDeque<Stoppable> activeApps = new ConcurrentLinkedDeque<>();
@CommandLine.Option(names = "--output-topic", description = "Output topic")
private String outputTopic;
@CommandLine.Option(names = "--labeled-output-topics", split = ",",
description = "Additional labeled output topics")
private Map<String, String> labeledOutputTopics = emptyMap();
@CommandLine.Option(names = {"--bootstrap-servers", "--bootstrap-server"}, required = true,
description = "Kafka bootstrap servers to connect to")
private String bootstrapServers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,27 @@

package com.bakdata.kafka.producer;

import static java.util.Collections.emptyMap;

import com.bakdata.kafka.KafkaApplication;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Command;


/**
* <p>The base class for creating Kafka Producer applications.</p>
* This class provides all configuration options provided by {@link KafkaApplication}.
* This class provides the following configuration options in addition to those provided by {@link KafkaApplication}:
* <ul>
* <li>{@link #outputTopic}</li>
* <li>{@link #labeledOutputTopics}</li>
* </ul>
* To implement your Kafka Producer application inherit from this class and add your custom options. Run it by
* creating an instance of your class and calling {@link #startApplication(String[])} from your main.
*
Expand All @@ -51,6 +59,11 @@
public abstract class KafkaProducerApplication<T extends ProducerApp> extends
KafkaApplication<ProducerRunner, ProducerCleanUpRunner, ProducerExecutionOptions, ExecutableProducerApp<T>,
ConfiguredProducerApp<T>, ProducerTopicConfig, T, ProducerAppConfiguration> {
@CommandLine.Option(names = "--output-topic", description = "Output topic")
private String outputTopic;
@CommandLine.Option(names = "--labeled-output-topics", split = ",",
description = "Additional labeled output topics")
private Map<String, String> labeledOutputTopics = emptyMap();

/**
* Delete all output topics associated with the Kafka Producer application.
Expand All @@ -69,8 +82,8 @@ public final Optional<ProducerExecutionOptions> createExecutionOptions() {
@Override
public final ProducerTopicConfig createTopicConfig() {
return ProducerTopicConfig.builder()
.outputTopic(this.getOutputTopic())
.labeledOutputTopics(this.getLabeledOutputTopics())
.outputTopic(this.outputTopic)
.labeledOutputTopics(this.labeledOutputTopics)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
* <p>The base class for creating Kafka Streams applications.</p>
* This class provides the following configuration options in addition to those provided by {@link KafkaApplication}:
* <ul>
* <li>{@link #outputTopic}</li>
* <li>{@link #labeledOutputTopics}</li>
* <li>{@link #inputTopics}</li>
* <li>{@link #inputPattern}</li>
* <li>{@link #errorTopic}</li>
Expand Down Expand Up @@ -84,6 +86,11 @@ public abstract class KafkaStreamsApplication<T extends StreamsApp> extends
@CommandLine.Option(names = "--labeled-input-patterns", split = ",",
description = "Additional labeled input patterns")
private Map<String, Pattern> labeledInputPatterns = emptyMap();
@CommandLine.Option(names = "--output-topic", description = "Output topic")
private String outputTopic;
@CommandLine.Option(names = "--labeled-output-topics", split = ",",
description = "Additional labeled output topics")
private Map<String, String> labeledOutputTopics = emptyMap();
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
Expand Down Expand Up @@ -135,8 +142,8 @@ public final StreamsTopicConfig createTopicConfig() {
.labeledInputTopics(this.labeledInputTopics)
.inputPattern(this.inputPattern)
.labeledInputPatterns(this.labeledInputPatterns)
.outputTopic(this.getOutputTopic())
.labeledOutputTopics(this.getLabeledOutputTopics())
.outputTopic(this.outputTopic)
.labeledOutputTopics(this.labeledOutputTopics)
.errorTopic(this.errorTopic)
.build();
}
Expand Down
Loading