Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script for removing outdated STAC entries #7

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,18 @@ configurable parameters: `./register_stack.py -h`

**Authentication**: Basic auth is resolved automatically by the Requests library by reading a **~/.netrc** file. Make sure
to set up the correct entries (Sentinel and STAC host URL) there.

# Remove STAC entry
Sometimes, data get removed in the odata catalogue (either expire, or were found invalid).
Upon their deletion in the odata catalogue, we need to remove their STAC entry from a STAC catalogue.
This script takes the product title (including prefix which was set on creation of the entry) and removes it
from the STAC catalogue.

### Configuration
**Configuration file** is again in the **sentinel_config.yml**, where STAC host and possible prefix needs to be set.

**Authentication**: Basic auth is resolved automatically by the Requests library by reading a **~/.netrc** file. Make sure
to set up the correct entry (STAC host URL) there.

**Command line arguments**: The command line options supersede the configuration file settings. Run help to list all
configurable parameters: `./remove_stac.py -h`
134 changes: 27 additions & 107 deletions register_stac.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,23 @@
#!/usr/bin/python3

import argparse
import netrc
import os
import re
import sys
import tempfile
from datetime import datetime
from urllib.parse import urlparse

import defusedxml.ElementTree
import pystac
import requests
import stactools.sentinel1.grd.stac
import stactools.sentinel1.slc.stac
import stactools.sentinel2.stac
import stactools.sentinel3.stac
from stactools.sentinel3 import constants
import stactools.sentinel5p.stac
import yaml
from requests import Session
from stactools.sentinel3.file_extension_updated import FileExtensionUpdated
from tqdm import tqdm

import sentinel_stac
from sentinel_stac import *

CONFIG_FILE = "sentinel_config.yml"
ERR_PREFIX = ""
SUCC_PREFIX = ""
PRODUCT_ID = None
COLLECTION = None
Expand Down Expand Up @@ -88,39 +79,10 @@ def parse_arguments():

args = parser.parse_args()
if not args.push and not args.save:
die_with_error('--push or --save required to take any action')
die_with_error(PRODUCT_ID, '--push or --save required to take any action')
return args


def die_with_error(msg, detailed_msg="", code=-1):
"""
Before terminating with exception, writes message to error file.
Known HTTP error code should be used, otherwise -1 is used.
"""
rundate = datetime.now().strftime('%Y-%m-%d')
err_file = ERR_PREFIX + rundate
create_missing_dir(os.path.dirname(err_file))
with open(err_file, 'a') as f:
f.write(f"{COLLECTION},{PRODUCT_ID},{code}:{msg}\n")
raise Exception("\n".join([f"{code}: {msg}", detailed_msg]))


def read_configuration():
"""
Read configuration file.
"""
with open(CONFIG_FILE, "r") as f:
return yaml.safe_load(f)


def create_missing_dir(dir_path):
"""
Creates directory, if it does not exist yet (including all missing directories in the path).
"""
if not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)


