Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions benchmarks/src/test/scala/benches/HistoryBenches.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit

import benches.HistoryBenches.HistoryBenchState
import benches.Utils._
import encry.network.DownloadedModifiersValidator.ModifierWithBytes
import encry.view.history.History
import encryBenchmark.BenchSettings
import org.encryfoundation.common.modifiers.history.Block
Expand All @@ -21,8 +22,8 @@ class HistoryBenches {
bh.consume {
val history: History = generateHistory(benchStateHistory.settings, getRandomTempDir)
benchStateHistory.blocks.foldLeft(history) { case (historyL, block) =>
historyL.append(block.header)
historyL.append(block.payload)
historyL.append(ModifierWithBytes(block.header))
historyL.append(ModifierWithBytes(block.payload))
historyL.reportModifierIsValid(block)
}
history.closeStorage()
Expand Down Expand Up @@ -70,8 +71,8 @@ object HistoryBenches extends BenchSettings {
case ((prevHistory, prevBlock, vector), _) =>
val block: Block =
generateNextBlockValidForHistory(prevHistory, 0, prevBlock, Seq(coinbaseTransaction(0)))
prevHistory.append(block.header)
prevHistory.append(block.payload)
prevHistory.append(ModifierWithBytes(block.header))
prevHistory.append(ModifierWithBytes(block.payload))
(prevHistory.reportModifierIsValid(block), Some(block), vector :+ block)
}
resultedHistory._1.closeStorage()
Expand Down
24 changes: 17 additions & 7 deletions src/main/scala/encry/network/DownloadedModifiersValidator.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package encry.network

import TransactionProto.TransactionProtoMessage
import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props }
import akka.dispatch.{ PriorityGenerator, UnboundedStablePriorityMailbox }
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
import akka.dispatch.{PriorityGenerator, UnboundedStablePriorityMailbox}
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import encry.modifiers.history.HeaderUtils
import encry.network.BlackList.BanReason._
import encry.network.DownloadedModifiersValidator.{ InvalidModifier, ModifiersForValidating }
import encry.network.DownloadedModifiersValidator.{InvalidModifier, ModifierWithBytes, ModifiersForValidating}
import encry.network.NodeViewSynchronizer.ReceivableMessages.UpdatedHistory
import encry.network.PeerConnectionHandler.ConnectedPeer
import encry.network.PeersKeeper.BanPeer
Expand All @@ -16,9 +16,12 @@ import encry.stats.StatsSender.ValidatedModifierFromNetwork
import encry.view.NodeViewHolder.ReceivableMessages.ModifierFromRemote
import encry.view.history.History
import encry.view.mempool.MemoryPool.NewTransaction
import org.encryfoundation.common.modifiers.mempool.transaction.{ Transaction, TransactionProtoSerializer }
import org.encryfoundation.common.utils.TaggedTypes.{ ModifierId, ModifierTypeId }
import scala.util.{ Failure, Success, Try }
import org.encryfoundation.common.modifiers.PersistentModifier
import org.encryfoundation.common.modifiers.history.HistoryModifiersProtoSerializer
import org.encryfoundation.common.modifiers.mempool.transaction.{Transaction, TransactionProtoSerializer}
import org.encryfoundation.common.utils.TaggedTypes.{ModifierId, ModifierTypeId}

import scala.util.{Failure, Success, Try}

class DownloadedModifiersValidator(modifierIdSize: Int,
nodeViewHolder: ActorRef,
Expand Down Expand Up @@ -50,7 +53,7 @@ class DownloadedModifiersValidator(modifierIdSize: Int,
s"Sending validated modifier to NodeViewHolder"
)
influxRef.foreach(_ ! ValidatedModifierFromNetwork(typeId))
nodeViewHolder ! ModifierFromRemote(modifier)
nodeViewHolder ! ModifierFromRemote(ModifierWithBytes(modifier, bytes))
} else {
logger.info(
s"Modifier with id: ${modifier.encodedId} of type: $typeId invalid cause of:" +
Expand Down Expand Up @@ -97,6 +100,13 @@ class DownloadedModifiersValidator(modifierIdSize: Int,

object DownloadedModifiersValidator {

final case class ModifierWithBytes(modifier: PersistentModifier, bytes: Array[Byte])

object ModifierWithBytes {
def apply(modifier: PersistentModifier): ModifierWithBytes =
new ModifierWithBytes(modifier, HistoryModifiersProtoSerializer.toProto(modifier).tail)
}

final case class ModifiersForValidating(remote: ConnectedPeer,
typeId: ModifierTypeId,
modifiers: Map[ModifierId, Array[Byte]])
Expand Down
33 changes: 18 additions & 15 deletions src/main/scala/encry/view/ModifiersCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ import org.encryfoundation.common.modifiers.PersistentModifier
import org.encryfoundation.common.modifiers.history.Header
import org.encryfoundation.common.utils.Algos
import org.encryfoundation.common.utils.TaggedTypes.ModifierId

import scala.annotation.tailrec
import scala.collection.immutable.SortedMap
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import encry.EncryApp.settings
import encry.network.DownloadedModifiersValidator.ModifierWithBytes

object ModifiersCache extends StrictLogging {

private type Key = mutable.WrappedArray[Byte]

val cache: TrieMap[Key, PersistentModifier] = TrieMap[Key, PersistentModifier]()
val cache: TrieMap[Key, ModifierWithBytes] = TrieMap[Key, ModifierWithBytes]()
private var headersCollection: SortedMap[Int, List[ModifierId]] = SortedMap[Int, List[ModifierId]]()

private var isChainSynced = false
Expand All @@ -30,10 +32,10 @@ object ModifiersCache extends StrictLogging {

def contains(key: Key): Boolean = cache.contains(key)

def put(key: Key, value: PersistentModifier, history: History): Unit = if (!contains(key)) {
logger.debug(s"Put ${value.encodedId} of type ${value.modifierTypeId} to cache.")
def put(key: Key, value: ModifierWithBytes, history: History): Unit = if (!contains(key)) {
logger.debug(s"Put ${value.modifier.encodedId} of type ${value.modifier.modifierTypeId} to cache.")
cache.put(key, value)
value match {
value.modifier match {
case header: Header =>
val possibleHeadersAtCurrentHeight: List[ModifierId] = headersCollection.getOrElse(header.height, List())
logger.debug(s"possibleHeadersAtCurrentHeight(${header.height}): ${possibleHeadersAtCurrentHeight.map(Algos.encode).mkString(",")}")
Expand All @@ -43,31 +45,32 @@ object ModifiersCache extends StrictLogging {
case _ =>
}

if (size > history.settings.node.modifiersCacheSize) cache.find { case (_, modifier) =>
history.testApplicable(modifier) match {
if (size > history.settings.node.modifiersCacheSize) cache.find { case (_, modifierWithBytes) =>
history.testApplicable(modifierWithBytes.modifier) match {
case Right(_) | Left(_: NonFatalValidationError) => false
case _ => true
}
}.map(mod => remove(mod._1))
}

def remove(key: Key): Option[PersistentModifier] = {
def remove(key: Key): Option[ModifierWithBytes] = {
logger.debug(s"Going to delete ${Algos.encode(key.toArray)}. Cache contains: ${cache.get(key).isDefined}.")
cache.remove(key)
}

def popCandidate(history: History): List[PersistentModifier] = synchronized {
def popCandidate(history: History): List[ModifierWithBytes] = synchronized {
findCandidateKey(history).flatMap(k => remove(k))
}

override def toString: String = cache.keys.map(key => Algos.encode(key.toArray)).mkString(",")

def findCandidateKey(history: History): List[Key] = {

def isApplicable(key: Key): Boolean = cache.get(key).exists(modifier => history.testApplicable(modifier) match {
case Left(_: FatalValidationError) => remove(key); false
case Right(_) => true
case Left(_) => false
def isApplicable(key: Key): Boolean = cache.get(key)
.exists(modifierWithBytes => history.testApplicable(modifierWithBytes.modifier) match {
case Left(_: FatalValidationError) => remove(key); false
case Right(_) => true
case Left(_) => false
})

def getHeadersKeysAtHeight(height: Int): List[Key] = {
Expand All @@ -90,8 +93,8 @@ object ModifiersCache extends StrictLogging {
}.toList

def exhaustiveSearch: List[Key] = List(cache.find { case (k, v) =>
v match {
case _: Header if history.getBestHeaderId.exists(headerId => headerId sameElements v.parentId) => true
v.modifier match {
case _: Header if history.getBestHeaderId.exists(headerId => headerId sameElements v.modifier.parentId) => true
case _ =>
val isApplicableMod: Boolean = isApplicable(k)
logger.debug(s"Try to apply: ${Algos.encode(k.toArray)} and result is: $isApplicableMod")
Expand All @@ -113,7 +116,7 @@ object ModifiersCache extends StrictLogging {
logger.debug(s"HeadersCollection size is: ${headersCollection.size}")
logger.debug(s"Drop height ${history.getBestHeaderHeight + 1} in HeadersCollection")
val res = value.map(cache.get(_)).collect {
case Some(v: Header)
case Some(ModifierWithBytes(v: Header, _))
if ((v.parentId sameElements history.getBestHeaderId.getOrElse(Array.emptyByteArray)) ||
(history.getBestHeaderHeight == history.settings.constants.PreGenesisHeight &&
(v.parentId sameElements Header.GenesisParentId)
Expand Down
Loading