From 0e7bb23ba39adfbfff041f206079a50a24da5aa3 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Thu, 19 Dec 2024 16:36:14 +0100 Subject: [PATCH] Add comparisons with influxdb and postgresql and some fixes. --- README.md | 31 +++++++++++++++++++ ...aset_parquet_single_file_per_asset_part.py | 8 +++-- ...single_file_per_asset_part_with_carrier.py | 7 +++-- 3 files changed, 40 insertions(+), 6 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..e1d9e90 --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +# Comparison between influxdb v1, postgresql & minio+parquet +250 assets, 35136 timesteps (1 year, 15 min resolution), 1 carrier, 10 KPI's + +100% CPU == 1 core + +| Action | Influxdb v1 | PostgreSQL | Minio+Parquet | +|------------------------------------------------------|-------------------------|----------------------------------------------|--------------------------------------------------------| +| Query single KPI for single asset and single carrier | 0.30367 seconds | 0.3081 seconds | 0.0186 seconds (using prefix, pyarrow) | +| Query single KPI for all assets and single carrier | 63.23 seconds | 14.416 seconds (due to join on time columns) | 0.9208 seconds (using filters, pyarrow) | +| Storage usage | ~1.5GB | 661MB ~ 1GB | 667MB (polars, partitioned by asset_id and carrier_id) | +| CPU usage | Uses >100% on 2nd query | Uses 100% on 2nd query | Uses ~60% during benchmark | +| Memory usage | Uses 3.2GB on 2nd query | Uses 1,3GB on 2nd query | Uses 370MB during benchmark | + + +# Latency +Test between 2 VM's + +# Difficulty when writing unpartitioned dataframes +- Check if dataframe can be partitioned upon adding data +- Check if we can append to existing files when simulator goes through new timesteps (e.g. Python generator across timesteps) + - So simulator does not need to keep all timesteps in memory throughout the computation. + +# Can we perform all queries from Ewoud/MapEditor + + +# Where are we going to keep the metadata? +E.g. asset_class, asset_name etc. for an asset_id. Can always store an JSON or similar file. +Preferably something that we can retrieve with pyarrow as well. + +# Hardware requirements +NVME/SSD storage (otherwise SLOOOWW) diff --git a/generate_test_dataset_parquet_single_file_per_asset_part.py b/generate_test_dataset_parquet_single_file_per_asset_part.py index 7587aba..e779489 100644 --- a/generate_test_dataset_parquet_single_file_per_asset_part.py +++ b/generate_test_dataset_parquet_single_file_per_asset_part.py @@ -2,7 +2,7 @@ import random import uuid -import pandas +import polars NUM_OF_ASSETS = 250 KPIs = ["HeatIn_Q1", "Heat_flow1", "PostProc_Velocity1", "HeatIn_Q2", "Heat_flow2", "PostProc_Velocity2", "HeatIn_Q3", "Heat_flow3", "PostProc_Velocity3", "HeatIn_Q4"] @@ -20,6 +20,7 @@ df_times = [] df_kpis = {} df_asset_ids = [] +df_carrier_ids = [] for asset_i, asset_id in enumerate(asset_ids): current_time = START_DATETIME @@ -29,11 +30,12 @@ for kpi in KPIs: df_kpis.setdefault(kpi, []).append(random.uniform(0, 10)) df_asset_ids.append(asset_id) + df_carrier_ids.append(CARRIER_ID) current_time = current_time + RESOLUTION print(f'Done {asset_i}/{len(asset_ids)}') print('Writing out results') -df = pandas.DataFrame({'time': df_times, 'asset_id': df_asset_ids, **df_kpis}) -df.to_parquet(f'single_file_per_asset_part/{esdl_id}', partition_cols=['asset_id']) +df = polars.DataFrame({'time': df_times, 'asset_id': df_asset_ids, 'carrier_id': df_carrier_ids, **df_kpis}) +df.write_parquet(f'single_file_per_asset_part/{esdl_id}', partition_by=['asset_id', 'carrier_id']) print('Done!') diff --git a/query_single_file_per_asset_part_with_carrier.py b/query_single_file_per_asset_part_with_carrier.py index 40ab51b..e5b5670 100644 --- a/query_single_file_per_asset_part_with_carrier.py +++ b/query_single_file_per_asset_part_with_carrier.py @@ -365,14 +365,15 @@ print(f'Pyarrow took {diff} seconds which means {per_q} second per q to retrieve one kpi for a single asset and all carriers') start_arrow = time.time() -df = pa.parquet.read_table(f"test-parquet/3f59be60-1597-46e0-9f1f-c5aa0a466a96/", filesystem=s3, partitioning=asset_carrier_partitioning, filters=[('asset_id', 'in', asset_ids), ('carrier_id', 'in', carrier_ids[:1])], columns=['HeatIn_Q1', 'time', 'asset_id', 'carrier_id']).to_pandas() +for carrier_id in carrier_ids: + df = pa.parquet.read_table(f"test-parquet/3f59be60-1597-46e0-9f1f-c5aa0a466a96/", filesystem=s3, partitioning=asset_carrier_partitioning, filters=[('asset_id', 'in', asset_ids), ('carrier_id', 'in', carrier_ids[:1])], columns=['HeatIn_Q1', 'time', 'asset_id', 'carrier_id']).to_pandas() end_arrow = time.time() - diff = end_arrow - start_arrow -per_q = diff / len(asset_ids) +per_q = diff / len(carrier_ids) print(f'Pyarrow took {diff} seconds which means {per_q} second per asset to retrieve one kpi for all assets and one carrier') + start_arrow = time.time() for asset_id in asset_ids: pa.parquet.read_table(f"test-parquet/3f59be60-1597-46e0-9f1f-c5aa0a466a96/", filesystem=s3, partitioning=asset_carrier_partitioning, filters=[('asset_id', '=', asset_id), ('carrier_id', '=', carrier_ids[0])], columns=['HeatIn_Q1', 'time']).to_pandas()