Skip to content

Commit

Permalink
upgrade from rxjava to rxjava2.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robin Duda committed Apr 1, 2019
1 parent 094ec6e commit 7e1d7a0
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ApplicationContext(CoreContext context) {
connectUsingIpc(endpoint);
}

node.ethSyncing().observable().subscribe(is -> {
node.ethSyncing().flowable().subscribe(is -> {
logger.event("synchronizing").put("is", is.isSyncing()).send();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@
import com.codingchili.ethereumingest.model.ImportListener;
import com.codingchili.ethereumingest.model.Importer;
import com.codingchili.ethereumingest.model.StorableBlock;
import io.reactivex.Flowable;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.core.methods.response.EthBlock;
import rx.Observable;
import rx.Subscriber;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.*;
import java.util.function.Consumer;
import java.util.function.Function;

import static com.codingchili.core.configuration.CoreStrings.ID_TIME;
import static com.codingchili.ethereumingest.importer.ApplicationContext.TX_ADDR;
Expand All @@ -32,11 +34,11 @@
/**
* A service that reads block data from an Ethereum client IPC connection.
* <p>
* If #{@link ApplicationConfig#txImport} is set to true will request full
* If #{@link ApplicationConfig#isTxImport()} is set to true will request full
* transaction details in each block. The transaction data is forwarded to
* another handler that imports the transactions into the configured storage.
* <p>
* If #{@link ApplicationConfig#blockImport} this service will also import
* If #{@link ApplicationConfig#isBlockImport()} this service will also import
* block data into the configured storage.
*/
public class BlockService implements Importer {
Expand Down Expand Up @@ -72,33 +74,64 @@ public void start(Future<Void> future) {
Integer start = Integer.parseInt(config.getStartBlock());
Integer end = Integer.parseInt(config.getBlockEnd());

Observable.range(start, end - start).subscribe(new Subscriber<Integer>() {
Flowable.range(start, end - start).subscribe(new Subscriber<>() {
final AtomicReference<String> hash = new AtomicReference<>();
private Subscription subscription;

@Override
public void onStart() {
request(config.getBackpressureBlocks());
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(config.getBackpressureBlocks());
}

@Override
public void onCompleted() {
listener.onFinished();
public void onNext(Integer blockNum) {
// avoid blocking the event loop while waiting for the ipc.
context.blocking(exec -> {
if (!stopping.get()) {
EthBlock event = chain.apply(blockNum);
if (event.getBlock() == null) {
listener.onSourceDepleted();

// block does not exist yet, wait a bit.
context.periodic(() -> ONE_SECOND, BLOCK_RETRY_TIMER, done -> {
timerId.set(done);
EthBlock retry = chain.apply(blockNum);
if (retry.getBlock() != null) {
context.cancel(done);
importer.accept(retry);
}
});
} else {
importer.accept(event);
}
}
exec.complete();

}, (done) -> {
//
});
}

@Override
public void onError(Throwable e) {
public void onError(Throwable throwable) {
if (!stopping.get()) {
listener.onError(e, hash.get());
unsubscribe();
listener.onError(throwable, hash.get());
subscription.cancel();
stopping.set(true);
}
}

@Override
public void onComplete() {
listener.onFinished();
}

Consumer<EthBlock> importer = eth -> {
hash.set(eth.getBlock().getHash());
startImport(eth).setHandler(imported -> {
if (imported.succeeded()) {
request(1);
subscription.request(1);
} else {
onError(imported.cause());
}
Expand All @@ -121,35 +154,6 @@ public void onError(Throwable e) {
return null;
}
};

@Override
public void onNext(Integer blockNum) {
// avoid blocking the event loop while waiting for the ipc.
context.blocking(exec -> {
if (!stopping.get()) {
EthBlock event = chain.apply(blockNum);
if (event.getBlock() == null) {
listener.onSourceDepleted();

// block does not exist yet, wait a bit.
context.periodic(() -> ONE_SECOND, BLOCK_RETRY_TIMER, done -> {
timerId.set(done);
EthBlock retry = chain.apply(blockNum);
if (retry.getBlock() != null) {
context.cancel(done);
importer.accept(retry);
}
});
} else {
importer.accept(event);
}
}
exec.complete();

}, (done) -> {
//
});
}
});
} else {
future.fail(done.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import com.codingchili.ethereumingest.model.Importer;
import com.codingchili.ethereumingest.model.StorableTransaction;
import com.codingchili.ethereumingest.model.TransactionLogListener;
import io.reactivex.Flowable;
import io.vertx.core.Future;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import rx.Observable;
import rx.Subscriber;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -28,7 +29,7 @@
/**
* A service that receives a list of transactions from each block retrieved from the
* ipc connection in #{@link BlockService}. This service is only used if
* #{@link ApplicationConfig#txImport} is set to true.
* #{@link ApplicationConfig#isTxImport()} is set to true.
*/
public class TransactionService implements Importer {
private AtomicInteger queue = new AtomicInteger(0);
Expand Down Expand Up @@ -68,23 +69,27 @@ public void start(Future<Void> future) {
private void importTx(Message<?> request, Collection<StorableTransaction> list) {
final AtomicReference<String> hash = new AtomicReference<>();

Observable.from(list).subscribe(new Subscriber<StorableTransaction>() {
@Override
public void onStart() {
request(config.getBackPressureTx());
}
Flowable.fromIterable(list).subscribe(new Subscriber<>() {
private Subscription subscription;


@Override
public void onCompleted() {
request.reply(null);
listener.onFinished();
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(config.getBackPressureTx());
}

@Override
public void onError(Throwable e) {
listener.onError(e, hash.get());
request.reply(e);
unsubscribe();
subscription.cancel();
}

@Override
public void onComplete() {
request.reply(null);
listener.onFinished();
}

@Override
Expand All @@ -96,7 +101,7 @@ public void onNext(StorableTransaction tx) {

if (done.succeeded()) {
listener.onImported(tx.getHash(), tx.getBlockNumber().longValue());
request(1);
subscription.request(1);
} else {
hash.set(tx.getHash());
onError(done.cause());
Expand Down

0 comments on commit 7e1d7a0

Please sign in to comment.