Skip to content

Commit f3ac7da

Browse files
authored
Add es_os_refresh env var to refresh index, ensure refresh passed via kwargs (#370)
**Related Issue(s):** - #315 **Description:** - Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. - Refactored CRUD methods in `TransactionsClient` to use the `_resolve_refresh` helper method for consistent and reusable handling of the `refresh` parameter. - Fixed an issue where some routes were not passing the `refresh` parameter from `kwargs` to the database logic, ensuring consistent behavior across all CRUD operations. **PR Checklist:** - [x] Code is formatted and linted (run `pre-commit run --all-files`) - [x] Tests pass (run `make test`) - [x] Documentation has been updated to reflect changes, if applicable - [x] Changes are added to the changelog
1 parent 43f911e commit f3ac7da

File tree

11 files changed

+496
-83
lines changed

11 files changed

+496
-83
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1010
### Added
1111

1212
- Added configurable landing page ID `STAC_FASTAPI_LANDING_PAGE_ID` [#352](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/352)
13+
- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)
1314

1415
### Changed
1516

17+
- Refactored CRUD methods in `TransactionsClient` to use the `validate_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)
18+
1619
### Fixed
1720

21+
- Fixed an issue where some routes were not passing the `refresh` parameter from `kwargs` to the database logic, ensuring consistent behavior across all CRUD operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)
22+
1823
## [v4.1.0] - 2025-05-09
1924

2025
### Added

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,11 @@ You can customize additional settings in your `.env` file:
112112
| `RELOAD` | Enable auto-reload for development. | `true` | Optional |
113113
| `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional |
114114
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
115-
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
116-
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
115+
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | |
117116
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
118-
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |
117+
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional
118+
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional |
119+
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional |
119120

120121
> [!NOTE]
121122
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -712,10 +712,11 @@ async def create_item(
712712
for feature in features
713713
]
714714
attempted = len(processed_items)
715+
715716
success, errors = await self.database.bulk_async(
716-
collection_id,
717-
processed_items,
718-
refresh=kwargs.get("refresh", False),
717+
collection_id=collection_id,
718+
processed_items=processed_items,
719+
**kwargs,
719720
)
720721
if errors:
721722
logger.error(
@@ -729,10 +730,7 @@ async def create_item(
729730

730731
# Handle single item
731732
await self.database.create_item(
732-
item_dict,
733-
refresh=kwargs.get("refresh", False),
734-
base_url=base_url,
735-
exist_ok=False,
733+
item_dict, base_url=base_url, exist_ok=False, **kwargs
736734
)
737735
return ItemSerializer.db_to_stac(item_dict, base_url)
738736

@@ -757,11 +755,12 @@ async def update_item(
757755
"""
758756
item = item.model_dump(mode="json")
759757
base_url = str(kwargs["request"].base_url)
758+
760759
now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
761760
item["properties"]["updated"] = now
762761

763762
await self.database.create_item(
764-
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
763+
item, base_url=base_url, exist_ok=True, **kwargs
765764
)
766765

767766
return ItemSerializer.db_to_stac(item, base_url)
@@ -777,7 +776,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None:
777776
Returns:
778777
None: Returns 204 No Content on successful deletion
779778
"""
780-
await self.database.delete_item(item_id=item_id, collection_id=collection_id)
779+
await self.database.delete_item(
780+
item_id=item_id, collection_id=collection_id, **kwargs
781+
)
781782
return None
782783

783784
@overrides
@@ -798,8 +799,9 @@ async def create_collection(
798799
"""
799800
collection = collection.model_dump(mode="json")
800801
request = kwargs["request"]
802+
801803
collection = self.database.collection_serializer.stac_to_db(collection, request)
802-
await self.database.create_collection(collection=collection)
804+
await self.database.create_collection(collection=collection, **kwargs)
803805
return CollectionSerializer.db_to_stac(
804806
collection,
805807
request,
@@ -835,7 +837,7 @@ async def update_collection(
835837

836838
collection = self.database.collection_serializer.stac_to_db(collection, request)
837839
await self.database.update_collection(
838-
collection_id=collection_id, collection=collection
840+
collection_id=collection_id, collection=collection, **kwargs
839841
)
840842

841843
return CollectionSerializer.db_to_stac(
@@ -860,7 +862,7 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None:
860862
Raises:
861863
NotFoundError: If the collection doesn't exist
862864
"""
863-
await self.database.delete_collection(collection_id=collection_id)
865+
await self.database.delete_collection(collection_id=collection_id, **kwargs)
864866
return None
865867

866868

@@ -937,7 +939,7 @@ def bulk_item_insert(
937939
success, errors = self.database.bulk_sync(
938940
collection_id,
939941
processed_items,
940-
refresh=kwargs.get("refresh", False),
942+
**kwargs,
941943
)
942944
if errors:
943945
logger.error(f"Bulk sync operation encountered errors: {errors}")

stac_fastapi/core/stac_fastapi/core/utilities.py

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,75 @@
1212
MAX_LIMIT = 10000
1313

1414

15-
def get_bool_env(name: str, default: bool = False) -> bool:
15+
def validate_refresh(value: Union[str, bool]) -> str:
16+
"""
17+
Validate the `refresh` parameter value.
18+
19+
Args:
20+
value (Union[str, bool]): The `refresh` parameter value, which can be a string or a boolean.
21+
22+
Returns:
23+
str: The validated value of the `refresh` parameter, which can be "true", "false", or "wait_for".
24+
"""
25+
logger = logging.getLogger(__name__)
26+
27+
# Handle boolean-like values using get_bool_env
28+
if isinstance(value, bool) or value in {
29+
"true",
30+
"false",
31+
"1",
32+
"0",
33+
"yes",
34+
"no",
35+
"y",
36+
"n",
37+
}:
38+
is_true = get_bool_env("DATABASE_REFRESH", default=value)
39+
return "true" if is_true else "false"
40+
41+
# Normalize to lowercase for case-insensitivity
42+
value = value.lower()
43+
44+
# Handle "wait_for" explicitly
45+
if value == "wait_for":
46+
return "wait_for"
47+
48+
# Log a warning for invalid values and default to "false"
49+
logger.warning(
50+
f"Invalid value for `refresh`: '{value}'. Expected 'true', 'false', or 'wait_for'. Defaulting to 'false'."
51+
)
52+
return "false"
53+
54+
55+
def get_bool_env(name: str, default: Union[bool, str] = False) -> bool:
1656
"""
1757
Retrieve a boolean value from an environment variable.
1858
1959
Args:
2060
name (str): The name of the environment variable.
21-
default (bool, optional): The default value to use if the variable is not set or unrecognized. Defaults to False.
61+
default (Union[bool, str], optional): The default value to use if the variable is not set or unrecognized. Defaults to False.
2262
2363
Returns:
2464
bool: The boolean value parsed from the environment variable.
2565
"""
26-
value = os.getenv(name, str(default).lower())
2766
true_values = ("true", "1", "yes", "y")
2867
false_values = ("false", "0", "no", "n")
68+
69+
# Normalize the default value
70+
if isinstance(default, bool):
71+
default_str = "true" if default else "false"
72+
elif isinstance(default, str):
73+
default_str = default.lower()
74+
else:
75+
logger = logging.getLogger(__name__)
76+
logger.warning(
77+
f"The `default` parameter must be a boolean or string, got {type(default).__name__}. "
78+
f"Falling back to `False`."
79+
)
80+
default_str = "false"
81+
82+
# Retrieve and normalize the environment variable value
83+
value = os.getenv(name, default_str)
2984
if value.lower() in true_values:
3085
return True
3186
elif value.lower() in false_values:
@@ -34,9 +89,9 @@ def get_bool_env(name: str, default: bool = False) -> bool:
3489
logger = logging.getLogger(__name__)
3590
logger.warning(
3691
f"Environment variable '{name}' has unrecognized value '{value}'. "
37-
f"Expected one of {true_values + false_values}. Using default: {default}"
92+
f"Expected one of {true_values + false_values}. Using default: {default_str}"
3893
)
39-
return default
94+
return default_str in true_values
4095

4196

4297
def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]:

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
import logging
44
import os
55
import ssl
6-
from typing import Any, Dict, Set
6+
from typing import Any, Dict, Set, Union
77

88
import certifi
99
from elasticsearch._async.client import AsyncElasticsearch
1010

1111
from elasticsearch import Elasticsearch # type: ignore[attr-defined]
1212
from stac_fastapi.core.base_settings import ApiBaseSettings
13-
from stac_fastapi.core.utilities import get_bool_env
13+
from stac_fastapi.core.utilities import get_bool_env, validate_refresh
1414
from stac_fastapi.types.config import ApiSettings
1515

1616

@@ -88,6 +88,17 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
8888
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
8989
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
9090

91+
@property
92+
def database_refresh(self) -> Union[bool, str]:
93+
"""
94+
Get the value of the DATABASE_REFRESH environment variable.
95+
96+
Returns:
97+
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
98+
"""
99+
value = os.getenv("DATABASE_REFRESH", "false")
100+
return validate_refresh(value)
101+
91102
@property
92103
def create_client(self):
93104
"""Create es client."""
@@ -109,6 +120,17 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
109120
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
110121
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
111122

123+
@property
124+
def database_refresh(self) -> Union[bool, str]:
125+
"""
126+
Get the value of the DATABASE_REFRESH environment variable.
127+
128+
Returns:
129+
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
130+
"""
131+
value = os.getenv("DATABASE_REFRESH", "false")
132+
return validate_refresh(value)
133+
112134
@property
113135
def create_client(self):
114136
"""Create async elasticsearch client."""

0 commit comments

Comments
 (0)