Skip to content
This repository was archived by the owner on May 20, 2025. It is now read-only.

Commit 2825ed6

Browse files
committed
Add common opening of datapipes
1 parent 86339e7 commit 2825ed6

File tree

1 file changed

+129
-0
lines changed

1 file changed

+129
-0
lines changed

ocf_datapipes/training/common.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from ocf_datapipes.load import OpenPVFromDB, OpenTopography, OpenGSP, OpenConfiguration, OpenNWP, OpenSatellite, OpenPVFromNetCDF
2+
import xarray as xr
3+
from torchdata.datapipes.iter import IterDataPipe
4+
from ocf_datapipes.config.model import Configuration
5+
from typing import List
6+
import logging
7+
from datetime import timedelta
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def open_and_return_datapipes(configuration_filename: str, use_gsp: bool = True, use_nwp: bool = True, use_pv: bool = True, use_sat: bool = True, use_hrv: bool = True, use_topo: bool = True) -> dict[IterDataPipe]:
13+
"""
14+
Open data sources given a configuration and return the list of datapipes
15+
16+
17+
Args:
18+
configuration_filename: Path to file to open
19+
use_nwp: Whether to use NWP data or not
20+
use_topo: Whether to use topographic data
21+
use_pv: Whether to open PV data
22+
use_hrv: Whether to open HRV satellite data
23+
use_sat: Whether to open non-HRV satellite data
24+
25+
Returns:
26+
List of datapipes corresponding to the datapipes to open
27+
"""
28+
# load configuration
29+
config_datapipe = OpenConfiguration(configuration_filename)
30+
configuration: Configuration = next(iter(config_datapipe))
31+
32+
# Check which modalities to use
33+
if use_nwp:
34+
use_nwp = True if configuration.input_data.nwp.nwp_zarr_path != "" else False
35+
if use_pv:
36+
use_pv = True if configuration.input_data.pv.pv_files_groups[0].pv_filename != "" else False
37+
if use_sat:
38+
use_sat = True if configuration.input_data.satellite.satellite_zarr_path != "" else False
39+
if use_hrv:
40+
use_hrv = (
41+
True if configuration.input_data.hrvsatellite.hrvsatellite_zarr_path != "" else False
42+
)
43+
if use_topo:
44+
use_topo = (
45+
True if configuration.input_data.topographic.topographic_filename != "" else False
46+
)
47+
if use_gsp:
48+
use_gsp = (
49+
True if configuration.input_data.gsp.gsp_zarr_path != "" else False
50+
)
51+
logger.debug(f"GSP: {use_gsp} NWP: {use_nwp} Sat: {use_sat}, HRV: {use_hrv} PV: {use_pv} Topo: {use_topo}")
52+
53+
used_datapipes = {}
54+
55+
# Load GSP national data
56+
if use_gsp:
57+
logger.debug("Opening GSP Data")
58+
gsp_datapipe = OpenGSP(
59+
gsp_pv_power_zarr_path=configuration.input_data.gsp.gsp_zarr_path
60+
).add_t0_idx_and_sample_period_duration(
61+
sample_period_duration=timedelta(minutes=30),
62+
history_duration=timedelta(minutes=configuration.input_data.gsp.history_minutes),
63+
)
64+
65+
used_datapipes["gsp"] = gsp_datapipe
66+
67+
# Load NWP data
68+
if use_nwp:
69+
logger.debug("Opening NWP Data")
70+
nwp_datapipe = (
71+
OpenNWP(configuration.input_data.nwp.nwp_zarr_path)
72+
.select_channels(configuration.input_data.nwp.nwp_channels)
73+
.add_t0_idx_and_sample_period_duration(
74+
sample_period_duration=timedelta(hours=1),
75+
history_duration=timedelta(minutes=configuration.input_data.nwp.history_minutes),
76+
)
77+
)
78+
79+
used_datapipes["nwp"] = nwp_datapipe
80+
81+
if use_sat:
82+
logger.debug("Opening Satellite Data")
83+
sat_datapipe = (
84+
OpenSatellite(configuration.input_data.satellite.satellite_zarr_path)
85+
.select_channels(configuration.input_data.satellite.satellite_channels)
86+
.add_t0_idx_and_sample_period_duration(
87+
sample_period_duration=timedelta(minutes=5),
88+
history_duration=timedelta(
89+
minutes=configuration.input_data.satellite.history_minutes
90+
),
91+
)
92+
)
93+
94+
used_datapipes["sat"] = sat_datapipe
95+
96+
if use_hrv:
97+
logger.debug("Opening HRV Satellite Data")
98+
sat_hrv_datapipe = (
99+
OpenSatellite(configuration.input_data.hrvsatellite.hrvsatellite_zarr_path)
100+
.add_t0_idx_and_sample_period_duration(
101+
sample_period_duration=timedelta(minutes=5),
102+
history_duration=timedelta(
103+
minutes=configuration.input_data.hrvsatellite.history_minutes
104+
),
105+
)
106+
)
107+
108+
used_datapipes["hrv"] = sat_hrv_datapipe
109+
110+
if use_pv:
111+
logger.debug("Opening PV")
112+
pv_datapipe = (
113+
OpenPVFromNetCDF(pv=configuration.input_data.pv)
114+
.add_t0_idx_and_sample_period_duration(
115+
sample_period_duration=timedelta(minutes=5),
116+
history_duration=timedelta(minutes=configuration.input_data.pv.history_minutes),
117+
))
118+
119+
used_datapipes["pv"] = pv_datapipe
120+
121+
if use_topo:
122+
logger.debug("Opening Topographic Data")
123+
topo_datapipe = OpenTopography(
124+
configuration.input_data.topographic.topographic_filename
125+
)
126+
127+
used_datapipes["topo"] = topo_datapipe
128+
129+
return used_datapipes

0 commit comments

Comments
 (0)