Skip to content

Commit 4adc47b

Browse files
feat(taskworker): Make relocation taskworker compatible (#88458)
This work is required to migrate tasks from celery to the new taskbroker system. The sentry option will be used to control the rollout of these tasks. The full migration plan is describe in this [document](https://www.notion.so/sentry/Rollout-Planning-1bd8b10e4b5d80aeaaa7dba0efca83bc). ### Notable changes/callouts - Most of relocation tasks use retry_backof, retry_backoff_jitter, and soft_time_limit which taskworker does not support - LastAction (action taken when retries are exhausted) is set to discard - `sentry.relocation.fulfill_cross_region_export_request` uses reject_on_worker_lost which is already the default behaviour --------- Co-authored-by: Mark Story <[email protected]>
1 parent d50e281 commit 4adc47b

File tree

5 files changed

+147
-0
lines changed

5 files changed

+147
-0
lines changed

src/sentry/conf/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,6 +1427,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
14271427
"sentry.monitors.tasks.clock_pulse",
14281428
"sentry.monitors.tasks.detect_broken_monitor_envs",
14291429
"sentry.notifications.utils.tasks",
1430+
"sentry.relocation.tasks.process",
1431+
"sentry.relocation.tasks.transfer",
14301432
"sentry.replays.tasks",
14311433
"sentry.sentry_apps.tasks.sentry_apps",
14321434
"sentry.snuba.tasks",

src/sentry/options/defaults.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3196,6 +3196,16 @@
31963196
default={},
31973197
flags=FLAG_AUTOMATOR_MODIFIABLE,
31983198
)
3199+
register(
3200+
"taskworker.relocation.rollout",
3201+
default={},
3202+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3203+
)
3204+
register(
3205+
"taskworker.relocation.control.rollout",
3206+
default={},
3207+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3208+
)
31993209
register(
32003210
"taskworker.auth.rollout",
32013211
default={},
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from sentry.taskworker.registry import TaskNamespace, taskregistry
2+
3+
relocation_tasks: TaskNamespace = taskregistry.create_namespace("relocation")
4+
relocation_control_tasks: TaskNamespace = taskregistry.create_namespace("relocation.control")

src/sentry/relocation/tasks/process.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
RegionRelocationTransfer,
5959
RelocationTransferState,
6060
)
61+
from sentry.relocation.tasks import relocation_tasks
6162
from sentry.relocation.tasks.transfer import process_relocation_transfer_region
6263
from sentry.relocation.utils import (
6364
TASK_TO_STEP,
@@ -74,6 +75,8 @@
7475
from sentry.signals import relocated, relocation_redeem_promo_code
7576
from sentry.silo.base import SiloMode
7677
from sentry.tasks.base import instrumented_task
78+
from sentry.taskworker.config import TaskworkerConfig
79+
from sentry.taskworker.retry import LastAction, Retry
7780
from sentry.types.region import get_local_region
7881
from sentry.users.models.lostpasswordhash import LostPasswordHash
7982
from sentry.users.models.user import User
@@ -160,6 +163,13 @@
160163
retry_backoff=RETRY_BACKOFF,
161164
retry_backoff_jitter=True,
162165
soft_time_limit=FAST_TIME_LIMIT,
166+
taskworker_config=TaskworkerConfig(
167+
namespace=relocation_tasks,
168+
processing_deadline_duration=FAST_TIME_LIMIT,
169+
retry=Retry(
170+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
171+
),
172+
),
163173
)
164174
def uploading_start(uuid: UUID, replying_region_name: str | None, org_slug: str | None) -> None:
165175
"""
@@ -320,6 +330,11 @@ def uploading_start(uuid: UUID, replying_region_name: str | None, org_slug: str
320330
# 10 minutes per try.
321331
soft_time_limit=60 * 10,
322332
silo_mode=SiloMode.REGION,
333+
taskworker_config=TaskworkerConfig(
334+
namespace=relocation_tasks,
335+
processing_deadline_duration=60 * 10,
336+
retry=Retry(times=4, on=(Exception,), times_exceeded=LastAction.Discard),
337+
),
323338
)
324339
def fulfill_cross_region_export_request(
325340
uuid_str: str,
@@ -431,6 +446,13 @@ def fulfill_cross_region_export_request(
431446
retry_backoff_jitter=True,
432447
soft_time_limit=FAST_TIME_LIMIT,
433448
silo_mode=SiloMode.REGION,
449+
taskworker_config=TaskworkerConfig(
450+
namespace=relocation_tasks,
451+
processing_deadline_duration=FAST_TIME_LIMIT,
452+
retry=Retry(
453+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
454+
),
455+
),
434456
)
435457
def cross_region_export_timeout_check(
436458
uuid: UUID,
@@ -488,6 +510,13 @@ def cross_region_export_timeout_check(
488510
retry_backoff=RETRY_BACKOFF,
489511
retry_backoff_jitter=True,
490512
soft_time_limit=FAST_TIME_LIMIT,
513+
taskworker_config=TaskworkerConfig(
514+
namespace=relocation_tasks,
515+
processing_deadline_duration=FAST_TIME_LIMIT,
516+
retry=Retry(
517+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
518+
),
519+
),
491520
)
492521
def uploading_complete(uuid: UUID) -> None:
493522
"""
@@ -535,6 +564,13 @@ def uploading_complete(uuid: UUID) -> None:
535564
retry_backoff_jitter=True,
536565
soft_time_limit=MEDIUM_TIME_LIMIT,
537566
silo_mode=SiloMode.REGION,
567+
taskworker_config=TaskworkerConfig(
568+
namespace=relocation_tasks,
569+
processing_deadline_duration=MEDIUM_TIME_LIMIT,
570+
retry=Retry(
571+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
572+
),
573+
),
538574
)
539575
def preprocessing_scan(uuid: UUID) -> None:
540576
"""
@@ -709,6 +745,13 @@ def preprocessing_scan(uuid: UUID) -> None:
709745
retry_backoff_jitter=True,
710746
soft_time_limit=MEDIUM_TIME_LIMIT,
711747
silo_mode=SiloMode.REGION,
748+
taskworker_config=TaskworkerConfig(
749+
namespace=relocation_tasks,
750+
processing_deadline_duration=MEDIUM_TIME_LIMIT,
751+
retry=Retry(
752+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
753+
),
754+
),
712755
)
713756
def preprocessing_transfer(uuid: UUID) -> None:
714757
"""
@@ -797,6 +840,13 @@ def preprocessing_transfer(uuid: UUID) -> None:
797840
retry_backoff_jitter=True,
798841
soft_time_limit=MEDIUM_TIME_LIMIT,
799842
silo_mode=SiloMode.REGION,
843+
taskworker_config=TaskworkerConfig(
844+
namespace=relocation_tasks,
845+
processing_deadline_duration=MEDIUM_TIME_LIMIT,
846+
retry=Retry(
847+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
848+
),
849+
),
800850
)
801851
def preprocessing_baseline_config(uuid: UUID) -> None:
802852
"""
@@ -848,6 +898,13 @@ def preprocessing_baseline_config(uuid: UUID) -> None:
848898
retry_backoff_jitter=True,
849899
soft_time_limit=MEDIUM_TIME_LIMIT,
850900
silo_mode=SiloMode.REGION,
901+
taskworker_config=TaskworkerConfig(
902+
namespace=relocation_tasks,
903+
processing_deadline_duration=MEDIUM_TIME_LIMIT,
904+
retry=Retry(
905+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
906+
),
907+
),
851908
)
852909
def preprocessing_colliding_users(uuid: UUID) -> None:
853910
"""
@@ -897,6 +954,13 @@ def preprocessing_colliding_users(uuid: UUID) -> None:
897954
retry_backoff_jitter=True,
898955
soft_time_limit=MEDIUM_TIME_LIMIT,
899956
silo_mode=SiloMode.REGION,
957+
taskworker_config=TaskworkerConfig(
958+
namespace=relocation_tasks,
959+
processing_deadline_duration=MEDIUM_TIME_LIMIT,
960+
retry=Retry(
961+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
962+
),
963+
),
900964
)
901965
def preprocessing_complete(uuid: UUID) -> None:
902966
"""
@@ -1115,6 +1179,13 @@ def _update_relocation_validation_attempt(
11151179
retry_backoff_jitter=True,
11161180
soft_time_limit=FAST_TIME_LIMIT,
11171181
silo_mode=SiloMode.REGION,
1182+
taskworker_config=TaskworkerConfig(
1183+
namespace=relocation_tasks,
1184+
processing_deadline_duration=FAST_TIME_LIMIT,
1185+
retry=Retry(
1186+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
1187+
),
1188+
),
11181189
)
11191190
def validating_start(uuid: UUID) -> None:
11201191
"""
@@ -1194,6 +1265,11 @@ def camel_to_snake_keep_underscores(value):
11941265
retry_backoff_jitter=True,
11951266
soft_time_limit=FAST_TIME_LIMIT,
11961267
silo_mode=SiloMode.REGION,
1268+
taskworker_config=TaskworkerConfig(
1269+
namespace=relocation_tasks,
1270+
processing_deadline_duration=FAST_TIME_LIMIT,
1271+
retry=Retry(times=MAX_VALIDATION_POLLS, on=(Exception,), times_exceeded=LastAction.Discard),
1272+
),
11971273
)
11981274
def validating_poll(uuid: UUID, build_id: str) -> None:
11991275
"""
@@ -1292,6 +1368,13 @@ def validating_poll(uuid: UUID, build_id: str) -> None:
12921368
retry_backoff_jitter=True,
12931369
soft_time_limit=FAST_TIME_LIMIT,
12941370
silo_mode=SiloMode.REGION,
1371+
taskworker_config=TaskworkerConfig(
1372+
namespace=relocation_tasks,
1373+
processing_deadline_duration=FAST_TIME_LIMIT,
1374+
retry=Retry(
1375+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
1376+
),
1377+
),
12951378
)
12961379
def validating_complete(uuid: UUID, build_id: str) -> None:
12971380
"""
@@ -1380,6 +1463,13 @@ def validating_complete(uuid: UUID, build_id: str) -> None:
13801463
acks_late=True,
13811464
soft_time_limit=SLOW_TIME_LIMIT,
13821465
silo_mode=SiloMode.REGION,
1466+
taskworker_config=TaskworkerConfig(
1467+
namespace=relocation_tasks,
1468+
processing_deadline_duration=SLOW_TIME_LIMIT,
1469+
retry=Retry(
1470+
times=MAX_SLOW_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
1471+
),
1472+
),
13831473
)
13841474
def importing(uuid: UUID) -> None:
13851475
"""
@@ -1442,6 +1532,13 @@ def importing(uuid: UUID) -> None:
14421532
retry_backoff_jitter=True,
14431533
soft_time_limit=FAST_TIME_LIMIT,
14441534
silo_mode=SiloMode.REGION,
1535+
taskworker_config=TaskworkerConfig(
1536+
namespace=relocation_tasks,
1537+
processing_deadline_duration=FAST_TIME_LIMIT,
1538+
retry=Retry(
1539+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
1540+
),
1541+
),
14451542
)
14461543
def postprocessing(uuid: UUID) -> None:
14471544
"""
@@ -1534,6 +1631,13 @@ def postprocessing(uuid: UUID) -> None:
15341631
retry_backoff_jitter=True,
15351632
soft_time_limit=FAST_TIME_LIMIT,
15361633
silo_mode=SiloMode.REGION,
1634+
taskworker_config=TaskworkerConfig(
1635+
namespace=relocation_tasks,
1636+
processing_deadline_duration=FAST_TIME_LIMIT,
1637+
retry=Retry(
1638+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
1639+
),
1640+
),
15371641
)
15381642
def notifying_unhide(uuid: UUID) -> None:
15391643
"""
@@ -1580,6 +1684,13 @@ def notifying_unhide(uuid: UUID) -> None:
15801684
retry_backoff_jitter=True,
15811685
soft_time_limit=FAST_TIME_LIMIT,
15821686
silo_mode=SiloMode.REGION,
1687+
taskworker_config=TaskworkerConfig(
1688+
namespace=relocation_tasks,
1689+
processing_deadline_duration=FAST_TIME_LIMIT,
1690+
retry=Retry(
1691+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
1692+
),
1693+
),
15831694
)
15841695
def notifying_users(uuid: UUID) -> None:
15851696
"""
@@ -1654,6 +1765,13 @@ def notifying_users(uuid: UUID) -> None:
16541765
retry_backoff_jitter=True,
16551766
soft_time_limit=FAST_TIME_LIMIT,
16561767
silo_mode=SiloMode.REGION,
1768+
taskworker_config=TaskworkerConfig(
1769+
namespace=relocation_tasks,
1770+
processing_deadline_duration=FAST_TIME_LIMIT,
1771+
retry=Retry(
1772+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
1773+
),
1774+
),
16571775
)
16581776
def notifying_owner(uuid: UUID) -> None:
16591777
"""
@@ -1702,6 +1820,13 @@ def notifying_owner(uuid: UUID) -> None:
17021820
retry_backoff_jitter=True,
17031821
soft_time_limit=FAST_TIME_LIMIT,
17041822
silo_mode=SiloMode.REGION,
1823+
taskworker_config=TaskworkerConfig(
1824+
namespace=relocation_tasks,
1825+
processing_deadline_duration=FAST_TIME_LIMIT,
1826+
retry=Retry(
1827+
times=MAX_FAST_TASK_RETRIES, on=(Exception,), times_exceeded=LastAction.Discard
1828+
),
1829+
),
17051830
)
17061831
def completed(uuid: UUID) -> None:
17071832
"""

