Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions managed/dynamic-configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,121 @@ The blob storage configuration is automatically injected only when `contentStore
## Learn More

For complete details, check our [official documentation](https://quix.io/docs/quix-cloud/managed-services/dynamic-configuration.html).

## Using with Quix Streams join_lookup

The Dynamic Configuration Manager works seamlessly with Quix Streams' `join_lookup` feature to enrich streaming data with configuration data in real-time.

### Basic Example

Here's how to use Dynamic Configuration with `join_lookup` to enrich sensor data with device configurations:

```python
from quixstreams import Application
from quixstreams.dataframe.joins.lookups import QuixConfigurationService

# Initialize the application
app = Application()

# Create a lookup instance pointing to your configuration topic
lookup = QuixConfigurationService(
topic=app.topic("device-configurations"),
app_config=app.config
)

# Create your main data stream
sdf = app.dataframe(app.topic("sensor-data"))

# Enrich sensor data with device configuration
sdf = sdf.join_lookup(
lookup=lookup,
on="device_id", # The field to match on
fields={
"device_name": lookup.json_field(
jsonpath="$.device.name",
type="device-config"
),
"calibration_params": lookup.json_field(
jsonpath="$.calibration",
type="device-config"
),
"firmware_version": lookup.json_field(
jsonpath="$.firmware.version",
type="device-config"
)
}
)

# Process the enriched data
sdf = sdf.apply(lambda value: {
**value,
"device_info": f"{value['device_name']} (v{value['firmware_version']})"
})

# Output to destination topic
sdf.to_topic(app.topic("enriched-sensor-data"))

if __name__ == "__main__":
app.run()
```

### Advanced Configuration Matching

You can also use custom key matching logic for more complex scenarios:

```python
def custom_key_matcher(value, key):
"""Custom logic to determine configuration key"""
device_type = value.get("device_type", "unknown")
location = value.get("location", "default")
return f"{device_type}-{location}"

# Use custom key matching
sdf = sdf.join_lookup(
lookup=lookup,
on=custom_key_matcher,
fields={
"config": lookup.json_field(
jsonpath="$",
type="location-config"
)
}
)
```

### Binary Configuration Support

For non-JSON configurations (firmware files, calibration data, etc.):

```python
sdf = sdf.join_lookup(
lookup=lookup,
on="device_id",
fields={
"firmware_binary": lookup.bytes_field(
type="firmware"
),
"calibration_data": lookup.bytes_field(
type="calibration"
)
}
)
```

### How It Works

1. **Configuration Updates**: When configurations are updated via the Dynamic Configuration API, lightweight Kafka events are published to your configuration topic.

2. **Real-time Enrichment**: The `join_lookup` feature listens to these events, fetches the latest configuration content, and caches it locally.

3. **Stream Enrichment**: As your main data stream processes records, `join_lookup` automatically enriches each record with the appropriate configuration data based on the matching key and timestamp.

4. **Version Management**: The system automatically handles configuration versioning, ensuring that each record is enriched with the configuration version that was valid at the time the record was created.

### Benefits

- **Real-time Updates**: Configuration changes are immediately available to your streaming applications
- **Large File Support**: Handle configuration files too large for direct Kafka streaming
- **Version Control**: Automatic versioning ensures data consistency
- **Performance**: Local caching minimizes API calls and latency
- **Flexibility**: Support for both JSON and binary configuration content