Skip to content

Commit

Permalink
First try
Browse files Browse the repository at this point in the history
  • Loading branch information
euskalhenriko committed Feb 10, 2025
1 parent 6860d3e commit d6038ff
Showing 1 changed file with 96 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import kong.unirest.HttpResponse;
import kong.unirest.JsonNode;
import kong.unirest.Unirest;
import kong.unirest.json.JSONException;
import kong.unirest.json.JSONObject;
import ucar.nc2.dt.grid.GridDataset;

/**
Expand All @@ -37,12 +39,12 @@ public abstract class CopernicusCDSDatacube extends ChunkedDatacubeRepository {

private String dataset;
private String apiKey;
private String user;

public static final String CDS_USER_NUMBER_PROPERTY = "klab.copernicus.cds.user";
public static final String CDS_API_KEY_PROPERTY = "klab.copernicus.cds.apikey";
public static final String CDS_API_VERSION = "1_1";
public static final String CDS_API_FORMAT = "zip";
public static final String CDS_API_KEY_HEADER = "PRIVATE-TOKEN";

private int TIMEOUT_SECONDS = 30;
private static Pattern pattern = Pattern.compile(".*(_[0-9]{8}_).*");

Expand All @@ -62,7 +64,7 @@ public CopernicusCDSDatacube(String dataset, ITimeInstant dataStart, double noDa
super(Time.resolution(1, Type.DAY), Time.resolution(3, Type.MONTH), dataStart,
Configuration.INSTANCE.getDataPath("copernicus/" + dataset), noDataValue);
this.dataset = dataset;
this.user = Configuration.INSTANCE.getProperties().getProperty(CDS_USER_NUMBER_PROPERTY);

this.apiKey = Configuration.INSTANCE.getProperties().getProperty(CDS_API_KEY_PROPERTY);
if (this.apiKey == null) {
setOnline(false, "Copernicus CDS datacube: no CDS credentials provided in configuration");
Expand Down Expand Up @@ -94,7 +96,9 @@ protected Geoserver initializeGeoserver() {
@Override
protected boolean downloadChunk(int chunk, String variable, File destinationDirectory) {

Map<String, Object> body = new HashMap<>();
Map<String, Object> bodyWrapper = new HashMap<>();
Map<String, Object> body = new HashMap<>();

boolean ret = false;
ITimeInstant date = getChunkStart(chunk);

Expand Down Expand Up @@ -123,105 +127,110 @@ protected boolean downloadChunk(int chunk, String variable, File destinationDire
FileUtils.deleteQuietly(destinationDirectory);
destinationDirectory.mkdirs();
}

body.put("year", "" + date.getYear());
body.put("month", this.monts[(date.getMonth() - 1) / 3]);
body.put("day", this.days);
body.put("version", CDS_API_VERSION);
body.put("download_format", CDS_API_FORMAT);
//body.put("download_format", CDS_API_FORMAT);

configureRequest(variable, body);

String jsonBody = JsonUtils.printAsJson(body);

bodyWrapper.put("inputs", body);
String jsonBody = JsonUtils.printAsJson(bodyWrapper);

Logging.INSTANCE.info("requesting chunk " + chunk + " of " + variable + " to CDS API: " + jsonBody);

HttpResponse<JsonNode> response = Unirest.post(getEndpointUrl("/resources/" + this.dataset))
.header("PRIVATE-TOKEN", apiKey).header("Accept", "application/json").body(jsonBody).asJson();
// retrieve the job id

HttpResponse<JsonNode> response = Unirest.post(getEndpointUrl("/processes/" + this.dataset + "/execute"))
.header(CDS_API_KEY_HEADER, apiKey).header("Accept", "application/json").body(jsonBody).asJson();

if (response.isSuccess()) {

System.out.println(response.getBody().getObject());

if (response.getBody().getObject().has("state")) {

int time = 0;
int tryafter = 5;
String url = null;

while (time < TIMEOUT_SECONDS && !"completed".equals(response.getBody().getObject().get("state"))) {

String requestId = response.getBody().getObject().has("request_id")
? response.getBody().getObject().getString("request_id")
: null;

if (requestId == null || response.getBody().getObject().has("error")) {
break;
}

try {
Thread.sleep(tryafter * 1000);
} catch (InterruptedException e) {
break;
}

time += tryafter;

/*
* inquire about task
*/
response = Unirest.get(getEndpointUrl("tasks/" + requestId)).basicAuth(user, apiKey).asJson();

System.out.println(response.getBody().getObject());

if (response.getBody().getObject().has("error")) {
break;
}

