Skip to content

Commit dac2173

Browse files
author
Maxwell Dylla
committed
utility application for seeding telemetry
1 parent 92b9faa commit dac2173

File tree

5 files changed

+141
-9
lines changed

5 files changed

+141
-9
lines changed

examples/table-maintenance.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ metadata:
44
name: table-maintenance
55
spec:
66
sparkImage:
7-
custom: "pulse-telemetry:latest"
7+
custom: "ghcr.io/battery-pulse/pulse-telemetry:latest"
88
productVersion: "3.5.1"
99
pullPolicy: IfNotPresent
1010
mainApplicationFile: local:////pulse-telemetry/src/pulse_telemetry/apps/table_maintenance.py

examples/telemetry-statistics.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ metadata:
44
name: telemetry-statistics
55
spec:
66
sparkImage:
7-
custom: "pulse-telemetry:latest"
7+
custom: "ghcr.io/battery-pulse/pulse-telemetry:latest"
88
productVersion: "3.5.1"
99
pullPolicy: IfNotPresent
1010
mainApplicationFile: local:////pulse-telemetry/src/pulse_telemetry/apps/telemetry_statistics.py

src/pulse_telemetry/utils/telemetry_generator.py

+53
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,56 @@ async def telemetry_generator(
187187
state["step_energy_discharged__Wh"] = 0
188188
else:
189189
continue
190+
191+
192+
if __name__ == "__main__":
193+
import os
194+
195+
from pyspark.sql import SparkSession
196+
197+
from pulse_telemetry.sparklib import iceberg, telemetry
198+
from pulse_telemetry.utils import channel
199+
200+
catalog = os.environ["PULSE_TELEMETRY_CATALOG"]
201+
database = os.environ["PULSE_TELEMETRY_DATABASE"]
202+
num_channels = int(os.environ["PULSE_TELEMETRY_NUM_CHANNELS"])
203+
timeout_seconds = float(os.environ["PULSE_TELEMETRY_TIMEOUT_SECONDS"])
204+
acquisition_frequency = int(os.environ["PULSE_TELEMETRY_ACQUISITION_FREQUENCY"])
205+
points_per_step = int(os.environ["PULSE_TELEMETRY_POINTS_PER_STEP"])
206+
207+
spark = SparkSession.builder.appName("TelemetryGenerator").getOrCreate()
208+
209+
# Create telemetry table if not exists
210+
iceberg.create_table_if_not_exists(
211+
spark=spark,
212+
catalog_name=catalog,
213+
database_name=database,
214+
table_name="telemetry",
215+
table_comment=telemetry.telemetry_comment,
216+
table_schema=telemetry.telemetry_schema,
217+
partition_columns=telemetry.telemetry_partitions,
218+
write_order_columns=telemetry.telemetry_write_order,
219+
)
220+
221+
# Runs generator and loads data into iceberg table
222+
local_buffer = channel.LocalBuffer()
223+
channel.run_with_timeout(
224+
source=telemetry_generator,
225+
sink=local_buffer,
226+
topic="telemetry",
227+
num_channels=num_channels,
228+
timeout_seconds=timeout_seconds,
229+
acquisition_frequency=acquisition_frequency,
230+
points_per_step=points_per_step,
231+
lower_voltage_limit=3, # V
232+
upper_voltage_limit=4, # V
233+
current=1.0, # A
234+
)
235+
iceberg.merge_into_table(
236+
spark=spark,
237+
source_df=local_buffer.dataframe(spark, telemetry.telemetry_schema),
238+
catalog_name=catalog,
239+
database_name=database,
240+
table_name="telemetry",
241+
match_columns=telemetry.telemetry_composite_key,
242+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
apiVersion: spark.stackable.tech/v1alpha1
2+
kind: SparkApplication
3+
metadata:
4+
name: telemetry-generator
5+
spec:
6+
sparkImage:
7+
custom: "pulse-telemetry:latest"
8+
productVersion: "3.5.1"
9+
pullPolicy: IfNotPresent
10+
mainApplicationFile: local:////pulse-telemetry/src/pulse_telemetry/utils/telemetry_generator.py
11+
env:
12+
- name: PULSE_TELEMETRY_CATALOG
13+
valueFrom:
14+
configMapKeyRef:
15+
name: telemetry-generator-config
16+
key: PULSE_TELEMETRY_CATALOG
17+
- name: PULSE_TELEMETRY_DATABASE
18+
valueFrom:
19+
configMapKeyRef:
20+
name: telemetry-generator-config
21+
key: PULSE_TELEMETRY_DATABASE
22+
- name: PULSE_TELEMETRY_NUM_CHANNELS
23+
valueFrom:
24+
configMapKeyRef:
25+
name: telemetry-generator-config
26+
key: PULSE_TELEMETRY_NUM_CHANNELS
27+
- name: PULSE_TELEMETRY_TIMEOUT_SECONDS
28+
valueFrom:
29+
configMapKeyRef:
30+
name: telemetry-generator-config
31+
key: PULSE_TELEMETRY_TIMEOUT_SECONDS
32+
- name: PULSE_TELEMETRY_ACQUISITION_FREQUENCY
33+
valueFrom:
34+
configMapKeyRef:
35+
name: telemetry-generator-config
36+
key: PULSE_TELEMETRY_ACQUISITION_FREQUENCY
37+
- name: PULSE_TELEMETRY_POINTS_PER_STEP
38+
valueFrom:
39+
configMapKeyRef:
40+
name: telemetry-generator-config
41+
key: PULSE_TELEMETRY_POINTS_PER_STEP
42+
sparkConf:
43+
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
44+
spark.sql.catalog.lakehouse: org.apache.iceberg.spark.SparkCatalog
45+
spark.sql.catalog.lakehouse.type: hive
46+
spark.sql.catalog.lakehouse.uri: thrift://hive-metastore.default.svc:9083
47+
spark.sql.catalog.lakehouse.warehouse: s3a://lakehouse/
48+
spark.sql.session.timeZone: UTC
49+
deps:
50+
packages:
51+
- org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
52+
- org.apache.hadoop:hadoop-aws:3.3.4
53+
mode: cluster
54+
s3connection:
55+
reference: minio
56+
driver:
57+
config:
58+
resources:
59+
cpu:
60+
min: "500m"
61+
max: "2"
62+
memory:
63+
limit: "1Gi"
64+
executor:
65+
replicas: 1
66+
config:
67+
resources:
68+
cpu:
69+
min: "500m"
70+
max: "2"
71+
memory:
72+
limit: "1Gi"
73+
---
74+
apiVersion: v1
75+
kind: ConfigMap
76+
metadata:
77+
name: telemetry-generator-config
78+
data:
79+
PULSE_TELEMETRY_CATALOG: "lakehouse"
80+
PULSE_TELEMETRY_DATABASE: "dev"
81+
PULSE_TELEMETRY_NUM_CHANNELS: "5"
82+
PULSE_TELEMETRY_TIMEOUT_SECONDS: "3"
83+
PULSE_TELEMETRY_ACQUISITION_FREQUENCY: "10"
84+
PULSE_TELEMETRY_POINTS_PER_STEP: "5"

tests/e2e/test_applications.py

+2-7
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,8 @@ def test_applications(kubernetes_services, spark_session, telemetry_df, statisti
2828
assert records.count() == 0, "Running the app should create tables"
2929

3030
# Running the app with data should populate tables
31-
iceberg.merge_into_table(
32-
spark=spark_session,
33-
source_df=telemetry_df,
34-
catalog_name="lakehouse",
35-
database_name="dev",
36-
table_name="telemetry",
37-
match_columns=telemetry.telemetry_composite_key,
31+
launch_spark_application(
32+
application_name="telemetry-generator", manifest_file_name="telemetry-generator.yaml", timeout_seconds="240"
3833
)
3934
launch_spark_application(
4035
application_name="telemetry-statistics", manifest_file_name="telemetry-statistics.yaml", timeout_seconds="240"

0 commit comments

Comments
 (0)