def request_with_progress(url, output_path):
"""
Downloads a file from a URL and saves it to the specified output path, with a progress bar.
Expand All @@ -131,7 +93,7 @@ def request_with_progress(url, output_path):
block_size = 1024 # Size of each block (1 KB)

if not response.ok:
die_with_error(f"Request to fetch file {url} failed.", response.text, response.status_code)
die_with_error(PRODUCT_ID, f"Request to fetch file {url} failed.", response.text, response.status_code)

progress_bar = tqdm(total=total_size,
unit='iB',
Expand Down Expand Up @@ -170,7 +132,7 @@ def fetch_product_data(sentinel_host, metadata_dir):
COLLECTION = map_to_collection(title)

if not title or not product_url:
die_with_error("Missing required title or product url for product.")
die_with_error(PRODUCT_ID, "Missing required title or product url for product.")

print(f"Parsed product data for product (UUID {PRODUCT_ID}):\n"
f"* Title ID: {title}\n"
Expand All @@ -184,46 +146,25 @@ def check_hosts(sentinel_host, stac_host, push):
"""
Checks sentinel_host and stac_host variables were resolved and .netrc file contains authentication credentials.
"""
if not sentinel_host:
die_with_error("Sentinel host not configured properly!")
if not stac_host and push:
die_with_error("STAC host not configured properly!")

try:
auth_info = netrc.netrc()
if not auth_info.authenticators(urlparse(sentinel_host).netloc):
die_with_error(
f"Host {urlparse(sentinel_host)} not found in authentication credentials in the .netrc file!")
if push and not auth_info.authenticators(urlparse(stac_host).netloc):
die_with_error(f"Host {urlparse(stac_host)} not found in authentication credentials in the .netrc file!")
except (FileNotFoundError, netrc.NetrcParseError) as e:
die_with_error(f"Error parsing authentication file .netrc in the home directory.")


def map_to_collection(product_name):
"""
Returns the normalized collection name for a given product.
"""
for pattern, collection in sentinel_stac.product_collection_mapping.items():
if re.match(pattern, product_name):
return collection
die_with_error("Could not match product to collection name! Probably missing in the sentinel_stac.py mappings.")
check_host(PRODUCT_ID, sentinel_host)
if push:
check_host(PRODUCT_ID, stac_host)


def fetch_platform_metadata(product_url, metadata_dir, platform):
"""
Fetches metadata from product's /Nodes data and stores them in the metadata directory.
"""
if platform.lower() == "s1":
platform_files = sentinel_stac.s1_files
platform_files = S1_FILES
elif platform.lower() == "s2":
platform_files = sentinel_stac.s2_files
platform_files = S2_FILES
elif platform.lower() == "s3":
platform_files = sentinel_stac.s3_files
platform_files = S3_FILES
elif platform.lower() == "s5":
platform_files = sentinel_stac.s5_files
platform_files = S5_FILES
else:
die_with_error(f"Platform {platform} not supported!")
die_with_error(PRODUCT_ID, f"Platform {platform} not supported!")
for file in platform_files:
source_url = f"{product_url}/Nodes('{file}')/$value"
output_file = os.path.join(metadata_dir, file)
Expand Down Expand Up @@ -302,39 +243,19 @@ def regenerate_href_links(stacfile_path, metadata_dir, product_url, salt):
os.replace(new_file, stacfile_path)


def get_auth_token(token_url):
"""
Gets token for communication with API from token url.
"""
response = requests.get(token_url)
if not response.ok:
die_with_error(f"Could not obtain API token from {token_url}", response.text, response.status_code)
return response.json()["token"]


def get_auth_session(token):
"""
Creates session which overwrites the BA credentials set in the ~/.netrc file by auth token.
"""
token_session = Session()
token_session.trust_env = False # need to overwrite the authorization header, otherwise BA is used
token_session.headers.update({"Authorization": f"Bearer {token}"})
return token_session


def update_catalogue_entry(stac_host, entry_id, json_data, auth_token=None):
"""
Updates stac entry by fully rewriting it
"""
url = f"{stac_host}/collections/{COLLECTION}/items/{entry_id}"
print(f"Overwriting existing product entry in STAC catalogue.")

token = auth_token or get_auth_token(f"{stac_host}/auth")
token = auth_token or get_auth_token(f"{stac_host}/auth", PRODUCT_ID)
token_session = get_auth_session(token)

response = token_session.put(url, data=json_data)
if not response.ok:
die_with_error(f"Could not remove existing product from catalogue.", response.text, response.status_code)
die_with_error(PRODUCT_ID, f"Could not remove existing product from catalogue.", response.text, response.status_code)


def upload_to_catalogue(stac_host, stac_filepath, overwrite=False):
Expand All @@ -345,7 +266,7 @@ def upload_to_catalogue(stac_host, stac_filepath, overwrite=False):
url = f"{stac_host}/collections/{COLLECTION}/items"
print(f"Uploading STAC data to {url}")

token = get_auth_token(f"{stac_host}/auth")
token = get_auth_token(f"{stac_host}/auth", PRODUCT_ID)

with open(stac_filepath, 'r') as file:
json_data = file.read()
Expand All @@ -361,52 +282,51 @@ def upload_to_catalogue(stac_host, stac_filepath, overwrite=False):
elif response.status_code == 409:
if not overwrite:
# don't die
err_file = ERR_PREFIX + rundate
create_missing_dir(os.path.dirname(err_file))
with open(err_file, 'a') as f:
create_missing_dir(os.path.dirname(ERR_FILE))
with open(ERR_FILE, 'a') as f:
f.write(f"{COLLECTION},{PRODUCT_ID},0,Skipped existing product\n")
print("Product already registered, skipping.")
else:
if response.text and "Feature" in response.text and "ErrorMessage" in response.text:
stac_product_id = response.json().get("ErrorMessage").split(" ")[1]
update_catalogue_entry(stac_host, COLLECTION, stac_product_id, json_data, token)
update_catalogue_entry(stac_host, stac_product_id, json_data, token)
else:
die_with_error("Cannot update existing entry, feature id expected in response not found.")
die_with_error(PRODUCT_ID, "Cannot update existing entry, feature id expected in response not found.")
elif response.status_code == 404:
die_with_error("Wrong URL, or collection does not exist.", response.text, response.status_code)
die_with_error(PRODUCT_ID, "Wrong URL, or collection does not exist.", response.text, response.status_code)
else:
die_with_error(f"Request to upload STAC file failed", response.text, response.status_code)
die_with_error(PRODUCT_ID, f"Request to upload STAC file failed", response.text, response.status_code)


def main():
args = parse_arguments()
config = read_configuration()
config = read_configuration(CONFIG_FILE)
global PRODUCT_ID
PRODUCT_ID = args.productId

sentinel_host = args.sentinelHost or config.get("SENTINEL_HOST")
stac_host = args.stacHost or config.get("STAC_HOST")

if args.save and config.get("LOCAL_DIR") is None and args.localDir is None:
die_with_error("Flag --save was provided, but LOCAL_DIR option not configured and not specified "
die_with_error(PRODUCT_ID, "Flag --save was provided, but LOCAL_DIR option not configured and not specified "
"in the --localDir argument!")

stac_storage = args.localDir or os.path.join(config.get("LOCAL_DIR"), "register_stac")
if stac_storage is not None:
if not os.path.isabs(stac_storage):
die_with_error("Valid path not used for the stac storage argument - expected an absolute directory path!")
die_with_error(PRODUCT_ID, "Valid path not used for the stac storage argument - expected an absolute directory path!")
create_missing_dir(os.path.dirname(stac_storage))

global SUCC_PREFIX, ERR_PREFIX
SUCC_PREFIX = config.get("SUCC_PREFIX")
ERR_PREFIX = config.get("ERR_PREFIX")
if args.push and (SUCC_PREFIX is None or ERR_PREFIX is None):
die_with_error("Flag --push was provided, but SUCC_PREFIX and ERR_PREFIX need to be set in the configuration "
die_with_error(PRODUCT_ID, "Flag --push was provided, but SUCC_PREFIX and ERR_PREFIX need to be set in the configuration "
"file for logging!")

salt = config.get("SALT")
if args.push and not stac_host:
die_with_error('--push requires --stacHost argument or STAC_HOST configuration option to be set!')
die_with_error(PRODUCT_ID, '--push requires --stacHost argument or STAC_HOST configuration option to be set!')

check_hosts(sentinel_host, stac_host, args.push)

Expand Down Expand Up @@ -443,7 +363,7 @@ def main():
else:
raise Exception(f"Unknown platform {platform}")
except Exception as e:
die_with_error(e.args[0] if e.args and len(str(e.args[0])) > 5 else str(e))
die_with_error(PRODUCT_ID, e.args[0] if e.args and len(str(e.args[0])) > 5 else str(e))

stac_storage = stac_storage if args.save else metadata_dir
stac_filepath = os.path.join(stac_storage, "{}.json".format(item.id))
Expand Down
86 changes: 86 additions & 0 deletions remove_stac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/python3
import argparse
import uuid

from sentinel_stac import *

NMSPC = b'\x92\x70\x80\x59\x20\x77\x45\xa3\xa4\xf3\x1e\xb4\x28\x78\x9c\xff'
SUCC_PREFIX = ""
PRODUCT_NAME = ""
COLLECTION = ""


def parse_arguments():
"""
Parse command line arguments.
"""
parser = argparse.ArgumentParser(
description='Remove entry from a stac catalogue by product ID.'
'The product ID needs to include a prefix, if used to create the stac entry.'
'The STAC feature id is computed from the provided product title.'
'Example:'
'./remove_stac.py -p dhr1S3B_SY_2_VG1____20240701T000000_20240701T235959_20240702T122854_EUROPE____________PS2_O_ST_002')
parser.add_argument('-p',
'--productName',
required=True,
help='Title of product to remove')
parser.add_argument('-t',
'--stacHost',
required=False,
help='URL of server to push data to, for example https://stac.cesnet.cz.'
'Overwrites STAC_HOST configuration option.')

args = parser.parse_args()
return args


def get_stac_id(product_title):
"""
Convert product name to STAC feature id. The defined namespace is required.
"""
namespace = uuid.UUID(bytes=NMSPC)
generated_uuid = uuid.uuid5(namespace, product_title)
return generated_uuid


def remove_from_catalogue(stac_host, feature_id):
"""
Removes single entry from a STAC catalogue. Obtains token first.
"""
url = f"{stac_host}/collections/{COLLECTION}/items/{feature_id}"
print(f"Removing STAC entry {feature_id} from {url}")

token = get_auth_token(f"{stac_host}/auth", PRODUCT_NAME)
token_session = get_auth_session(token)
response = token_session.delete(url)

rundate = datetime.now().strftime('%Y-%m-%d')

if response.ok:
succ_file = SUCC_PREFIX + rundate
create_missing_dir(os.path.dirname(succ_file))
with open(succ_file, 'a') as f:
f.write(f"{COLLECTION},{PRODUCT_NAME}\n")
elif response.status_code == 404:
die_with_error(PRODUCT_NAME, f"Wrong URL, or feature {feature_id} under collection {COLLECTION} not found!")
elif response.status_code == 403:
die_with_error(PRODUCT_NAME, f"Insufficient permissions to remove feature {feature_id}!")
else:
die_with_error(PRODUCT_NAME, f"Request to upload STAC file failed", response.text, response.status_code)

def main():
args = parse_arguments()
config = read_configuration(CONFIG_FILE)
global PRODUCT_NAME, COLLECTION, SUCC_PREFIX
SUCC_PREFIX = config.get("SUCC_PREFIX_REMOVAL")
PRODUCT_NAME = args.productName
prefix = config.get("SALT")
unsalted_title = PRODUCT_NAME.split(prefix)[1] if prefix and prefix in PRODUCT_NAME else PRODUCT_NAME
COLLECTION = map_to_collection(unsalted_title)
stac_host = args.stacHost or config.get("STAC_HOST")
check_host(PRODUCT_NAME, stac_host)
feature_id = get_stac_id(PRODUCT_NAME)
remove_from_catalogue(stac_host, feature_id)

if __name__ == "__main__":
main()
8 changes: 6 additions & 2 deletions sentinel_config.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# Common scripts configuration
SENTINEL_HOST: "https://dhr1.cesnet.cz"
LOCAL_DIR: "/var/tmp/sentinel/"
SALT: "dhr1" # if defined, will be used to prefix the ID (to resolve possible conflicts in the STAC catalogue)

# register_stac.py variables
SUCC_PREFIX: "/var/tmp/sentinel/register-stac-success-"
ERR_PREFIX: "/var/tmp/sentinel/register-stac-error-"
STAC_HOST: "https://stac.cesnet.cz"
SALT: "dhr1" # if defined, will be used to prefix the ID
STAC_HOST: "https://resto-test.c-scale.zcu.cz"

# remove_stac.py variables
SUCC_PREFIX_REMOVAL: "/var/tmp/sentinel/remove-stac-success-"
ERR_PREFIX_REMOVAL: "/var/tmp/sentinel/remove-stac-error-"
Loading