Skip to content

Commit 36fc6c4

Browse files
committed
rebase and add tests and small fixes
1 parent e30575f commit 36fc6c4

File tree

4 files changed

+348
-2
lines changed

4 files changed

+348
-2
lines changed
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
1-
# TODO: finalize version
2-
quixstreams[aws]==3.22.0
1+
quixstreams[s3]==3.23.1
32
python-dotenv
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# timeout: 60
2+
services:
3+
minio:
4+
image: minio/minio:latest
5+
command: server /data --console-address ":9001"
6+
environment:
7+
- MINIO_ROOT_USER=minioadmin
8+
- MINIO_ROOT_PASSWORD=minioadmin
9+
networks:
10+
- test-network
11+
healthcheck:
12+
test: ["CMD", "mc", "ready", "local"]
13+
interval: 3s
14+
timeout: 5s
15+
retries: 10
16+
stop_grace_period: 3s
17+
18+
minio-init:
19+
image: minio/mc:latest
20+
depends_on:
21+
minio:
22+
condition: service_healthy
23+
entrypoint: >
24+
/bin/sh -c "
25+
mc alias set myminio http://minio:9000 minioadmin minioadmin;
26+
mc mb myminio/test-bucket --ignore-existing;
27+
echo 'MinIO bucket created';
28+
echo 'Keeping minio-init alive...';
29+
tail -f /dev/null
30+
"
31+
networks:
32+
- test-network
33+
34+
kafka:
35+
image: docker.redpanda.com/redpandadata/redpanda:v24.2.4
36+
command:
37+
- redpanda
38+
- start
39+
- --kafka-addr internal://0.0.0.0:9092
40+
- --advertise-kafka-addr internal://kafka:9092
41+
- --mode dev-container
42+
- --smp 1
43+
healthcheck:
44+
test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"]
45+
interval: 5s
46+
timeout: 10s
47+
retries: 10
48+
networks:
49+
- test-network
50+
stop_grace_period: 3s
51+
52+
s3-file-dest:
53+
build:
54+
context: ../../../python/destinations/s3-file
55+
dockerfile: Dockerfile
56+
environment:
57+
- Quix__Broker__Address=kafka:9092
58+
- Quix__Consumer__Group=s3-file-dest-test
59+
- Quix__Deployment__Id=test-s3-file-dest
60+
- input=test-s3-input
61+
- S3_BUCKET=test-bucket
62+
- S3_BUCKET_DIRECTORY=test_data
63+
- AWS_ACCESS_KEY_ID=minioadmin
64+
- AWS_SECRET_ACCESS_KEY=minioadmin
65+
- AWS_REGION_NAME=us-east-1
66+
- AWS_ENDPOINT_URL_S3=http://minio:9000
67+
- FILE_FORMAT=json
68+
networks:
69+
- test-network
70+
depends_on:
71+
minio:
72+
condition: service_healthy
73+
kafka:
74+
condition: service_healthy
75+
minio-init:
76+
condition: service_started
77+
stop_grace_period: 3s
78+
79+
test-runner:
80+
build:
81+
context: ../../framework
82+
dockerfile: Dockerfile
83+
environment:
84+
- Quix__Broker__Address=kafka:9092
85+
- TEST_INPUT_TOPIC=test-s3-input
86+
- TEST_MESSAGE_COUNT=3
87+
- MINIO_ENDPOINT=minio:9000
88+
- MINIO_ACCESS_KEY=minioadmin
89+
- MINIO_SECRET_KEY=minioadmin
90+
- S3_BUCKET=test-bucket
91+
- S3_PREFIX=test_data
92+
command: >
93+
sh -c "
94+
echo 'Installing boto3 for S3 access...' &&
95+
pip install boto3 > /dev/null 2>&1 &&
96+
echo 'Producing test messages to Kafka...' &&
97+
python /tests/produce_test_data.py &&
98+
echo 'Waiting for s3-file-dest to process messages...' &&
99+
sleep 15 &&
100+
echo 'Verifying data in S3...' &&
101+
python /tests/verify_output.py
102+
"
103+
volumes:
104+
- ./produce_test_data.py:/tests/produce_test_data.py:ro
105+
- ./verify_output.py:/tests/verify_output.py:ro
106+
working_dir: /
107+
networks:
108+
- test-network
109+
depends_on:
110+
minio:
111+
condition: service_healthy
112+
kafka:
113+
condition: service_healthy
114+
s3-file-dest:
115+
condition: service_started
116+
stop_grace_period: 3s
117+
118+
networks:
119+
test-network:
120+
driver: bridge
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import os
2+
import time
3+
import json
4+
from quixstreams import Application
5+
6+
def main():
7+
broker_address = os.getenv("Quix__Broker__Address", "kafka:9092")
8+
topic_name = os.getenv("TEST_INPUT_TOPIC", "test-s3-input")
9+
message_count = int(os.getenv("TEST_MESSAGE_COUNT", "3"))
10+
11+
print(f"Producing {message_count} test messages to topic: {topic_name}")
12+
13+
app = Application(
14+
broker_address=broker_address,
15+
producer_extra_config={
16+
"allow.auto.create.topics": "true"
17+
}
18+
)
19+
20+
topic = app.topic(topic_name)
21+
22+
with app.get_producer() as producer:
23+
for i in range(message_count):
24+
message = {
25+
"id": i,
26+
"name": f"test_item_{i}",
27+
"value": f"test_value_{i}",
28+
"timestamp": int(time.time() * 1000)
29+
}
30+
print(f"Producing message {i}: {message}")
31+
32+
serialized = json.dumps(message).encode('utf-8')
33+
34+
producer.produce(
35+
topic=topic.name,
36+
key=f"key_{i}",
37+
value=serialized
38+
)
39+
40+
producer.flush()
41+
42+
print(f"Successfully produced {message_count} messages")
43+
44+
if __name__ == "__main__":
45+
try:
46+
main()
47+
except Exception as e:
48+
print(f"Error: {e}")
49+
import traceback
50+
traceback.print_exc()
51+
exit(1)
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import boto3
2+
import os
3+
import sys
4+
import time
5+
import json
6+
7+
def main():
8+
minio_endpoint = os.getenv("MINIO_ENDPOINT", "minio:9000")
9+
access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
10+
secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin")
11+
bucket_name = os.getenv("S3_BUCKET", "test-bucket")
12+
prefix = os.getenv("S3_PREFIX", "test_data")
13+
14+
print(f"Connecting to MinIO at {minio_endpoint}")
15+
16+
# Create S3 client for MinIO
17+
s3_client = boto3.client(
18+
's3',
19+
endpoint_url=f'http://{minio_endpoint}',
20+
aws_access_key_id=access_key,
21+
aws_secret_access_key=secret_key,
22+
region_name='us-east-1'
23+
)
24+
25+
expected_count = 1 # Expecting at least 1 file
26+
max_attempts = 20
27+
found_files = []
28+
29+
print(f"Checking S3 bucket '{bucket_name}' with prefix '{prefix}' for files...")
30+
31+
# Retry logic with polling
32+
for attempt in range(max_attempts):
33+
found_files = []
34+
35+
try:
36+
# List objects in the bucket with the prefix
37+
response = s3_client.list_objects_v2(
38+
Bucket=bucket_name,
39+
Prefix=prefix
40+
)
41+
42+
if 'Contents' in response:
43+
for obj in response['Contents']:
44+
key = obj['Key']
45+
# Skip directory markers
46+
if not key.endswith('/'):
47+
found_files.append(key)
48+
print(f"Found file: {key} (size: {obj['Size']} bytes)")
49+
50+
except Exception as e:
51+
print(f"Error listing objects: {e}")
52+
53+
if len(found_files) >= expected_count:
54+
print(f"\nSuccess: Found {len(found_files)} file(s) in S3")
55+
56+
# Verify the file(s) contain valid data
57+
try:
58+
all_records = []
59+
60+
# Read all files and collect records
61+
for file_key in found_files:
62+
print(f"\nReading file: {file_key}")
63+
64+
obj_response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
65+
content = obj_response['Body'].read().decode('utf-8')
66+
67+
print(f"File size: {len(content)} bytes")
68+
print(f"File content (first 1000 chars):\n{content[:1000]}")
69+
print(f"---")
70+
71+
if file_key.endswith('.json') or file_key.endswith('.jsonl'):
72+
lines = content.strip().split('\n')
73+
print(f"File contains {len(lines)} line(s)")
74+
75+
for idx, line in enumerate(lines):
76+
if line.strip():
77+
print(f"Parsing line {idx}: {line[:100]}...")
78+
try:
79+
data = json.loads(line)
80+
all_records.append(data)
81+
print(f"✓ Successfully parsed record: {data}")
82+
except json.JSONDecodeError as je:
83+
print(f"ERROR: Invalid JSON on line {idx}: {je}")
84+
print(f"Line content: {line}")
85+
sys.exit(1)
86+
else:
87+
print(f"WARNING: Skipping non-JSON file: {file_key}")
88+
89+
print(f"\nTotal records found: {len(all_records)}")
90+
91+
# Verify we have the expected number of records
92+
expected_message_count = 3
93+
if len(all_records) < expected_message_count:
94+
print(f"ERROR: Expected {expected_message_count} records, found {len(all_records)}")
95+
sys.exit(1)
96+
97+
# Verify each record has the expected structure and values
98+
expected_metadata_fields = {'_key', '_timestamp', '_value'}
99+
expected_value_fields = {'id', 'name', 'value', 'timestamp'}
100+
found_ids = set()
101+
102+
for i, record in enumerate(all_records):
103+
print(f"\nValidating record {i}: {record}")
104+
105+
# Check for Kafka metadata fields
106+
actual_fields = set(record.keys())
107+
if not expected_metadata_fields.issubset(actual_fields):
108+
missing = expected_metadata_fields - actual_fields
109+
print(f"ERROR: Record {i} missing metadata fields: {missing}")
110+
sys.exit(1)
111+
112+
# Extract the actual message value
113+
message_value = record['_value']
114+
if not isinstance(message_value, dict):
115+
print(f"ERROR: Record {i} _value is not a dict: {type(message_value)}")
116+
sys.exit(1)
117+
118+
# Check for required fields in _value
119+
actual_value_fields = set(message_value.keys())
120+
if not expected_value_fields.issubset(actual_value_fields):
121+
missing = expected_value_fields - actual_value_fields
122+
print(f"ERROR: Record {i} _value missing fields: {missing}")
123+
sys.exit(1)
124+
125+
# Verify id is an integer
126+
if not isinstance(message_value['id'], int):
127+
print(f"ERROR: Record {i} has invalid id type: {type(message_value['id'])}")
128+
sys.exit(1)
129+
130+
found_ids.add(message_value['id'])
131+
132+
# Verify _key matches expected pattern
133+
expected_key = f"key_{message_value['id']}"
134+
if record['_key'] != expected_key:
135+
print(f"ERROR: Record {i} has incorrect _key. Expected '{expected_key}', got '{record['_key']}'")
136+
sys.exit(1)
137+
138+
# Verify name matches expected pattern
139+
expected_name = f"test_item_{message_value['id']}"
140+
if message_value['name'] != expected_name:
141+
print(f"ERROR: Record {i} has incorrect name. Expected '{expected_name}', got '{message_value['name']}'")
142+
sys.exit(1)
143+
144+
# Verify value matches expected pattern
145+
expected_value = f"test_value_{message_value['id']}"
146+
if message_value['value'] != expected_value:
147+
print(f"ERROR: Record {i} has incorrect value. Expected '{expected_value}', got '{message_value['value']}'")
148+
sys.exit(1)
149+
150+
print(f"✓ Record {i} validated: _key={record['_key']}, id={message_value['id']}, name={message_value['name']}, value={message_value['value']}")
151+
152+
# Verify we got all expected IDs (0, 1, 2)
153+
expected_ids = set(range(expected_message_count))
154+
if found_ids != expected_ids:
155+
print(f"ERROR: Missing IDs. Expected {expected_ids}, found {found_ids}")
156+
sys.exit(1)
157+
158+
print(f"\n✓ All {len(all_records)} records validated successfully")
159+
print(f"✓ All expected IDs present: {sorted(found_ids)}")
160+
161+
except Exception as e:
162+
print(f"ERROR: Failed to verify file content: {e}")
163+
import traceback
164+
traceback.print_exc()
165+
sys.exit(1)
166+
167+
sys.exit(0)
168+
169+
print(f"Attempt {attempt + 1}/{max_attempts}: Found {len(found_files)} file(s), waiting...")
170+
time.sleep(2)
171+
172+
print(f"\nFAILED: Only found {len(found_files)} file(s) after {max_attempts} attempts")
173+
sys.exit(1)
174+
175+
if __name__ == "__main__":
176+
main()

0 commit comments

Comments
 (0)