Skip to content

Commit

Permalink
complete e-working of the calls to TwitIE which should ensure better …
Browse files Browse the repository at this point in the history
…performance. We know parallelise the reading from the ES stream and handle each tweet in turn. This not only means the tweets are processed in parallel but we can start processing them before we've read the full result set from ES. see issue #6 for more details
  • Loading branch information
greenwoodma committed Sep 22, 2020
1 parent 3fb7fc9 commit 5eb34a6
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 136 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>com.afp.medialab.weverify.social</groupId>
<artifactId>twint-wrapper</artifactId>
<version>2.0.4-SNAPSHOT</version>
<version>2.0.5-SNAPSHOT</version>
<name>twint-wrapper</name>
<description>Twitter scraper wrapper</description>

Expand Down Expand Up @@ -155,7 +155,7 @@
<properties>
<docker.path>src/main/docker/delivery</docker.path>
<docker.image.name>twint-wrapper</docker.image.name>
<docker.image.tag>2.0.4</docker.image.tag>
<docker.image.tag>2.0.5</docker.image.tag>
</properties>
<build>
<plugins>
Expand Down
110 changes: 67 additions & 43 deletions src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;

import javax.transaction.Transactional;

Expand All @@ -33,8 +37,16 @@
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;

import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.afp.medialab.weverify.social.model.CollectRequest;
import com.afp.medialab.weverify.social.model.twint.TwintModel;
import com.afp.medialab.weverify.social.model.twint.WordsInTweet;

@Service
@Transactional
Expand All @@ -44,13 +56,10 @@ public class ESOperations {
private ElasticsearchOperations esOperation;

@Autowired
private TwittieProcessing twittieProcessing;

// bulk number of request
private int bulkLimit = 1000;
private TweetsPostProcess twintModelAdapter;

// private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
// HH:mm:ss");
@Autowired
RestHighLevelClient highLevelClient;

private static Logger Logger = LoggerFactory.getLogger(ESOperations.class);

Expand All @@ -68,13 +77,58 @@ public void enrichWithTweetie(String essid) throws IOException {
builder.mustNot(existsQuery("wit"));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(builder)
.withPageable(PageRequest.of(0, 10)).build();
SearchHitsIterator<TwintModel> stream = esOperation.searchForStream(searchQuery, TwintModel.class);
List<TwintModel> model = new ArrayList<TwintModel>();
while (stream.hasNext()) {
model.add(stream.next().getContent());

// an AtomicInteger so that we can count the number of successfully processed Tweets
// from within the code running on multiple threads
AtomicInteger successful = new AtomicInteger();

// use a try with resources to ensure the iterator is closed no matter what
try (SearchHitsIterator<TwintModel> stream = esOperation.searchForStream(searchQuery, TwintModel.class)) {

// create a Spliterator over the normal ES iterator so we can work on the
// elements in parallel. Specifying the size ensures that the data is split
// sensibly across multiple threads and we can start calling TwitIE while
// we are still pulling results from ES
Spliterator<SearchHit<TwintModel>> it = Spliterators.spliterator(stream,
stream.getTotalHits(),
Spliterator.IMMUTABLE | Spliterator.CONCURRENT);

// now we work our way through all the hits allowing the JVM to work
// out how many threads we should use given where it's being run etc.
StreamSupport.stream(it,true).forEach(hit -> {

// get the TwintModel object out of the ES search result hit
TwintModel tm = hit.getContent();

try {
// this is the time consuming bit that eventually runs TwitIE
// but now we are using multiple threads this should be a bit quicker
List<WordsInTweet> wit = twintModelAdapter.buildWit(tm.getFull_text());

// convert the result of running TwitIE into a JSON version
ObjectMapper mapper = new ObjectMapper();
String b = "{\"wit\": " + mapper.writeValueAsString(wit) + "}";

// build a request to update the Tweet with the info from TwitIE
UpdateRequest updateRequest = new UpdateRequest("tsnatweets", tm.getId());
updateRequest.doc(b, XContentType.JSON);

// pass the update to ES
highLevelClient.update(updateRequest, RequestOptions.DEFAULT);

// if we've got this far we've successfully processed the tweet
// and stored the result so update the counter
successful.incrementAndGet();
} catch (Exception e) {
Logger.error("Error processing this tweet: {} with error : {}", tm.getId(), e.getMessage());
}

});

// and we are back to single threaded processing so let's log how
// many of the tweets we pulled from ES that we managed to process
Logger.debug("successfully processed {} of {} tweets", successful.get(), stream.getTotalHits());
}
stream.close();
indexWordsSubList(model);
}

/**
Expand Down Expand Up @@ -159,36 +213,6 @@ public Date findWhereIndexingStopped(CollectRequest request) {
Date date = Date.from(instant);
return date;

}

/**
*
* @param tms
* @throws IOException
*/
private void indexWordsSubList(List<TwintModel> tms) throws IOException {
if (tms.isEmpty())
return;
int listSize = tms.size();
Logger.debug("List size {}", listSize);
int nbSubList = listSize / bulkLimit;
Collection<Future<String>> results = new ArrayList<>();
Logger.debug("Nb List {}", nbSubList);
for (int i = 0; i <= nbSubList; i++) {
int fromIndex = i * bulkLimit;
int toIndex = fromIndex + bulkLimit;
if (toIndex > listSize) {
toIndex = listSize;
}
Logger.debug("index from {} to {}", fromIndex, toIndex);
List<TwintModel> subList = tms.subList(fromIndex, toIndex);
Logger.debug("sublist size {}", subList.size());
results.add(twittieProcessing.indexWordsObj(subList));
}
CompletableFuture.allOf(results.toArray(new CompletableFuture<?>[results.size()])).join();

}


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ public TaskExecutor twintCallTaskExecutor() {
executor.initialize();
return executor;
}
@Bean(name = "twittieCallTaskExecutor")
public TaskExecutor twittieCallTaskExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(nbCoreThreads);
executor.setMaxPoolSize(nbMaxThreads);
executor.setQueueCapacity(nbQueueThreads);
executor.setThreadNamePrefix("twittie-");
executor.initialize();
return executor;
}

@Bean(name = "twintCallGroupTaskExecutor")
public TaskExecutor twintCallGroutTaskExecutor() {
Expand Down

This file was deleted.

0 comments on commit 5eb34a6

Please sign in to comment.