Skip to content

dead letter serialization fix #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: fix-dead-letters-2021
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/src/main/protobuf/raw.data.events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ message LineAddedMessage {
string data = 2;
}

message LinesAddedMessage {
int64 timestamp = 1;
repeated string data = 2;
}

message HmdaRawDataStateMessage {
int32 size = 1;
}
1 change: 1 addition & 0 deletions common/src/main/resources/serialization.conf
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ akka {

"hmda.messages.submission.HmdaRawDataCommands$AddLines" = hmda-raw-commands
"hmda.messages.submission.HmdaRawDataEvents$LineAdded" = hmda-raw-events
"hmda.messages.submission.HmdaRawDataEvents$LinesAdded" = hmda-raw-events
"hmda.model.processing.state.HmdaRawDataState" = hmda-raw-events

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object HmdaRawDataCommands {

sealed trait HmdaRawDataCommand extends Command

case class AddLines(submissionId: SubmissionId, timestamp: Long, data: Seq[String], maybeReplyTo: Option[ActorRef[Seq[HmdaRawDataEvent]]])
case class AddLines(submissionId: SubmissionId, timestamp: Long, data: Seq[String], maybeReplyTo: Option[ActorRef[HmdaRawDataEvent]])
extends HmdaRawDataCommand

case object StopRawData extends HmdaRawDataCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import hmda.messages.CommonMessages.Event
object HmdaRawDataEvents {
sealed trait HmdaRawDataEvent extends Event
case class LineAdded(timestamp: Long, data: String) extends HmdaRawDataEvent
case class LinesAdded(timestamp: Long, data: Seq[String]) extends HmdaRawDataEvent
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package hmda.model.processing.state

import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded }
import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded, LinesAdded }

case class HmdaRawDataState(size: Int = 0) {
def update(event: HmdaRawDataEvent): HmdaRawDataState = event match {
case LineAdded(_, _) =>
HmdaRawDataState(size + 1)
case LinesAdded(_, data) =>
HmdaRawDataState(size + data.size)
case _ => this
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package hmda.serialization.submission

import hmda.messages.submission.HmdaRawDataEvents.LineAdded
import hmda.messages.submission.HmdaRawDataEvents.{ LineAdded, LinesAdded }
import hmda.model.processing.state.HmdaRawDataState
import hmda.persistence.serialization.raw.data.events.{ HmdaRawDataStateMessage, LineAddedMessage }
import hmda.persistence.serialization.raw.data.events.{ HmdaRawDataStateMessage, LineAddedMessage, LinesAddedMessage }

object HmdaRawDataEventsProtobufConverter {

Expand All @@ -12,12 +12,24 @@ object HmdaRawDataEventsProtobufConverter {
evt.data
)

def linesAddedToProtobuf(evt: LinesAdded): LinesAddedMessage =
LinesAddedMessage(
evt.timestamp,
evt.data
)

def lineAddedFromProtobuf(msg: LineAddedMessage): LineAdded =
LineAdded(
msg.timestamp,
msg.data
)

def linesAddedFromProtobuf(msg: LinesAddedMessage): LinesAdded =
LinesAdded(
msg.timestamp,
msg.data
)

def rawDataStateToProtobuf(evt: HmdaRawDataState): HmdaRawDataStateMessage =
HmdaRawDataStateMessage(
evt.size
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package hmda.serialization.submission

import java.io.NotSerializableException

import akka.serialization.SerializerWithStringManifest
import hmda.messages.submission.HmdaRawDataEvents.LineAdded
import hmda.messages.submission.HmdaRawDataEvents.{ LineAdded, LinesAdded }
import hmda.model.processing.state.HmdaRawDataState
import hmda.persistence.serialization.raw.data.events.{ HmdaRawDataStateMessage, LineAddedMessage }
import hmda.persistence.serialization.raw.data.events.{ HmdaRawDataStateMessage, LineAddedMessage, LinesAddedMessage }
import hmda.serialization.submission.HmdaRawDataEventsProtobufConverter._

class HmdaRawDataEventsSerializer extends SerializerWithStringManifest {
override def identifier: Int = 114

final val LineAddedManifest = classOf[LineAdded].getName
final val LinesAddedManifest = classOf[LinesAdded].getName
final val HmdaRawDataStateManifest = classOf[HmdaRawDataState].getName

override def manifest(o: AnyRef): String = o.getClass.getName

override def toBinary(o: AnyRef): Array[Byte] = o match {
case evt: LineAdded =>
lineAddedToProtobuf(evt).toByteArray
case evt: LinesAdded =>
linesAddedToProtobuf(evt).toByteArray
case evt: HmdaRawDataState =>
rawDataStateToProtobuf(evt).toByteArray
case _ =>
Expand All @@ -29,6 +31,8 @@ class HmdaRawDataEventsSerializer extends SerializerWithStringManifest {
manifest match {
case LineAddedManifest =>
lineAddedFromProtobuf(LineAddedMessage.parseFrom(bytes))
case LinesAddedManifest =>
linesAddedFromProtobuf(LinesAddedMessage.parseFrom(bytes))
case HmdaRawDataStateManifest =>
rawDataStateFromProtobuf(HmdaRawDataStateMessage.parseFrom(bytes))
case _ =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package hmda.api.http.filing.submissions

import java.time.Instant

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, EntityRef }
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{ StatusCodes, Uri }
Expand Down Expand Up @@ -168,13 +167,13 @@ private class UploadHttpApi(log: Logger, sharding: ClusterSharding)(
}
}

private def uploadFile(submissionId: SubmissionId, hmdaRaw: EntityRef[HmdaRawDataCommand]): Flow[String, Seq[HmdaRawDataEvent], NotUsed] =
private def uploadFile(submissionId: SubmissionId, hmdaRaw: EntityRef[HmdaRawDataCommand]): Flow[String, HmdaRawDataEvent, NotUsed] =
Flow[String]
.grouped(100)
.mapAsync(1)(lines => persistLines(hmdaRaw, submissionId, lines))

private def persistLines(entityRef: EntityRef[HmdaRawDataCommand], submissionId: SubmissionId, data: Seq[String]): Future[Seq[HmdaRawDataEvent]] = {
val response: Future[Seq[HmdaRawDataEvent]] = entityRef ? (ref => AddLines(submissionId, Instant.now.toEpochMilli, data, Some(ref)))
private def persistLines(entityRef: EntityRef[HmdaRawDataCommand], submissionId: SubmissionId, data: Seq[String]): Future[HmdaRawDataEvent] = {
val response: Future[HmdaRawDataEvent] = entityRef ? ((ref: ActorRef[HmdaRawDataEvent]) => AddLines(submissionId, Instant.now.toEpochMilli, data, Some(ref)))
response
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, RetentionCriteria }
import hmda.messages.submission.HmdaRawDataCommands.{ AddLines, HmdaRawDataCommand, StopRawData }
import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded }
import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded, LinesAdded }
import hmda.model.filing.submission.SubmissionId
import hmda.model.processing.state.HmdaRawDataState
import hmda.persistence.HmdaTypedPersistentActor
Expand Down Expand Up @@ -38,7 +38,7 @@ object HmdaRawData extends HmdaTypedPersistentActor[HmdaRawDataCommand, HmdaRawD
log.debug(s"Persisted: $data")
maybeReplyTo match {
case Some(replyTo) =>
replyTo ! evts
replyTo ! LinesAdded(timestamp, data)
case None => //Do Nothing
}
}
Expand All @@ -50,6 +50,7 @@ object HmdaRawData extends HmdaTypedPersistentActor[HmdaRawDataCommand, HmdaRawD

override def eventHandler: (HmdaRawDataState, HmdaRawDataEvent) => HmdaRawDataState = {
case (state, evt @ LineAdded(_, _)) => state.update(evt)
case (state, evt @ LinesAdded(_, _)) => state.update(evt)
}

def startShardRegion(sharding: ClusterSharding): ActorRef[ShardingEnvelope[HmdaRawDataCommand]] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package hmda.persistence.submission

import java.time.Instant

import akka.actor
import akka.actor.testkit.typed.scaladsl.TestProbe
import hmda.persistence.AkkaCassandraPersistenceSpec
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.typed.{ Cluster, Join }
import hmda.messages.submission.HmdaRawDataCommands.AddLines
import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded }
import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded, LinesAdded }
import hmda.model.filing.submission.SubmissionId
import hmda.utils.YearUtils.Period

Expand All @@ -20,7 +19,7 @@ class HmdaRawDataSpec extends AkkaCassandraPersistenceSpec {
val sharding = ClusterSharding(typedSystem)
HmdaRawData.startShardRegion(sharding)

val hmdaRawProbe = TestProbe[Seq[HmdaRawDataEvent]]
val hmdaRawProbe = TestProbe[HmdaRawDataEvent]

val submissionId = SubmissionId("12345", Period(2018, None), 1)

Expand All @@ -33,8 +32,7 @@ class HmdaRawDataSpec extends AkkaCassandraPersistenceSpec {
val timestamp = Instant.now.toEpochMilli

hmdaRawData ! AddLines(submissionId, timestamp, List("data1", "data2"), Some(hmdaRawProbe.ref))

hmdaRawProbe.expectMessage(Seq(LineAdded(timestamp, "data1"), LineAdded(timestamp, "data2")))
hmdaRawProbe.expectMessage(LinesAdded(timestamp, Seq("data1", "data2")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ class HmdaValidationErrorSpec extends AkkaCassandraPersistenceSpec with ScalaFut
StreamConverters
.fromInputStream(() => getClass.getResourceAsStream("/error_test_files/trigger_s304_s305.txt"))
.via(framing("\n"))
.mapAsync(1)(data => hmdaRawData ? ((ref: ActorRef[Seq[HmdaRawDataEvent]]) =>
.mapAsync(1)(data => hmdaRawData ? ((ref: ActorRef[HmdaRawDataEvent]) =>
AddLines(submissionId, Instant.now.toEpochMilli, List(data.utf8String), Some(ref))))
.run()
.futureValue

import ValidationProgress._

// subscribe to progress updates; we expect an initial progress message, where no validation stage has yet started
Expand Down