Skip to content

Commit a8849b4

Browse files
authored
Use SearchIterator instead of ElasticSource and fromGraph (#2842)
* Use SearchIterator instead of ElasticSource and fromGraph * a little clarity * typo
1 parent 05b1b41 commit a8849b4

File tree

6 files changed

+49
-54
lines changed

6 files changed

+49
-54
lines changed

pipeline/relation_embedder/relation_embedder/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ services:
99
ports:
1010
- "4566:4566"
1111
elasticsearch:
12-
image: "docker.elastic.co/elasticsearch/elasticsearch:8.4.0"
12+
image: "docker.elastic.co/elasticsearch/elasticsearch:8.8.2"
1313
ports:
1414
- "9200:9200"
1515
- "9300:9300"
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
{"rootPath":"GC103","selectors":[{"path":"GC103","type":"Tree"}]}
2-
2+
{"rootPath":"MS7826","selectors":[{"path":"MS7826","type":"Tree"}]}

pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package weco.pipeline.relation_embedder
33
import com.sksamuel.elastic4s.Index
44
import grizzled.slf4j.Logging
55
import org.apache.pekko.NotUsed
6-
import org.apache.pekko.actor.ActorSystem
76
import org.apache.pekko.stream.Materializer
87
import org.apache.pekko.stream.scaladsl.{Sink, Source}
98
import weco.catalogue.internal_model.work.Work
@@ -79,6 +78,7 @@ class BatchProcessor(
7978
.getAffectedWorks(batch)
8079
.map {
8180
work =>
81+
debug(s"transitioning ${work.id}")
8282
val relations = relationsCache(work)
8383
work.transition[Denormalised](relations)
8484
}
@@ -105,8 +105,7 @@ object BatchProcessor {
105105
def apply(
106106
config: RelationEmbedderConfig
107107
)(
108-
implicit actorSystem: ActorSystem,
109-
ec: ExecutionContext,
108+
implicit ec: ExecutionContext,
110109
materializer: Materializer
111110
): BatchProcessor = {
112111

Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
package weco.pipeline.relation_embedder
22

33
import org.apache.pekko.NotUsed
4-
import org.apache.pekko.actor.ActorSystem
54
import org.apache.pekko.stream.scaladsl.Source
65
import com.sksamuel.elastic4s.ElasticDsl._
76
import com.sksamuel.elastic4s.circe._
87
import com.sksamuel.elastic4s.{ElasticClient, Index}
98
import grizzled.slf4j.Logging
109
import weco.json.JsonUtil._
1110
import weco.catalogue.internal_model.Implicits._
12-
import com.sksamuel.elastic4s.pekko.streams._
11+
import com.sksamuel.elastic4s.requests.searches.SearchIterator
1312
import weco.catalogue.internal_model.work.WorkState.Merged
1413
import weco.catalogue.internal_model.work.Work
1514
import weco.pipeline.relation_embedder.models.{Batch, RelationWork}
1615

16+
import scala.concurrent.duration.{Duration, DurationInt}
17+
1718
trait RelationsService {
1819
def getRelationTree(batch: Batch): Source[RelationWork, NotUsed]
1920

@@ -25,8 +26,7 @@ class PathQueryRelationsService(
2526
index: Index,
2627
completeTreeScroll: Int = 1000,
2728
affectedWorksScroll: Int = 250
28-
)(implicit as: ActorSystem)
29-
extends RelationsService
29+
) extends RelationsService
3030
with Logging {
3131

3232
private val requestBuilder = RelationsRequestBuilder(index)
@@ -36,34 +36,33 @@ class PathQueryRelationsService(
3636
debug(
3737
s"Querying affected works with ES request: ${elasticClient.show(request)}"
3838
)
39-
val sourceSettings = SourceSettings(
40-
search = request,
41-
maxItems = Long.MaxValue,
42-
fetchThreshold = affectedWorksScroll,
43-
warm = true
39+
// Arbitrary timeout value, it has to exist for SearchIterator,
40+
// but it has not been derived either through experimentation or calculation,
41+
implicit val timeout: Duration = 5 minutes
42+
43+
Source.fromIterator(
44+
() =>
45+
SearchIterator.iterate[Work[Merged]](
46+
elasticClient,
47+
request
48+
)
4449
)
45-
Source
46-
.fromGraph(
47-
new ElasticSource(elasticClient, sourceSettings)(as.dispatcher)
48-
)
49-
.map(searchHit => searchHit.safeTo[Work[Merged]].get)
5050
}
5151

5252
def getRelationTree(batch: Batch): Source[RelationWork, NotUsed] = {
5353
val request = requestBuilder.completeTree(batch, completeTreeScroll)
5454
debug(
5555
s"Querying complete tree with ES request: ${elasticClient.show(request)}"
5656
)
57-
val sourceSettings = SourceSettings(
58-
search = request,
59-
maxItems = Long.MaxValue,
60-
fetchThreshold = affectedWorksScroll,
61-
warm = true
57+
58+
implicit val timeout: Duration = 5 minutes
59+
60+
Source.fromIterator(
61+
() =>
62+
SearchIterator.iterate[RelationWork](
63+
elasticClient,
64+
request
65+
)
6266
)
63-
Source
64-
.fromGraph(
65-
new ElasticSource(elasticClient, sourceSettings)(as.dispatcher)
66-
)
67-
.map(searchHit => searchHit.safeTo[RelationWork].get)
6867
}
6968
}

pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala

+21-24
Original file line numberDiff line numberDiff line change
@@ -56,33 +56,30 @@ class BatchProcessorTest
5656
)
5757
withUpstreamIndex(workList) {
5858
mergedIndex =>
59-
withActorSystem {
60-
implicit actorSystem =>
61-
val relationsService = new PathQueryRelationsService(
62-
elasticClient,
63-
mergedIndex,
64-
10
59+
val relationsService = new PathQueryRelationsService(
60+
elasticClient,
61+
mergedIndex,
62+
10
63+
)
64+
withMaterializer {
65+
implicit materializer: Materializer =>
66+
val downstream = new MemoryDownstream
67+
val processor = new BatchProcessor(
68+
relationsService = relationsService,
69+
bulkWriter = bulkWriter,
70+
downstream = downstream
6571
)
66-
withMaterializer {
67-
implicit materializer: Materializer =>
68-
val downstream = new MemoryDownstream
69-
val processor = new BatchProcessor(
70-
relationsService = relationsService,
71-
bulkWriter = bulkWriter,
72-
downstream = downstream
73-
)
74-
75-
whenReady(processor(batch)) {
76-
_ =>
77-
testWith(
78-
(
79-
downstream.msgSender.messages.map(_.body),
80-
denormalisedIndex
81-
)
82-
)
83-
}
8472

73+
whenReady(processor(batch)) {
74+
_ =>
75+
testWith(
76+
(
77+
downstream.msgSender.messages.map(_.body),
78+
denormalisedIndex
79+
)
80+
)
8581
}
82+
8683
}
8784
}
8885
}

pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationsServiceTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class RelationsServiceTest
2525
index: Index,
2626
completeTreeScroll: Int = 20,
2727
affectedWorksScroll: Int = 20
28-
)(implicit as: ActorSystem) =
28+
) =
2929
new PathQueryRelationsService(
3030
elasticClient = elasticClient,
3131
index = index,

0 commit comments

Comments
 (0)