// heed their fucking advice
if (response.getHeaders().containsKey("Retry-After")) {
tryafter = (int) Math.ceil(Double.parseDouble(response.getHeaders().get("Retry-After").get(0)));
}
}

if (response.getBody().getObject().has("location")) {

url = response.getBody().getObject().getString("location");
if (url.endsWith(".zip")) {

Logging.INSTANCE.info("chunk " + chunk + " data for " + variable + " ready: downloading....");

/*
* Download the zip and unzip in chunk directory
*/
try {
URL uurl = new URL(url);
File zipFile = File.createTempFile("agera", ".zip");
URLUtils.copyChanneled(uurl, zipFile);
ZipUtils.unzip(zipFile, destinationDirectory);
FileUtils.deleteQuietly(zipFile);
ret = true;
Logging.INSTANCE.info("download of chunk " + chunk + " data for " + variable + " successful");
} catch (Throwable e) {
Logging.INSTANCE.warn("Download of CDS chunk " + variable + "/" + chunk
+ " threw exception: " + e.getMessage());
}
}
}

} else {
Logging.INSTANCE.warn("Retrieval of CDS chunk " + variable + "/" + chunk + " threw exception: "
+ response.getBody().getObject().get("message"));
}

if (response.getBody().getObject().has("status") && "accepted".equals(response.getBody().getObject().get("status"))) {
// check the status of job
int time = 0;
int tryafter = 5;
String url = null;
String requestId = response.getBody().getObject().has("jobID")
? response.getBody().getObject().getString("jobID")
: null;
if (requestId == null) {
Logging.INSTANCE.warn("Retrieval of CDS chunk " + variable + "/" + chunk + " didn't return a job ID");
Logging.INSTANCE.warn(response.getBody().toPrettyString());
return ret;
}
String status = null;
do {
try {
Thread.sleep(tryafter * 1000);
} catch (InterruptedException e) {
break;
}

time += tryafter;
/*
* inquire about task
*/
response = Unirest.get(getEndpointUrl("jobs/" + requestId)).header("PRIVATE-TOKEN", apiKey).asJson();
status = response.getBody().getObject().getString("status");
Logging.INSTANCE.info("Status of retrieval of CDS chunk " + variable + "/" + chunk + ": " + status);
if ("failed".equals(status)){
break;
}
} while (time < TIMEOUT_SECONDS && !"successful".equals(status) && !"failed".equals(status));

// retrieve the job results
response = Unirest.get(getEndpointUrl("jobs/" + requestId + "/results")).header("PRIVATE-TOKEN", apiKey).asJson();
if (response.isSuccess()) {
// retrieve the url
String href = null;
JSONObject rBody = response.getBody().getObject();
try {
href = rBody.getJSONObject("asset").getJSONObject("value").getString("href");
} catch (JSONException e) {
Logging.INSTANCE.warn("The result is not API compliant: " + response.getBody().toPrettyString());
return false;
}
if (href.endsWith(".zip")) {
Logging.INSTANCE.info("chunk " + chunk + " data for " + variable + " ready: downloading....");
/*
* Download the zip and unzip in chunk directory
*/
try {
URL uurl = new URL(url);
File zipFile = File.createTempFile("agera", ".zip");
URLUtils.copyChanneled(uurl, zipFile);
ZipUtils.unzip(zipFile, destinationDirectory);
FileUtils.deleteQuietly(zipFile);
ret = true;
Logging.INSTANCE.info("download of chunk " + chunk + " data for " + variable + " successful");
} catch (Throwable e) {
Logging.INSTANCE.warn("Download of CDS chunk " + variable + "/" + chunk
+ " threw exception: " + e.getMessage());
}
} else {
Logging.INSTANCE.warn("The returned file is not .zip" + href);
return false;
}
} else {
Logging.INSTANCE.warn("The job has failed\n" + response.getBody().getObject().getString("status")+" - " + response.getBody().getObject().getString("traceback"));
return false;
}
} else {
Logging.INSTANCE.error("API request made to CDS Service didn't get accepted: " + response.getBody().toPrettyString());
return false;
}
} else {
Logging.INSTANCE.error("API request to CDS service returned error " + response.getStatusText());
}

Logging.INSTANCE.error("API request to CDS service returned error " + response.getStatusText());
return false;
}
return ret;
}

public String getEndpointUrl(String request) {
return "https://cds.climate.copernicus.eu/api/" + request;
return "https://cds.climate.copernicus.eu/api/retrieve/v1/" + request;
}

@Override
Expand Down

0 comments on commit d6038ff

Please sign in to comment.