Skip to content

Commit

Permalink
Refactoring Twittie multi-threading
Browse files Browse the repository at this point in the history
  • Loading branch information
AFPMedialab committed Jul 17, 2020
1 parent f510633 commit 3fb7fc9
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 117 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>com.afp.medialab.weverify.social</groupId>
<artifactId>twint-wrapper</artifactId>
<version>2.0.2-SNAPSHOT</version>
<version>2.0.4-SNAPSHOT</version>
<name>twint-wrapper</name>
<description>Twitter scraper wrapper</description>

Expand Down Expand Up @@ -155,7 +155,7 @@
<properties>
<docker.path>src/main/docker/delivery</docker.path>
<docker.image.name>twint-wrapper</docker.image.name>
<docker.image.tag>2.0.2.2</docker.image.tag>
<docker.image.tag>2.0.4</docker.image.tag>
</properties>
<build>
<plugins>
Expand Down
2 changes: 1 addition & 1 deletion src/main/docker/delivery/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM twintplus:0.3 as base
FROM twintplus:0.4 as base

# install openjre:8
RUN mkdir -p /usr/share/man/man1 && apt install openjdk-11-jre -y && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class CollectRequest {

private Boolean verified = false;

private Boolean cached = true;

private Boolean disableTimeRange = false;

@RetweetHandlingConstrain
Expand All @@ -62,6 +64,7 @@ public CollectRequest(CollectRequest collectRequest) {
this.verified = collectRequest.verified;
this.retweetsHandling = collectRequest.retweetsHandling;
this.disableTimeRange = collectRequest.disableTimeRange;
this.cached = collectRequest.cached;
}

public CollectRequest(Request request) {
Expand Down Expand Up @@ -153,6 +156,14 @@ public void setDisableTimeRange(boolean disableTimeRange) {
this.disableTimeRange = disableTimeRange;
}

public Boolean isCached() {
return cached;
}

public void setCached(boolean cached) {
this.cached = cached;
}

/**
* @param overObject
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import javax.transaction.Transactional;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -33,28 +31,23 @@
import org.springframework.data.elasticsearch.core.SearchHitsIterator;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import com.afp.medialab.weverify.social.model.CollectRequest;
import com.afp.medialab.weverify.social.model.twint.TwintModel;
import com.afp.medialab.weverify.social.model.twint.WordsInTweet;
import com.fasterxml.jackson.databind.ObjectMapper;

@Component
@Service
@Transactional
public class ESOperations {

@Autowired
private ElasticsearchOperations esOperation;

@Autowired
RestHighLevelClient highLevelClient;


@Autowired
private TweetsPostProcess twintModelAdapter;
private TwittieProcessing twittieProcessing;

// bulk number of request
private int bulkLimit = 5000;
private int bulkLimit = 1000;

// private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
// HH:mm:ss");
Expand All @@ -69,7 +62,7 @@ public class ESOperations {
* @param end
* @throws IOException
*/
public List<TwintModel> enrichWithTweetie(String essid) throws IOException {
public void enrichWithTweetie(String essid) throws IOException {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must(matchQuery("essid", essid));
builder.mustNot(existsQuery("wit"));
Expand All @@ -81,7 +74,7 @@ public List<TwintModel> enrichWithTweetie(String essid) throws IOException {
model.add(stream.next().getContent());
}
stream.close();
return model;
indexWordsSubList(model);
}

/**
Expand Down Expand Up @@ -173,10 +166,14 @@ public Date findWhereIndexingStopped(CollectRequest request) {
* @param tms
* @throws IOException
*/
public void indexWordsSubList(List<TwintModel> tms) throws IOException {
private void indexWordsSubList(List<TwintModel> tms) throws IOException {
if (tms.isEmpty())
return;
int listSize = tms.size();
Logger.debug("List size {}", listSize);
int nbSubList = listSize / bulkLimit;
Collection<Future<String>> results = new ArrayList<>();
Logger.debug("Nb List {}", nbSubList);
for (int i = 0; i <= nbSubList; i++) {
int fromIndex = i * bulkLimit;
int toIndex = fromIndex + bulkLimit;
Expand All @@ -186,55 +183,12 @@ public void indexWordsSubList(List<TwintModel> tms) throws IOException {
Logger.debug("index from {} to {}", fromIndex, toIndex);
List<TwintModel> subList = tms.subList(fromIndex, toIndex);
Logger.debug("sublist size {}", subList.size());
indexWordsObj(subList);
}
}

/**
* Add Twitie data
*
* @param tms Indexed document
* @throws IOException
*/
public void indexWordsObj(List<TwintModel> tms) throws IOException {

int i = 0;
Logger.info("call Twittie WS for {} extracted tweets", tms.size());
BulkRequest bulkRequest = new BulkRequest();

for (TwintModel tm : tms) {

try {

// Logger.debug("Builtin wit : " + i++ + "/" + tms.size());
List<WordsInTweet> wit = twintModelAdapter.buildWit(tm.getFull_text());

ObjectMapper mapper = new ObjectMapper();
String b = "{\"wit\": " + mapper.writeValueAsString(wit) + "}";

UpdateRequest updateRequest = new UpdateRequest("tsnatweets", tm.getId());
updateRequest.doc(b, XContentType.JSON);

bulkRequest.add(updateRequest);
i++;
} catch (Exception e) {
Logger.error("Error processing this tweet: {} with error : {}", tm.getId(), e.getMessage());
// e.printStackTrace();
}
results.add(twittieProcessing.indexWordsObj(subList));
}
Logger.debug("{}/{} process tweets ", i, tms.size());
// SocketTimeoutException
bulkQuery(bulkRequest);
Logger.debug("Update success");

CompletableFuture.allOf(results.toArray(new CompletableFuture<?>[results.size()])).join();

}

private void bulkQuery(BulkRequest bulkRequest) {
try {
highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
Logger.error("Error in Bulk request {} {}", e.getMessage(), e.getCause());
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
Expand All @@ -19,15 +17,14 @@
import com.afp.medialab.weverify.social.dao.entity.CollectHistory;
import com.afp.medialab.weverify.social.dao.service.CollectService;
import com.afp.medialab.weverify.social.model.CollectRequest;
import com.afp.medialab.weverify.social.model.Status;
import com.afp.medialab.weverify.social.model.twint.TwintModel;

/**
* Run twint command in a asynchronous thread
*
* @author Medialab
*/
@Component("twintThread")
@Transactional
public class TwintThread {

private static final Logger Logger = LoggerFactory.getLogger(TwintThread.class);
Expand All @@ -43,9 +40,9 @@ public class TwintThread {

@Autowired
private ESOperations esOperation;

@Autowired
CollectService collectService;
private CollectService collectService;

// private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
// HH:mm:ss");
Expand All @@ -60,7 +57,7 @@ private boolean isDockerCommand(String twintCall) {
}

@Async(value = "twintCallTaskExecutor")
@Transactional
//@Transactional
public CompletableFuture<Integer> callTwint(CollectHistory collectHistory, CollectRequest request) {

Integer result = null;
Expand All @@ -70,16 +67,6 @@ public CompletableFuture<Integer> callTwint(CollectHistory collectHistory, Colle
e.printStackTrace();
Logger.error("Error calling twint process", e);
}
if (result > 0) {
try {
Thread.sleep(2000);
List<TwintModel> tms = esOperation.enrichWithTweetie(collectHistory.getSession());
esOperation.indexWordsSubList(tms);
} catch (IOException | InterruptedException e) {
Logger.error("error with tweeetie", e);
}
}
// update db to say this thread is finished
collectHistory.setFinished_threads(collectHistory.getFinished_threads() + 1);
Integer old_count = collectHistory.getCount();
if (old_count == null || old_count == -1)
Expand All @@ -92,31 +79,16 @@ public CompletableFuture<Integer> callTwint(CollectHistory collectHistory, Colle
collectHistory.setSuccessful_threads(collectHistory.getSuccessful_threads() + 1);
collectService.save_collectHistory(collectHistory);
}

int finished_threads = collectHistory.getFinished_threads();
int successful_threads = collectHistory.getSuccessful_threads();
int total_threads = collectHistory.getTotal_threads();
Logger.debug("FINISH THREAD {}, SUCCCESS {}, TOTAL THREAD {}", finished_threads, successful_threads,
total_threads);
if (finished_threads == total_threads) {

collectHistory.setStatus(Status.Done);
collectHistory.setProcessEnd(Calendar.getInstance().getTime());
if (successful_threads == finished_threads) {
collectHistory.setMessage("Finished successfully");
} else {
collectHistory.setStatus(Status.Error);
collectHistory.setMessage("Parts of this search could not be found");
}
collectService.save_collectHistory(collectHistory);
}
// }

return CompletableFuture.completedFuture(result);
}


private ProcessBuilder createProcessBuilder(CollectRequest request, String session) {
boolean isDocker = isDockerCommand(twintCall);
//String twintRequest = TwintRequestGenerator.getInstance().generateRequest(request, session, isDocker, esURL);
// String twintRequest =
// TwintRequestGenerator.getInstance().generateRequest(request, session,
// isDocker, esURL);
String twintRequest = TwintPlusRequestBuilder.getInstance().generateRequest(request, session, isDocker, esURL);

ProcessBuilder processBuilder = new ProcessBuilder("/bin/bash", "-c", twintCall + twintRequest);
Expand All @@ -135,21 +107,19 @@ private Integer callProcess(ProcessBuilder processBuilder, String got) throws IO
Integer nb_tweets = -1;
while ((LoggerString = stdError.readLine()) != null) {


if (LoggerString.contains("Tweets collected")) {
String str = LoggerString.split("Tweets collected: ")[1].split(" ")[0];
str = str.substring(0, str.length() -2);
str = str.substring(0, str.length() - 2);
nb_tweets = Integer.parseInt(str);
Logger.info("Successfully collected: " + nb_tweets + " " + got);

}
// error_occurred = true;
}


while ((LoggerString = stdInput.readLine()) != null) {
Logger.error(LoggerString);

}
stdInput.close();
stdError.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ public TaskExecutor twintCallTaskExecutor() {
executor.initialize();
return executor;
}
@Bean(name = "twittieCallTaskExecutor")
public TaskExecutor twittieCallTaskExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(nbCoreThreads);
executor.setMaxPoolSize(nbMaxThreads);
executor.setQueueCapacity(nbQueueThreads);
executor.setThreadNamePrefix("twittie-");
executor.initialize();
return executor;
}

@Bean(name = "twintCallGroupTaskExecutor")
public TaskExecutor twintCallGroutTaskExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.lang.Math.toIntExact;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Calendar;
Expand Down Expand Up @@ -42,6 +43,9 @@ public class TwintThreadGroup {
@Autowired
@Qualifier("twintThread")
private TwintThread tt;

@Autowired
private ESOperations esOperation;

private Date addDuration(Date date, Duration duration) {
Calendar calendar = Calendar.getInstance();
Expand Down Expand Up @@ -137,6 +141,29 @@ private void callTwintThreads(ArrayList<CollectRequest> collectRequestList, Coll
result.add(tt.callTwint(collectHistory, collectRequest));
}
CompletableFuture.allOf(result.toArray(new CompletableFuture<?>[result.size()])).join();
int finished_threads = collectHistory.getFinished_threads();
int successful_threads = collectHistory.getSuccessful_threads();
int total_threads = collectHistory.getTotal_threads();
if (finished_threads == total_threads) {
if (successful_threads == finished_threads) {
collectHistory.setStatus(Status.CountingWords);
collectService.save_collectHistory(collectHistory);
try {
esOperation.enrichWithTweetie(collectHistory.getSession());
} catch (IOException e) {
Logger.error("error with tweeetie", e);
}
collectHistory.setStatus(Status.Done);
collectHistory.setMessage("Finished successfully");
collectHistory.setProcessEnd(Calendar.getInstance().getTime());
collectService.save_collectHistory(collectHistory);
}else {
collectHistory.setStatus(Status.Error);
collectHistory.setMessage("Parts of this search could not be found");
collectHistory.setProcessEnd(Calendar.getInstance().getTime());
collectService.save_collectHistory(collectHistory);
}
}
}

}
Loading

0 comments on commit 3fb7fc9

Please sign in to comment.