diff --git a/pom.xml b/pom.xml index 21441f7..f33010e 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.afp.medialab.weverify.social twint-wrapper - 2.0.2-SNAPSHOT + 2.0.4-SNAPSHOT twint-wrapper Twitter scraper wrapper @@ -155,7 +155,7 @@ src/main/docker/delivery twint-wrapper - 2.0.2.2 + 2.0.4 diff --git a/src/main/docker/delivery/Dockerfile b/src/main/docker/delivery/Dockerfile index 85f7805..d802e01 100644 --- a/src/main/docker/delivery/Dockerfile +++ b/src/main/docker/delivery/Dockerfile @@ -1,4 +1,4 @@ -FROM twintplus:0.3 as base +FROM twintplus:0.4 as base # install openjre:8 RUN mkdir -p /usr/share/man/man1 && apt install openjdk-11-jre -y && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* diff --git a/src/main/java/com/afp/medialab/weverify/social/model/CollectRequest.java b/src/main/java/com/afp/medialab/weverify/social/model/CollectRequest.java index 31a6db0..7186a3b 100644 --- a/src/main/java/com/afp/medialab/weverify/social/model/CollectRequest.java +++ b/src/main/java/com/afp/medialab/weverify/social/model/CollectRequest.java @@ -43,6 +43,8 @@ public class CollectRequest { private Boolean verified = false; + private Boolean cached = true; + private Boolean disableTimeRange = false; @RetweetHandlingConstrain @@ -62,6 +64,7 @@ public CollectRequest(CollectRequest collectRequest) { this.verified = collectRequest.verified; this.retweetsHandling = collectRequest.retweetsHandling; this.disableTimeRange = collectRequest.disableTimeRange; + this.cached = collectRequest.cached; } public CollectRequest(Request request) { @@ -153,6 +156,14 @@ public void setDisableTimeRange(boolean disableTimeRange) { this.disableTimeRange = disableTimeRange; } + public Boolean isCached() { + return cached; + } + + public void setCached(boolean cached) { + this.cached = cached; + } + /** * @param overObject * @return diff --git a/src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java b/src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java index 90ccc42..4dd90ed 100644 --- a/src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java +++ b/src/main/java/com/afp/medialab/weverify/social/twint/ESOperations.java @@ -7,17 +7,15 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import javax.transaction.Transactional; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -33,28 +31,23 @@ import org.springframework.data.elasticsearch.core.SearchHitsIterator; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; import com.afp.medialab.weverify.social.model.CollectRequest; import com.afp.medialab.weverify.social.model.twint.TwintModel; -import com.afp.medialab.weverify.social.model.twint.WordsInTweet; -import com.fasterxml.jackson.databind.ObjectMapper; -@Component +@Service @Transactional public class ESOperations { @Autowired private ElasticsearchOperations esOperation; - - @Autowired - RestHighLevelClient highLevelClient; - + @Autowired - private TweetsPostProcess twintModelAdapter; + private TwittieProcessing twittieProcessing; // bulk number of request - private int bulkLimit = 5000; + private int bulkLimit = 1000; // private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd // HH:mm:ss"); @@ -69,7 +62,7 @@ public class ESOperations { * @param end * @throws IOException */ - public List enrichWithTweetie(String essid) throws IOException { + public void enrichWithTweetie(String essid) throws IOException { BoolQueryBuilder builder = QueryBuilders.boolQuery(); builder.must(matchQuery("essid", essid)); builder.mustNot(existsQuery("wit")); @@ -81,7 +74,7 @@ public List enrichWithTweetie(String essid) throws IOException { model.add(stream.next().getContent()); } stream.close(); - return model; + indexWordsSubList(model); } /** @@ -173,10 +166,14 @@ public Date findWhereIndexingStopped(CollectRequest request) { * @param tms * @throws IOException */ - public void indexWordsSubList(List tms) throws IOException { + private void indexWordsSubList(List tms) throws IOException { + if (tms.isEmpty()) + return; int listSize = tms.size(); Logger.debug("List size {}", listSize); int nbSubList = listSize / bulkLimit; + Collection> results = new ArrayList<>(); + Logger.debug("Nb List {}", nbSubList); for (int i = 0; i <= nbSubList; i++) { int fromIndex = i * bulkLimit; int toIndex = fromIndex + bulkLimit; @@ -186,55 +183,12 @@ public void indexWordsSubList(List tms) throws IOException { Logger.debug("index from {} to {}", fromIndex, toIndex); List subList = tms.subList(fromIndex, toIndex); Logger.debug("sublist size {}", subList.size()); - indexWordsObj(subList); - } - } - - /** - * Add Twitie data - * - * @param tms Indexed document - * @throws IOException - */ - public void indexWordsObj(List tms) throws IOException { - - int i = 0; - Logger.info("call Twittie WS for {} extracted tweets", tms.size()); - BulkRequest bulkRequest = new BulkRequest(); - - for (TwintModel tm : tms) { - - try { - - // Logger.debug("Builtin wit : " + i++ + "/" + tms.size()); - List wit = twintModelAdapter.buildWit(tm.getFull_text()); - - ObjectMapper mapper = new ObjectMapper(); - String b = "{\"wit\": " + mapper.writeValueAsString(wit) + "}"; - - UpdateRequest updateRequest = new UpdateRequest("tsnatweets", tm.getId()); - updateRequest.doc(b, XContentType.JSON); - - bulkRequest.add(updateRequest); - i++; - } catch (Exception e) { - Logger.error("Error processing this tweet: {} with error : {}", tm.getId(), e.getMessage()); - // e.printStackTrace(); - } + results.add(twittieProcessing.indexWordsObj(subList)); } - Logger.debug("{}/{} process tweets ", i, tms.size()); - // SocketTimeoutException - bulkQuery(bulkRequest); - Logger.debug("Update success"); - + CompletableFuture.allOf(results.toArray(new CompletableFuture[results.size()])).join(); + } - private void bulkQuery(BulkRequest bulkRequest) { - try { - highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); - } catch (IOException e) { - Logger.error("Error in Bulk request {} {}", e.getMessage(), e.getCause()); - } - } + } diff --git a/src/main/java/com/afp/medialab/weverify/social/twint/TwintThread.java b/src/main/java/com/afp/medialab/weverify/social/twint/TwintThread.java index 649e19e..43db279 100644 --- a/src/main/java/com/afp/medialab/weverify/social/twint/TwintThread.java +++ b/src/main/java/com/afp/medialab/weverify/social/twint/TwintThread.java @@ -3,9 +3,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.util.Calendar; import java.util.Date; -import java.util.List; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; @@ -19,8 +17,6 @@ import com.afp.medialab.weverify.social.dao.entity.CollectHistory; import com.afp.medialab.weverify.social.dao.service.CollectService; import com.afp.medialab.weverify.social.model.CollectRequest; -import com.afp.medialab.weverify.social.model.Status; -import com.afp.medialab.weverify.social.model.twint.TwintModel; /** * Run twint command in a asynchronous thread @@ -28,6 +24,7 @@ * @author Medialab */ @Component("twintThread") +@Transactional public class TwintThread { private static final Logger Logger = LoggerFactory.getLogger(TwintThread.class); @@ -43,9 +40,9 @@ public class TwintThread { @Autowired private ESOperations esOperation; - + @Autowired - CollectService collectService; + private CollectService collectService; // private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd // HH:mm:ss"); @@ -60,7 +57,7 @@ private boolean isDockerCommand(String twintCall) { } @Async(value = "twintCallTaskExecutor") - @Transactional + //@Transactional public CompletableFuture callTwint(CollectHistory collectHistory, CollectRequest request) { Integer result = null; @@ -70,16 +67,6 @@ public CompletableFuture callTwint(CollectHistory collectHistory, Colle e.printStackTrace(); Logger.error("Error calling twint process", e); } - if (result > 0) { - try { - Thread.sleep(2000); - List tms = esOperation.enrichWithTweetie(collectHistory.getSession()); - esOperation.indexWordsSubList(tms); - } catch (IOException | InterruptedException e) { - Logger.error("error with tweeetie", e); - } - } - // update db to say this thread is finished collectHistory.setFinished_threads(collectHistory.getFinished_threads() + 1); Integer old_count = collectHistory.getCount(); if (old_count == null || old_count == -1) @@ -92,31 +79,16 @@ public CompletableFuture callTwint(CollectHistory collectHistory, Colle collectHistory.setSuccessful_threads(collectHistory.getSuccessful_threads() + 1); collectService.save_collectHistory(collectHistory); } - - int finished_threads = collectHistory.getFinished_threads(); - int successful_threads = collectHistory.getSuccessful_threads(); - int total_threads = collectHistory.getTotal_threads(); - Logger.debug("FINISH THREAD {}, SUCCCESS {}, TOTAL THREAD {}", finished_threads, successful_threads, - total_threads); - if (finished_threads == total_threads) { - - collectHistory.setStatus(Status.Done); - collectHistory.setProcessEnd(Calendar.getInstance().getTime()); - if (successful_threads == finished_threads) { - collectHistory.setMessage("Finished successfully"); - } else { - collectHistory.setStatus(Status.Error); - collectHistory.setMessage("Parts of this search could not be found"); - } - collectService.save_collectHistory(collectHistory); - } -// } + return CompletableFuture.completedFuture(result); } + private ProcessBuilder createProcessBuilder(CollectRequest request, String session) { boolean isDocker = isDockerCommand(twintCall); - //String twintRequest = TwintRequestGenerator.getInstance().generateRequest(request, session, isDocker, esURL); + // String twintRequest = + // TwintRequestGenerator.getInstance().generateRequest(request, session, + // isDocker, esURL); String twintRequest = TwintPlusRequestBuilder.getInstance().generateRequest(request, session, isDocker, esURL); ProcessBuilder processBuilder = new ProcessBuilder("/bin/bash", "-c", twintCall + twintRequest); @@ -135,10 +107,9 @@ private Integer callProcess(ProcessBuilder processBuilder, String got) throws IO Integer nb_tweets = -1; while ((LoggerString = stdError.readLine()) != null) { - if (LoggerString.contains("Tweets collected")) { String str = LoggerString.split("Tweets collected: ")[1].split(" ")[0]; - str = str.substring(0, str.length() -2); + str = str.substring(0, str.length() - 2); nb_tweets = Integer.parseInt(str); Logger.info("Successfully collected: " + nb_tweets + " " + got); @@ -146,10 +117,9 @@ private Integer callProcess(ProcessBuilder processBuilder, String got) throws IO // error_occurred = true; } - while ((LoggerString = stdInput.readLine()) != null) { Logger.error(LoggerString); - + } stdInput.close(); stdError.close(); diff --git a/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadExecutor.java b/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadExecutor.java index 61fcc97..2d9e679 100644 --- a/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadExecutor.java +++ b/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadExecutor.java @@ -35,6 +35,17 @@ public TaskExecutor twintCallTaskExecutor() { executor.initialize(); return executor; } + @Bean(name = "twittieCallTaskExecutor") + public TaskExecutor twittieCallTaskExecutor() { + + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(nbCoreThreads); + executor.setMaxPoolSize(nbMaxThreads); + executor.setQueueCapacity(nbQueueThreads); + executor.setThreadNamePrefix("twittie-"); + executor.initialize(); + return executor; + } @Bean(name = "twintCallGroupTaskExecutor") public TaskExecutor twintCallGroutTaskExecutor() { diff --git a/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadGroup.java b/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadGroup.java index eb269e9..b55c1bf 100644 --- a/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadGroup.java +++ b/src/main/java/com/afp/medialab/weverify/social/twint/TwintThreadGroup.java @@ -2,6 +2,7 @@ import static java.lang.Math.toIntExact; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Calendar; @@ -42,6 +43,9 @@ public class TwintThreadGroup { @Autowired @Qualifier("twintThread") private TwintThread tt; + + @Autowired + private ESOperations esOperation; private Date addDuration(Date date, Duration duration) { Calendar calendar = Calendar.getInstance(); @@ -137,6 +141,29 @@ private void callTwintThreads(ArrayList collectRequestList, Coll result.add(tt.callTwint(collectHistory, collectRequest)); } CompletableFuture.allOf(result.toArray(new CompletableFuture[result.size()])).join(); + int finished_threads = collectHistory.getFinished_threads(); + int successful_threads = collectHistory.getSuccessful_threads(); + int total_threads = collectHistory.getTotal_threads(); + if (finished_threads == total_threads) { + if (successful_threads == finished_threads) { + collectHistory.setStatus(Status.CountingWords); + collectService.save_collectHistory(collectHistory); + try { + esOperation.enrichWithTweetie(collectHistory.getSession()); + } catch (IOException e) { + Logger.error("error with tweeetie", e); + } + collectHistory.setStatus(Status.Done); + collectHistory.setMessage("Finished successfully"); + collectHistory.setProcessEnd(Calendar.getInstance().getTime()); + collectService.save_collectHistory(collectHistory); + }else { + collectHistory.setStatus(Status.Error); + collectHistory.setMessage("Parts of this search could not be found"); + collectHistory.setProcessEnd(Calendar.getInstance().getTime()); + collectService.save_collectHistory(collectHistory); + } + } } } diff --git a/src/main/java/com/afp/medialab/weverify/social/twint/TwittieProcessing.java b/src/main/java/com/afp/medialab/weverify/social/twint/TwittieProcessing.java new file mode 100644 index 0000000..4e6d6d4 --- /dev/null +++ b/src/main/java/com/afp/medialab/weverify/social/twint/TwittieProcessing.java @@ -0,0 +1,80 @@ +package com.afp.medialab.weverify.social.twint; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import com.afp.medialab.weverify.social.model.twint.TwintModel; +import com.afp.medialab.weverify.social.model.twint.WordsInTweet; +import com.fasterxml.jackson.databind.ObjectMapper; + +@Service +public class TwittieProcessing { + + @Autowired + RestHighLevelClient highLevelClient; + + @Autowired + private TweetsPostProcess twintModelAdapter; + + private static Logger Logger = LoggerFactory.getLogger(TwittieProcessing.class); + + /** + * Add Twittie data + * + * @param tms Indexed document + * @throws IOException + */ + @Async(value = "twittieCallTaskExecutor") + public CompletableFuture indexWordsObj(List tms) throws IOException { + + int i = 0; + Logger.info("call Twittie WS for {} extracted tweets", tms.size()); + BulkRequest bulkRequest = new BulkRequest(); + + for (TwintModel tm : tms) { + + try { + + // Logger.debug("Builtin wit : " + i++ + "/" + tms.size()); + List wit = twintModelAdapter.buildWit(tm.getFull_text()); + + ObjectMapper mapper = new ObjectMapper(); + String b = "{\"wit\": " + mapper.writeValueAsString(wit) + "}"; + + UpdateRequest updateRequest = new UpdateRequest("tsnatweets", tm.getId()); + updateRequest.doc(b, XContentType.JSON); + + bulkRequest.add(updateRequest); + i++; + } catch (Exception e) { + Logger.error("Error processing this tweet: {} with error : {}", tm.getId(), e.getMessage()); + // e.printStackTrace(); + } + } + Logger.debug("{}/{} process tweets ", i, tms.size()); + // SocketTimeoutException + bulkQuery(bulkRequest); + Logger.debug("Update success"); + return CompletableFuture.completedFuture("ok"); + } + + private void bulkQuery(BulkRequest bulkRequest) { + try { + highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + Logger.error("Error in Bulk request {} {}", e.getMessage(), e.getCause()); + } + } +} diff --git a/src/main/java/com/afp/medialab/weverify/social/util/RequestCacheManager.java b/src/main/java/com/afp/medialab/weverify/social/util/RequestCacheManager.java index e5b1adc..c8c700c 100644 --- a/src/main/java/com/afp/medialab/weverify/social/util/RequestCacheManager.java +++ b/src/main/java/com/afp/medialab/weverify/social/util/RequestCacheManager.java @@ -31,7 +31,7 @@ public class RequestCacheManager { private static Logger Logger = LoggerFactory.getLogger(RequestCacheManager.class); - + @Autowired private CollectService collectService; @@ -60,7 +60,7 @@ public CollectResponse useCache(CollectRequest collectRequest) { CollectHistory collectHistory = collectService.createNewCollectHistory(); Set similarRequests = similarInCache(collectRequest); Set previousMatch = exactRequests(similarRequests, collectRequest); - if (previousMatch != null && !previousMatch.isEmpty()) { + if ((previousMatch != null && !previousMatch.isEmpty()) && collectRequest.isCached()) { // Requests exist in cache // Try to extend with new dateRange if so collectHistory = (collectRequest.isDisableTimeRange() ? reusePreviousRequest(previousMatch) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index afa3c05..8274835 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -63,7 +63,7 @@ spring: show-sql: true command: - twint: docker run --rm --network dev_esnet -i twintplus:0.3 + twint: docker run --rm --network dev_esnet -i twintplus:0.4 management: # server: diff --git a/src/scripts/prod/docker-compose.yml b/src/scripts/prod/docker-compose.yml index 0493cf0..ac7e11e 100644 --- a/src/scripts/prod/docker-compose.yml +++ b/src/scripts/prod/docker-compose.yml @@ -39,7 +39,7 @@ services: - esnet twint-wrapper: - image: twint-wrapper:2.0.2.2 + image: twint-wrapper:2.0.4 container_name: twint-wrapper restart: on-failure depends_on: diff --git a/src/test/java/com/afp/medialab/weverify/social/ElasticSearchQueryTest.java b/src/test/java/com/afp/medialab/weverify/social/ElasticSearchQueryTest.java index 8a7f657..2a26155 100644 --- a/src/test/java/com/afp/medialab/weverify/social/ElasticSearchQueryTest.java +++ b/src/test/java/com/afp/medialab/weverify/social/ElasticSearchQueryTest.java @@ -5,7 +5,6 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; import java.io.IOException; -import java.util.List; import org.elasticsearch.index.query.QueryBuilder; import org.junit.Test; @@ -64,9 +63,9 @@ public void testQueryCollectRequest() throws JsonParseException, JsonMappingExce CollectRequest collectRequest = objectMapper.readValue(donalTrumpQuery, CollectRequest.class); //List models = esOperations.enrichWithTweetie(collectRequest); - List models = esOperations.enrichWithTweetie(essid); - esOperations.indexWordsSubList(models); - System.out.println("ok " + models.size()); + esOperations.enrichWithTweetie(essid); + + //System.out.println("ok " + models.size()); }