-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathframework.py
executable file
·185 lines (147 loc) · 6.72 KB
/
framework.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import re
from datetime import datetime, timedelta
from enum import Enum
from clean_api import get_request
from typing import Callable, List, Literal, Optional, TypeVar, Union
from io import StringIO
import pandas as pd
import esdl
import logging
from termcolor import colored
from os import environ
logger = logging.getLogger()
class MarketKind(Enum):
DAY_AHEAD = 1
INTRADAY = 2
AFRR = 3
class BidKind(Enum):
DEMAND = 1
SUPPLY = 2
class MarketInfo:
def __init__(self, kind: MarketKind, start_time: datetime, end_time: datetime, real_start_time: datetime):
self.kind = kind
self.start_time = start_time
self.end_time = end_time
self.real_start_time = real_start_time
# self.current_time = current_time
# self.clearing_for_time = clearing_for_time
class EnergySystemInfo:
def __init__(self, energy_system: esdl.EnergySystem):
self.energy_system = energy_system
self.profiles = {}
logger.info(colored("Retrieving profiles from DWH", "cyan"))
for profile in esdl.InfluxDBProfile.allInstances():
if profile.name is None:
logger.info(colored(f"Skipping profile: {profile.id}", "yellow"))
continue
result = get_request("readFile", { "Name": profile.name, "IncludeMetadata": True })
content = StringIO(result["Content"])
series = pd.read_csv(content, index_col="timestep")
series.index = pd.to_datetime(series.index)
series = series.squeeze()
self.profiles[profile.name] = series
def get_profile(self, name: str, market_info: MarketInfo) -> pd.Series:
start = market_info.start_time
end = market_info.end_time
return self.profiles[name][start:end]
def get_forecast(self, commodity: esdl.CommodityEnum, time: datetime) -> float:
# WARNING: dummy value
return 2
class Bid:
def __init__(self, kind: BidKind, price: float, watts: float, commodity: Literal["E", "G", "H"], timeslot: datetime, asset_shortname: str = None):
self.kind = kind
self.price = price
self.watts = watts
self.timeslot = timeslot
self.commodity = commodity
self.asset_shortname = asset_shortname
def to_api_bid(self):
bid = {
"VirtualTimeslotStart": self.timeslot.isoformat(),
"CommodityName": self.commodity,
"Quantity": (15 / 60) * self.watts / 1000 * (-1 if self.kind == BidKind.SUPPLY else 1),
"MinPrice": self.price,
"MaxPrice": self.price,
"EnergyLabel": "green"
}
if self.asset_shortname is not None:
bid["AssetShortName"] = self.asset_shortname
return bid
T = TypeVar('T')
class AssetSet:
def __init__(self, assets: List[esdl.EnergyAsset], agent_shortname: str):
self.agent = agent_shortname
self.assets = { asset.id: asset for asset in assets }
def get(self, T: type[T]) -> Union[T, List[T]]:
result = []
for asset in self.assets.values():
if isinstance(asset, T):
result.append(asset)
if len(result) == 1:
return result[0]
elif len(result) > 1:
return result
else:
logger.warning(f"Could not find asset of type {T} for agent {self.agent}; this is probably a mistake in the strategy function")
return []
ALL_STRATEGIES = {}
def strategy(id: Union[str, List[str]]):
# TODO: allow strategies for a group of assets, and assets that have multiple agents
def inner(func: Callable[[AssetSet, MarketInfo, EnergySystemInfo], Optional[List[Bid]]]):
if isinstance(id, str):
ALL_STRATEGIES[id] = func
elif isinstance(id, list):
for i in id:
ALL_STRATEGIES[i] = func
else:
raise Exception(f"Invalid data as strategy ID: {id}")
return func
return inner
def configure_logger():
logger = logging.getLogger()
# logging.basicConfig(filename="a.out")
if logger.hasHandlers():
logger.handlers.clear()
class NoUrlLib(logging.Filter):
def filter(self, record: logging.LogRecord):
return not record.module.startswith("urllib3") and not record.module.startswith("connectionpool")
class NoColorsFormatter(logging.Formatter):
regex = re.compile(r"\033\[[0-9;]+m")
base = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
def format(self, record: logging.LogRecord):
result = self.base.format(record)
return self.regex.sub('', result)
file_handler = logging.FileHandler(environ.get("LOG_DIR", "") + f"{datetime.now().date().isoformat()}.out.log", mode="a", encoding="utf-8")
file_handler.addFilter(NoUrlLib())
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(NoColorsFormatter())
std_handler = logging.StreamHandler()
std_handler.addFilter(NoUrlLib())
std_handler.setLevel(logging.DEBUG)
std_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.addHandler(file_handler)
logger.addHandler(std_handler)
logger.setLevel(logging.DEBUG)
def retrieve_plan(agent: str, commodity: Literal["E", "G", "H"], market_info: MarketInfo, asset_short_name: str, include_open_bids = False) -> pd.Series:
dates = pd.date_range(market_info.start_time, market_info.end_time, freq='15min')
data = pd.Series(0.0, index=dates)
data = data.rename("value")
data.index.name = "timestep"
data[:] = 0
param = { "AgentName": agent, "CommodityName": commodity, "VirtualStartDatetime": market_info.start_time.isoformat(), "VirtualEndDatetime": market_info.end_time.isoformat() }
intraday = get_request("readIntradayPlan", param)
dayahead = get_request("readDayAheadPlan", param)
for datum in intraday:
bid_status_valid = datum["Status"] == 1 or (include_open_bids and datum["Status"] == 0)
if bid_status_valid and "AssetShortName" in datum and datum["AssetShortName"] == asset_short_name:
timestamp = datum["TimeslotStartTime"]
amount = float(datum["PlannedQuantity"])
data[timestamp] += amount
# day_ahead_closure = market_info.real_start_time - timedelta(minutes=30)
for datum in dayahead:
bid_status_valid = datum["Status"] == 1 or (include_open_bids and datum["Status"] == 0 and market_info.kind == MarketKind.DAY_AHEAD)
if bid_status_valid and "AssetShortName" in datum and datum["AssetShortName"] == asset_short_name:
timestamp = datum["TimeslotStartTime"]
amount = float(datum["PlannedQuantity"])
data[timestamp] += amount
return data