Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ Initial release of the Frequenz Market Metering API client for Python.

- `MarketMeteringApiClient`: Main client class for connecting to the Market Metering service
- `upsert_samples()`: Bidirectional streaming for upserting metering samples.
- `create_market_location()`: Create a new Market Location.
- `update_market_location()`: Update an existing Market Location.
- `activate_market_location()`: Activate a Market Location.
- `deactivate_market_location()`: Deactivate a Market Location.
- `create_market_location()`: Create a new Market Location. Returns `MarketLocationDetail`.
- `update_market_location()`: Update an existing Market Location. Returns `MarketLocationDetail`.
- `activate_market_locations()`: Activate one or more Market Locations (batch). Returns per-location `MarketLocationOperationResult`.
- `deactivate_market_locations()`: Deactivate one or more Market Locations (batch). Returns per-location `MarketLocationOperationResult`.
- `list_market_locations()`: List Market Locations with filtering and pagination.
- `stream()`: Channel-based receiver for streaming with automatic reconnection
- `MarketLocationDetail`: Server-managed metadata (revision, is_active, create_time, update_time, last_deactivated_time).
- `MarketLocationOperationResult` / `MarketLocationOperationErrorCode`: Per-location results for activate/deactivate operations.
- `UpsertResult.ingest_time`: Server-side timestamp when the sample was ingested.
- `revision_strategy` parameter on `stream_samples()` and `stream()`.
- CLI tool (`marketmetering-cli`) for quick access to metering data
- Support for multiple market identifier types:
- MaLo-ID (Germany)
Expand All @@ -32,6 +36,20 @@ Initial release of the Frequenz Market Metering API client for Python.
- Multiple metric types (active energy, active power, reactive energy/power)
- Optional resampling for time-series aggregation

## Improvements

- Add mock tests for all client RPC methods.
- Add integration tests (excluded from CI, run with `uv run pytest -m integration`).

## Breaking Changes

- `activate_market_location()` renamed to `activate_market_locations()` (batch support, returns results).
- `deactivate_market_location()` renamed to `deactivate_market_locations()` (batch support, returns results).
- `create_market_location()` now returns `MarketLocationDetail` instead of `None`.
- `update_market_location()` now returns `MarketLocationDetail` instead of `None`.
- `MarketLocationEntry` now wraps `MarketLocationDetail` via `market_location_detail` field (convenience properties `market_location` and `market_location_ref` preserved).
- `UpsertResult` has a new required field `ingest_time`.

## Bug Fixes

