Skip to content

[Fix](pyudf) clear stale UDAF state cache on drop#63062

Open
linrrzqqq wants to merge 2 commits intoapache:masterfrom
linrrzqqq:pyudf-clear-udaf-state
Open

[Fix](pyudf) clear stale UDAF state cache on drop#63062
linrrzqqq wants to merge 2 commits intoapache:masterfrom
linrrzqqq:pyudf-clear-udaf-state

Conversation

@linrrzqqq
Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

Fix Python UDAF stale cache reuse after dropping and recreating an inline UDAF with the same name/signature.

The Python server previously keyed UDAF state managers by function name and argument types, so a recreated inline UDAF could reuse the old loaded Python class. This fix includes the FE function id in the Python UDAF metadata/cache key and clears UDAF state manager cache during DROP FUNCTION cleanup.

set enable_sql_cache = 0;
DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
drop database if exists db001;
create database db001;
use db001;

-- 0. Prepare test data
DROP TABLE IF EXISTS t_udaf_cache_bug_test;
CREATE TABLE t_udaf_cache_bug_test (
    id INT,
    val INT
) DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num"="1");
INSERT INTO t_udaf_cache_bug_test VALUES (1, 10), (2, 20), (3, 30);
-- At this moment, the total of the entire table val is 60.

-- 1. Create V1 version of UDAF (Logic: Accumulate and multiply by 10)
DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
select sleep(10);
CREATE AGGREGATE FUNCTION py_udaf_bug_repro(INT)
RETURNS BIGINT
PROPERTIES (
    "type"="PYTHON_UDF",
    "symbol"="RecreateUDAF",
    "runtime_version"="3.12.11", 
    "always_nullable"="true"
)
AS $$
class RecreateUDAF:
    def __init__(self):
        self.total = 0
    @property
    def aggregate_state(self):
        return self.total
    def accumulate(self, val):
        if val is not None:
            self.total += val
    def merge(self, other):
        self.total += other
    def finish(self):
        return self.total * 10  # V1: 乘以 10
$$;

-- 2. Verify V1 Logic
SELECT py_udaf_bug_repro(val) FROM t_udaf_cache_bug_test;
-- Expected Return: 600 (60 * 10)
-- Actual Return: 600 (Correct)

-- 3. Drop the old function and create a V2 version of the UDAF with the same name (logic: accumulate and multiply by 100)
DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
select sleep(10);
select sleep(10);
CREATE AGGREGATE FUNCTION py_udaf_bug_repro(INT)
RETURNS BIGINT
PROPERTIES (
    "type"="PYTHON_UDF",
    "symbol"="RecreateUDAF",
    "runtime_version"="3.12.11",
    "always_nullable"="true"
)
AS $$
class RecreateUDAF:
    def __init__(self):
        self.total = 0
    @property
    def aggregate_state(self):
        return self.total
    def accumulate(self, val):
        if val is not None:
            self.total += val
    def merge(self, other):
        self.total += other
    def finish(self):
        return self.total * 100  # V2: Logic modified to multiply by 100
$$;

-- 4. Verify V2 Logic
SELECT py_udaf_bug_repro(val) FROM t_udaf_cache_bug_test;
-- Expected Return: 6000 (60 * 100)
-- Actual Return: 600  ([Bug occurs] Still outputs the old cached 600)

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@linrrzqqq linrrzqqq force-pushed the pyudf-clear-udaf-state branch from ee34297 to 1edf152 Compare May 8, 2026 02:15
@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found two blocking issues in the new Python UDAF drop cleanup path.

Critical checkpoint conclusions:

  • Goal/test: The PR addresses stale inline UDAF class reuse after DROP/CREATE with the same signature, and adds a targeted regression test. The function-id cache key covers the recreate case, but the cleanup path introduces failure and lifecycle risks.
  • Scope: The change is mostly focused, but the cleanup action should preserve existing best-effort cache cleanup semantics and in-flight query lifecycles.
  • Concurrency: The new DROP cleanup can run concurrently with active Python Flight UDAF exchanges. Current code can clear states still needed by those exchanges.
  • Lifecycle: Function-id keys avoid reuse by new functions, but removing/clearing old managers needs reference-safe lifecycle handling.
  • Compatibility/config: No new config or storage/protocol incompatibility identified for the BE/Python process pair built from this PR.
  • Parallel paths: Module cache cleanup remains best-effort; the new UDAF cleanup should follow the same non-crashing error handling model.
  • Tests: Regression coverage exercises stale reuse, but does not cover cleanup failure or concurrent DROP while an old UDAF query is still running.
  • Observability/performance/data correctness: No additional observability requirement found; the main correctness risks are in error propagation and active-state invalidation.
  • User focus: No additional user-provided review focus was specified.

