Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datasets/met-office/scripts/ingest-items.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ SINCE_ARG=""
SUBMIT_MODE="--upsert"
if [ -n "$SINCE" ]; then
SINCE_ARG="-a since $SINCE"
SUBMIT_MODE="--submit -a year-prefix 2026"
SUBMIT_MODE="--submit"
fi

for collection in $COLLECTIONS
Expand All @@ -82,5 +82,5 @@ do
--is-update-workflow \
--workflow-id $collection-update \
-c "$collection" ingest \
--confirm $SUBMIT_MODE $SINCE_ARG
--confirm $SUBMIT_MODE $SINCE_ARG -a year-prefix 2026
done
4 changes: 4 additions & 0 deletions datasets/met-office/test_met_office.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
f"blob://{test_storage_account}/{test_container}/global/near-surface/update/2026/01/06/1200Z/20260113T1200Z-PT0168H00M.updated",
MetOfficeGlobalNearSurfaceCollection,
),
(
f"blob://{test_storage_account}/{test_container}/global/pressure/update/2026/01/24/0600Z/20260124T0600Z-PT0000H00M.updated",
MetOfficeGlobalPressureCollection,
),
(
f"blob://{test_storage_account}/{test_container}/global/pressure/update/2026/01/06/1200Z/20260113T1200Z-PT0168H00M.updated",
MetOfficeGlobalPressureCollection,
Expand Down
6 changes: 4 additions & 2 deletions datasets/sentinel-2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/downloa
&& rm -rf "Miniforge3-$(uname)-$(uname -m).sh"

ENV PATH=/opt/conda/bin:$PATH
ENV LD_LIBRARY_PATH=/opt/conda/lib/:$LD_LIBRARY_PATH
ARG LD_LIBRARY_PATH=""
ENV LD_LIBRARY_PATH=/opt/conda/lib/:${LD_LIBRARY_PATH}

RUN mamba install -y -c conda-forge python=3.11 gdal pip setuptools cython numpy

Expand Down Expand Up @@ -75,6 +76,7 @@ COPY ./datasets/sentinel-2/requirements.txt /opt/src/datasets/sentinel-2/require
RUN python3 -m pip install -r /opt/src/datasets/sentinel-2/requirements.txt

# Setup Python Path to allow import of test modules
ENV PYTHONPATH=/opt/src:$PYTHONPATH
ARG PYTHONPATH=""
ENV PYTHONPATH=/opt/src:${PYTHONPATH}

WORKDIR /opt/src
54 changes: 50 additions & 4 deletions datasets/sentinel-2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
## Chunk creation for dynamic ingest

- Using the same chunking split level and options as ETL
- Listing the `manifest.safe` files
- Generates about 1000 tasks
- Listing the `manifest.safe` files
- Generates about 1000 tasks
- 5-6 hour run-time with a `--since` option and run on the `pctasksteststaging` batch account
- No faster set of chunking options found.

Expand All @@ -16,8 +16,54 @@ az acr build -r {the registry} --subscription {the subscription} -t pctasks-sent

## Update Workflow

Created with
### Testing (creates a new workflow, does not affect production)

To create or update a test workflow:

```shell
pctasks dataset process-items sentinel-2-l2a-update --is-update-workflow -d datasets/sentinel-2/dataset.yaml -u
```

To submit the test workflow:

```shell
pctasks workflow submit sentinel-2-kn1-sentinel-2-l2a-process-items \
-a registry pccomponents.azurecr.io \
-a since "2026-01-16T08:30:00Z"
```

### Production

> ⚠️ **Warning**: The following command will update the production workflow. Only run this if you intend to deploy changes to production.

To update the production workflow:

```shell
pctasks dataset process-items sentinel-2-l2a-update \
--is-update-workflow \
--workflow-id sentinel-2-sentinel-2-l2a-update \
-d datasets/sentinel-2/dataset.yaml \
-u
```

**Parameters:**

- `sentinel-2-l2a-update` - The chunkset ID for this workflow
- `--is-update-workflow` - Creates an update workflow with `since` as a runtime argument
- `--workflow-id` - Specifies the exact workflow ID to upsert (required for production)
- `-d` - Path to the dataset configuration file
- `-u` - Upsert the workflow through the API

```bash
pctasks dataset process-items --is-update-workflow sentinel-2-l2a-update -d datasets/sentinel-2/dataset.yaml -u
```
```

## Submit ingestion jobs on demand

```bash
pctasks dataset process-items \
--is-update-workflow sentinel-2-l2a-update \
-d datasets/sentinel-2/dataset.yaml -u --submit \
-a registry pccomponents.azurecr.io \
-a since "2025-06-16T00:00:00Z"
```
10 changes: 7 additions & 3 deletions datasets/sentinel-2/dataset.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
id: sentinel-2
image: ${{ args.registry }}/pctasks-sentinel-2:2024.5.28.0
image: ${{ args.registry }}/pctasks-sentinel-2:2026.02.02

args:
- registry
- registry

code:
src: ${{ local.path(./sentinel2.py) }}
Expand All @@ -28,7 +28,7 @@ collections:
- uri: blob://sentinel2l2a01/sentinel2-l2/
chunks:
splits:
- depth: 2 # depth: 1 --> 60 tasks; depth: 2 --> ~1000 tasks
- depth: 2 # depth: 1 --> 60 tasks; depth: 2 --> ~1000 tasks
options:
# extensions: [.safe]
ends_with: manifest.safe
Expand All @@ -37,5 +37,9 @@ collections:
min_depth: 5
max_depth: 5
chunk_length: 20000
# Filter to only walk into 2026 folders
# Year is at depth 2 from the split point (Grid2/Year/Month/Day)
folder_matches: "^2026$"
folder_matches_at_depth: 2
chunk_storage:
uri: blob://sentinel2l2a01/sentinel2-l2-info/pctasks-chunks/
6 changes: 6 additions & 0 deletions pctasks/core/pctasks/core/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def walk(
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
match_full_path: bool = False,
folder_matches: Optional[str] = None,
folder_matches_at_depth: Optional[int] = None,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
"""
Recursively walk storage.
Expand All @@ -91,6 +93,10 @@ def walk(
match_full_path: bool, default False
Whether to match on just the file name segment of the path (the default) or
the entire path, including the base path.
folder_matches: Optional regex pattern to filter which folders to descend into.
Only folders matching this pattern will be walked.
folder_matches_at_depth: Optional depth (1-indexed) at which to apply folder_matches.
If None, the filter applies at all depths.

Returns:
Generator of (path, files, folders) tuples. Similar to os.walk. Lists
Expand Down
21 changes: 20 additions & 1 deletion pctasks/core/pctasks/core/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import multiprocessing
import os
import re
import sys
from datetime import datetime as Datetime
from datetime import timedelta, timezone
Expand Down Expand Up @@ -484,6 +485,8 @@ def walk(
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
match_full_path: bool = False,
folder_matches: Optional[str] = None,
folder_matches_at_depth: Optional[int] = None,
max_concurrency: int = 32,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
# Ensure UTC set
Expand Down Expand Up @@ -531,6 +534,9 @@ def _get_prefix_content(
extensions=extensions, ends_with=ends_with, matches=matches
)

# Compile folder filter regex once if provided
folder_pattern = re.compile(folder_matches) if folder_matches else None

walk_count = 0
file_count = 0
limit_break = False
Expand Down Expand Up @@ -585,10 +591,23 @@ def _get_prefix_content(
root = self._strip_prefix(full_prefix or "") or "."
walk_count += 1

# Filter folders before descending
filtered_folders = folders
if folder_pattern:
# Apply filter at specific depth or all depths
next_depth = prefix_depth + 1
if (
folder_matches_at_depth is None
or next_depth == folder_matches_at_depth
):
filtered_folders = [
f for f in folders if folder_pattern.search(f)
]

next_level_prefixes.extend(
map(
lambda f: f"{os.path.join(full_prefix, f)}/",
folders,
filtered_folders,
)
)
file_count += len(files)
Expand Down
13 changes: 13 additions & 0 deletions pctasks/core/pctasks/core/storage/local.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import re
import shutil
from datetime import datetime as Datetime
from pathlib import Path
Expand Down Expand Up @@ -83,6 +84,8 @@ def walk(
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
match_full_path: bool = False,
folder_matches: Optional[str] = None,
folder_matches_at_depth: Optional[int] = None,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
def _get_depth(path: str) -> int:
relpath = os.path.relpath(path, self.base_dir)
Expand All @@ -108,7 +111,17 @@ def _filter_file(root: str, p: str) -> bool:
else:
return path_filter(p)

# Compile folder filter regex if provided
folder_pattern = None
if folder_matches:
folder_pattern = re.compile(folder_matches)

for root, folders, files in os.walk(self.base_dir):
# Filter folders before descending (modifies in-place for os.walk)
if folder_pattern:
depth = _get_depth(root) + 1 # Next level depth
if folder_matches_at_depth is None or depth == folder_matches_at_depth:
folders[:] = [f for f in folders if folder_pattern.search(f)]

files = [f for f in files if _filter_file(root, f)]

Expand Down
1 change: 1 addition & 0 deletions pctasks/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ classifiers = [
dependencies = [
"aiohttp>=3.9",
"azure-cosmos>=4.5.0",
"cachetools==5.3.3",
"azure-data-tables>=12.0.0,<13",
"azure-identity>=1.0.0,<2",
"azure-storage-blob",
Expand Down
2 changes: 2 additions & 0 deletions pctasks/dataset/pctasks/dataset/chunks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def create_chunks(
max_depth=input.options.max_depth,
min_depth=input.options.min_depth,
match_full_path=input.options.match_full_path,
folder_matches=input.options.folder_matches,
folder_matches_at_depth=input.options.folder_matches_at_depth,
):
if input.options.list_folders:
gen = folders
Expand Down
6 changes: 6 additions & 0 deletions pctasks/dataset/pctasks/dataset/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class ChunkOptions(PCBaseModel):
list_folders: Optional[bool] = False
"""Whether to list files (the default) or folders instead of files."""

folder_matches: Optional[str] = None
"""Regex pattern to filter folders during walk. Only folders matching this pattern will be descended into."""

folder_matches_at_depth: Optional[int] = None
"""Apply folder_matches filter only at this depth. If None, applies at all depths."""

chunk_file_name: str = "uris-list"
"""Chunk file name."""

Expand Down
1 change: 1 addition & 0 deletions pctasks/dev/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ classifiers = [
"Programming Language :: Python :: 3.8",
]
dependencies = [
"cachetools==5.3.3",
"pctasks.cli @ {root:parent:uri}/cli",
"pctasks.client @ {root:parent:uri}/client",
"pctasks.ingest @ {root:parent:uri}/ingest",
Expand Down
5 changes: 1 addition & 4 deletions pctasks/ingest/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import pytest
import yaml

from pctasks.ingest.models import (
IngestNdjsonInput,
IngestTaskInput,
)
from pctasks.ingest.models import IngestNdjsonInput, IngestTaskInput


def test_ingest_task_input_model_validate_from_yaml() -> None:
Expand Down
1 change: 1 addition & 0 deletions pctasks/notify/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ classifiers = [
"Programming Language :: Python :: 3.8",
]
dependencies = [
"cachetools==5.3.3",
"pctasks.core @ {root:parent:uri}/core",
]

Expand Down
1 change: 1 addition & 0 deletions pctasks/router/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ classifiers = [
"Programming Language :: Python :: 3.8",
]
dependencies = [
"cachetools==5.3.3",
"pctasks.core @ {root:parent:uri}/core",
]

Expand Down
1 change: 1 addition & 0 deletions pctasks/run/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ classifiers = [
dependencies = [
"argo-workflows>=6.3.0,<6.4",
"azure-batch==15.0.0b2",
"cachetools==5.3.3",
"azure-keyvault-secrets>=4.0.0,<5",
"kubernetes",
"networkx>=2.0.0,<3",
Expand Down