Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/scala/encry/network/BlackList.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ object BlackList {
final case class InvalidResponseManifestMessage(error: String) extends BanReason
final case class InvalidChunkMessage(error: String) extends BanReason
final case class InvalidManifestHasChangedMessage(error: String) extends BanReason
case object NotAllChunksSentMessage extends BanReason
case object UnrequestedChunksSentMessage extends BanReason
case object ExpiredNumberOfReRequestAttempts extends BanReason
case object ExpiredNumberOfRequests extends BanReason
final case class InvalidStateAfterFastSync(error: String) extends BanReason
Expand Down
58 changes: 29 additions & 29 deletions src/main/scala/encry/view/fast/sync/SnapshotHolder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,29 @@ package encry.view.fast.sync

import SnapshotChunkProto.SnapshotChunkMessage
import SnapshotManifestProto.SnapshotManifestProtoMessage
import akka.actor.{ Actor, ActorRef, Cancellable, Props }
import akka.actor.{Actor, ActorRef, Cancellable, Props}
import cats.syntax.either._
import cats.syntax.option._
import com.google.protobuf.ByteString
import com.typesafe.scalalogging.StrictLogging
import encry.network.BlackList.BanReason._
import encry.network.Broadcast
import encry.network.NetworkController.ReceivableMessages.{ DataFromPeer, RegisterMessagesHandler }
import encry.network.PeersKeeper.{ BanPeer, SendToNetwork }
import encry.network.NetworkController.ReceivableMessages.{DataFromPeer, RegisterMessagesHandler}
import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, SemanticallySuccessfulModifier}
import encry.network.PeersKeeper.{BanPeer, SendToNetwork}
import encry.settings.EncryAppSettings
import SnapshotHolder._
import encry.storage.VersionalStorage.{StorageKey, StorageValue}
import encry.view.fast.sync.FastSyncExceptions.{ApplicableChunkIsAbsent, FastSyncException}
import encry.view.fast.sync.SnapshotHolder._
import encry.view.history.History
import encry.view.state.UtxoState
import encry.view.state.avlTree.utils.implicits.Instances._
import encry.view.state.avlTree.{Node, NodeSerilalizer}
import org.encryfoundation.common.modifiers.history.Block
import org.encryfoundation.common.network.BasicMessagesRepo._
import org.encryfoundation.common.utils.Algos
import cats.syntax.option._
import encry.network.BlackList.BanReason.{
ExpiredNumberOfReRequestAttempts,
ExpiredNumberOfRequests,
InvalidChunkMessage,
InvalidResponseManifestMessage,
InvalidStateAfterFastSync
}
import encry.network.NodeViewSynchronizer.ReceivableMessages.{ ChangedHistory, SemanticallySuccessfulModifier }
import encry.storage.VersionalStorage.{ StorageKey, StorageValue }
import encry.view.fast.sync.FastSyncExceptions.{ ApplicableChunkIsAbsent, FastSyncException }
import encry.view.history.History
import encry.view.state.avlTree.{ Node, NodeSerilalizer }
import cats.syntax.either._

import scala.util.Try
import encry.view.state.avlTree.utils.implicits.Instances._

