Skip to content

Commit d44a1af

Browse files
added compression feature for circe-akka-serializer (#160)
* added compression feature for circe-akka-serializer * Added newline character for checks * removed trailing space from README * few minor PR fixes * replace real logs with random strings * fix structure in circe-akka-serializer reference.conf * PR fixes and improvements * added useful comment on check * added more comments
1 parent 1914525 commit d44a1af

File tree

10 files changed

+280
-36
lines changed

10 files changed

+280
-36
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ class ExampleSerializer(actorSystem: ExtendedActorSystem)
251251
override lazy val packagePrefix = "org.project"
252252
}
253253
```
254+
`CirceAkkaSerializer` can be configured to use Gzip compression when serializing payloads greater than defined size (default is without compression). See [default reference.conf file](circe-akka-serializer/src/main/resources/reference.conf) with comments for details.
254255

255256
For more guidelines on how to use the serializer,
256257
read [Akka documentation about serialization](https://doc.akka.io/docs/akka/current/serialization.html),
Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
# Default configuration
22

33
org.virtuslab.ash {
4-
verbose-debug-logging = off
4+
circe {
5+
verbose-debug-logging = off
6+
7+
# Settings for compression of the payload - here we decide, if compression should be possible
8+
# when serializing an object. It does not not affect deserialization process - so even with
9+
# `compression.algorithm = off` deserialization of objects compressed with Gzip will be successfull.
10+
compression {
11+
# Compression algorithm:
12+
# - off : no compression
13+
# - gzip : using common java gzip
14+
algorithm = off
15+
16+
# If compression is enabled with the `algorithm` setting,
17+
# the payload will be compressed if its size is bigger than this value.
18+
compress-larger-than = 32 KiB
19+
}
20+
}
521
}

circe-akka-serializer/src/main/scala/org/virtuslab/ash/circe/CirceAkkaSerializer.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,18 @@ abstract class CirceAkkaSerializer[Ser <: AnyRef: ClassTag](system: ExtendedActo
4545
with AkkaCodecs {
4646

4747
private lazy val log = Logging(system, getClass)
48-
private lazy val conf = system.settings.config.getConfig("org.virtuslab.ash")
48+
private lazy val conf = system.settings.config.getConfig("org.virtuslab.ash.circe")
4949
private lazy val isDebugEnabled = conf.getBoolean("verbose-debug-logging") && log.isDebugEnabled
50+
private lazy val compressionAlgorithm: Compression.Algorithm = conf.getString("compression.algorithm") match {
51+
case "off" =>
52+
Compression.Off
53+
case "gzip" =>
54+
Compression.GZip(conf.getBytes("compression.compress-larger-than"))
55+
case other =>
56+
throw new IllegalArgumentException(
57+
s"Unknown compression algorithm value: [$other], possible values are: 'off' and 'gzip'")
58+
}
59+
protected val bufferSize: Int = 1024 * 4
5060

5161
override lazy val classTagEvidence: ClassTag[Ser] = implicitly[ClassTag[Ser]]
5262
override lazy val errorCallback: String => Unit = x => log.error(x)
@@ -58,9 +68,10 @@ abstract class CirceAkkaSerializer[Ser <: AnyRef: ClassTag](system: ExtendedActo
5868
val startTime = if (isDebugEnabled) System.nanoTime else 0L
5969
codecsMap.get(manifest(o)) match {
6070
case Some((encoder, _)) =>
61-
val res = printer.print(encoder.asInstanceOf[Encoder[AnyRef]](o)).getBytes(UTF_8)
62-
logDuration("Serialization", o, startTime, res)
63-
res
71+
val bytes = printer.print(encoder.asInstanceOf[Encoder[AnyRef]](o)).getBytes(UTF_8)
72+
val result = Compression.compressIfNeeded(bytes, bufferSize, compressionAlgorithm)
73+
logDuration("Serialization", o, startTime, result)
74+
result
6475
case None =>
6576
throw new RuntimeException(
6677
s"Serialization of [${o.getClass.getName}] failed. Call Register[A] for this class or its supertype and append result to `def codecs`.")
@@ -71,9 +82,10 @@ abstract class CirceAkkaSerializer[Ser <: AnyRef: ClassTag](system: ExtendedActo
7182
val startTime = if (isDebugEnabled) System.nanoTime else 0L
7283
codecsMap.get(manifestMigrationsMap.getOrElse(manifest, manifest)) match {
7384
case Some((_, decoder)) =>
74-
val res = parser.parseByteArray(bytes).flatMap(_.as(decoder)).fold(e => throw e, identity)
75-
logDuration("Deserialization", res, startTime, bytes)
76-
res
85+
val decompressedBytes = Compression.decompressIfNeeded(bytes, bufferSize)
86+
val result = parser.parseByteArray(decompressedBytes).flatMap(_.as(decoder)).fold(e => throw e, identity)
87+
logDuration("Deserialization", result, startTime, bytes)
88+
result
7789
case None =>
7890
throw new NotSerializableException(
7991
s"Manifest [$manifest] did not match any known codec. If you're not currently performing a rolling upgrade, you must add a manifest migration to correct codec.")
@@ -115,4 +127,5 @@ abstract class CirceAkkaSerializer[Ser <: AnyRef: ClassTag](system: ExtendedActo
115127
bytes.length)
116128
}
117129
}
130+
118131
}

circe-akka-serializer/src/main/scala/org/virtuslab/ash/circe/CirceTraitCodec.scala

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,6 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
5454

5555
val errorCallback: String => Unit
5656

57-
Seq(
58-
(codecs, "codecs"),
59-
(manifestMigrations, "manifestMigrations"),
60-
(packagePrefix, "packagePrefix"),
61-
(classTagEvidence, "classTagEvidence"),
62-
(errorCallback, "errorCallback")).foreach { x =>
63-
assert(x._1 != null, s"${x._2} must be declared as a def or a lazy val to work correctly")
64-
}
65-
6657
private val mirror = ru.runtimeMirror(getClass.getClassLoader)
6758

6859
protected val codecsMap: Map[String, (Encoder[_ <: Ser], Decoder[_ <: Ser])] = codecs
@@ -85,8 +76,9 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
8576
.toMap
8677
.withDefaultValue("")
8778

88-
def manifest(o: AnyRef): String = parentsUpToRegisteredTypeMap(o.getClass.getName)
89-
79+
/**
80+
* Decoder apply method - decodes from Json into an object of type Ser
81+
*/
9082
override def apply(c: HCursor): Result[Ser] = {
9183
c.value.asObject match {
9284
case Some(obj) =>
@@ -103,6 +95,9 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
10395
}
10496
}
10597

98+
/**
99+
* Encoder apply method - encodes given object of type Ser into Json
100+
*/
106101
override def apply(a: Ser): Json = {
107102
val manifestString = manifest(a)
108103
val encoder = codecsMap.get(manifestString) match {
@@ -114,6 +109,33 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
114109
Json.obj((manifestString, encoder.asInstanceOf[Encoder[Ser]](a)))
115110
}
116111

112+
def manifest(o: AnyRef): String = parentsUpToRegisteredTypeMap(o.getClass.getName)
113+
114+
/*
115+
* All code below serves as a check - it checks,
116+
* whether class extending this trait is a valid implementation.
117+
* doNeededChecksOnStart() gets invoked on object creation.
118+
*/
119+
doNeededChecksOnStart()
120+
121+
private def doNeededChecksOnStart(): Unit = {
122+
checkImplementationForInvalidMemberDeclarations()
123+
checkSerializableTypesForMissingCodec(packagePrefix)
124+
checkCodecsForNull()
125+
checkCodecsForDuplication()
126+
}
127+
128+
private def checkImplementationForInvalidMemberDeclarations(): Unit = {
129+
Seq(
130+
(codecs, "codecs"),
131+
(manifestMigrations, "manifestMigrations"),
132+
(packagePrefix, "packagePrefix"),
133+
(classTagEvidence, "classTagEvidence"),
134+
(errorCallback, "errorCallback")).foreach { x =>
135+
assert(x._1 != null, s"${x._2} must be declared as a def or a lazy val to work correctly")
136+
}
137+
}
138+
117139
private def checkSerializableTypesForMissingCodec(packagePrefix: String): Unit = {
118140
val reflections = new Reflections(packagePrefix)
119141
val foundSerializables = reflections.getSubTypesOf(classTag[Ser].runtimeClass).asScala.filterNot(_.isInterface)
@@ -142,8 +164,4 @@ trait CirceTraitCodec[Ser <: AnyRef] extends Codec[Ser] {
142164
errorCallback(s"Codec for class ${x._1} has been declared multiple times with types ${x._2.mkString(",")}.")
143165
}
144166
}
145-
146-
checkSerializableTypesForMissingCodec(packagePrefix)
147-
checkCodecsForNull()
148-
checkCodecsForDuplication()
149167
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.virtuslab.ash.circe
2+
3+
import java.io.ByteArrayInputStream
4+
import java.io.ByteArrayOutputStream
5+
import java.util.zip.GZIPInputStream
6+
import java.util.zip.GZIPOutputStream
7+
8+
import scala.annotation.tailrec
9+
10+
object Compression {
11+
sealed trait Algorithm
12+
case object Off extends Algorithm
13+
case class GZip(greaterThan: Long) extends Algorithm
14+
// TODO (#159): add support for LZ4 java compression
15+
// case class LZ4(greaterThan: Long) extends Algorithm
16+
17+
private[circe] def compressIfNeeded(
18+
bytes: Array[Byte],
19+
bufferSize: Int,
20+
compressionAlgorithm: Algorithm): Array[Byte] =
21+
compressionAlgorithm match {
22+
case Compression.Off => bytes
23+
case Compression.GZip(largerThan) =>
24+
if (bytes.length <= largerThan) {
25+
bytes
26+
} else {
27+
val byteArrayOutputStream = new ByteArrayOutputStream(bufferSize)
28+
val outputStream = new GZIPOutputStream(byteArrayOutputStream)
29+
try outputStream.write(bytes)
30+
finally outputStream.close()
31+
byteArrayOutputStream.toByteArray
32+
}
33+
}
34+
35+
private[circe] def decompressIfNeeded(bytes: Array[Byte], bufferSize: Int): Array[Byte] =
36+
if (isCompressedWithGzip(bytes)) {
37+
val inputStream = new GZIPInputStream(new ByteArrayInputStream(bytes))
38+
val outputStream = new ByteArrayOutputStream()
39+
val buffer = new Array[Byte](bufferSize)
40+
41+
@tailrec def readChunk(): Unit =
42+
inputStream.read(buffer) match {
43+
case -1 => ()
44+
case n =>
45+
outputStream.write(buffer, 0, n)
46+
readChunk()
47+
}
48+
49+
try readChunk()
50+
finally inputStream.close()
51+
outputStream.toByteArray
52+
} else {
53+
bytes
54+
}
55+
56+
/*
57+
Since we are encoding JSON for Ser <: AnyRef types - they can start with:
58+
a) '{' char for Json objects or
59+
b) '[' char for Arrays or
60+
c) '"' char for String
61+
Thus, the first element of the `bytes` array could be one of three below:
62+
a) 123 Byte number - which is the decimal representation of the { character
63+
b) 91 Byte number - which is the decimal representation of the [ character
64+
c) 34 Byte number - which is the decimal representation of the " character
65+
66+
So, below quick comment on why isCompressedWithGzip will not return false positives (for not compressed JSON data):
67+
68+
bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte
69+
gets evaluated to:
70+
bytes(0) == 35615.toByte
71+
which gets evaluated to:
72+
bytes(0) == 31 // where 31 is of type Byte
73+
And since bytes(0) holds a Byte with value equal to 123, 91 or 34 - this will never be true.
74+
*/
75+
private[circe] def isCompressedWithGzip(bytes: Array[Byte]): Boolean =
76+
(bytes != null) && (bytes.length >= 2) &&
77+
(bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) &&
78+
(bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte)
79+
}

circe-akka-serializer/src/test/resources/application.conf

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,23 @@ akka.actor {
99
allow-java-serialization = off
1010
}
1111

12-
#Uncomment below to enable logging
13-
12+
# Uncomment chosen settings below to enable debug logging OR change compression settings
1413
#akka.loglevel = "DEBUG"
15-
#org.virtuslab.ash {
16-
# verbose-debug-logging = on
17-
#}
14+
org.virtuslab.ash {
15+
circe {
16+
# enables debug logging
17+
#verbose-debug-logging = on
18+
19+
# settings for compression of the payload
20+
compression {
21+
# Compression algorithm.
22+
# - off : no compression
23+
# - gzip : using common java gzip
24+
algorithm = off
25+
26+
# If compression is enabled with the `algorithm` setting,
27+
# the payload will be compressed if it's size is bigger than this value.
28+
compress-larger-than = 1 KiB # value set for testing purposes - do not change
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)