Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merge] POC: Use SDK component graph and formula engine with reporting client #102

Draft
wants to merge 9 commits into
base: v0.x.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from frequenz.client.reporting.component_graph import ComponentGraph, Component, Connection, ComponentCategory, InverterType
import json
from frequenz.client.reporting import ReportingApiClient
from frequenz.client.reporting.sdk_reporting_bridge import list_microgrid_components_data_receiver
import asyncio
from datetime import datetime

from frequenz.client.reporting.formula_engine._formula_generators._pv_power_formula import PVPowerFormula, FormulaGeneratorConfig

from frequenz.client.common.metric import Metric
def build_graph(json_data: dict) -> ComponentGraph:
components = []
connections = []
for component in json_data["components"]:
component_id = int(component["id"])
category = ComponentCategory(component["category"])
component_type = None
if category == ComponentCategory.INVERTER and "inverter" in component and "type" in component["inverter"]:
component_type = InverterType(component["inverter"]["type"])

components.append(
Component(
component_id=component_id,
category=category,
type=component_type,
)
)
for connection in json_data["connections"]:
connections.append(
Connection(
start=int(connection["start"]),
end=int(connection["end"]),
)
)
return ComponentGraph(components, connections)


async def main():
# Read JSON data from file
with open("comps13.json", "r") as file:
data = json.load(file)

# Build component graph
component_graph = build_graph(data)

# Print component graph
print(component_graph)

key = open("key.txt", "r").read().strip()
client = ReportingApiClient(server_url="grpc://reporting.api.frequenz.com:443?ssl=true", key=key)

def get_receiver(component_id, metric_id):
microgrid_id = 13
component_ids = [component_id]
microgrid_components = [
(microgrid_id, component_ids),
]

start_dt = datetime(2024, 9, 17)
end_dt = datetime(2024, 9, 18)
resolution = 900
receiver = list_microgrid_components_data_receiver(
client,
microgrid_components=microgrid_components,
metrics=[Metric.AC_ACTIVE_POWER],
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
)
return receiver

#async for sample in get_receiver(256, 4711):
# print(sample)

formula = PVPowerFormula(
get_receiver=get_receiver,
config=FormulaGeneratorConfig(),
component_graph=component_graph,
)

engine = formula.generate()
recv = engine.new_receiver()
async for sample in recv:
print(sample)

if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"grpcio-tools >= 1.54.2, < 2",
"protobuf >= 4.25.3, < 6",
"frequenz-client-base >= 0.6.0, < 0.7.0",
"networkx",
]
dynamic = ["version"]

Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/client/reporting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
This package provides a low-level interface for interacting with the reporting API.
"""


from ._base_types import *
from ._client import ReportingApiClient

__all__ = ["ReportingApiClient"]
223 changes: 223 additions & 0 deletions src/frequenz/client/reporting/_base_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Timeseries basic types."""

import dataclasses
import functools
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Generic, Protocol, Self, TypeVar, cast, overload

from ._quantities import Power, QuantityT

UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
"""The UNIX epoch (in UTC)."""


@dataclass(frozen=True, order=True)
class Sample(Generic[QuantityT]):
"""A measurement taken at a particular point in time.

The `value` could be `None` if a component is malfunctioning or data is
lacking for another reason, but a sample still needs to be sent to have a
coherent view on a group of component metrics for a particular timestamp.
"""

timestamp: datetime
"""The time when this sample was generated."""

value: QuantityT | None = None
"""The value of this sample."""


@dataclass(frozen=True)
class Sample3Phase(Generic[QuantityT]):
"""A 3-phase measurement made at a particular point in time.

Each of the `value` fields could be `None` if a component is malfunctioning
or data is lacking for another reason, but a sample still needs to be sent
to have a coherent view on a group of component metrics for a particular
timestamp.
"""

timestamp: datetime
"""The time when this sample was generated."""
value_p1: QuantityT | None
"""The value of the 1st phase in this sample."""

value_p2: QuantityT | None
"""The value of the 2nd phase in this sample."""

value_p3: QuantityT | None
"""The value of the 3rd phase in this sample."""