class SnapshotHolder(settings: EncryAppSettings,
networkController: ActorRef,
Expand Down Expand Up @@ -124,17 +119,22 @@ class SnapshotHolder(settings: EncryAppSettings,
(controller, chunk) = controllerAndChunk
validChunk <- snapshotProcessor.validateChunkId(chunk)
processor = snapshotProcessor.updateCache(validChunk)
newProcessor <- processor.processNextApplicableChunk(processor).leftFlatMap {
case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException]
case t => t.asLeft[SnapshotProcessor]
}
newProcessor <- processor.processNextApplicableChunk(processor).leftFlatMap {
case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException]
case t => t.asLeft[SnapshotProcessor]
}
} yield (newProcessor, controller)) match {
case Left(error) =>
nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage(error.error))
restartFastSync(history)
case Right((processor, controller))
if controller.requestedChunks.isEmpty && controller.notYetRequested.isEmpty && processor.chunksCache.nonEmpty =>
nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage("For request is empty, buffer is nonEmpty"))
if controller.requestedChunks.isEmpty &&
controller.notYetRequested.isEmpty &&
(processor.chunksCache.nonEmpty || processor.applicableChunks.nonEmpty) =>
if (processor.chunksCache.nonEmpty)
nodeViewSynchronizer ! BanPeer(remote, UnrequestedChunksSentMessage)
else
nodeViewSynchronizer ! BanPeer(remote, NotAllChunksSentMessage)
restartFastSync(history)
case Right((processor, controller))
if controller.requestedChunks.isEmpty && controller.notYetRequested.isEmpty =>
Expand Down Expand Up @@ -233,10 +233,10 @@ class SnapshotHolder(settings: EncryAppSettings,
def workMod(history: History): Receive = {
case TreeChunks(chunks, id) =>
//todo add collection with potentialManifestsIds to NVH
val manifestIds: Seq[Array[Byte]] = snapshotProcessor.potentialManifestsIds
if (!manifestIds.exists(_.sameElements(id))) {
snapshotProcessor.createNewSnapshot(id, manifestIds, chunks)
} else logger.info(s"Doesn't need to create snapshot")
snapshotProcessor.createNewSnapshot(id, chunks).fold( err =>
logger.warn(s"Failed to create new snapshot due to ${err.error}"),
newProc => snapshotProcessor = newProc
)

case SemanticallySuccessfulModifier(block: Block) if history.isFullChainSynced =>
logger.info(s"Snapshot holder got semantically successful modifier message. Started processing it.")
Expand Down
92 changes: 40 additions & 52 deletions src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala
Original file line number Diff line number Diff line change
@@ -1,46 +1,30 @@
package encry.view.fast.sync

import java.io.File
import encry.storage.VersionalStorage.{ StorageKey, StorageValue, StorageVersion }
import encry.view.state.UtxoState
import org.encryfoundation.common.modifiers.history.Block

import cats.syntax.either._
import com.google.common.primitives.Ints
import com.typesafe.scalalogging.StrictLogging
import encry.settings.{ EncryAppSettings, LevelDBSettings }
import encry.settings.{EncryAppSettings, LevelDBSettings}
import encry.storage.VersionalStorage
import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion}
import encry.storage.iodb.versionalIODB.IODBWrapper
import encry.storage.levelDb.versionalLevelDB.{ LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion }
import encry.view.fast.sync.SnapshotHolder.{
SnapshotChunk,
SnapshotChunkSerializer,
SnapshotManifest,
SnapshotManifestSerializer
}
import encry.view.state.avlTree.{ AvlTree, InternalNode, LeafNode, Node, NodeSerilalizer, ShadowNode }
import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion}
import encry.view.fast.sync.FastSyncExceptions._
import encry.view.fast.sync.SnapshotHolder.{SnapshotChunk, SnapshotChunkSerializer, SnapshotManifest, SnapshotManifestSerializer}
import encry.view.history.History
import encry.view.state.UtxoState
import encry.view.state.avlTree.utils.implicits.Instances._
import encry.view.state.avlTree._
import io.iohk.iodb.{ByteArrayWrapper, LSMStore}
import org.encryfoundation.common.modifiers.history.Block
import org.encryfoundation.common.utils.Algos
import org.encryfoundation.common.utils.TaggedTypes.Height
import org.iq80.leveldb.{DB, Options}
import scorex.utils.Random
import encry.view.state.avlTree.utils.implicits.Instances._
import io.iohk.iodb.{ ByteArrayWrapper, LSMStore }
import org.iq80.leveldb.{ DB, Options }
import cats.syntax.either._

