|
| 1 | +# created by Etiënne Kras, dd 24-04-2024 |
| 2 | +# Script uses inspiration from https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/blob/main/data_loader.py, also to align with the STAC URL from STAC-fastAPI. |
| 3 | +# Note, this script needs a dataset to be formatted into a collection & item structure already.. |
| 4 | +# Note, .. |
| 5 | + |
| 6 | +import json |
| 7 | +from geojson import Feature, Point, FeatureCollection, Polygon |
| 8 | +import os |
| 9 | + |
| 10 | +import click |
| 11 | +import requests |
| 12 | + |
| 13 | + |
| 14 | +def load_data(data_dir, filename): |
| 15 | + """Load json data from a file within the specified data directory.""" |
| 16 | + filepath = os.path.join(data_dir, filename) |
| 17 | + if not os.path.exists(filepath): |
| 18 | + click.secho(f"File not found: {filepath}", fg="red", err=True) |
| 19 | + raise click.Abort() |
| 20 | + with open(filepath) as file: |
| 21 | + return json.load(file) |
| 22 | + |
| 23 | + |
| 24 | +def load_collection(base_url, collection_id, data_dir): |
| 25 | + """Load a STAC collection into the database.""" |
| 26 | + collection = load_data(data_dir, "collection.json") |
| 27 | + collection["id"] = collection_id |
| 28 | + try: |
| 29 | + resp = requests.post(f"{base_url}/collections", json=collection) |
| 30 | + if resp.status_code == 200: |
| 31 | + click.echo(f"Status code: {resp.status_code}") |
| 32 | + click.echo(f"Added collection: {collection['id']}") |
| 33 | + elif resp.status_code == 409: |
| 34 | + click.echo(f"Status code: {resp.status_code}") |
| 35 | + click.echo(f"Collection: {collection['id']} already exists") |
| 36 | + except requests.ConnectionError: |
| 37 | + click.secho("Failed to connect", fg="red", err=True) |
| 38 | + |
| 39 | + |
| 40 | +# Adjusted: added to remove collection from the catalog |
| 41 | +def load_items(base_url, collection_id, use_bulk, data_dir): |
| 42 | + """Load STAC items into the database based on the method selected.""" |
| 43 | + # # Attempt to dynamically find a suitable feature collection file |
| 44 | + # feature_files = [ |
| 45 | + # file |
| 46 | + # for file in os.listdir(data_dir) |
| 47 | + # if file.endswith(".json") and file != "collection.json" |
| 48 | + # ] |
| 49 | + # if not feature_files: |
| 50 | + # click.secho( |
| 51 | + # "No feature collection files found in the specified directory.", |
| 52 | + # fg="red", |
| 53 | + # err=True, |
| 54 | + # ) |
| 55 | + # # raise click.Abort() |
| 56 | + # feature_collection_file = feature_files[ |
| 57 | + # 0 |
| 58 | + # ] # Use the first found feature collection file |
| 59 | + featurecol = [] |
| 60 | + for dir, fol, files in os.walk(data_dir): |
| 61 | + for filename in files: |
| 62 | + if filename.endswith(".json") and filename != "collection.json": |
| 63 | + filepath = os.path.join(data_dir, dir, filename) |
| 64 | + with open(filepath) as file: |
| 65 | + data = json.load(file) |
| 66 | + del data["properties"][ |
| 67 | + "deltares:paint" |
| 68 | + ] # remove the paint property, will be mapbox (redundant) |
| 69 | + featurecol.append(data) |
| 70 | + feature_collection = FeatureCollection(featurecol) |
| 71 | + # feature_collection = load_data(data_dir, feature_collection_file) |
| 72 | + |
| 73 | + load_collection(base_url, collection_id, data_dir) |
| 74 | + if use_bulk: |
| 75 | + load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir) |
| 76 | + else: |
| 77 | + load_items_one_by_one(base_url, collection_id, feature_collection, data_dir) |
| 78 | + |
| 79 | + |
| 80 | +def load_items_one_by_one(base_url, collection_id, feature_collection, data_dir): |
| 81 | + """Load STAC items into the database one by one.""" |
| 82 | + for feature in feature_collection["features"]: |
| 83 | + try: |
| 84 | + feature["collection"] = collection_id |
| 85 | + resp = requests.post( |
| 86 | + f"{base_url}/collections/{collection_id}/items", json=feature |
| 87 | + ) |
| 88 | + if resp.status_code == 200: |
| 89 | + click.echo(f"Status code: {resp.status_code}") |
| 90 | + click.echo(f"Added item: {feature['id']}") |
| 91 | + elif resp.status_code == 409: |
| 92 | + click.echo(f"Status code: {resp.status_code}") |
| 93 | + click.echo(f"Item: {feature['id']} already exists") |
| 94 | + except requests.ConnectionError: |
| 95 | + click.secho("Failed to connect", fg="red", err=True) |
| 96 | + |
| 97 | + |
| 98 | +def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir): |
| 99 | + """Load STAC items into the database via bulk insert.""" |
| 100 | + try: |
| 101 | + for i, _ in enumerate(feature_collection["features"]): |
| 102 | + feature_collection["features"][i]["collection"] = collection_id |
| 103 | + resp = requests.post( |
| 104 | + f"{base_url}/collections/{collection_id}/items", json=feature_collection |
| 105 | + ) |
| 106 | + if resp.status_code == 200: |
| 107 | + click.echo(f"Status code: {resp.status_code}") |
| 108 | + click.echo("Bulk inserted items successfully.") |
| 109 | + elif resp.status_code == 204: |
| 110 | + click.echo(f"Status code: {resp.status_code}") |
| 111 | + click.echo("Bulk update successful, no content returned.") |
| 112 | + elif resp.status_code == 409: |
| 113 | + click.echo(f"Status code: {resp.status_code}") |
| 114 | + click.echo("Conflict detected, some items might already exist.") |
| 115 | + except requests.ConnectionError: |
| 116 | + click.secho("Failed to connect", fg="red", err=True) |
| 117 | + |
| 118 | + |
| 119 | +# Adjusted: added to remove collection from the catalog |
| 120 | +def remove_collection(base_url, collection_id): |
| 121 | + """Delete a STAC collection from the database.""" |
| 122 | + try: |
| 123 | + resp = requests.delete(f"{base_url}/collections/{collection_id}") |
| 124 | + except requests.ConnectionError: |
| 125 | + click.secho("Failed to connect", fg="red", err=True) |
| 126 | + |
| 127 | + |
| 128 | +# @click.command() |
| 129 | +# @click.option("--base-url", required=True, help="Base URL of the STAC API") |
| 130 | +# @click.option( |
| 131 | +# "--collection-id", |
| 132 | +# default="test-collection", |
| 133 | +# help="ID of the collection to which items are added", |
| 134 | +# ) |
| 135 | +# @click.option("--use-bulk", is_flag=True, help="Use bulk insert method for items") |
| 136 | +# @click.option( |
| 137 | +# "--data-dir", |
| 138 | +# type=click.Path(exists=True), |
| 139 | +# default="sample_data/", |
| 140 | +# help="Directory containing collection.json and feature collection file", |
| 141 | +# ) |
| 142 | +# def main(base_url, collection_id, use_bulk, data_dir): |
| 143 | +# """Load STAC items into the database.""" |
| 144 | +# load_items(base_url, collection_id, use_bulk, data_dir) |
| 145 | + |
| 146 | + |
| 147 | +if __name__ == "__main__": |
| 148 | + """Load STAC items into the database.""" |
| 149 | + base_url = "http://34.91.70.6:8080" # URL of the (fast)STAC API |
| 150 | + collection_id = "example-collection" |
| 151 | + use_bulk = False |
| 152 | + # data_dir = r"C:\Users\kras\Documents\GitHub\coclicodata\current\ssl" |
| 153 | + data_dir = r"C:\Users\kras\Documents\GitHub\coclicodata\current\ssl" |
| 154 | + # data_dir = r"C:\Users\kras\Documents\GitHub\stac-fastapi-elasticsearch-opensearch\sample_data" |
| 155 | + load_items(base_url, collection_id, use_bulk, data_dir) |
| 156 | + |
| 157 | + # remove_collection(base_url, collection_id) |
0 commit comments