Skip to content

Commit

Permalink
Make matrix request chunk by chunk to avoid hitting max_location limits
Browse files Browse the repository at this point in the history
  • Loading branch information
inigo-cobian committed Feb 20, 2025
1 parent 6e3adc0 commit b76c113
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package org.integratedmodelling.klab.components.geospace.routing;

import java.util.List;
import java.util.Map;

import org.geotools.data.geojson.GeoJSONReader;
import org.integratedmodelling.klab.api.observations.IDirectObservation;
import org.integratedmodelling.klab.api.observations.scale.space.IShape;
import org.integratedmodelling.klab.components.geospace.routing.ValhallaConfiguration.GeometryCollapser;
import org.integratedmodelling.klab.exceptions.KlabException;
import org.integratedmodelling.klab.exceptions.KlabRemoteException;
import org.locationtech.jts.geom.Geometry;
import edu.uci.ics.jung.graph.DirectedSparseGraph;
import edu.uci.ics.jung.graph.Graph;
import edu.uci.ics.jung.graph.util.EdgeType;
import kong.unirest.HttpResponse;
import kong.unirest.JsonNode;
import kong.unirest.Unirest;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.integratedmodelling.klab.components.geospace.routing;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -60,14 +61,14 @@ static public class Matrix {

public String algorithm;
public String units;
public ArrayList<Coordinates> sources;
public ArrayList<Coordinates> targets;
public List<List<Coordinates>> sources;
public List<List<Coordinates>> targets;
public Collection<List<PairwiseDistance>> sourcesToTargets;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public Matrix(@JsonProperty("algorithm") String algorithm, @JsonProperty("units") String units,
@JsonProperty("sources") ArrayList<Coordinates> sources,
@JsonProperty("targets") ArrayList<Coordinates> targets,
@JsonProperty("sources") List<List<Coordinates>> sources,
@JsonProperty("targets") List<List<Coordinates>> targets,
@JsonProperty("sources_to_targets") Collection<List<PairwiseDistance>> sourcesToTargets) {
this.algorithm = algorithm;
this.units = units;
Expand All @@ -87,12 +88,12 @@ public String units() {
}

@JsonProperty("sources")
public ArrayList<Coordinates> sources() {
public List<List<Coordinates>> sources() {
return sources;
}

@JsonProperty("targets")
public ArrayList<Coordinates> targets() {
public List<List<Coordinates>> targets() {
return targets;
}

Expand All @@ -110,17 +111,17 @@ public String getUnits() {
}

public List<Map<String, Double>> getSources() {
return this.sources.stream().map(Coordinates::exportAsMap).collect(Collectors.toList());
return this.sources.get(0).stream().map(Coordinates::exportAsMap).collect(Collectors.toList());
}

public List<Map<String, Double>> getTargets() {
return this.targets.stream().map(Coordinates::exportAsMap).collect(Collectors.toList());
return this.targets.get(0).stream().map(Coordinates::exportAsMap).collect(Collectors.toList());
}

// public List<Map<String, Number>> getAdjacencyList() {
// return this.sourcesToTargets.stream().flatMap(x -> x.stream().map(PairwiseDistance::exportAsMap))
// .collect(Collectors.toList());
// }
public List<Map<String, Number>> getAdjacencyList() {
return this.sourcesToTargets.stream().flatMap(x -> x.stream().map(PairwiseDistance::exportAsMap))
.collect(Collectors.toList());
}

public static class Coordinates {
public double lon;
Expand All @@ -147,6 +148,7 @@ public Map<String, Double> exportAsMap() {
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
public static class PairwiseDistance {
public double distance;
public double time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.integratedmodelling.klab.Observables;
import org.integratedmodelling.klab.api.data.artifacts.IObjectArtifact;
import org.integratedmodelling.klab.api.data.general.IExpression;
Expand All @@ -11,6 +16,7 @@
import org.integratedmodelling.klab.api.observations.IDirectObservation;
import org.integratedmodelling.klab.api.observations.IObservation;
import org.integratedmodelling.klab.api.observations.IObservationGroup;
import org.integratedmodelling.klab.api.observations.scale.space.IShape;
import org.integratedmodelling.klab.api.provenance.IArtifact;
import org.integratedmodelling.klab.api.provenance.IArtifact.Type;
import org.integratedmodelling.klab.api.runtime.IContextualizationScope;
Expand All @@ -36,13 +42,14 @@ public class MatrixRelationshipInstantiator extends AbstractContextualizer imple

private Double timeThresholdInSeconds = null;
private Double distanceThresholdInKilometers = null;
private Integer maxLocations = 1000;

private TransportType transportType = TransportType.Auto;
private GeometryCollapser geometryCollapser = GeometryCollapser.Centroid;
private String server;
private IContextualizationScope scope;
private Valhalla valhalla;
private Graph<IObjectArtifact, MatrixEdge> graph;
private Graph<IObjectArtifact, DefaultEdge> graph;

public MatrixRelationshipInstantiator() {
/* to instantiate as expression - do not remove (or use) */}
Expand All @@ -53,6 +60,9 @@ public MatrixRelationshipInstantiator(Parameters<Object> parameters, IContextual
this.targetArtifact = parameters.get("target", String.class);
this.timeThresholdInSeconds = parameters.get("time_limit", Double.class);
this.distanceThresholdInKilometers = parameters.get("distance_limit", Double.class);
if (parameters.contains("max_locations")) {
this.maxLocations = parameters.get("max_locations", Integer.class);
}

if (parameters.containsKey("transport")) {
this.transportType = TransportType.fromValue(Utils.removePrefix(parameters.get("transport", String.class)));
Expand All @@ -79,7 +89,6 @@ public Type getType() {

@Override
public List<IObjectArtifact> instantiate(IObservable semantics, IContextualizationScope context) throws KlabException {
int i = 1;
List<IObjectArtifact> ret = new ArrayList<>();

// TODO Lo que viene aquí es algo que también se da en RoutingRelationshipInstantiator -> fusionar
Expand Down Expand Up @@ -110,52 +119,67 @@ public List<IObjectArtifact> instantiate(IObservable semantics, IContextualizati
}
}

List<double[]> sourcesCoordinates = sources.stream().map(s -> Valhalla.getCoordinates((IDirectObservation) s, geometryCollapser)).toList();
List<double[]> targetsCoordinates = targets.stream().map(t -> Valhalla.getCoordinates((IDirectObservation) t, geometryCollapser)).toList();

String matrixRequest = Valhalla.buildValhallaMatrixInput(sourcesCoordinates, targetsCoordinates, transportType.getType());
ValhallaOutputDeserializer.Matrix response = valhalla.matrix(matrixRequest);

graph = new DefaultDirectedGraph<>(MatrixEdge.class);
for (List<PairwiseDistance> connections : response.sourcesToTargets) {
for (PairwiseDistance connection : connections) {
IObservation source = sources.get(connection.sourceId);
IObservation target = targets.get(connection.targetId);

Parameters<String> routeParameters = new Parameters<String>();
if (distanceThresholdInKilometers != null && connection.distance > distanceThresholdInKilometers) {
continue;
graph = new DefaultDirectedGraph<>(DefaultEdge.class);
AtomicLong nConnections = new AtomicLong(0);
int iterationSize = maxLocations/2;
IntStream.range(0, (sources.size() + iterationSize - 1) / iterationSize)
.mapToObj(i -> sources.subList(i * iterationSize, Math.min((i + 1) * iterationSize, sources.size())))
.forEach(sourcesChunk -> {
Map<double[], IObservation> sourceCoordinates = sourcesChunk.stream().collect(Collectors.toMap(s -> Valhalla.getCoordinates((IDirectObservation)s, geometryCollapser), s -> s));

IntStream.range(0, (targets.size() + iterationSize - 1) / iterationSize)
.mapToObj(j -> targets.subList(j * iterationSize, Math.min((j + 1) * iterationSize, targets.size())))
.forEach(targetsChunk -> {
Map<double[], IObservation> targetCoordinates = targetsChunk.stream().collect(Collectors.toMap(t -> Valhalla.getCoordinates((IDirectObservation)t, geometryCollapser), t -> t));
String matrixRequest = Valhalla.buildValhallaMatrixInput(sourceCoordinates.keySet().stream().toList(), targetCoordinates.keySet().stream().toList(), transportType.getType());
ValhallaOutputDeserializer.Matrix response = valhalla.matrix(matrixRequest);

for (List<PairwiseDistance> connections : response.sourcesToTargets) {
for (PairwiseDistance connection : connections) {
Parameters<String> routeParameters = new Parameters<String>();
if (distanceThresholdInKilometers != null && connection.distance > distanceThresholdInKilometers) {
continue;
}
if (timeThresholdInSeconds != null && connection.time > timeThresholdInSeconds) {
continue;
}
routeParameters.put("distance", connection.distance);
routeParameters.put("time", connection.time);

IObservation source = sourcesChunk.get(connection.sourceId);
IObservation target = sourcesChunk.get(connection.targetId);
IShape sourceShape = source.getSpace().getShape();
IShape targetShape = target.getSpace().getShape();
graph.addVertex((IDirectObservation)source);
graph.addVertex((IDirectObservation)target);
graph.addEdge((IDirectObservation)source, (IDirectObservation)target,
new MatrixEdge(sourceShape, targetShape, routeParameters));

// TEST ISSUE java.lang.IllegalArgumentException: no such edge in graph: {Relationship o7qrbzuamg: [green_areas_connection_77 = testing.valencia:GreenAreasConnection]}
ret.add(scope.newRelationship(semantics, semantics.getName() + "_" + nConnections, scope.getScale(), (IDirectObservation)source, (IDirectObservation)target,
new Metadata(routeParameters)));
nConnections.incrementAndGet();
}
}
if (timeThresholdInSeconds != null && connection.time > timeThresholdInSeconds) {
continue;
}
routeParameters.put("distance", connection.distance);
routeParameters.put("time", connection.time);

graph.addVertex((IDirectObservation)source);
graph.addVertex((IDirectObservation)target);
graph.addEdge((IDirectObservation)source, (IDirectObservation)target,
new MatrixEdge(routeParameters));

ret.add(scope.newRelationship(semantics, semantics.getName() + "_" + i, scope.getScale(), (IDirectObservation)source, (IDirectObservation)target,
new Metadata(routeParameters)));
i++;
}
}

});
});
return ret;
}

class MatrixEdge extends DefaultEdge {
private static final long serialVersionUID = 964984629774455337L;

Parameters<String> routeParameters;
IShape sourceShape;
IShape targetShape;

MatrixEdge() {
}

MatrixEdge(Parameters<String> rp) {
MatrixEdge(IShape s, IShape t, Parameters<String> rp) {
this.routeParameters = rp;
this.sourceShape = s;
this.targetShape = t;
}

public Parameters<String> getParameters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ export object matrix
"Method used to collapse line and polygon sources and targets to a single point or a set of points that allow route
finding."
values Centroid
default "Centroid"
default "Centroid"

optional number max_locations
"Valhalla servers set a maximum number of locations that can be analyzed with each query.
Set this value accordingly."
default 1000

optional text server
"Specify the endpoint for the Valhalla server."
Expand Down

0 comments on commit b76c113

Please sign in to comment.