Skip to content

Commit

Permalink
Finish various testing scenarios using minio base with carrier info f…
Browse files Browse the repository at this point in the history
…or pyarrow and duckdb.
  • Loading branch information
lfse-slafleur committed Dec 18, 2024
0 parents commit 091a227
Show file tree
Hide file tree
Showing 9 changed files with 1,175 additions and 0 deletions.
15 changes: 15 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
volumes:
minio-data:

services:
minio:
image: minio/minio
ports:
- 9000:9000
- 9001:9001
command: server /data --console-address ":9001"
volumes:
- "minio-data:/data"
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: 12345678
49 changes: 49 additions & 0 deletions generate_test_dataset_parquet_influx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import datetime
import random
import uuid

import pandas

NUM_OF_ASSETS = 250
KPIs = ["HeatIn_Q1", "Heat_flow1", "PostProc_Velocity1", "HeatIn_Q2", "Heat_flow2", "PostProc_Velocity2", "HeatIn_Q3", "Heat_flow3", "PostProc_Velocity3", "HeatIn_Q4"]
START_DATETIME = datetime.datetime.fromisoformat('2020-01-01T00:00:00+00:00')
END_DATETIME = datetime.datetime.fromisoformat('2021-01-01T00:00:00+00:00')
RESOLUTION = datetime.timedelta(minutes=15)

CARRIER_ID = str(uuid.uuid4())
SIMULATION_RUN_ID = str(uuid.uuid4())

esdl_id = str(uuid.uuid4())
asset_ids = [str(uuid.uuid4()) for _ in range(0, NUM_OF_ASSETS)]


times = []
carrier_ids = []
asset_ids_ = []
asset_classes = []
asset_names = []
capabilities = []
simulation_runs = []
simulation_types = []
kpis = {}

current_time = START_DATETIME
while current_time < END_DATETIME:
for asset_i, asset_id in enumerate(asset_ids):
times.append(current_time)
carrier_ids.append(CARRIER_ID)
asset_ids_.append(asset_id)
asset_names.append(asset_id)
asset_classes.append(random.choice(['HeatingDemand', 'Pipe', 'ResidualHeatSource']))
capabilities.append(random.choice(['Consumer', 'Transport', 'Producer']))
simulation_runs.append(SIMULATION_RUN_ID)
simulation_types.append("EndScenarioSizingDiscountedStagedHIGHS")

for kpi in KPIs:
kpis.setdefault(kpi, []).append(random.uniform(0, 10))

current_time = current_time + RESOLUTION

df = pandas.DataFrame({'time': times, 'carrier_id': carrier_ids, 'asset_id': asset_ids_, 'asset_class': asset_classes, 'asset_name': asset_names, 'capabilities': capabilities, 'simulation_run': simulation_runs, 'simulation_type': simulation_types, **kpis})
print(df)
df.to_parquet('test.parquet')
39 changes: 39 additions & 0 deletions generate_test_dataset_parquet_single_file_per_asset_part.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import datetime
import random
import uuid

import pandas

NUM_OF_ASSETS = 250
KPIs = ["HeatIn_Q1", "Heat_flow1", "PostProc_Velocity1", "HeatIn_Q2", "Heat_flow2", "PostProc_Velocity2", "HeatIn_Q3", "Heat_flow3", "PostProc_Velocity3", "HeatIn_Q4"]
START_DATETIME = datetime.datetime.fromisoformat('2020-01-01T00:00:00+00:00')
END_DATETIME = datetime.datetime.fromisoformat('2021-01-01T00:00:00+00:00')
RESOLUTION = datetime.timedelta(minutes=15)

CARRIER_ID = str(uuid.uuid4())
SIMULATION_RUN_ID = str(uuid.uuid4())

esdl_id = str(uuid.uuid4())
asset_ids = [str(uuid.uuid4()) for _ in range(0, NUM_OF_ASSETS)]


df_times = []
df_kpis = {}
df_asset_ids = []
for asset_i, asset_id in enumerate(asset_ids):
current_time = START_DATETIME

while current_time < END_DATETIME:
df_times.append(current_time)

for kpi in KPIs:
df_kpis.setdefault(kpi, []).append(random.uniform(0, 10))
df_asset_ids.append(asset_id)
current_time = current_time + RESOLUTION

print(f'Done {asset_i}/{len(asset_ids)}')

print('Writing out results')
df = pandas.DataFrame({'time': df_times, 'asset_id': df_asset_ids, **df_kpis})
df.to_parquet(f'single_file_per_asset_part/{esdl_id}', partition_cols=['asset_id'])
print('Done!')
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import datetime
import random
import uuid

import polars

NUM_OF_ASSETS = 250
KPIs = ["HeatIn_Q1", "Heat_flow1", "PostProc_Velocity1", "HeatIn_Q2", "Heat_flow2", "PostProc_Velocity2", "HeatIn_Q3", "Heat_flow3", "PostProc_Velocity3", "HeatIn_Q4"]
START_DATETIME = datetime.datetime.fromisoformat('2020-01-01T00:00:00+00:00')
END_DATETIME = datetime.datetime.fromisoformat('2021-01-01T00:00:00+00:00')
RESOLUTION = datetime.timedelta(minutes=15)

CARRIER_IDS = [str(uuid.uuid4()), str(uuid.uuid4()), str(uuid.uuid4())]
SIMULATION_RUN_ID = str(uuid.uuid4())

esdl_id = str(uuid.uuid4())
asset_ids = [str(uuid.uuid4()) for _ in range(0, NUM_OF_ASSETS)]


df_times = []
df_kpis = {}
df_asset_ids = []
df_carrier_ids = []
for carrier_id in CARRIER_IDS:
for asset_i, asset_id in enumerate(asset_ids):
current_time = START_DATETIME

while current_time < END_DATETIME:
df_times.append(current_time)

for kpi in KPIs:
df_kpis.setdefault(kpi, []).append(random.uniform(0, 10))
df_asset_ids.append(asset_id)
df_carrier_ids.append(carrier_id)
current_time = current_time + RESOLUTION

print(f'Done {asset_i}/{len(asset_ids)}')

print('Writing out results')
df = polars.DataFrame({'time': df_times, 'asset_id': df_asset_ids, 'carrier_id': df_carrier_ids, **df_kpis})
df.write_parquet(f'single_file_per_asset_part_with_carrier/{esdl_id}', partition_by=['asset_id', 'carrier_id'])
print('Done!')
24 changes: 24 additions & 0 deletions query_influx_style.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import time

import duckdb

con = duckdb.connect(":memory:")
con.sql("""
CREATE SECRET secret1 (
TYPE S3,
PROVIDER CREDENTIAL_CHAIN,
CHAIN 'env;config',
REGION 'XX-XXXX-X ',
ENDPOINT 'localhost:9000',
URL_STYLE 'path',
USE_SSL false,
KEY_ID 'test',
SECRET '12345678'
);
""")

start = time.time()
print(len(con.sql('SELECT time, PostProc_Velocity1 FROM read_parquet([\'s3://test-parquet/test.parquet\']) WHERE asset_id = \'77563ead-7bba-4739-abb4-47b415655062\';').fetchall()))
end = time.time()

print(f'Took {end - start} seconds')
Loading

0 comments on commit 091a227

Please sign in to comment.