Comment thread be/src/udf/python/python_server.cpp Outdated
void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
THROW_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", body,
fmt::format("function_id={}", function_id)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can crash the BE from a DROP FUNCTION cleanup task. _broadcast_action_to_processes() returns an error whenever an active Python process fails DoAction/Next, and THROW_IF_ERROR converts that into a doris::Exception. The caller is clean_udf_cache_callback(), which is run by TaskWorkerPool via _callback(task) without any catch boundary, so an uncaught exception terminates the worker thread/process instead of reporting a best-effort cache cleanup failure. The existing module-cache cleanup path logs and continues; this should return Status and be handled with a warning (or use the existing cleanup-style WARN_IF_ERROR) rather than throwing out of the task callback.

for key in keys_to_remove:
manager = self.udaf_state_managers.pop(key)
manager.states.clear()
cleared += 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearing manager.states can invalidate in-flight UDAF queries. The DROP cleanup task is submitted asynchronously after FE removes the function, while an already-started query can still have a Flight exchange using this same UDAFStateManager for the old function id. If this action runs between that query's CREATE/ACCUMULATE and later SERIALIZE/FINALIZE/DESTROY calls, those operations will find their place_id entries removed and fail with KeyError/failed UDAF results. Since adding function_id to the key already prevents a recreated function from reusing the old class, cleanup should detach the manager from udaf_state_managers without clearing a manager that active exchanges may still reference, or add explicit lifecycle/ref-count coordination. Also consider returning the manager from _get_udaf_state_manager() while still under the lock so a concurrent pop cannot occur between lookup and return.

@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated review completed for PR head 7bdde75837b919220cc8758638b8bdaf61dd14b7.

No new blocking issues found. I also reviewed the existing inline threads and did not duplicate them; the current head appears to address the previously raised UDAF cleanup exception and in-flight state invalidation concerns by using WARN_IF_ERROR, returning the manager under lock, and detaching managers without clearing active states.

Critical checkpoint conclusions:

  • Goal/test: The PR targets stale Python UDAF state/class reuse after DROP/CREATE. The function-id keying plus drop cleanup path matches that goal, with added BE unit tests and regression coverage for inline drop/recreate.
  • Scope: The change is small and focused on Python UDF/UDAF metadata, manager cleanup, and related tests.
  • Concurrency/lifecycle: Reviewed the async DROP cleanup versus active Flight exchanges. The current implementation avoids clearing in-flight state and prevents lookup/pop races for manager retrieval. No additional concurrency blocker found.
  • Configuration: No new configuration items.
  • Compatibility/storage: No storage-format or persisted metadata change. The new id field is carried in the internal BE-to-Python descriptor and all reviewed production builders populate it.
  • Parallel paths: UDF, UDAF, and UDTF metadata serialization now include the function id; only UDAF manager keys use it, which is appropriate for this bug.
  • Error handling: Cleanup failures are best-effort warnings instead of uncaught task-worker exceptions. Status handling in the reviewed changed C++ paths is acceptable.
  • Tests: Added BE unit coverage for no-process, failed-action, action broadcast, and JSON id serialization, plus regression coverage for inline UDAF recreation. I did not run tests locally in this review runner.
  • Observability: Existing INFO/WARNING logs around cleanup/broadcast are sufficient for this change.
  • Transaction/persistence/data writes: Not applicable.
  • Performance: Cleanup scans only the per-process UDAF manager registry under a lock; no obvious performance blocker found.
  • User focus: No additional user-provided review focus was specified.

