-
Notifications
You must be signed in to change notification settings - Fork 4
Calculate which FTP files to transfer to object storage for ICON-EU #331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Calculate which FTP files to transfer to object storage for ICON-EU #331
Conversation
This command runs, although it does not do anything useful yet: DY NAMICAL_ENV=prod uv run main dwd-icon-eu-forecast archive-grib-files --nwp-run=00z
Replacing with paths to the actual S3 bucket.
I haven't yet manually reviewed the code. I will do that in the next commit.
I need to simplify the third test a lot.
… verbose! I'll simplify it manually later...
|
OK, I think this is finally ready for review! BUT it's Christmas. So please don't feel ANY pressure to look at this code any time soon. I'm gonna be on holiday until 5th Jan 2026 and plan to not look at any work stuff until then. I've tested the code on my laptop for several days. It seems to work. I've been using this command to run on my laptop (to only download the first 5 steps): uv run main \
dwd-icon-eu-forecast
archive-grib-files \
--nwp-init-hour=6 \
--filename-filter=single-level_.*_00[0-5]_ \
--local-dst-root-path=/home/jack/data/ICON-EU/grib/download_and_compress_and_concat_script/icon-eu/regular-lat-lonI'll update the comment at the top of this PR with a quick overview of how the code works. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all this!
Two overarching comments
-
There's a couple assumptions about the file path structure at the source that are likely to not be general. It's nice to aim for generality but we don't technically need this to be general out of the box. I know it'll work well for any DWD dataset so what about moving all the common/ftp_* and common/obstore_manager.py files into reformatters/dwd/ftp_obstore_sync for now and then we can generalize if/when we need to? The two main assumptions that are likely to hold this back from being general that I see are
- Directories at the source starting with the init hour only
- One variable per file and the variable name in the file name
- If you see a way to remove those assumptions without much work, even better, but lets still move them all into their own directory (e.g. reformatters/common/ft_obstore_sync).
-
There's a lot of related classes to keep track of here. will you add a brief readme in the new dir that lists each module/class and how they relate?
| "pytest-asyncio~=1.3", | ||
| "pytest-sugar~=1.0", | ||
| "pytest-xdist~=3.8", | ||
| "pytest-mock~=3.14", # Added for mocker fixture |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove this comment, it's clear enough why
| bucket=bucket, | ||
| region=region, | ||
| access_key_id=secret["key"], | ||
| secret_access_key=secret["secret"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add retries to this.
| cpu="14", | ||
| memory="30G", | ||
| shared_memory="12G", | ||
| ephemeral_storage="30G", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this will be doing very little cpu work lets try making this smaller. fine to increase if we need. we also don't need shared memory (we just use that for sharing an array w/ multiprocessing)
| cpu="14", | |
| memory="30G", | |
| shared_memory="12G", | |
| ephemeral_storage="30G", | |
| cpu="3", | |
| memory="7G", | |
| ephemeral_storage="30G", |
| raise NotImplementedError( | ||
| f"Implement `operational_kubernetes_resources` on {self.__class__.__name__}" | ||
| ftp_manager = DwdFtpManager( | ||
| ftp_host="opendata.dwd.de", filename_filter=filename_filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting a default ftp host on the DwdFtpManager makes sense to me
| class ArchiveGribFilesCronJob(CronJob): | ||
| command: Sequence[str] = ["archive-grib-files"] | ||
| workers_total: int = 1 | ||
| parallelism: int = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the interest of creating fewer things for readers to keep track of, lets remove this and operational_kubernetes_resources use CronJob directly, passing the command, workers, etc there
| # We have to use NamedTuple because we want to use PathAndSize objects in a `set`. | ||
| path: str # No slash at the start of this string! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you subclass our FrozenBaseModel the frozen aspect will make the object hashable. You could then have pydantic enforce the no leading slash for you:
from typing import Annotated
from pydantic import BaseModel, StringConstraints
from reformatters.common.pydantic import FrozenBaseModel
class PathAndSize(FrozenBaseModel):
path: Annotated[str, StringConstraints(pattern=r"^[^/].*")]
| if TYPE_CHECKING: | ||
| from reformatters.common.ftp_manager import FtpManager | ||
| from reformatters.common.obstore_manager import ObstoreManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't expect any slow nor cyclic imports, you can import this without a TYPE_CHECKING guard
| ftp_path = self.convert_nwp_init_hour_to_ftp_path(init_hour) | ||
| ftp_host_and_path = f"ftp://{self.ftp_host}{ftp_path}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the paths at the source don't begin with an init hour only? e.g. a lot of nwps do something like "2025-12-01/00/..."?
|
|
||
| async def calc_new_files_for_multiple_nwp_init_hours( | ||
| self, | ||
| nwp_inits_to_transfer: Sequence[int] = (0, 6, 12, 18), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd have this be a required argument (no default) so we don't bake in assumptions about init frequency
|
|
||
| @staticmethod | ||
| @abstractmethod | ||
| def sanity_check_ftp_path(ftp_path: PurePosixPath) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slight preference for "validate" or "check" vs sanity
Outline of the classes
ObstoreManager: Contains theObjectStoreanddst_root_pathand the logic for calculating paths on the destination object store and listing files on object store.FtpManager: Knows how to list just the files on the FTP server for a given NWP initialisation (e.g. 00z), and filter those FTP files. And extract NWP variable name and init datetime from the FTP path.DwdFtpManagersubclassesFtpManagerand implements the DWD-specific logic.FtpTransferCoordinator: Contains anObstoreManagerandFtpManagerand coordinates them 🙂. It computes the set difference of the ftp_files - files_already_on_object_storage to only download new files. It considers the filesize.DwdIconEuForecastDatasetnow has anarchive_grib_filesmethod, andoperational_kubernetes_resourcesreturns anArchiveGribFilesCronJobwhich, if we're very lucky 🙂 will run 4 times per day, and copy grib files to Source Co-Op. (To code should know how to get the credentials to upload to Source CoOp).You can run the new code locally like this:
Tasks:
class FtpTransferCalculator(ABC)DwdIconEuForecastDataset.archive_grib_files(following Alden's instructions)DwdFtpTransferCalculatorfor the FTP -> Source CoOp transfersftp_hostanddst_storeonce (instead of when callingcopy_files_from_ftp_to_obstoreand inDwdFtpTransferCalculator.object_storeanddst_pathinto the calculator (don't define it in the subclass).