1
- from ocf_datapipes .load import OpenPVFromDB , OpenTopography , OpenGSP , OpenConfiguration , OpenNWP , OpenSatellite , OpenPVFromNetCDF
1
+ import logging
2
+ from datetime import timedelta
3
+ from typing import List
4
+
2
5
import xarray as xr
3
6
from torchdata .datapipes .iter import IterDataPipe
7
+
4
8
from ocf_datapipes .config .model import Configuration
5
- from typing import List
6
- import logging
7
- from datetime import timedelta
9
+ from ocf_datapipes .load import (
10
+ OpenConfiguration ,
11
+ OpenGSP ,
12
+ OpenNWP ,
13
+ OpenPVFromDB ,
14
+ OpenPVFromNetCDF ,
15
+ OpenSatellite ,
16
+ OpenTopography ,
17
+ )
8
18
9
19
logger = logging .getLogger (__name__ )
10
20
11
21
12
- def open_and_return_datapipes (configuration_filename : str , use_gsp : bool = True , use_nwp : bool = True , use_pv : bool = True , use_sat : bool = True , use_hrv : bool = True , use_topo : bool = True ) -> dict [IterDataPipe ]:
22
+ def open_and_return_datapipes (
23
+ configuration_filename : str ,
24
+ use_gsp : bool = True ,
25
+ use_nwp : bool = True ,
26
+ use_pv : bool = True ,
27
+ use_sat : bool = True ,
28
+ use_hrv : bool = True ,
29
+ use_topo : bool = True ,
30
+ ) -> dict [IterDataPipe ]:
13
31
"""
14
32
Open data sources given a configuration and return the list of datapipes
15
33
@@ -45,10 +63,10 @@ def open_and_return_datapipes(configuration_filename: str, use_gsp: bool = True,
45
63
True if configuration .input_data .topographic .topographic_filename != "" else False
46
64
)
47
65
if use_gsp :
48
- use_gsp = (
49
- True if configuration . input_data . gsp . gsp_zarr_path != "" else False
50
- )
51
- logger . debug ( f"GSP: { use_gsp } NWP: { use_nwp } Sat: { use_sat } , HRV: { use_hrv } PV: { use_pv } Topo: { use_topo } " )
66
+ use_gsp = True if configuration . input_data . gsp . gsp_zarr_path != "" else False
67
+ logger . debug (
68
+ f"GSP: { use_gsp } NWP: { use_nwp } Sat: { use_sat } , HRV: { use_hrv } PV: { use_pv } Topo: { use_topo } "
69
+ )
52
70
53
71
used_datapipes = {}
54
72
@@ -58,9 +76,9 @@ def open_and_return_datapipes(configuration_filename: str, use_gsp: bool = True,
58
76
gsp_datapipe = OpenGSP (
59
77
gsp_pv_power_zarr_path = configuration .input_data .gsp .gsp_zarr_path
60
78
).add_t0_idx_and_sample_period_duration (
61
- sample_period_duration = timedelta (minutes = 30 ),
62
- history_duration = timedelta (minutes = configuration .input_data .gsp .history_minutes ),
63
- )
79
+ sample_period_duration = timedelta (minutes = 30 ),
80
+ history_duration = timedelta (minutes = configuration .input_data .gsp .history_minutes ),
81
+ )
64
82
65
83
used_datapipes ["gsp" ] = gsp_datapipe
66
84
@@ -95,34 +113,31 @@ def open_and_return_datapipes(configuration_filename: str, use_gsp: bool = True,
95
113
96
114
if use_hrv :
97
115
logger .debug ("Opening HRV Satellite Data" )
98
- sat_hrv_datapipe = (
99
- OpenSatellite (configuration .input_data .hrvsatellite .hrvsatellite_zarr_path )
100
- .add_t0_idx_and_sample_period_duration (
101
- sample_period_duration = timedelta (minutes = 5 ),
102
- history_duration = timedelta (
103
- minutes = configuration .input_data .hrvsatellite .history_minutes
104
- ),
105
- )
116
+ sat_hrv_datapipe = OpenSatellite (
117
+ configuration .input_data .hrvsatellite .hrvsatellite_zarr_path
118
+ ).add_t0_idx_and_sample_period_duration (
119
+ sample_period_duration = timedelta (minutes = 5 ),
120
+ history_duration = timedelta (
121
+ minutes = configuration .input_data .hrvsatellite .history_minutes
122
+ ),
106
123
)
107
124
108
125
used_datapipes ["hrv" ] = sat_hrv_datapipe
109
126
110
127
if use_pv :
111
128
logger .debug ("Opening PV" )
112
- pv_datapipe = (
113
- OpenPVFromNetCDF ( pv = configuration .input_data .pv )
114
- .add_t0_idx_and_sample_period_duration (
115
- sample_period_duration = timedelta (minutes = 5 ),
116
- history_duration = timedelta (minutes = configuration .input_data .pv .history_minutes ),
117
- ) )
129
+ pv_datapipe = OpenPVFromNetCDF (
130
+ pv = configuration .input_data .pv
131
+ ) .add_t0_idx_and_sample_period_duration (
132
+ sample_period_duration = timedelta (minutes = 5 ),
133
+ history_duration = timedelta (minutes = configuration .input_data .pv .history_minutes ),
134
+ )
118
135
119
136
used_datapipes ["pv" ] = pv_datapipe
120
137
121
138
if use_topo :
122
139
logger .debug ("Opening Topographic Data" )
123
- topo_datapipe = OpenTopography (
124
- configuration .input_data .topographic .topographic_filename
125
- )
140
+ topo_datapipe = OpenTopography (configuration .input_data .topographic .topographic_filename )
126
141
127
142
used_datapipes ["topo" ] = topo_datapipe
128
143
0 commit comments