@linrrzqqq linrrzqqq force-pushed the pyudf-clear-udaf-state branch from 7bdde75 to 7bff96e Compare May 8, 2026 09:11
@linrrzqqq linrrzqqq force-pushed the pyudf-clear-udaf-state branch from 7bff96e to e321868 Compare May 8, 2026 16:05
@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-H: Total hot run time: 29888 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit e32186890f425e43b7bdd68a0e28a0509cdeff0f, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17680	4018	3996	3996
q2	q3	10718	881	617	617
q4	4657	462	353	353
q5	7448	1318	1149	1149
q6	195	171	139	139
q7	902	937	750	750
q8	9342	1394	1298	1298
q9	6050	5367	5366	5366
q10	6291	2083	1813	1813
q11	475	274	257	257
q12	697	415	300	300
q13	18212	3352	2721	2721
q14	302	283	262	262
q15	q16	913	879	807	807
q17	968	1049	782	782
q18	6433	5710	5627	5627
q19	1376	1296	1136	1136
q20	530	411	260	260
q21	4645	2349	1925	1925
q22	457	384	330	330
Total cold run time: 98291 ms
Total hot run time: 29888 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4874	4773	4739	4739
q2	q3	4671	4781	4234	4234
q4	2148	2180	1391	1391
q5	5003	4970	5259	4970
q6	205	172	138	138
q7	2076	1831	1648	1648
q8	3316	3078	3073	3073
q9	8416	8599	8561	8561
q10	4536	4516	4254	4254
q11	625	420	419	419
q12	680	754	551	551
q13	3458	3542	2933	2933
q14	312	319	287	287
q15	q16	760	779	766	766
q17	1460	1384	1364	1364
q18	8036	7219	7161	7161
q19	1152	1156	1204	1156
q20	2289	2254	1959	1959
q21	6232	5705	4900	4900
q22	526	484	399	399
Total cold run time: 60775 ms
Total hot run time: 54903 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-DS: Total hot run time: 169707 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit e32186890f425e43b7bdd68a0e28a0509cdeff0f, data reload: false

query5	4316	660	513	513
query6	336	229	208	208
query7	4240	564	300	300
query8	320	232	217	217
query9	8863	4007	4009	4007
query10	468	346	306	306
query11	5781	2365	2188	2188
query12	186	133	130	130
query13	1342	596	436	436
query14	6414	5366	5064	5064
query14_1	4400	4379	4369	4369
query15	217	208	186	186
query16	1004	456	457	456
query17	1159	771	642	642
query18	2747	499	366	366
query19	233	216	178	178
query20	143	138	132	132
query21	219	142	120	120
query22	13599	13525	13440	13440
query23	17129	16358	16025	16025
query23_1	16201	16125	16206	16125
query24	7453	1788	1355	1355
query24_1	1340	1361	1344	1344
query25	599	520	475	475
query26	1300	317	181	181
query27	2690	597	343	343
query28	4365	1958	1986	1958
query29	1029	650	539	539
query30	300	242	198	198
query31	1138	1065	938	938
query32	89	80	75	75
query33	555	352	307	307
query34	1190	1127	651	651
query35	774	810	682	682
query36	1378	1360	1138	1138
query37	152	106	85	85
query38	3235	3160	3086	3086
query39	931	934	886	886
query39_1	870	857	917	857
query40	234	159	142	142
query41	69	62	60	60
query42	111	115	108	108
query43	330	327	282	282
query44	
query45	216	203	202	202
query46	1126	1202	747	747
query47	2500	2521	2217	2217
query48	402	430	306	306
query49	645	532	424	424
query50	735	295	221	221
query51	4375	4302	4262	4262
query52	107	106	97	97
query53	255	301	211	211
query54	308	291	275	275
query55	96	91	88	88
query56	331	317	305	305
query57	1411	1447	1343	1343
query58	298	276	271	271
query59	1581	1660	1433	1433
query60	344	351	328	328
query61	165	150	152	150
query62	672	632	567	567
query63	241	201	207	201
query64	2358	827	679	679
query65	
query66	1689	520	384	384
query67	30052	30032	29223	29223
query68	
query69	460	337	303	303
query70	1031	1011	970	970
query71	299	273	268	268
query72	2912	2744	2417	2417
query73	880	756	403	403
query74	5081	4900	4751	4751
query75	2772	2669	2354	2354
query76	2333	1134	739	739
query77	420	430	348	348
query78	12988	12955	12484	12484
query79	1481	996	752	752
query80	1385	579	513	513
query81	525	281	243	243
query82	1261	157	118	118
query83	329	282	247	247
query84	274	144	114	114
query85	907	510	446	446
query86	450	351	325	325
query87	3430	3389	3217	3217
query88	3601	2677	2663	2663
query89	452	396	341	341
query90	1944	176	182	176
query91	178	165	135	135
query92	78	82	76	76
query93	980	937	551	551
query94	699	297	315	297
query95	656	380	337	337
query96	1054	783	332	332
query97	2705	2699	2555	2555
query98	236	231	248	231
query99	1155	1148	974	974
Total cold run time: 255203 ms
Total hot run time: 169707 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 70.59% (12/17) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.79% (27780/37648)
Line Coverage 57.67% (300875/521723)
Region Coverage 54.95% (251028/456811)
Branch Coverage 56.44% (108485/192210)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants