diff --git a/adapters/klab.adapter.copernicus/src/main/java/org/integratedmodelling/adapter/copernicus/datacubes/CopernicusCDSDatacube.java b/adapters/klab.adapter.copernicus/src/main/java/org/integratedmodelling/adapter/copernicus/datacubes/CopernicusCDSDatacube.java index d9d90806b..88d8d3992 100644 --- a/adapters/klab.adapter.copernicus/src/main/java/org/integratedmodelling/adapter/copernicus/datacubes/CopernicusCDSDatacube.java +++ b/adapters/klab.adapter.copernicus/src/main/java/org/integratedmodelling/adapter/copernicus/datacubes/CopernicusCDSDatacube.java @@ -25,6 +25,8 @@ import kong.unirest.HttpResponse; import kong.unirest.JsonNode; import kong.unirest.Unirest; +import kong.unirest.json.JSONException; +import kong.unirest.json.JSONObject; import ucar.nc2.dt.grid.GridDataset; /** @@ -37,12 +39,12 @@ public abstract class CopernicusCDSDatacube extends ChunkedDatacubeRepository { private String dataset; private String apiKey; - private String user; - public static final String CDS_USER_NUMBER_PROPERTY = "klab.copernicus.cds.user"; public static final String CDS_API_KEY_PROPERTY = "klab.copernicus.cds.apikey"; public static final String CDS_API_VERSION = "1_1"; public static final String CDS_API_FORMAT = "zip"; + public static final String CDS_API_KEY_HEADER = "PRIVATE-TOKEN"; + private int TIMEOUT_SECONDS = 30; private static Pattern pattern = Pattern.compile(".*(_[0-9]{8}_).*"); @@ -62,7 +64,7 @@ public CopernicusCDSDatacube(String dataset, ITimeInstant dataStart, double noDa super(Time.resolution(1, Type.DAY), Time.resolution(3, Type.MONTH), dataStart, Configuration.INSTANCE.getDataPath("copernicus/" + dataset), noDataValue); this.dataset = dataset; - this.user = Configuration.INSTANCE.getProperties().getProperty(CDS_USER_NUMBER_PROPERTY); + this.apiKey = Configuration.INSTANCE.getProperties().getProperty(CDS_API_KEY_PROPERTY); if (this.apiKey == null) { setOnline(false, "Copernicus CDS datacube: no CDS credentials provided in configuration"); @@ -94,7 +96,9 @@ protected Geoserver initializeGeoserver() { @Override protected boolean downloadChunk(int chunk, String variable, File destinationDirectory) { - Map body = new HashMap<>(); + Map bodyWrapper = new HashMap<>(); + Map body = new HashMap<>(); + boolean ret = false; ITimeInstant date = getChunkStart(chunk); @@ -123,105 +127,110 @@ protected boolean downloadChunk(int chunk, String variable, File destinationDire FileUtils.deleteQuietly(destinationDirectory); destinationDirectory.mkdirs(); } - + body.put("year", "" + date.getYear()); body.put("month", this.monts[(date.getMonth() - 1) / 3]); body.put("day", this.days); body.put("version", CDS_API_VERSION); - body.put("download_format", CDS_API_FORMAT); + //body.put("download_format", CDS_API_FORMAT); configureRequest(variable, body); - - String jsonBody = JsonUtils.printAsJson(body); + + bodyWrapper.put("inputs", body); + String jsonBody = JsonUtils.printAsJson(bodyWrapper); Logging.INSTANCE.info("requesting chunk " + chunk + " of " + variable + " to CDS API: " + jsonBody); - HttpResponse response = Unirest.post(getEndpointUrl("/resources/" + this.dataset)) - .header("PRIVATE-TOKEN", apiKey).header("Accept", "application/json").body(jsonBody).asJson(); + // retrieve the job id + + HttpResponse response = Unirest.post(getEndpointUrl("/processes/" + this.dataset + "/execute")) + .header(CDS_API_KEY_HEADER, apiKey).header("Accept", "application/json").body(jsonBody).asJson(); if (response.isSuccess()) { - - System.out.println(response.getBody().getObject()); - - if (response.getBody().getObject().has("state")) { - - int time = 0; - int tryafter = 5; - String url = null; - - while (time < TIMEOUT_SECONDS && !"completed".equals(response.getBody().getObject().get("state"))) { - - String requestId = response.getBody().getObject().has("request_id") - ? response.getBody().getObject().getString("request_id") - : null; - - if (requestId == null || response.getBody().getObject().has("error")) { - break; - } - - try { - Thread.sleep(tryafter * 1000); - } catch (InterruptedException e) { - break; - } - - time += tryafter; - - /* - * inquire about task - */ - response = Unirest.get(getEndpointUrl("tasks/" + requestId)).basicAuth(user, apiKey).asJson(); - - System.out.println(response.getBody().getObject()); - - if (response.getBody().getObject().has("error")) { - break; - } - - // heed their fucking advice - if (response.getHeaders().containsKey("Retry-After")) { - tryafter = (int) Math.ceil(Double.parseDouble(response.getHeaders().get("Retry-After").get(0))); - } - } - - if (response.getBody().getObject().has("location")) { - - url = response.getBody().getObject().getString("location"); - if (url.endsWith(".zip")) { - - Logging.INSTANCE.info("chunk " + chunk + " data for " + variable + " ready: downloading...."); - - /* - * Download the zip and unzip in chunk directory - */ - try { - URL uurl = new URL(url); - File zipFile = File.createTempFile("agera", ".zip"); - URLUtils.copyChanneled(uurl, zipFile); - ZipUtils.unzip(zipFile, destinationDirectory); - FileUtils.deleteQuietly(zipFile); - ret = true; - Logging.INSTANCE.info("download of chunk " + chunk + " data for " + variable + " successful"); - } catch (Throwable e) { - Logging.INSTANCE.warn("Download of CDS chunk " + variable + "/" + chunk - + " threw exception: " + e.getMessage()); - } - } - } - - } else { - Logging.INSTANCE.warn("Retrieval of CDS chunk " + variable + "/" + chunk + " threw exception: " - + response.getBody().getObject().get("message")); - } + + if (response.getBody().getObject().has("status") && "accepted".equals(response.getBody().getObject().get("status"))) { + // check the status of job + int time = 0; + int tryafter = 5; + String url = null; + String requestId = response.getBody().getObject().has("jobID") + ? response.getBody().getObject().getString("jobID") + : null; + if (requestId == null) { + Logging.INSTANCE.warn("Retrieval of CDS chunk " + variable + "/" + chunk + " didn't return a job ID"); + Logging.INSTANCE.warn(response.getBody().toPrettyString()); + return ret; + } + String status = null; + do { + try { + Thread.sleep(tryafter * 1000); + } catch (InterruptedException e) { + break; + } + + time += tryafter; + /* + * inquire about task + */ + response = Unirest.get(getEndpointUrl("jobs/" + requestId)).header("PRIVATE-TOKEN", apiKey).asJson(); + status = response.getBody().getObject().getString("status"); + Logging.INSTANCE.info("Status of retrieval of CDS chunk " + variable + "/" + chunk + ": " + status); + if ("failed".equals(status)){ + break; + } + } while (time < TIMEOUT_SECONDS && !"successful".equals(status) && !"failed".equals(status)); + + // retrieve the job results + response = Unirest.get(getEndpointUrl("jobs/" + requestId + "/results")).header("PRIVATE-TOKEN", apiKey).asJson(); + if (response.isSuccess()) { + // retrieve the url + String href = null; + JSONObject rBody = response.getBody().getObject(); + try { + href = rBody.getJSONObject("asset").getJSONObject("value").getString("href"); + } catch (JSONException e) { + Logging.INSTANCE.warn("The result is not API compliant: " + response.getBody().toPrettyString()); + return false; + } + if (href.endsWith(".zip")) { + Logging.INSTANCE.info("chunk " + chunk + " data for " + variable + " ready: downloading...."); + /* + * Download the zip and unzip in chunk directory + */ + try { + URL uurl = new URL(url); + File zipFile = File.createTempFile("agera", ".zip"); + URLUtils.copyChanneled(uurl, zipFile); + ZipUtils.unzip(zipFile, destinationDirectory); + FileUtils.deleteQuietly(zipFile); + ret = true; + Logging.INSTANCE.info("download of chunk " + chunk + " data for " + variable + " successful"); + } catch (Throwable e) { + Logging.INSTANCE.warn("Download of CDS chunk " + variable + "/" + chunk + + " threw exception: " + e.getMessage()); + } + } else { + Logging.INSTANCE.warn("The returned file is not .zip" + href); + return false; + } + } else { + Logging.INSTANCE.warn("The job has failed\n" + response.getBody().getObject().getString("status")+" - " + response.getBody().getObject().getString("traceback")); + return false; + } + } else { + Logging.INSTANCE.error("API request made to CDS Service didn't get accepted: " + response.getBody().toPrettyString()); + return false; + } } else { - Logging.INSTANCE.error("API request to CDS service returned error " + response.getStatusText()); - } - + Logging.INSTANCE.error("API request to CDS service returned error " + response.getStatusText()); + return false; + } return ret; } public String getEndpointUrl(String request) { - return "https://cds.climate.copernicus.eu/api/" + request; + return "https://cds.climate.copernicus.eu/api/retrieve/v1/" + request; } @Override