diff --git a/pom.xml b/pom.xml index f33010e..d93364d 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.afp.medialab.weverify.social twint-wrapper - 2.0.4-SNAPSHOT + 2.0.5-SNAPSHOT twint-wrapper Twitter scraper wrapper @@ -155,7 +155,7 @@ src/main/docker/delivery twint-wrapper - 2.0.4 + 2.0.5 diff --git a/src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java b/src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java index 4dd90ed..7ba7562 100644 --- a/src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java +++ b/src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java @@ -11,8 +11,12 @@ import java.util.Date; import java.util.List; import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.StreamSupport; import javax.transaction.Transactional; @@ -33,8 +37,16 @@ import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.stereotype.Service; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; + +import com.fasterxml.jackson.databind.ObjectMapper; + import com.afp.medialab.weverify.social.model.CollectRequest; import com.afp.medialab.weverify.social.model.twint.TwintModel; +import com.afp.medialab.weverify.social.model.twint.WordsInTweet; @Service @Transactional @@ -44,13 +56,10 @@ public class ESOperations { private ElasticsearchOperations esOperation; @Autowired - private TwittieProcessing twittieProcessing; - - // bulk number of request - private int bulkLimit = 1000; + private TweetsPostProcess twintModelAdapter; - // private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd - // HH:mm:ss"); + @Autowired + RestHighLevelClient highLevelClient; private static Logger Logger = LoggerFactory.getLogger(ESOperations.class); @@ -68,13 +77,58 @@ public void enrichWithTweetie(String essid) throws IOException { builder.mustNot(existsQuery("wit")); NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(builder) .withPageable(PageRequest.of(0, 10)).build(); - SearchHitsIterator stream = esOperation.searchForStream(searchQuery, TwintModel.class); - List model = new ArrayList(); - while (stream.hasNext()) { - model.add(stream.next().getContent()); + + // an AtomicInteger so that we can count the number of successfully processed Tweets + // from within the code running on multiple threads + AtomicInteger successful = new AtomicInteger(); + + // use a try with resources to ensure the iterator is closed no matter what + try (SearchHitsIterator stream = esOperation.searchForStream(searchQuery, TwintModel.class)) { + + // create a Spliterator over the normal ES iterator so we can work on the + // elements in parallel. Specifying the size ensures that the data is split + // sensibly across multiple threads and we can start calling TwitIE while + // we are still pulling results from ES + Spliterator> it = Spliterators.spliterator(stream, + stream.getTotalHits(), + Spliterator.IMMUTABLE | Spliterator.CONCURRENT); + + // now we work our way through all the hits allowing the JVM to work + // out how many threads we should use given where it's being run etc. + StreamSupport.stream(it,true).forEach(hit -> { + + // get the TwintModel object out of the ES search result hit + TwintModel tm = hit.getContent(); + + try { + // this is the time consuming bit that eventually runs TwitIE + // but now we are using multiple threads this should be a bit quicker + List wit = twintModelAdapter.buildWit(tm.getFull_text()); + + // convert the result of running TwitIE into a JSON version + ObjectMapper mapper = new ObjectMapper(); + String b = "{\"wit\": " + mapper.writeValueAsString(wit) + "}"; + + // build a request to update the Tweet with the info from TwitIE + UpdateRequest updateRequest = new UpdateRequest("tsnatweets", tm.getId()); + updateRequest.doc(b, XContentType.JSON); + + // pass the update to ES + highLevelClient.update(updateRequest, RequestOptions.DEFAULT); + + // if we've got this far we've successfully processed the tweet + // and stored the result so update the counter + successful.incrementAndGet(); + } catch (Exception e) { + Logger.error("Error processing this tweet: {} with error : {}", tm.getId(), e.getMessage()); + } + + }); + + // and we are back to single threaded processing so let's log how + // many of the tweets we pulled from ES that we managed to process + Logger.debug("successfully processed {} of {} tweets", successful.get(), stream.getTotalHits()); } - stream.close(); - indexWordsSubList(model); } /** @@ -159,36 +213,6 @@ public Date findWhereIndexingStopped(CollectRequest request) { Date date = Date.from(instant); return date; - } - - /** - * - * @param tms - * @throws IOException - */ - private void indexWordsSubList(List tms) throws IOException { - if (tms.isEmpty()) - return; - int listSize = tms.size(); - Logger.debug("List size {}", listSize); - int nbSubList = listSize / bulkLimit; - Collection> results = new ArrayList<>(); - Logger.debug("Nb List {}", nbSubList); - for (int i = 0; i <= nbSubList; i++) { - int fromIndex = i * bulkLimit; - int toIndex = fromIndex + bulkLimit; - if (toIndex > listSize) { - toIndex = listSize; - } - Logger.debug("index from {} to {}", fromIndex, toIndex); - List subList = tms.subList(fromIndex, toIndex); - Logger.debug("sublist size {}", subList.size()); - results.add(twittieProcessing.indexWordsObj(subList)); - } - CompletableFuture.allOf(results.toArray(new CompletableFuture[results.size()])).join(); - - } - - + } } diff --git a/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadExecutor.java b/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadExecutor.java index 2d9e679..61fcc97 100644 --- a/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadExecutor.java +++ b/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadExecutor.java @@ -35,17 +35,6 @@ public TaskExecutor twintCallTaskExecutor() { executor.initialize(); return executor; } - @Bean(name = "twittieCallTaskExecutor") - public TaskExecutor twittieCallTaskExecutor() { - - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(nbCoreThreads); - executor.setMaxPoolSize(nbMaxThreads); - executor.setQueueCapacity(nbQueueThreads); - executor.setThreadNamePrefix("twittie-"); - executor.initialize(); - return executor; - } @Bean(name = "twintCallGroupTaskExecutor") public TaskExecutor twintCallGroutTaskExecutor() { diff --git a/src/main/java/com/afp/medialab/weverify/social/twint/TwittieProcessing.java b/src/main/java/com/afp/medialab/weverify/social/twint/TwittieProcessing.java deleted file mode 100644 index 4e6d6d4..0000000 --- a/src/main/java/com/afp/medialab/weverify/social/twint/TwittieProcessing.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.afp.medialab.weverify.social.twint; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; - -import com.afp.medialab.weverify.social.model.twint.TwintModel; -import com.afp.medialab.weverify.social.model.twint.WordsInTweet; -import com.fasterxml.jackson.databind.ObjectMapper; - -@Service -public class TwittieProcessing { - - @Autowired - RestHighLevelClient highLevelClient; - - @Autowired - private TweetsPostProcess twintModelAdapter; - - private static Logger Logger = LoggerFactory.getLogger(TwittieProcessing.class); - - /** - * Add Twittie data - * - * @param tms Indexed document - * @throws IOException - */ - @Async(value = "twittieCallTaskExecutor") - public CompletableFuture indexWordsObj(List tms) throws IOException { - - int i = 0; - Logger.info("call Twittie WS for {} extracted tweets", tms.size()); - BulkRequest bulkRequest = new BulkRequest(); - - for (TwintModel tm : tms) { - - try { - - // Logger.debug("Builtin wit : " + i++ + "/" + tms.size()); - List wit = twintModelAdapter.buildWit(tm.getFull_text()); - - ObjectMapper mapper = new ObjectMapper(); - String b = "{\"wit\": " + mapper.writeValueAsString(wit) + "}"; - - UpdateRequest updateRequest = new UpdateRequest("tsnatweets", tm.getId()); - updateRequest.doc(b, XContentType.JSON); - - bulkRequest.add(updateRequest); - i++; - } catch (Exception e) { - Logger.error("Error processing this tweet: {} with error : {}", tm.getId(), e.getMessage()); - // e.printStackTrace(); - } - } - Logger.debug("{}/{} process tweets ", i, tms.size()); - // SocketTimeoutException - bulkQuery(bulkRequest); - Logger.debug("Update success"); - return CompletableFuture.completedFuture("ok"); - } - - private void bulkQuery(BulkRequest bulkRequest) { - try { - highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); - } catch (IOException e) { - Logger.error("Error in Bulk request {} {}", e.getMessage(), e.getCause()); - } - } -}