def __iter__(self) -> Iterator[QuantityT | None]:
"""Return an iterator that yields values from each of the phases.

Yields:
Per-phase measurements one-by-one.
"""
yield self.value_p1
yield self.value_p2
yield self.value_p3

@overload
def max(self, default: QuantityT) -> QuantityT: ...

@overload
def max(self, default: None = None) -> QuantityT | None: ...

def max(self, default: QuantityT | None = None) -> QuantityT | None:
"""Return the max value among all phases, or default if they are all `None`.

Args:
default: value to return if all phases are `None`.

Returns:
Max value among all phases, if available, default value otherwise.
"""
if not any(self):
return default
value: QuantityT = functools.reduce(
lambda x, y: x if x > y else y,
filter(None, self),
)
return value

@overload
def min(self, default: QuantityT) -> QuantityT: ...

@overload
def min(self, default: None = None) -> QuantityT | None: ...

def min(self, default: QuantityT | None = None) -> QuantityT | None:
"""Return the min value among all phases, or default if they are all `None`.

Args:
default: value to return if all phases are `None`.

Returns:
Min value among all phases, if available, default value otherwise.
"""
if not any(self):
return default
value: QuantityT = functools.reduce(
lambda x, y: x if x < y else y,
filter(None, self),
)
return value

def map(
self,
function: Callable[[QuantityT], QuantityT],
default: QuantityT | None = None,
) -> Self:
"""Apply the given function on each of the phase values and return the result.

If a phase value is `None`, replace it with `default` instead.

Args:
function: The function to apply on each of the phase values.
default: The value to apply if a phase value is `None`.

Returns:
A new instance, with the given function applied on values for each of the
phases.
"""
return self.__class__(
timestamp=self.timestamp,
value_p1=default if self.value_p1 is None else function(self.value_p1),
value_p2=default if self.value_p2 is None else function(self.value_p2),
value_p3=default if self.value_p3 is None else function(self.value_p3),
)


class Comparable(Protocol):
"""A protocol that requires the implementation of comparison methods.

This protocol is used to ensure that types can be compared using
the less than or equal to (`<=`) and greater than or equal to (`>=`)
operators.
"""

def __le__(self, other: Any, /) -> bool:
"""Return whether this instance is less than or equal to `other`."""

def __ge__(self, other: Any, /) -> bool:
"""Return whether this instance is greater than or equal to `other`."""


_T = TypeVar("_T", bound=Comparable | None)


@dataclass(frozen=True)
class Bounds(Generic[_T]):
"""Lower and upper bound values."""

lower: _T
"""Lower bound."""

upper: _T
"""Upper bound."""

def __contains__(self, item: _T) -> bool:
"""
Check if the value is within the range of the container.

Args:
item: The value to check.

Returns:
bool: True if value is within the range, otherwise False.
"""
if self.lower is None and self.upper is None:
return True
if self.lower is None:
return item <= self.upper
if self.upper is None:
return self.lower <= item

return cast(Comparable, self.lower) <= item <= cast(Comparable, self.upper)


@dataclass(frozen=True, kw_only=True)
class SystemBounds:
"""Internal representation of system bounds for groups of components."""

# compare = False tells the dataclass to not use name for comparison methods
timestamp: datetime = dataclasses.field(compare=False)
"""Timestamp of the metrics."""

inclusion_bounds: Bounds[Power] | None
"""Total inclusion power bounds for all components of a pool.

This is the range within which power requests would be allowed by the pool.

When exclusion bounds are present, they will exclude a subset of the inclusion
bounds.
"""

exclusion_bounds: Bounds[Power] | None
"""Total exclusion power bounds for all components of a pool.

This is the range within which power requests are NOT allowed by the pool.
If present, they will be a subset of the inclusion bounds.
"""

def __contains__(self, item: Power) -> bool:
"""
Check if the value is within the range of the container.

Args:
item: The value to check.

Returns:
bool: True if value is within the range, otherwise False.
"""
if not self.inclusion_bounds or item not in self.inclusion_bounds:
return False
if self.exclusion_bounds and item in self.exclusion_bounds:
return False
return True
Loading
Loading