Skip to content

Commit

Permalink
chore: rename classes and use lombok when possible (#16)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
KeranYang and vigith authored Feb 2, 2023
1 parent 5cdbde6 commit ed96f42
Show file tree
Hide file tree
Showing 23 changed files with 80 additions and 97 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ ij_java_method_call_chain_wrap = on_every_item
ij_java_method_parameters_new_line_after_left_paren = true
ij_java_method_parameters_wrap = on_every_item
ij_java_names_count_to_use_import_on_demand = 9999
ij_java_spaces_around_equality_operators = true
ij_java_variable_annotation_wrap = normal
ij_java_wrap_first_method_in_call_chain = true
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ option java_package = "io.numaproj.numaflow.function.v1";
```protobuf
option java_package = "io.numaproj.numaflow.sink.v1";
```

## Code Style
Use [Editor Config](https://www.jetbrains.com/help/idea/editorconfig.html).
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<dependency>
<groupId>io.numaproj.numaflow</groupId>
<artifactId>numaflow-java</artifactId>
<version>0.0.0-SNAPSHOT</version>
<version>0.3.2</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.io.IOException;
import java.util.logging.Logger;

public class Forward {
private static final Logger logger = Logger.getLogger(Forward.class.getName());
public class EvenOddFunction {
private static final Logger logger = Logger.getLogger(EvenOddFunction.class.getName());

private static Message[] process(String key, Datum data) {
int value = 0;
Expand All @@ -27,6 +27,6 @@ private static Message[] process(String key, Datum data) {

public static void main(String[] args) throws IOException {
logger.info("Forward invoked");
new FunctionServer().registerMapper(new MapFunc(Forward::process)).start();
new FunctionServer().registerMapper(new MapFunc(EvenOddFunction::process)).start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.io.IOException;
import java.util.logging.Logger;

public class FlatMap {
private static final Logger logger = Logger.getLogger(FlatMap.class.getName());
public class FlatMapFunction {
private static final Logger logger = Logger.getLogger(FlatMapFunction.class.getName());

private static Message[] process(String key, Datum data) {
String msg = new String(data.getValue());
Expand All @@ -24,6 +24,6 @@ private static Message[] process(String key, Datum data) {

public static void main(String[] args) throws IOException {
logger.info("Flatmap invoked");
new FunctionServer().registerMapper(new MapFunc(FlatMap::process)).start();
new FunctionServer().registerMapper(new MapFunc(FlatMapFunction::process)).start();
}
}
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
<scope>import</scope>
</dependency>
</dependencies>

</dependencyManagement>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package io.numaproj.numaflow.common;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
public class GrpcServerConfig {
private String socketPath;
private int maxMessageSize;

public GrpcServerConfig(String socketPath, int maxMessageSize) {
this.socketPath = socketPath;
this.maxMessageSize = maxMessageSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
}

/**
* Stop serving requests and shutdown resources.
* Await termination on the main thread since the grpc library uses daemon threads.
* Stop serving requests and shutdown resources. Await termination on the main thread since the
* grpc library uses daemon threads.
*/
public void stop() {
if (server != null) {
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/io/numaproj/numaflow/function/FunctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -31,7 +32,6 @@
import static io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.getReduceFnMethod;

class FunctionService extends UserDefinedFunctionGrpc.UserDefinedFunctionImplBase {

private static final Logger logger = Logger.getLogger(FunctionService.class.getName());
// it will never be smaller than one
private final ExecutorService reduceTaskExecutor = Executors
Expand Down Expand Up @@ -179,7 +179,9 @@ public void isReady(Empty request, StreamObserver<Udfunction.ReadyResponse> resp
responseObserver.onCompleted();
}

// shuts down the executor service which is used for reduce
/**
* shuts down the executor service which is used for reduce.
*/
public void shutDown() {
this.reduceTaskExecutor.shutdown();
try {
Expand Down Expand Up @@ -217,14 +219,12 @@ private Udfunction.DatumList buildDatumListResponse(List<Future<Message[]>> resu

private Udfunction.DatumList buildDatumListResponse(Message[] messages) {
Udfunction.DatumList.Builder datumListBuilder = Udfunction.DatumList.newBuilder();
for (Message message : messages) {
Udfunction.Datum d = Udfunction.Datum.newBuilder()
Arrays.stream(messages).forEach(message -> {
datumListBuilder.addElements(Udfunction.Datum.newBuilder()
.setKey(message.getKey())
.setValue(ByteString.copyFrom(message.getValue()))
.build();

datumListBuilder.addElements(d);
}
.build());
});
return datumListBuilder.build();
}
}
18 changes: 5 additions & 13 deletions src/main/java/io/numaproj/numaflow/function/Message.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package io.numaproj.numaflow.function;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public class Message {
public static final String ALL = "U+005C__ALL__";
public static final String DROP = "U+005C__DROP__";

private final String key;
private final byte[] value;

public Message(String key, byte[] value) {
this.key = key;
this.value = value;
}

// creates a Message to be dropped
public static Message toDrop() {
return new Message(DROP, new byte[0]);
Expand All @@ -26,12 +26,4 @@ public static Message toAll(byte[] value) {
public static Message to(String to, byte[] value) {
return new Message(to, value);
}

public String getKey() {
return key;
}

public byte[] getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package io.numaproj.numaflow.function.metadata;

import lombok.AllArgsConstructor;

import java.time.Instant;

/**
* IntervalWindowImpl implements IntervalWindow interface which will be passed
* as metadata to reduce handlers
* IntervalWindowImpl implements IntervalWindow interface which will be passed as metadata to reduce
* handlers
*/
@AllArgsConstructor
public class IntervalWindowImpl implements IntervalWindow {

private final Instant startTime;
private final Instant endTime;

public IntervalWindowImpl(Instant startTime, Instant endTime) {
this.startTime = startTime;
this.endTime = endTime;
}

@Override
public Instant GetStartTime() {
return this.startTime;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package io.numaproj.numaflow.function.metadata;

import lombok.AllArgsConstructor;

/**
* MetadataImpl implements Metadata interface which will be passed to reduce handlers
*/
@AllArgsConstructor
public class MetadataImpl implements Metadata {
private final IntervalWindow intervalWindow;

public MetadataImpl(IntervalWindow intervalWindow) {
this.intervalWindow = intervalWindow;
}

@Override
public IntervalWindow GetIntervalWindow() {
return intervalWindow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
import io.numaproj.numaflow.function.HandlerDatum;

/**
* ReduceDatumStream is an interface which will be passed to
* reduce handlers to read input messages.
* ReduceDatumStream is an interface which will be passed to reduce handlers to read input
* messages.
*/
public interface ReduceDatumStream {
// EOF indicates the end of input
HandlerDatum EOF = HandlerDatum.EOF();

/* ReadMessage can be used to read message from the stream
* returns null if there are no more messages to consume.*/
/**
* Reads message from the stream.
*
* @return the message read from the stream. null if there are no more messages to consume.
*/
Datum ReadMessage();

/**
* Writes message to the stream.
*
* @throws InterruptedException if writing gets interrupted.
*/
void WriteMessage(Datum datum) throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
import java.util.logging.Logger;

/**
* Implementation of ReduceDatumStream, exposes two methods
* read and write, it is an unbounded queue, which blocks
* the reads if the queue is empty and blocks the writes if
* the queue is full
* Implementation of ReduceDatumStream, exposes two methods read and write, it is an unbounded
* queue, which blocks the reads if the queue is empty and blocks the writes if the queue is full
*/
public class ReduceDatumStreamImpl implements ReduceDatumStream {
private static final Logger logger = Logger.getLogger(ReduceDatumStreamImpl.class.getName());
private final BlockingQueue<Datum> blockingQueue = new LinkedBlockingDeque<>();

// TODO - unit test this function.
// blocking call, returns EOF if there are no messages to be read
@Override
public Datum ReadMessage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import io.numaproj.numaflow.function.metadata.Metadata;
import io.numaproj.numaflow.utils.TriFunction;


/**
* Implementation of ReduceHandler instantiated from a function
*/
public class ReduceFunc implements ReduceHandler {

private final TriFunction<String, ReduceDatumStream, Metadata, Message[]> reduceFn;

public ReduceFunc(TriFunction<String, ReduceDatumStream, Metadata, Message[]> reduceFn) {
Expand Down
23 changes: 5 additions & 18 deletions src/main/java/io/numaproj/numaflow/sink/Response.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
package io.numaproj.numaflow.sink;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public class Response {
private final String id;
private final Boolean success;
private final String err;

public Response(String id, Boolean success, String err) {
this.id = id;
this.success = success;
this.err = err;
}

public String getId() {
return id;
}

public Boolean getSuccess() {
return success;
}

public String getErr() {
return err;
}
}
17 changes: 13 additions & 4 deletions src/main/java/io/numaproj/numaflow/sink/SinkDatumStream.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package io.numaproj.numaflow.sink;

/**
* SinkDatumStream is an interface which will be passed to
* sink handlers to read input messages.
* SinkDatumStream is an interface which will be passed to sink handlers to read input messages.
*/
public interface SinkDatumStream {
// EOF indicates the end of input
HandlerDatum EOF = HandlerDatum.EOF();

/* ReadMessage can be used to read message from the stream
* returns null if there are no more messages to consume.*/
/**
* Reads message from the stream.
*
* @return the message read from the stream. null if there are no more messages to consume.
*/
HandlerDatum ReadMessage();

/**
* Writes message to the stream.
*
* @throws InterruptedException if writing gets interrupted.
*/
void WriteMessage(HandlerDatum datum) throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import java.util.logging.Logger;

/**
* Implementation of SinkDatumStream, exposes two methods
* read and write, it is an unbounded queue, which blocks
* the reads if the queue is empty and blocks the writes if
* the queue is full
* Implementation of SinkDatumStream, exposes two methods read and write, it is an unbounded queue,
* which blocks the reads if the queue is empty and blocks the writes if the queue is full
*/
public class SinkDatumStreamImpl implements SinkDatumStream {
private static final Logger logger = Logger.getLogger(SinkDatumStreamImpl.class.getName());
Expand All @@ -29,6 +27,7 @@ public HandlerDatum ReadMessage() {
}

// blocking call, waits until the write operation is successful
@Override
public void WriteMessage(HandlerDatum datum) throws InterruptedException {
blockingQueue.put(datum);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/numaproj/numaflow/sink/SinkServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public void start() throws IOException {
}

/**
* Stop serving requests and shutdown resources.
* Await termination on the main thread since the grpc library uses daemon threads.
* Stop serving requests and shutdown resources. Await termination on the main thread since the
* grpc library uses daemon threads.
*/
public void stop() throws InterruptedException {
if (server != null) {
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/io/numaproj/numaflow/sink/SinkService.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,13 @@ public void isReady(Empty request, StreamObserver<Udsink.ReadyResponse> response

public Udsink.ResponseList buildResponseList(List<Response> responses) {
var responseListBuilder = Udsink.ResponseList.newBuilder();
for (Response response : responses) {
Udsink.Response r = Udsink.Response.newBuilder()
responses.stream().forEach(response -> {
responseListBuilder.addResponses(Udsink.Response.newBuilder()
.setId(response.getId())
.setSuccess(response.getSuccess())
.setErrMsg(response.getErr())
.build();

responseListBuilder.addResponses(r);
}
.build());
});
return responseListBuilder.build();
}
}
Loading

0 comments on commit ed96f42

Please sign in to comment.