Skip to content

Commit

Permalink
Add comparisons with influxdb and postgresql and some fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
lfse-slafleur committed Dec 19, 2024
1 parent 091a227 commit 0e7bb23
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Comparison between influxdb v1, postgresql & minio+parquet
250 assets, 35136 timesteps (1 year, 15 min resolution), 1 carrier, 10 KPI's

100% CPU == 1 core

| Action | Influxdb v1 | PostgreSQL | Minio+Parquet |
|------------------------------------------------------|-------------------------|----------------------------------------------|--------------------------------------------------------|
| Query single KPI for single asset and single carrier | 0.30367 seconds | 0.3081 seconds | 0.0186 seconds (using prefix, pyarrow) |
| Query single KPI for all assets and single carrier | 63.23 seconds | 14.416 seconds (due to join on time columns) | 0.9208 seconds (using filters, pyarrow) |
| Storage usage | ~1.5GB | 661MB ~ 1GB | 667MB (polars, partitioned by asset_id and carrier_id) |
| CPU usage | Uses >100% on 2nd query | Uses 100% on 2nd query | Uses ~60% during benchmark |
| Memory usage | Uses 3.2GB on 2nd query | Uses 1,3GB on 2nd query | Uses 370MB during benchmark |


# Latency
Test between 2 VM's

# Difficulty when writing unpartitioned dataframes
- Check if dataframe can be partitioned upon adding data
- Check if we can append to existing files when simulator goes through new timesteps (e.g. Python generator across timesteps)
- So simulator does not need to keep all timesteps in memory throughout the computation.

# Can we perform all queries from Ewoud/MapEditor


# Where are we going to keep the metadata?
E.g. asset_class, asset_name etc. for an asset_id. Can always store an JSON or similar file.
Preferably something that we can retrieve with pyarrow as well.

# Hardware requirements
NVME/SSD storage (otherwise SLOOOWW)
8 changes: 5 additions & 3 deletions generate_test_dataset_parquet_single_file_per_asset_part.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import random
import uuid

import pandas
import polars

NUM_OF_ASSETS = 250
KPIs = ["HeatIn_Q1", "Heat_flow1", "PostProc_Velocity1", "HeatIn_Q2", "Heat_flow2", "PostProc_Velocity2", "HeatIn_Q3", "Heat_flow3", "PostProc_Velocity3", "HeatIn_Q4"]
Expand All @@ -20,6 +20,7 @@
df_times = []
df_kpis = {}
df_asset_ids = []
df_carrier_ids = []
for asset_i, asset_id in enumerate(asset_ids):
current_time = START_DATETIME

Expand All @@ -29,11 +30,12 @@
for kpi in KPIs:
df_kpis.setdefault(kpi, []).append(random.uniform(0, 10))
df_asset_ids.append(asset_id)
df_carrier_ids.append(CARRIER_ID)
current_time = current_time + RESOLUTION

print(f'Done {asset_i}/{len(asset_ids)}')

print('Writing out results')
df = pandas.DataFrame({'time': df_times, 'asset_id': df_asset_ids, **df_kpis})
df.to_parquet(f'single_file_per_asset_part/{esdl_id}', partition_cols=['asset_id'])
df = polars.DataFrame({'time': df_times, 'asset_id': df_asset_ids, 'carrier_id': df_carrier_ids, **df_kpis})
df.write_parquet(f'single_file_per_asset_part/{esdl_id}', partition_by=['asset_id', 'carrier_id'])
print('Done!')
7 changes: 4 additions & 3 deletions query_single_file_per_asset_part_with_carrier.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,15 @@
print(f'Pyarrow took {diff} seconds which means {per_q} second per q to retrieve one kpi for a single asset and all carriers')

start_arrow = time.time()
df = pa.parquet.read_table(f"test-parquet/3f59be60-1597-46e0-9f1f-c5aa0a466a96/", filesystem=s3, partitioning=asset_carrier_partitioning, filters=[('asset_id', 'in', asset_ids), ('carrier_id', 'in', carrier_ids[:1])], columns=['HeatIn_Q1', 'time', 'asset_id', 'carrier_id']).to_pandas()
for carrier_id in carrier_ids:
df = pa.parquet.read_table(f"test-parquet/3f59be60-1597-46e0-9f1f-c5aa0a466a96/", filesystem=s3, partitioning=asset_carrier_partitioning, filters=[('asset_id', 'in', asset_ids), ('carrier_id', 'in', carrier_ids[:1])], columns=['HeatIn_Q1', 'time', 'asset_id', 'carrier_id']).to_pandas()
end_arrow = time.time()


diff = end_arrow - start_arrow
per_q = diff / len(asset_ids)
per_q = diff / len(carrier_ids)
print(f'Pyarrow took {diff} seconds which means {per_q} second per asset to retrieve one kpi for all assets and one carrier')


start_arrow = time.time()
for asset_id in asset_ids:
pa.parquet.read_table(f"test-parquet/3f59be60-1597-46e0-9f1f-c5aa0a466a96/", filesystem=s3, partitioning=asset_carrier_partitioning, filters=[('asset_id', '=', asset_id), ('carrier_id', '=', carrier_ids[0])], columns=['HeatIn_Q1', 'time']).to_pandas()
Expand Down

0 comments on commit 0e7bb23

Please sign in to comment.