Skip to content

Commit d328773

Browse files
author
Maxwell Dylla
committed
docstring enforcement for sparklib
1 parent d5f6e5d commit d328773

File tree

5 files changed

+26
-10
lines changed

5 files changed

+26
-10
lines changed

pyproject.toml

+12-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ select = [
3434
# pyflakes
3535
"F",
3636
# pycodestyle
37+
"D",
3738
"E",
3839
"W",
3940
# flake8-2020
@@ -86,8 +87,18 @@ ignore = [
8687
"B006",
8788
# recommended by Ruff to disable to avoid issues with formatter
8889
"COM812", "ISC001",
90+
# extra rules on documentation strings
91+
"D100", "D401", "D203", "D213",
92+
8993
]
90-
per-file-ignores = {"__init__.py" = ["F401"]}
94+
95+
[tool.ruff.lint.per-file-ignores]
96+
"__init__.py" = ["F401"]
97+
98+
# requires docstring rules in sparklib
99+
"src/pulse_telemetry/sparklib/__init__.py" = ["D"]
100+
"!src/pulse_telemetry/sparklib/**/*.py" = ["D"]
101+
91102

92103
[tool.ruff.lint.flake8-type-checking]
93104
quote-annotations = true

src/pulse_telemetry/sparklib/iceberg.py

+11-8
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def create_table_if_not_exists(
4848
Returns
4949
-------
5050
None
51+
5152
"""
5253
# Creates the database if it does not exist
5354
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{database_name}")
@@ -91,6 +92,7 @@ def read_table(spark: "SparkSession", catalog_name: str, database_name: str, tab
9192
-------
9293
DataFrame
9394
A PySpark DataFrame representing the table.
95+
9496
"""
9597
return spark.sql(f"SELECT * FROM {catalog_name}.{database_name}.{table_name}")
9698

@@ -125,6 +127,7 @@ def merge_into_table(
125127
Returns
126128
-------
127129
None
130+
128131
"""
129132
source_df.createOrReplaceTempView("source")
130133
match_condition = " AND ".join([f"target.{col} = source.{col}" for col in match_columns])
@@ -149,8 +152,7 @@ def expire_snapshots(
149152
retain_last: int,
150153
max_concurrent_deletes: int = 8,
151154
) -> int:
152-
"""
153-
Removes old snapshots from the specified Iceberg table.
155+
"""Removes old snapshots from the specified Iceberg table.
154156
155157
Parameters
156158
----------
@@ -173,6 +175,7 @@ def expire_snapshots(
173175
-------
174176
int
175177
The number of data files deleted during snapshot expiration.
178+
176179
"""
177180
older_than_str = older_than.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
178181
return spark.sql(f"""
@@ -193,8 +196,7 @@ def remove_orphan_files(
193196
older_than: "datetime.datetime",
194197
max_concurrent_deletes: int = 8,
195198
) -> int:
196-
"""
197-
Removes orphaned files from the specified Iceberg table.
199+
"""Removes orphaned files from the specified Iceberg table.
198200
199201
Parameters
200202
----------
@@ -215,6 +217,7 @@ def remove_orphan_files(
215217
-------
216218
int
217219
The count of orphaned files removed during the operation.
220+
218221
"""
219222
older_than_str = older_than.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
220223
return spark.sql(f"""
@@ -232,8 +235,7 @@ def rewrite_data_files(
232235
database_name: str,
233236
table_name: str,
234237
) -> int:
235-
"""
236-
Rewrites data files from the specified Iceberg table.
238+
"""Rewrites data files from the specified Iceberg table.
237239
238240
Uses the 'sort' strategy and defaults to the table's sort-order.
239241
@@ -252,6 +254,7 @@ def rewrite_data_files(
252254
-------
253255
int
254256
The sum of the rewritten and new data files.
257+
255258
"""
256259
result = spark.sql(f"""
257260
CALL {catalog_name}.system.rewrite_data_files(
@@ -268,8 +271,7 @@ def rewrite_manifests(
268271
database_name: str,
269272
table_name: str,
270273
) -> int:
271-
"""
272-
Rewrites manifest files from the specified Iceberg table.
274+
"""Rewrites manifest files from the specified Iceberg table.
273275
274276
Parameters
275277
----------
@@ -286,6 +288,7 @@ def rewrite_manifests(
286288
-------
287289
int
288290
The sum of the rewritten and new manifest files.
291+
289292
"""
290293
result = spark.sql(f"""
291294
CALL {catalog_name}.system.rewrite_manifests(

src/pulse_telemetry/sparklib/processing_incremental.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ def processing_incremental(
5151
-------
5252
DataFrame
5353
The DataFrame resulting from applying the aggregation function to the filtered source records.
54-
"""
5554
55+
"""
5656
# Get the adjusted last processed timestamp from the sink DataFrame
5757
watermark = _adjusted_watermark(
5858
sink=sink,

src/pulse_telemetry/sparklib/statistics_cycle.py

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def statistics_cycle(df: "DataFrame") -> "DataFrame":
8585
```
8686
df = df.withWatermark("update_ts", "14 days")
8787
```
88+
8889
"""
8990
# Calculating weighted averages using the duration__s column
9091
time_weighted_avg = lambda col: (F.sum(F.col(col) * F.col("duration__s")) / F.sum("duration__s")) # noqa: E731

src/pulse_telemetry/sparklib/statistics_step.py

+1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def statistics_step(df: "DataFrame") -> "DataFrame":
8888
```
8989
df = df.withWatermark("update_ts", "14 days")
9090
```
91+
9192
"""
9293
# Calculating weighted averages using the duration__s column
9394
time_weighted_avg = lambda col: (F.sum(F.col(col) * F.col("duration__s")) / F.sum("duration__s")) # noqa: E731

0 commit comments

Comments
 (0)