- `update_market_location()`: Add missing `expected_revision` parameter required for optimistic concurrency control.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,14 @@ filterwarnings = [
# chars as this is a regex
'ignore:Protobuf gencode version .*exactly one major version older.*:UserWarning',
]
addopts = "-vv"
addopts = "-vv -m 'not integration'"
testpaths = ["tests", "src"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
required_plugins = ["pytest-asyncio", "pytest-mock"]
markers = [
"integration: requires a running marketmetering service (deselected by default)",
]

[tool.mypy]
explicit_package_bases = true
Expand Down
62 changes: 61 additions & 1 deletion src/frequenz/client/marketmetering/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,65 @@
"""

from ._client import MarketMeteringApiClient
from .types import (
ActivationFilter,
DataQuality,
DownsamplingMethod,
EnergyFlowDirection,
MarketArea,
MarketLocation,
MarketLocationChangedField,
MarketLocationDetail,
MarketLocationEntry,
MarketLocationId,
MarketLocationIdType,
MarketLocationOperationErrorCode,
MarketLocationOperationResult,
MarketLocationRef,
MarketLocationSample,
MarketLocationSeries,
MarketLocationsFilter,
MarketLocationUpdate,
MetricType,
MetricUnit,
PaginationParams,
ResamplingMethod,
ResamplingOptions,
RevisionSelection,
RevisionStrategy,
SampleUpsertErrorCode,
TimeResolution,
UpsertResult,
)

__all__ = ["MarketMeteringApiClient"]
__all__ = [
"ActivationFilter",
"DataQuality",
"DownsamplingMethod",
"EnergyFlowDirection",
"MarketArea",
"MarketLocation",
"MarketLocationChangedField",
"MarketLocationDetail",
"MarketLocationEntry",
"MarketLocationId",
"MarketLocationIdType",
"MarketLocationOperationErrorCode",
"MarketLocationOperationResult",
"MarketLocationRef",
"MarketLocationSample",
"MarketLocationSeries",
"MarketLocationUpdate",
"MarketLocationsFilter",
"MarketMeteringApiClient",
"MetricType",
"MetricUnit",
"PaginationParams",
"ResamplingMethod",
"ResamplingOptions",
"RevisionSelection",
"RevisionStrategy",
"SampleUpsertErrorCode",
"TimeResolution",
"UpsertResult",
]
87 changes: 54 additions & 33 deletions src/frequenz/client/marketmetering/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from frequenz.api.common.v1alpha8.types.interval_pb2 import Interval as PBInterval
from frequenz.api.marketmetering.v1alpha1 import marketmetering_pb2 as pb
from frequenz.api.marketmetering.v1alpha1 import marketmetering_pb2_grpc
from google.protobuf.timestamp_pb2 import Timestamp

from frequenz import channels
from frequenz.client.base.channel import ChannelOptions, SslOptions
Expand All @@ -23,7 +22,9 @@
from .types import (
EnergyFlowDirection,
MarketLocation,
MarketLocationDetail,
MarketLocationEntry,
MarketLocationOperationResult,
MarketLocationRef,
MarketLocationSeries,
MarketLocationsFilter,
Expand All @@ -32,26 +33,14 @@
PaginationParams,
ResamplingOptions,
RevisionSelection,
RevisionStrategy,
UpsertResult,
_datetime_to_timestamp,
)

DEFAULT_PORT = 443


def _datetime_to_timestamp(dt: datetime) -> Timestamp:
"""Convert a datetime to a protobuf Timestamp.

Args:
dt: The datetime to convert.

Returns:
The protobuf timestamp representation.
"""
ts = Timestamp()
ts.FromDatetime(dt)
return ts


class MarketMeteringApiClient(
BaseApiClient[marketmetering_pb2_grpc.MarketMeteringServiceStub]
):
Expand Down Expand Up @@ -166,29 +155,33 @@ async def create_market_location(
*,
market_location_ref: MarketLocationRef,
market_location: MarketLocation,
) -> None:
) -> MarketLocationDetail:
"""Create a new Market Location.

Args:
market_location_ref: The reference ID for the new location.
market_location: The configuration of the new location.

Returns:
The created Market Location with server-assigned metadata.
"""
request = pb.CreateMarketLocationRequest(
market_location_ref=market_location_ref.to_protobuf(),
market_location=market_location.to_protobuf(),
)
await self.stub.CreateMarketLocation( # type: ignore[misc]
response = await self.stub.CreateMarketLocation( # type: ignore[misc]
request,
timeout=self._call_timeout_seconds,
)
return MarketLocationDetail.from_protobuf(response.market_location)

async def update_market_location(
self,
*,
market_location_ref: MarketLocationRef,
update: MarketLocationUpdate,
expected_revision: int,
) -> None:
) -> MarketLocationDetail:
"""Update an existing Market Location.

Args:
Expand All @@ -198,6 +191,9 @@ async def update_market_location(
latest. This prevents lost updates when multiple callers
modify the same Market Location concurrently. Pass the
revision from the most recent read of the location.

Returns:
The updated Market Location with server-assigned metadata.
"""
update_pb, update_mask_pb = update.to_protobuf()
request = pb.UpdateMarketLocationRequest(
Expand All @@ -206,46 +202,59 @@ async def update_market_location(
update_fields=update_pb,
update_mask=update_mask_pb,
)
await self.stub.UpdateMarketLocation( # type: ignore[misc]
response = await self.stub.UpdateMarketLocation( # type: ignore[misc]
request,
timeout=self._call_timeout_seconds,
)
return MarketLocationDetail.from_protobuf(response.market_location_detail)

async def activate_market_location(
async def activate_market_locations(
self,
*,
market_location_ref: MarketLocationRef,
) -> None:
"""Activate a Market Location.
market_location_refs: list[MarketLocationRef],
) -> list[MarketLocationOperationResult]:
"""Activate one or more Market Locations.

Args:
market_location_ref: The reference ID of the location to activate.
market_location_refs: References to the locations to activate.

Returns:
A list of operation results, one per requested location.
"""
request = pb.ActivateMarketLocationRequest(
market_location_refs=[market_location_ref.to_protobuf()],
market_location_refs=[ref.to_protobuf() for ref in market_location_refs],
)
await self.stub.ActivateMarketLocation( # type: ignore[misc]
response = await self.stub.ActivateMarketLocation( # type: ignore[misc]
request,
timeout=self._call_timeout_seconds,
)
return [
MarketLocationOperationResult.from_protobuf(r) for r in response.results
]

async def deactivate_market_location(
async def deactivate_market_locations(
self,
*,
market_location_ref: MarketLocationRef,
) -> None:
"""Deactivate a Market Location.
market_location_refs: list[MarketLocationRef],
) -> list[MarketLocationOperationResult]:
"""Deactivate one or more Market Locations.

Args:
market_location_ref: The reference ID of the location to deactivate.
market_location_refs: References to the locations to deactivate.

Returns:
A list of operation results, one per requested location.
"""
request = pb.DeactivateMarketLocationRequest(
market_location_refs=[market_location_ref.to_protobuf()],
market_location_refs=[ref.to_protobuf() for ref in market_location_refs],
)
await self.stub.DeactivateMarketLocation( # type: ignore[misc]
response = await self.stub.DeactivateMarketLocation( # type: ignore[misc]
request,
timeout=self._call_timeout_seconds,
)
return [
MarketLocationOperationResult.from_protobuf(r) for r in response.results
]

async def list_market_locations(
self,
Expand Down Expand Up @@ -342,6 +351,7 @@ async def stream_samples(
start_time: datetime | None = None,
end_time: datetime | None = None,
resampling: ResamplingOptions | None = None,
revision_strategy: RevisionStrategy | None = None,
) -> AsyncIterator[MarketLocationSeries]:
"""Stream metering samples for Market Locations.

Expand All @@ -357,6 +367,7 @@ async def stream_samples(
If omitted, stream starts from real-time data.
end_time: Optional end time. If omitted, stream continues in real-time.
resampling: Optional resampling options for aggregation.
revision_strategy: Optional revision strategy for the stream filter.

Yields:
MarketLocationSeries objects containing samples for each combination
Expand Down Expand Up @@ -397,6 +408,9 @@ async def stream_samples(
if resampling:
stream_filter.resampling_options.CopyFrom(resampling.to_protobuf())

if revision_strategy:
stream_filter.revision_strategy = revision_strategy.value

request.stream_filter.CopyFrom(stream_filter)

# Make the streaming call
Expand All @@ -422,6 +436,7 @@ def stream(
start_time: datetime | None = None,
end_time: datetime | None = None,
resampling: ResamplingOptions | None = None,
revision_strategy: RevisionStrategy | None = None,
) -> channels.Receiver[MarketLocationSeries]:
"""Get a receiver for streaming metering samples.

Expand All @@ -436,6 +451,7 @@ def stream(
start_time: Optional start time for historical data.
end_time: Optional end time. If omitted, stream continues in real-time.
resampling: Optional resampling options for aggregation.
revision_strategy: Optional revision strategy for the stream filter.

Returns:
A channel receiver for MarketLocationSeries objects.
Expand All @@ -458,6 +474,7 @@ def stream(
start_time=start_time,
end_time=end_time,
resampling=resampling,
revision_strategy=revision_strategy,
).new_receiver()

# pylint: disable=too-many-arguments
Expand All @@ -470,6 +487,7 @@ def _get_stream(
start_time: datetime | None = None,
end_time: datetime | None = None,
resampling: ResamplingOptions | None = None,
revision_strategy: RevisionStrategy | None = None,
) -> GrpcStreamBroadcaster[
pb.ReceiveMarketLocationSamplesStreamResponse, MarketLocationSeries
]:
Expand Down Expand Up @@ -509,6 +527,9 @@ def _get_stream(
if resampling:
stream_filter.resampling_options.CopyFrom(resampling.to_protobuf())

if revision_strategy:
stream_filter.revision_strategy = revision_strategy.value

request.stream_filter.CopyFrom(stream_filter)

def transform(
Expand Down
Loading
Loading