generated from outerbounds/flowproject
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathforecastflow.py
85 lines (73 loc) · 2.47 KB
/
forecastflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from datetime import datetime, timedelta
from metaflow import (
step,
trigger_on_finish,
current,
project,
Config,
config_expr,
card,
Parameter,
pypi,
resources,
)
from metaflow.cards import Markdown, VegaChart, ProgressBar
from flowproject import BaseFlow, snowflake
@project(name=config_expr("flowconfig.project_name"))
@trigger_on_finish(flow="SensorFlow")
class ForecastFlow(BaseFlow):
window = Parameter("window", default=10, help="past time window for forecast")
qa_zipcode = Parameter(
"qa_zipcode", default="02108", help="visualize this zipcode for sanity check"
)
@snowflake
@step
def start(self):
self.timestamp = self.sensor_value
self.since = datetime.fromisoformat(self.timestamp[:-1]) - timedelta(
days=self.window
)
print("Querying data since", self.since)
self.data = self.query_snowflake(
template=("historical_temperature", [self.since])
)
self.next(self.forecast)
@card(type="blank", refresh_interval=1)
@pypi(
python="3.11",
packages={"pandas": "2.2.3", "sktime": "0.35.0", "statsmodels": "0.14.4"},
)
@resources(cpu=2, memory=8000)
@step
def forecast(self):
from weather import forecast
self.forecasts = {}
zipcode_series = forecast.make_series(self.data)
p = ProgressBar(max=len(zipcode_series), label="Forecasts done")
current.card.append(p)
for i, (zipcode, series) in enumerate(zipcode_series):
try:
past = forecast.series_to_list(series)
future = forecast.forecast(series)
except:
pass
else:
self.forecasts[zipcode] = (past, future)
p.update(i + 1)
current.card.refresh()
self.next(self.end)
@card(type="blank")
@step
def end(self):
if self.qa_zipcode in self.forecasts:
from weather import forecastviz
past, future = self.forecasts[self.qa_zipcode]
current.card.append(Markdown(f"# Forecast for {self.qa_zipcode}"))
current.card.append(
Markdown("**Blue** is historical data, **orange** is our forecast")
)
current.card.append(VegaChart(forecastviz.vegaspec(past, future)))
else:
current.card.append(Markdown(f"# {self.qa_zipcode} not found"))
if __name__ == "__main__":
ForecastFlow()