src/sentry/relocation/tasks/transfer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
control_relocation_export_service,
1919
region_relocation_export_service,
2020
)
21+
from sentry.relocation.tasks import relocation_control_tasks, relocation_tasks
2122
from sentry.silo.base import SiloMode
2223
from sentry.tasks.base import instrumented_task
24+
from sentry.taskworker.config import TaskworkerConfig
2325
from sentry.types.region import get_local_region
2426

2527
logger = logging.getLogger("sentry.relocation")
@@ -29,6 +31,7 @@
2931
name="sentry.relocation.transfer.find_relocation_transfer_control",
3032
queue="relocation.control",
3133
silo_mode=SiloMode.CONTROL,
34+
taskworker_config=TaskworkerConfig(namespace=relocation_control_tasks),
3235
)
3336
def find_relocation_transfer_control() -> None:
3437
_find_relocation_transfer(ControlRelocationTransfer, process_relocation_transfer_control)
@@ -38,6 +41,7 @@ def find_relocation_transfer_control() -> None:
3841
name="sentry.relocation.transfer.find_relocation_transfer_region",
3942
queue="relocation",
4043
silo_mode=SiloMode.REGION,
44+
taskworker=TaskworkerConfig(namespace=relocation_tasks),
4145
)
4246
def find_relocation_transfer_region() -> None:
4347
_find_relocation_transfer(RegionRelocationTransfer, process_relocation_transfer_region)
@@ -88,6 +92,7 @@ def _find_relocation_transfer(
8892
name="sentry.relocation.transfer.process_relocation_transfer_control",
8993
queue="relocation.control",
9094
silo_mode=SiloMode.CONTROL,
95+
taskworker_config=TaskworkerConfig(namespace=relocation_control_tasks),
9196
)
9297
def process_relocation_transfer_control(transfer_id: int) -> None:
9398
log_context = {"id": transfer_id, "silo": "control"}
@@ -178,6 +183,7 @@ def process_relocation_transfer_control(transfer_id: int) -> None:
178183
name="sentry.relocation.transfer.process_relocation_transfer_region",
179184
queue="relocation",
180185
silo_mode=SiloMode.REGION,
186+
taskworker_config=TaskworkerConfig(namespace=relocation_tasks),
181187
)
182188
def process_relocation_transfer_region(transfer_id: int) -> None:
183189
log_context = {"id": transfer_id, "silo": "region", "region": get_local_region().name}

0 commit comments

Comments
 (0)