Skip to content

Commit 1c145f7

Browse files
authored
ref(store) Update post_process to use a cache key round 2 (#20620)
This reverts commit 9018680 and restores post_process to using a cache key instead of an AMQP payload. I've added additional logic to ensure we don't fail when duplicate events are processed and we don't have a cache_key, and included tests for those scenarios. Lastly, I'm always passing cache_key to post_process now to ensure that the processingstore is cleared when the new paths are not run as the delete done during tasks/store has been removed.
1 parent fbfc6dc commit 1c145f7

File tree

4 files changed

+349
-59
lines changed

4 files changed

+349
-59
lines changed

src/sentry/eventstream/base.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from __future__ import absolute_import
22

3+
import random
34
import logging
45

5-
from sentry.utils.services import Service
6+
from sentry import options
67
from sentry.tasks.post_process import post_process_group
8+
from sentry.utils.services import Service
9+
from sentry.utils.cache import cache_key_for_event
710

811

912
logger = logging.getLogger(__name__)
@@ -43,13 +46,30 @@ def _dispatch_post_process_group_task(
4346
if skip_consume:
4447
logger.info("post_process.skip.raw_event", extra={"event_id": event.event_id})
4548
else:
46-
post_process_group.delay(
47-
event=event,
48-
is_new=is_new,
49-
is_regression=is_regression,
50-
is_new_group_environment=is_new_group_environment,
51-
primary_hash=primary_hash,
49+
random_val = random.random()
50+
cache_key = cache_key_for_event(
51+
{"project": event.project_id, "event_id": event.event_id}
5252
)
53+
if options.get("postprocess.use-cache-key") > random_val:
54+
post_process_group.delay(
55+
event=None,
56+
is_new=is_new,
57+
is_regression=is_regression,
58+
is_new_group_environment=is_new_group_environment,
59+
primary_hash=primary_hash,
60+
cache_key=cache_key,
61+
group_id=event.group_id,
62+
)
63+
else:
64+
# Pass the cache key here to ensure that the processing cache is removed.
65+
post_process_group.delay(
66+
event=event,
67+
is_new=is_new,
68+
is_regression=is_regression,
69+
is_new_group_environment=is_new_group_environment,
70+
primary_hash=primary_hash,
71+
cache_key=cache_key,
72+
)
5373

5474
def insert(
5575
self,

src/sentry/tasks/post_process.py

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -116,38 +116,57 @@ def handle_owner_assignment(project, group, event):
116116

117117

118118
@instrumented_task(name="sentry.tasks.post_process.post_process_group")
119-
def post_process_group(event, is_new, is_regression, is_new_group_environment, **kwargs):
119+
def post_process_group(
120+
event, is_new, is_regression, is_new_group_environment, cache_key=None, group_id=None, **kwargs
121+
):
120122
"""
121123
Fires post processing hooks for a group.
122124
"""
123-
set_current_project(event.project_id)
124-
125+
from sentry.eventstore.models import Event
126+
from sentry.eventstore.processing import event_processing_store
125127
from sentry.utils import snuba
126128
from sentry.reprocessing2 import is_reprocessed_event
127129

128130
with snuba.options_override({"consistent": True}):
129-
if is_reprocessed_event(event.data):
131+
# The event parameter will be removed after transitioning to
132+
# event_processing_store is complete.
133+
if cache_key and event is None:
134+
data = event_processing_store.get(cache_key)
135+
if not data:
136+
logger.info(
137+
"post_process.skipped",
138+
extra={"cache_key": cache_key, "reason": "missing_cache"},
139+
)
140+
return
141+
event = Event(
142+
project_id=data["project"], event_id=data["event_id"], group_id=group_id, data=data
143+
)
144+
elif check_event_already_post_processed(event):
145+
if cache_key:
146+
event_processing_store.delete_by_key(cache_key)
130147
logger.info(
131148
"post_process.skipped",
132149
extra={
133150
"project_id": event.project_id,
134151
"event_id": event.event_id,
135-
"reason": "reprocessed",
152+
"reason": "duplicate",
136153
},
137154
)
138155
return
139156

140-
if check_event_already_post_processed(event):
157+
if is_reprocessed_event(event.data):
141158
logger.info(
142159
"post_process.skipped",
143160
extra={
144161
"project_id": event.project_id,
145162
"event_id": event.event_id,
146-
"reason": "duplicate",
163+
"reason": "reprocessed",
147164
},
148165
)
149166
return
150167

168+
set_current_project(event.project_id)
169+
151170
# NOTE: we must pass through the full Event object, and not an
152171
# event_id since the Event object may not actually have been stored
153172
# in the database due to sampling.
@@ -161,13 +180,13 @@ def post_process_group(event, is_new, is_regression, is_new_group_environment, *
161180
event.data = EventDict(event.data, skip_renormalization=True)
162181

163182
if event.group_id:
164-
# Re-bind Group since we're pickling the whole Event object
165-
# which may contain a stale Project.
183+
# Re-bind Group since we're reading the Event object
184+
# from cache, which may contain a stale group and project
166185
event.group, _ = get_group_with_redirect(event.group_id)
167186
event.group_id = event.group.id
168187

169-
# Re-bind Project and Org since we're pickling the whole Event object
170-
# which may contain stale parent models.
188+
# Re-bind Project and Org since we're reading the Event object
189+
# from cache which may contain stale parent models.
171190
event.project = Project.objects.get_from_cache(id=event.project_id)
172191
event.project._organization_cache = Organization.objects.get_from_cache(
173192
id=event.project.organization_id
@@ -232,6 +251,9 @@ def post_process_group(event, is_new, is_regression, is_new_group_environment, *
232251
event=event,
233252
primary_hash=kwargs.get("primary_hash"),
234253
)
254+
if cache_key:
255+
with metrics.timer("tasks.post_process.delete_event_cache"):
256+
event_processing_store.delete_by_key(cache_key)
235257

236258

237259
def process_snoozes(group):

src/sentry/tasks/store.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,7 @@ def create_failed_event(
667667
# modifications to take place.
668668
delete_raw_event(project_id, event_id)
669669
data = event_processing_store.get(cache_key)
670+
670671
if data is None:
671672
metrics.incr("events.failed", tags={"reason": "cache", "stage": "raw"}, skip_internal=False)
672673
error_logger.error("process.failed_raw.empty", extra={"cache_key": cache_key})
@@ -690,7 +691,6 @@ def create_failed_event(
690691
type=issue["type"],
691692
data=issue["data"],
692693
)
693-
694694
event_processing_store.delete_by_key(cache_key)
695695

696696
return True
@@ -762,15 +762,18 @@ def _do_save_event(
762762
manager.save(
763763
project_id, assume_normalized=True, start_time=start_time, cache_key=cache_key
764764
)
765-
765+
# Put the updated event back into the cache so that post_process
766+
# has the most recent data.
767+
data = manager.get_data()
768+
if isinstance(data, CANONICAL_TYPES):
769+
data = dict(data.items())
770+
with metrics.timer("tasks.store.do_save_event.write_processing_cache"):
771+
event_processing_store.store(data)
766772
except HashDiscarded:
767773
pass
768774

769775
finally:
770776
if cache_key:
771-
with metrics.timer("tasks.store.do_save_event.delete_cache"):
772-
event_processing_store.delete_by_key(cache_key)
773-
774777
with metrics.timer("tasks.store.do_save_event.delete_attachment_cache"):
775778
attachment_cache.delete(cache_key)
776779

0 commit comments

Comments
 (0)