diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index 1774e218..f61c3d2f 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -4,64 +4,106 @@ import com.google.protobuf.Empty; import io.grpc.Status; import io.grpc.stub.StreamObserver; -import io.numaproj.numaflow.batchmap.v1.BatchMapGrpc; -import io.numaproj.numaflow.batchmap.v1.Batchmap; +import io.numaproj.numaflow.map.v1.MapGrpc; +import io.numaproj.numaflow.map.v1.MapOuterClass; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - @Slf4j @AllArgsConstructor -class Service extends BatchMapGrpc.BatchMapImplBase { +class Service extends MapGrpc.MapImplBase { - // batchMapTaskExecutor is the executor for the batchMap. It is a fixed size thread pool + // Executor service for the batch mapper. It is a fixed size thread pool // with the number of threads equal to the number of cores on the machine times 2. - private final ExecutorService batchMapTaskExecutor = Executors + // We use 2 times the number of cores because the batch mapper is a CPU intensive task. + private final ExecutorService mapTaskExecutor = Executors .newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); - // SHUTDOWN_TIME is the time to wait for the sinker to shut down, in seconds. + // Time to wait for the batch mapper to shut down, in seconds. // We use 30 seconds as the default value because it provides a balance between giving tasks enough time to complete // and not delaying program termination unduly. private final long SHUTDOWN_TIME = 30; + // BatchMapper instance to process the messages private final BatchMapper batchMapper; + // Applies a map function to each datum element in the stream. @Override - public StreamObserver batchMapFn(StreamObserver responseObserver) { + public StreamObserver mapFn(StreamObserver responseObserver) { + // If the batchMapper is null, return an unimplemented call if (this.batchMapper == null) { return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall( - BatchMapGrpc.getBatchMapFnMethod(), + MapGrpc.getMapFnMethod(), responseObserver); } - DatumIteratorImpl datumStream = new DatumIteratorImpl(); - - Future result = batchMapTaskExecutor.submit(() -> this.batchMapper.processMessage( - datumStream)); - + // Return a new StreamObserver to handle the incoming requests return new StreamObserver<>() { + private boolean startOfStream = true; + private boolean handshakeDone = false; + private DatumIteratorImpl datumStream; + private CompletableFuture result; + + // Called for each incoming request @Override - public void onNext(Batchmap.BatchMapRequest mapRequest) { + public void onNext(MapOuterClass.MapRequest mapRequest) { try { - datumStream.writeMessage(constructHandlerDatum(mapRequest)); - } catch (InterruptedException e) { - Thread.interrupted(); - onError(e); + // Make sure the handshake is done before processing the messages + if (!handshakeDone) { + if (!mapRequest.hasHandshake() || !mapRequest.getHandshake().getSot()) { + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription("Handshake request not received") + .asException()); + return; + } + responseObserver.onNext(MapOuterClass.MapResponse.newBuilder() + .setHandshake(mapRequest.getHandshake()) + .build()); + handshakeDone = true; + return; + } + + // Create a DatumIterator to write the messages to the batch mapper + // and start the batch mapper if it is the start of the stream + if (startOfStream) { + datumStream = new DatumIteratorImpl(); + result = CompletableFuture.supplyAsync( + () -> batchMapper.processMessage(datumStream), + mapTaskExecutor); + startOfStream = false; + } + + // If end of transmission, write EOF datum to the stream + // Wait for the result and send the response back to the client + if (mapRequest.hasStatus() && mapRequest.getStatus().getEot()) { + datumStream.writeMessage(HandlerDatum.EOF_DATUM); + BatchResponses responses = result.join(); + buildAndStreamResponse(responses, responseObserver); + startOfStream = true; + } else { + datumStream.writeMessage(constructHandlerDatum(mapRequest)); + } + } catch (Exception e) { + log.error("Encountered an error in batch map", e); + responseObserver.onError(Status.UNKNOWN + .withDescription(e.getMessage()) + .withCause(e) + .asException()); } } + // Called when an error occurs @Override public void onError(Throwable throwable) { - // We close the stream and let the sender retry the messages log.error("Error Encountered in batchMap Stream", throwable); var status = Status.UNKNOWN .withDescription(throwable.getMessage()) @@ -69,40 +111,23 @@ public void onError(Throwable throwable) { responseObserver.onError(status.asException()); } + // Called when the client has finished sending requests @Override public void onCompleted() { - try { - // We Fire off the call to the client from here and stream the response back - datumStream.writeMessage(HandlerDatum.EOF_DATUM); - BatchResponses responses = result.get(); - log.debug( - "Finished the call Result size is :{} and iterator count is :{}", - responses.getItems().size(), - datumStream.getCount()); - // Crash if the number of responses from the users don't match the input requests ignoring the EOF message - if (responses.getItems().size() != datumStream.getCount() - 1) { - throw new RuntimeException("Number of results did not match expected " + ( - datumStream.getCount() - 1) + " but got " + responses - .getItems() - .size()); - } - buildAndStreamResponse(responses, responseObserver); - } catch (Exception e) { - log.error("Error Encountered in batchMap Stream onCompleted", e); - onError(e); - } + responseObserver.onCompleted(); } }; } + // Build and stream the response back to the client private void buildAndStreamResponse( BatchResponses responses, - StreamObserver responseObserver) { + StreamObserver responseObserver) { responses.getItems().forEach(message -> { - List batchMapResponseResult = new ArrayList<>(); + List mapResponseResult = new ArrayList<>(); message.getItems().forEach(res -> { - batchMapResponseResult.add( - Batchmap.BatchMapResponse.Result + mapResponseResult.add( + MapOuterClass.MapResponse.Result .newBuilder() .setValue(res.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( @@ -114,48 +139,48 @@ private void buildAndStreamResponse( .build() ); }); - Batchmap.BatchMapResponse singleRequestResponse = Batchmap.BatchMapResponse + MapOuterClass.MapResponse singleRequestResponse = MapOuterClass.MapResponse .newBuilder() .setId(message.getId()) - .addAllResults(batchMapResponseResult) + .addAllResults(mapResponseResult) .build(); - // Stream the response back to the sender responseObserver.onNext(singleRequestResponse); }); responseObserver.onCompleted(); } - + // IsReady is the heartbeat endpoint for gRPC. @Override public void isReady( Empty request, - StreamObserver responseObserver) { - responseObserver.onNext(Batchmap.ReadyResponse.newBuilder().setReady(true).build()); + StreamObserver responseObserver) { + responseObserver.onNext(MapOuterClass.ReadyResponse.newBuilder().setReady(true).build()); responseObserver.onCompleted(); } - private HandlerDatum constructHandlerDatum(Batchmap.BatchMapRequest d) { + // Construct a HandlerDatum from a MapRequest + private HandlerDatum constructHandlerDatum(MapOuterClass.MapRequest d) { return new HandlerDatum( - d.getKeysList().toArray(new String[0]), - d.getValue().toByteArray(), + d.getRequest().getKeysList().toArray(new String[0]), + d.getRequest().getValue().toByteArray(), Instant.ofEpochSecond( - d.getWatermark().getSeconds(), - d.getWatermark().getNanos()), + d.getRequest().getWatermark().getSeconds(), + d.getRequest().getWatermark().getNanos()), Instant.ofEpochSecond( - d.getEventTime().getSeconds(), - d.getEventTime().getNanos()), + d.getRequest().getEventTime().getSeconds(), + d.getRequest().getEventTime().getNanos()), d.getId(), - d.getHeadersMap() + d.getRequest().getHeadersMap() ); } - // shuts down the executor service which is used for reduce + // Shuts down the executor service which is used for batch map public void shutDown() { - this.batchMapTaskExecutor.shutdown(); + this.mapTaskExecutor.shutdown(); try { - if (!batchMapTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { + if (!mapTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { log.error("BatchMap executor did not terminate in the specified time."); - List droppedTasks = batchMapTaskExecutor.shutdownNow(); + List droppedTasks = mapTaskExecutor.shutdownNow(); log.error("BatchMap executor was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); } else { @@ -166,5 +191,4 @@ public void shutDown() { e.printStackTrace(); } } - } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 9c18ed97..d60fac1a 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -67,7 +67,7 @@ public void onNext(SinkOuterClass.SinkRequest request) { } try { - if (request.getStatus().getEot()) { + if (request.hasStatus() && request.getStatus().getEot()) { // End of transmission, write EOF datum to the stream // Wait for the result and send the response back to the client datumStream.writeMessage(HandlerDatum.EOF_DATUM); diff --git a/src/main/proto/batchmap/v1/batchmap.proto b/src/main/proto/batchmap/v1/batchmap.proto deleted file mode 100644 index 1d2526e5..00000000 --- a/src/main/proto/batchmap/v1/batchmap.proto +++ /dev/null @@ -1,52 +0,0 @@ -syntax = "proto3"; - -option java_package = "io.numaproj.numaflow.batchmap.v1"; - -import "google/protobuf/empty.proto"; -import "google/protobuf/timestamp.proto"; - -package batchmap.v1; - -service BatchMap { - // IsReady is the heartbeat endpoint for gRPC. - rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); - - // BatchMapFn is a bi-directional streaming rpc which applies a - // map function on each BatchMapRequest element of the stream and then streams - // back BatchMapResponse elements. - rpc BatchMapFn(stream BatchMapRequest) returns (stream BatchMapResponse); -} - -/** - * BatchMapRequest represents a request element. - */ -message BatchMapRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; - map headers = 5; - // This ID is used uniquely identify a map request - string id = 6; -} - -/** - * BatchMapResponse represents a response element. - */ -message BatchMapResponse { - message Result { - repeated string keys = 1; - bytes value = 2; - repeated string tags = 3; - } - repeated Result results = 1; - // This ID is used to refer the responses to the request it corresponds to. - string id = 2; -} - -/** - * ReadyResponse is the health check result. - */ -message ReadyResponse { - bool ready = 1; -} diff --git a/src/main/proto/map/v1/map.proto b/src/main/proto/map/v1/map.proto index f256c157..f702cc33 100644 --- a/src/main/proto/map/v1/map.proto +++ b/src/main/proto/map/v1/map.proto @@ -26,10 +26,14 @@ message MapRequest { google.protobuf.Timestamp watermark = 4; map headers = 5; } + message Status { + bool eot = 1; + } Request request = 1; // This ID is used to uniquely identify a map request string id = 2; optional Handshake handshake = 3; + optional Status status = 4; } /* diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/BatchMapOutputStreamObserver.java b/src/test/java/io/numaproj/numaflow/batchmapper/BatchMapOutputStreamObserver.java index 4d8da634..66e5135e 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/BatchMapOutputStreamObserver.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/BatchMapOutputStreamObserver.java @@ -1,38 +1,40 @@ package io.numaproj.numaflow.batchmapper; import io.grpc.stub.StreamObserver; -import io.numaproj.numaflow.batchmap.v1.Batchmap; -import lombok.extern.slf4j.Slf4j; +import io.numaproj.numaflow.map.v1.MapOuterClass; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.CompletableFuture; -@Slf4j -public class BatchMapOutputStreamObserver implements StreamObserver { - public AtomicReference completed = new AtomicReference<>(false); - public AtomicReference> resultDatum = new AtomicReference<>( - new ArrayList<>()); - public Throwable t; +public class BatchMapOutputStreamObserver implements StreamObserver { + List mapResponses = new ArrayList<>(); + CompletableFuture done = new CompletableFuture<>(); + Integer responseCount; + + public BatchMapOutputStreamObserver(Integer responseCount) { + this.responseCount = responseCount; + } @Override - public void onNext(Batchmap.BatchMapResponse batchMapResponse) { - List receivedResponses = resultDatum.get(); - receivedResponses.add(batchMapResponse); - resultDatum.set(receivedResponses); - log.info( - "Received BatchMapResponse with id {} and message count {}", - batchMapResponse.getId(), - batchMapResponse.getResultsCount()); + public void onNext(MapOuterClass.MapResponse mapResponse) { + mapResponses.add(mapResponse); + if (mapResponses.size() == responseCount) { + done.complete(null); + } } @Override public void onError(Throwable throwable) { - t = throwable; + done.completeExceptionally(throwable); } @Override public void onCompleted() { - this.completed.set(true); + done.complete(null); + } + + public List getMapResponses() { + return mapResponses; } } diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java index 93af2400..937031d2 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java @@ -13,14 +13,15 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; -import io.numaproj.numaflow.batchmap.v1.BatchMapGrpc; -import io.numaproj.numaflow.batchmap.v1.Batchmap; +import io.numaproj.numaflow.map.v1.MapGrpc; +import io.numaproj.numaflow.map.v1.MapOuterClass; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import java.util.concurrent.atomic.AtomicReference; +import java.util.List; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -106,105 +107,67 @@ public void tearDown() throws Exception { @Test public void testErrorFromUDF() { - BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(); - StreamObserver inputStreamObserver = BatchMapGrpc + BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(2); + StreamObserver inputStreamObserver = MapGrpc .newStub(inProcessChannel) - .batchMapFn(outputStreamObserver); - - // we need to maintain a reference to any exceptions thrown inside the thread, otherwise even if the assertion failed in the thread, - // the test can still succeed. - AtomicReference exceptionInThread = new AtomicReference<>(); - - Thread t = new Thread(() -> { - while (outputStreamObserver.t == null) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - exceptionInThread.set(e); - } - } - try { - assertEquals( - "UNKNOWN: java.lang.RuntimeException: unknown exception", - outputStreamObserver.t.getMessage()); - } catch (Throwable e) { - exceptionInThread.set(e); - } - }); - t.start(); + .mapFn(outputStreamObserver); String message = "message"; - Batchmap.BatchMapRequest request = Batchmap.BatchMapRequest.newBuilder() - .setValue(ByteString.copyFromUtf8(message)) - .addKeys("exception") + MapOuterClass.MapRequest handshakeRequest = MapOuterClass.MapRequest + .newBuilder() + .setHandshake(MapOuterClass.Handshake.newBuilder().setSot(true)) + .build(); + inputStreamObserver.onNext(handshakeRequest); + MapOuterClass.MapRequest request = MapOuterClass.MapRequest.newBuilder() + .setRequest(MapOuterClass.MapRequest.Request + .newBuilder() + .setValue(ByteString.copyFromUtf8(message)) + .addKeys("exception")) .setId("exception") .build(); inputStreamObserver.onNext(request); + inputStreamObserver.onNext(MapOuterClass.MapRequest + .newBuilder() + .setStatus(MapOuterClass.MapRequest.Status.newBuilder().setEot(true)) + .build()); inputStreamObserver.onCompleted(); - try { - t.join(); - } catch (InterruptedException e) { - fail("Thread got interrupted before test assertion."); - } - // Fail the test if any exception caught in the thread - if (exceptionInThread.get() != null) { - fail("Assertion failed in the thread: " + exceptionInThread.get().getMessage()); + outputStreamObserver.done.get(); + fail("Expected exception not thrown"); + } catch (InterruptedException | ExecutionException e) { + assertEquals( + "UNKNOWN: java.lang.RuntimeException: unknown exception", + e.getCause().getMessage()); } } @Test - public void testMismatchSizeError() { + public void testMapperWithoutHandshake() { + ByteString inValue = ByteString.copyFromUtf8("invalue"); + MapOuterClass.MapRequest inDatum = MapOuterClass.MapRequest + .newBuilder() + .setRequest(MapOuterClass.MapRequest.Request + .newBuilder() + .setValue(inValue) + .addAllKeys(List.of("test-map-key")) + .build()).build(); - BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(); - StreamObserver inputStreamObserver = BatchMapGrpc - .newStub(inProcessChannel) - .batchMapFn(outputStreamObserver); + BatchMapOutputStreamObserver responseObserver = new BatchMapOutputStreamObserver(1); - // we need to maintain a reference to any exceptions thrown inside the thread, otherwise even if the assertion failed in the thread, - // the test can still succeed. - AtomicReference exceptionInThread = new AtomicReference<>(); + var stub = MapGrpc.newStub(inProcessChannel); + var requestStreamObserver = stub + .mapFn(responseObserver); - Thread t = new Thread(() -> { - while (outputStreamObserver.t == null) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - exceptionInThread.set(e); - } - } - try { - assertEquals( - "UNKNOWN: Number of results did not match expected 2 but got 1", - outputStreamObserver.t.getMessage()); - } catch (Throwable e) { - exceptionInThread.set(e); - } - }); - t.start(); - String message = "message"; - Batchmap.BatchMapRequest request = Batchmap.BatchMapRequest.newBuilder() - .setValue(ByteString.copyFromUtf8(message)) - .addKeys("drop") - .setId("drop") - .build(); - inputStreamObserver.onNext(request); - Batchmap.BatchMapRequest request1 = Batchmap.BatchMapRequest.newBuilder() - .setValue(ByteString.copyFromUtf8(message)) - .addKeys("test") - .setId("test") - .build(); - inputStreamObserver.onNext(request1); - inputStreamObserver.onCompleted(); + requestStreamObserver.onNext(inDatum); try { - t.join(); - } catch (InterruptedException e) { - fail("Thread got interrupted before test assertion."); - } - // Fail the test if any exception caught in the thread - if (exceptionInThread.get() != null) { - fail("Assertion failed in the thread: " + exceptionInThread.get().getMessage()); + responseObserver.done.get(); + fail("Expected an exception to be thrown"); + } catch (InterruptedException | ExecutionException e) { + assertEquals( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Handshake request not received", + e.getMessage()); } + requestStreamObserver.onCompleted(); } private static class TestMapFn extends BatchMapper { @@ -213,7 +176,7 @@ private static class TestMapFn extends BatchMapper { public BatchResponses processMessage(DatumIterator datumStream) { BatchResponses batchResponses = new BatchResponses(); while (true) { - Datum datum = null; + Datum datum; try { datum = datumStream.next(); } catch (InterruptedException e) { diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java index ed68bc78..a139e8b0 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java @@ -6,8 +6,8 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; -import io.numaproj.numaflow.batchmap.v1.BatchMapGrpc; -import io.numaproj.numaflow.batchmap.v1.Batchmap; +import io.numaproj.numaflow.map.v1.MapGrpc; +import io.numaproj.numaflow.map.v1.MapOuterClass; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; @@ -15,13 +15,14 @@ import org.junit.Test; import java.util.List; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @Slf4j public class ServerTest { - private final static String PROCESSED_KEY_SUFFIX = "-key-processed"; - private final static String PROCESSED_VALUE_SUFFIX = "-value-processed"; private final static String key = "key"; @Rule @@ -62,27 +63,49 @@ public void tearDown() throws Exception { @Test public void testBatchMapHappyPath() { - BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(); - StreamObserver inputStreamObserver = BatchMapGrpc + BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(11); + StreamObserver inputStreamObserver = MapGrpc .newStub(inProcessChannel) - .batchMapFn(outputStreamObserver); + .mapFn(outputStreamObserver); + MapOuterClass.MapRequest handshakeRequest = MapOuterClass.MapRequest + .newBuilder() + .setHandshake(MapOuterClass.Handshake.newBuilder().setSot(true)) + .build(); + + inputStreamObserver.onNext(handshakeRequest); for (int i = 1; i <= 10; i++) { String uuid = Integer.toString(i); String message = i + "," + (i + 10); - Batchmap.BatchMapRequest request = Batchmap.BatchMapRequest.newBuilder() - .setValue(ByteString.copyFromUtf8(message)) - .addKeys(key) + MapOuterClass.MapRequest request = MapOuterClass.MapRequest.newBuilder() + .setRequest(MapOuterClass.MapRequest.Request + .newBuilder() + .setValue(ByteString.copyFromUtf8(message)) + .addKeys(key) + .build()) .setId(uuid) .build(); - log.info("Sending request with ID : {} and msg: {}", uuid, message); inputStreamObserver.onNext(request); } + inputStreamObserver.onNext(MapOuterClass.MapRequest + .newBuilder() + .setStatus(MapOuterClass.MapRequest.Status.newBuilder().setEot(true)) + .build()); inputStreamObserver.onCompleted(); - while (outputStreamObserver.resultDatum.get().size() != 10) ; - List result = outputStreamObserver.resultDatum.get(); - assertEquals(10, result.size()); + + try { + outputStreamObserver.done.get(); + } catch (InterruptedException | ExecutionException e) { + fail("Error in getting done signal from the observer " + e.getMessage()); + } + List result = outputStreamObserver.getMapResponses(); + assertEquals(11, result.size()); + + // first response is handshake + assertTrue(result.get(0).hasHandshake()); + + result = result.subList(1, result.size()); for (int i = 0; i < 10; i++) { assertEquals(result.get(i).getId(), String.valueOf(i + 1)); } @@ -93,7 +116,7 @@ private static class TestMapFn extends BatchMapper { public BatchResponses processMessage(DatumIterator datumStream) { BatchResponses batchResponses = new BatchResponses(); while (true) { - Datum datum = null; + Datum datum; try { datum = datumStream.next(); } catch (InterruptedException e) { @@ -106,13 +129,11 @@ public BatchResponses processMessage(DatumIterator datumStream) { String msg = new String(datum.getValue()); String[] strs = msg.split(","); BatchResponse batchResponse = new BatchResponse(datum.getId()); - log.info("Processing message with id: {}", datum.getId()); for (String str : strs) { batchResponse.append(new Message(str.getBytes())); } batchResponses.append(batchResponse); } - log.info("Returning respose list with size {}", batchResponses.getItems().size()); return batchResponses; } }