import scala.collection.immutable.{HashMap, HashSet}
import scala.language.postfixOps
import com.google.common.primitives.Ints
import encry.view.fast.sync.FastSyncExceptions.{
ApplicableChunkIsAbsent,
BestHeaderAtHeightIsAbsent,
ChunkApplyError,
ChunkValidationError,
EmptyHeightKey,
EmptyRootNodeError,
FastSyncException,
InconsistentChunkId,
InitializeHeightAndRootKeysException,
ProcessNewBlockError,
ProcessNewSnapshotError,
UtxoCreationError
}
import encry.view.history.History
import org.encryfoundation.common.utils.TaggedTypes.Height
import scala.collection.immutable.{ HashMap, HashSet }

final case class SnapshotProcessor(settings: EncryAppSettings,
storage: VersionalStorage,
Expand Down Expand Up @@ -193,26 +177,30 @@ final case class SnapshotProcessor(settings: EncryAppSettings,

def createNewSnapshot(
id: Array[Byte],
manifestIds: Seq[Array[Byte]],
newChunks: List[SnapshotChunk]
): Either[ProcessNewSnapshotError, SnapshotProcessor] = {
//todo add only exists chunks
val manifest: SnapshotManifest = SnapshotManifest(id, newChunks.map(_.id))
val snapshotToDB: List[(StorageKey, StorageValue)] = newChunks.map { elem =>
val bytes: Array[Byte] = SnapshotChunkSerializer.toProto(elem).toByteArray
StorageKey @@ elem.id -> StorageValue @@ bytes
}
val manifestToDB: (StorageKey, StorageValue) =
StorageKey @@ manifest.manifestId -> StorageValue @@ SnapshotManifestSerializer
.toProto(manifest)
.toByteArray
val updateList: (StorageKey, StorageValue) =
PotentialManifestsIdsKey -> StorageValue @@ (manifest.manifestId :: manifestIds.toList).flatten.toArray
val toApply: List[(StorageKey, StorageValue)] = manifestToDB :: updateList :: snapshotToDB
logger.info(s"A new snapshot created successfully. Insertion started.")
Either.catchNonFatal(storage.insert(StorageVersion @@ Random.randomBytes(), toApply, List.empty)) match {
case Left(value) => ProcessNewSnapshotError(value.getMessage).asLeft[SnapshotProcessor]
case Right(_) => this.asRight[ProcessNewSnapshotError]
val potential = potentialManifestsIds
if (potential.exists(_.sameElements(id)))
ProcessNewSnapshotError(s"Potential manifest with id ${Algos.encode(id)} already exists").asLeft[SnapshotProcessor]
else {
//todo add only exists chunks
val manifest: SnapshotManifest = SnapshotManifest(id, newChunks.map(_.id))
val snapshotToDB: List[(StorageKey, StorageValue)] = newChunks.map { elem =>
val bytes: Array[Byte] = SnapshotChunkSerializer.toProto(elem).toByteArray
StorageKey @@ elem.id -> StorageValue @@ bytes
}
val manifestToDB: (StorageKey, StorageValue) =
StorageKey @@ manifest.manifestId -> StorageValue @@ SnapshotManifestSerializer
.toProto(manifest)
.toByteArray
val updateList: (StorageKey, StorageValue) =
PotentialManifestsIdsKey -> StorageValue @@ (manifest.manifestId :: potential.toList).flatten.toArray
val toApply: List[(StorageKey, StorageValue)] = manifestToDB :: updateList :: snapshotToDB
logger.info(s"A new snapshot created successfully. Insertion started.")
Either.catchNonFatal(storage.insert(StorageVersion @@ Random.randomBytes(), toApply, List.empty)) match {
case Left(value) => ProcessNewSnapshotError(value.getMessage).asLeft[SnapshotProcessor]
case Right(_) => this.asRight[ProcessNewSnapshotError]
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/encry/view/state/avlTree/AvlTree.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage
kSer: Serializer[K],
vSer: Serializer[V]
): (Option[Node[K, V]]) = node match {
case _: EmptyNode[K, V] => None
case shadowNode: ShadowNode[K, V] =>
val restoredNode = shadowNode.restoreFullNode(storage)
delete(restoredNode, key)
Expand Down Expand Up @@ -498,6 +499,7 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage
}

def selfInspectionAfterFastSync(implicit kSer: Serializer[K]): Boolean = {

@scala.annotation.tailrec
def loop(nodesToProcess: List[Node[K, V]], keysToInspect: List[Array[Byte]]): List[Array[Byte]] =
if (nodesToProcess.nonEmpty) nodesToProcess.head match {
Expand All @@ -520,10 +522,13 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage
loop(updatedNodeToProcess ::: next, Algos.hash(kSer.toBytes(i.key).reverse) :: i.hash :: current ::: keysToInspect)
case l: LeafNode[K, V] => loop(nodesToProcess.drop(1), Algos.hash(kSer.toBytes(l.key).reverse) :: l.hash :: keysToInspect)
} else keysToInspect

val keys: Set[ByteArrayWrapper] = loop(List(rootNode), List.empty).map(ByteArrayWrapper(_)).toSet

val allKeysFromDB: Set[ByteArrayWrapper] = storage.getAllKeys(-1)
.map(ByteArrayWrapper(_)).toSet - ByteArrayWrapper(UtxoState.bestHeightKey) - ByteArrayWrapper(AvlTree.rootNodeKey)
logger.debug(s"${keys.map(l => Algos.encode(l.data))}")

(allKeysFromDB -- keys).isEmpty
}

Expand Down
57 changes: 57 additions & 0 deletions src/test/scala/encry/modifiers/InstanceFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import encry.consensus.EncrySupplyController
import encry.modifiers.mempool._
import encry.modifiers.state.Keys
import encry.settings.{EncryAppSettings, NodeSettings}
import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion}
import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion}
import encry.utils.implicits.UTXO.combineAll
import encry.utils.{EncryGenerator, FileHelper, NetworkTimeProvider, TestHelper}
import encry.view.history.History
import encry.view.history.storage.HistoryStorage
import encry.view.state.UtxoState
import encry.view.state.avlTree.AvlTree
import io.iohk.iodb.LSMStore
import org.encryfoundation.common.modifiers.history.{Block, Header, Payload}
import org.encryfoundation.common.modifiers.mempool.transaction.EncryAddress.Address
import org.encryfoundation.common.modifiers.mempool.transaction.{Input, Transaction}
import org.encryfoundation.common.modifiers.state.box.{AssetBox, EncryProposition}
import org.encryfoundation.common.modifiers.state.box.Box.Amount
Expand Down Expand Up @@ -170,6 +175,58 @@ trait InstanceFactory extends Keys with EncryGenerator {
Block(header, Payload(header.id, txs))
}

def generateNextBlockAndInsert(history: History,
tree: AvlTree[StorageKey, StorageValue],
insert: Boolean,
difficultyDiff: BigInt = 0,
prevId: Option[ModifierId] = None,
txsQty: Int = 100,
additionalDifficulty: BigInt = 0,
addressOpt: Option[Address] = None): (Block, AvlTree[StorageKey, StorageValue]) = {

import encry.utils.implicits.UTXO._
import encry.view.state.avlTree.utils.implicits.Instances._

val previousHeaderId: ModifierId =
prevId.getOrElse(history.getBestHeader.map(_.id).getOrElse(Header.GenesisParentId))
val requiredDifficulty: Difficulty = history.getBestHeader.map(parent =>
history.requiredDifficultyAfter(parent).getOrElse(Difficulty @@ BigInt(0)))
.getOrElse(history.settings.constants.InitialDifficulty)

val txs = { addressOpt match {
case Some(address) => if (txsQty != 0) genValidPaymentTxsToAddr(Scarand.nextInt(txsQty), address) else Seq.empty
case None => if (txsQty != 0) genValidPaymentTxs(Scarand.nextInt(txsQty)) else Seq.empty
}

} ++ Seq(coinbaseAt(history.getBestHeaderHeight + 1))

val combinedStateChange: UtxoState.StateChange = combineAll(txs.map(UtxoState.tx2StateChange).toList)
val newStateRoot = tree.getOperationsRootHash(
combinedStateChange.outputsToDb.toList, combinedStateChange.inputsToDb.toList
).get

val header = genHeader.copy(
parentId = previousHeaderId,
height = history.getBestHeaderHeight + 1,
difficulty = Difficulty @@ (requiredDifficulty + difficultyDiff + additionalDifficulty),
transactionsRoot = Payload.rootHash(txs.map(_.id)),
stateRoot = newStateRoot
)

val block = Block(header, Payload(header.id, txs))

val newTree: AvlTree[StorageKey, StorageValue] = if (insert)
tree.insertAndDeleteMany(
StorageVersion !@@ block.id,
combinedStateChange.outputsToDb.toList,
combinedStateChange.inputsToDb.toList,
Height @@ block.header.height
) else tree

(block, newTree)

}

def genForkOn(qty: Int,
addDifficulty: BigInt = 0,
from: Int,
Expand Down
40 changes: 40 additions & 0 deletions src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package encry.network.DeliveryManagerTests

import java.io.File
import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestProbe}
import encry.local.miner.Miner.{DisableMining, StartMining}
Expand All @@ -10,10 +12,18 @@ import encry.network.DeliveryManager.FullBlockChainIsSynced
import encry.network.NodeViewSynchronizer.ReceivableMessages.UpdatedHistory
import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming}
import encry.settings.EncryAppSettings
import encry.storage.VersionalStorage.{StorageKey, StorageValue}
import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion}
import encry.utils.FileHelper
import encry.view.fast.sync.FastSyncTestsUtils.settings
import encry.view.history.History
import encry.view.state.avlTree.AvlTree
import org.encryfoundation.common.modifiers.history.Block
import org.encryfoundation.common.modifiers.mempool.transaction.EncryAddress.Address
import org.encryfoundation.common.network.BasicMessagesRepo.Handshake
import org.encryfoundation.common.utils.TaggedTypes.ModifierId
import org.iq80.leveldb.Options

import scala.collection.mutable
import scala.collection.mutable.WrappedArray

Expand Down Expand Up @@ -44,6 +54,36 @@ object DMUtils extends InstanceFactory {
(a, blocks :+ block)
}

def generateBlocksWithTree(qty: Int,
history: History,
prevTreeOpt: Option[AvlTree[StorageKey, StorageValue]] = None,
addressOpt: Option[Address] = None): (History, List[Block], AvlTree[StorageKey, StorageValue]) = {

import encry.view.state.avlTree.utils.implicits.Instances._

val avl: AvlTree[StorageKey, StorageValue] = prevTreeOpt match {
case Some(t) => t
case None =>
val dir: File = FileHelper.getRandomTempDir
val storage: VLDBWrapper = {
val levelDBInit = LevelDbFactory.factory.open(dir, new Options)
VLDBWrapper(VersionalLevelDBCompanion(levelDBInit, settings.levelDB, keySize = 32))
}

AvlTree[StorageKey, StorageValue](storage)
}

(0 until qty).foldLeft(history, List.empty[Block], avl) {
case ((prevHistory, blocks, tree), _) =>
val (block: Block, newTree: AvlTree[StorageKey, StorageValue]) =
generateNextBlockAndInsert(prevHistory, tree, prevTreeOpt.isEmpty, addressOpt = addressOpt)
prevHistory.append(block.header)
prevHistory.append(block.payload)
val newHist = prevHistory.reportModifierIsValid(block)
(newHist, blocks :+ block, newTree)
}
}

def toKey(id: ModifierId): WrappedArray.ofByte = new mutable.WrappedArray.ofByte(id)

def createPeer(port: Int,
Expand Down
Loading