Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time-series support for websockets #3889

Merged
merged 1 commit into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ public void onText(String message) {
responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "response/success",
"", null, eventDTO.eventId);
break;
case "ItemTimeSeriesEvent":
Event itemTimeseriesEvent = itemEventUtility.createTimeSeriesEvent(eventDTO);
eventPublisher.post(itemTimeseriesEvent);
responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "response/success",
"", null, eventDTO.eventId);
break;
case WEBSOCKET_EVENT_TYPE:
if ((WEBSOCKET_TOPIC_PREFIX + "heartbeat").equals(eventDTO.topic)
&& "PING".equals(eventDTO.payload)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.openhab.core.io.websocket.event;

import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -26,6 +28,7 @@
import org.openhab.core.types.Command;
import org.openhab.core.types.RefreshType;
import org.openhab.core.types.State;
import org.openhab.core.types.TimeSeries;
import org.openhab.core.types.Type;
import org.openhab.core.types.TypeParser;
import org.openhab.core.types.UnDefType;
Expand Down Expand Up @@ -78,6 +81,13 @@ public Event createStateEvent(EventDTO eventDTO) throws EventProcessingException
throw new EventProcessingException("Incompatible datatype, rejected.");
}

public Event createTimeSeriesEvent(EventDTO eventDTO) throws EventProcessingException {
Matcher matcher = getTopicMatcher(eventDTO.topic, "timeseries");
Item item = getItem(matcher.group("entity"));
TimeSeries timeSeries = parseTimeSeries(eventDTO.payload);
return ItemEventFactory.createTimeSeriesEvent(item.getName(), timeSeries, eventDTO.source);
}

private Matcher getTopicMatcher(@Nullable String topic, String action) throws EventProcessingException {
if (topic == null) {
throw new EventProcessingException("Topic must not be null");
Expand All @@ -102,6 +112,36 @@ private Item getItem(String itemName) throws EventProcessingException {
}
}

private TimeSeries parseTimeSeries(@Nullable String payload) throws EventProcessingException {
ItemTimeSeriesEventPayloadBean bean = null;
try {
bean = gson.fromJson(payload, ItemTimeSeriesEventPayloadBean.class);
} catch (JsonParseException ignored) {
}
if (bean == null) {
throw new EventProcessingException("Failed to deserialize payload '" + payload + "'.");
}

TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.valueOf(bean.policy));

for (ItemTimeSeriesEventPayloadBean.TimeSeriesPayload timeSeriesPayload : bean.timeSeries) {
Type value = parseType(timeSeriesPayload.type, timeSeriesPayload.value);
if (value instanceof State state) {
try {
Instant timestamp = Instant.parse(timeSeriesPayload.timestamp);
timeSeries.add(timestamp, state);
} catch (DateTimeParseException e) {
throw new EventProcessingException(
"Could not parse '" + timeSeriesPayload.timestamp + "' to an instant.");
}
} else {
throw new EventProcessingException("Only states are allowed in timeseries events.");
}
}

return timeSeries;
}

private Type parseType(@Nullable String payload) throws EventProcessingException {
ItemEventPayloadBean bean = null;
try {
Expand All @@ -112,20 +152,24 @@ private Type parseType(@Nullable String payload) throws EventProcessingException
throw new EventProcessingException("Failed to deserialize payload '" + payload + "'.");
}

String simpleClassName = bean.type + TYPE_POSTFIX;
return parseType(bean.type, bean.value);
}

private Type parseType(String type, String value) throws EventProcessingException {
String simpleClassName = type + TYPE_POSTFIX;
Type returnType;

if (simpleClassName.equals(UnDefType.class.getSimpleName())) {
returnType = UnDefType.valueOf(bean.value);
returnType = UnDefType.valueOf(value);
} else if (simpleClassName.equals(RefreshType.class.getSimpleName())) {
returnType = RefreshType.valueOf(bean.value);
returnType = RefreshType.valueOf(value);
} else {
returnType = TypeParser.parseType(simpleClassName, bean.value);
returnType = TypeParser.parseType(simpleClassName, value);
}

if (returnType == null) {
throw new EventProcessingException(
"Error parsing simpleClassName '" + simpleClassName + "' with value '" + bean.value + "'.");
"Error parsing simpleClassName '" + simpleClassName + "' with value '" + value + "'.");
}

return returnType;
Expand All @@ -135,4 +179,15 @@ private static class ItemEventPayloadBean {
public @NonNullByDefault({}) String type;
public @NonNullByDefault({}) String value;
}

private static class ItemTimeSeriesEventPayloadBean {
private @NonNullByDefault({}) List<ItemTimeSeriesEventPayloadBean.TimeSeriesPayload> timeSeries;
private @NonNullByDefault({}) String policy;

private static class TimeSeriesPayload {
private @NonNullByDefault({}) String type;
private @NonNullByDefault({}) String value;
private @NonNullByDefault({}) String timestamp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

import java.time.Instant;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -34,10 +36,13 @@
import org.openhab.core.items.ItemRegistry;
import org.openhab.core.items.events.ItemEvent;
import org.openhab.core.items.events.ItemEventFactory;
import org.openhab.core.items.events.ItemTimeSeriesEvent;
import org.openhab.core.library.items.StringItem;
import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.HSBType;
import org.openhab.core.library.types.OnOffType;
import org.openhab.core.library.types.StringType;
import org.openhab.core.types.TimeSeries;

import com.google.gson.Gson;

Expand Down Expand Up @@ -171,4 +176,17 @@ public void invalidCommandEventPayload() {
() -> itemEventUtility.createCommandEvent(eventDTO));
assertThat(e.getMessage(), is("Failed to deserialize payload 'invalidNoJson'."));
}

@Test
public void validTimeSeriesEvent() throws EventProcessingException {
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.REPLACE);
timeSeries.add(Instant.now(), OnOffType.ON);
timeSeries.add(Instant.now().plusSeconds(5), OnOffType.OFF);
ItemTimeSeriesEvent event = ItemEventFactory.createTimeSeriesEvent(EXISTING_ITEM_NAME, timeSeries, null);
EventDTO eventDTO = new EventDTO(event);

Event itemEvent = itemEventUtility.createTimeSeriesEvent(eventDTO);

assertThat(itemEvent, is(event));
}
}