Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
#include "storage/task/engine_storage_migration_task.h"
#include "storage/txn/txn_manager.h"
#include "storage/utils.h"
#include "udf/python/python_server.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/jni-util.h"
Expand Down Expand Up @@ -2596,6 +2597,7 @@ void clean_udf_cache_callback(const TAgentTaskRequest& req) {

if (clean_req.__isset.function_id && clean_req.function_id > 0) {
UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
PythonServerManager::instance().clear_udaf_state_cache(clean_req.function_id);
}

LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
Expand Down
25 changes: 19 additions & 6 deletions be/src/udf/python/python_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "arrow/flight/client.h"
#include "common/config.h"
#include "common/status.h"
#include "udf/python/python_udaf_client.h"
#include "udf/python/python_udf_client.h"
#include "udf/python/python_udtf_client.h"
Expand Down Expand Up @@ -413,7 +414,20 @@ Status PythonServerManager::clear_module_cache(const std::string& location) {
}

std::string body = fmt::format(R"({{"location": "{}"}})", location);
return _broadcast_action_to_processes("clear_module_cache", body,
fmt::format("location={}", location));
}

void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
WARN_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", body,
fmt::format("function_id={}", function_id)),
"failed to clear Python UDAF state cache");
}

Status PythonServerManager::_broadcast_action_to_processes(const std::string& action_type,
const std::string& body,
const std::string& log_name) {
int success_count = 0;
int fail_count = 0;
bool has_active_process = false;
Expand Down Expand Up @@ -441,7 +455,7 @@ Status PythonServerManager::clear_module_cache(const std::string& location) {
auto client = std::move(*client_result);

arrow::flight::Action action;
action.type = "clear_module_cache";
action.type = action_type;
action.body = arrow::Buffer::FromString(body);

auto result_stream = client->DoAction(action);
Expand All @@ -467,13 +481,12 @@ Status PythonServerManager::clear_module_cache(const std::string& location) {
return Status::OK();
}

LOG(INFO) << "clear_module_cache completed for location=" << location
<< ", success=" << success_count << ", failed=" << fail_count;
LOG(INFO) << action_type << " completed for " << log_name << ", success=" << success_count
<< ", failed=" << fail_count;

if (fail_count > 0) {
return Status::InternalError(
"clear_module_cache failed for location={}, success={}, failed={}", location,
success_count, fail_count);
return Status::InternalError("{} failed for {}, success={}, failed={}", action_type,
log_name, success_count, fail_count);
}

return Status::OK();
Expand Down
5 changes: 5 additions & 0 deletions be/src/udf/python/python_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class PythonServerManager {
// Clear Python module cache for a specific UDF location across all processes
Status clear_module_cache(const std::string& location);

// Clear Python UDAF runtime state after DROP FUNCTION
void clear_udaf_state_cache(int64_t function_id);

void shutdown();

#ifdef BE_TEST
Expand Down Expand Up @@ -108,6 +111,8 @@ class PythonServerManager {
std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const PythonVersion& version);
std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>>
_snapshot_process_pools();
Status _broadcast_action_to_processes(const std::string& action_type, const std::string& body,
const std::string& log_name);

std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> _process_pools;
// Protects the version -> pool handle map only. Per-version process operations are guarded
Expand Down
79 changes: 72 additions & 7 deletions be/src/udf/python/python_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ class PythonUDFMeta:

def __init__(
self,
function_id: int,
name: str,
symbol: str,
location: str,
Expand All @@ -470,6 +471,7 @@ def __init__(
Initialize Python UDF metadata.

Args:
function_id: FE catalog function id
name: UDF function name
symbol: Symbol to load (function name or module.function)
location: File path or directory containing the UDF
Expand All @@ -481,6 +483,7 @@ def __init__(
output_type: PyArrow data type for return value
client_type: 0 for UDF, 1 for UDAF, 2 for UDTF
"""
self.id = function_id
self.name = name
self.symbol = symbol
self.location = location
Expand Down Expand Up @@ -508,7 +511,7 @@ def __str__(self) -> str:
"""Returns a string representation of the UDF metadata."""
udf_load_type_str = "INLINE" if self.udf_load_type == 0 else "MODULE"
return (
f"PythonUDFMeta(name={self.name}, symbol={self.symbol}, "
f"PythonUDFMeta(id={self.id}, name={self.name}, symbol={self.symbol}, "
f"location={self.location}, udf_load_type={udf_load_type_str}, runtime_version={self.runtime_version}, "
f"always_nullable={self.always_nullable}, client_type={self.client_type.name}, "
f"input_types={self.input_types}, output_type={self.output_type})"
Expand Down Expand Up @@ -1575,8 +1578,9 @@ def __init__(self, location: str):
location: Unix socket path for the server
"""
super().__init__(location)
# Use a dictionary to maintain separate state managers for each UDAF function
# Key: function signature (name + input_types), Value: UDAFStateManager instance
# Use a dictionary to maintain separate state managers for each UDAF function.
# Key includes function_id so DROP/CREATE with the same name and signature
# cannot reuse a class loaded from old inline code.
self.udaf_state_managers: Dict[str, UDAFStateManager] = {}
self.udaf_managers_lock = threading.Lock()

Expand All @@ -1593,19 +1597,50 @@ def _get_udaf_state_manager(
Returns:
UDAFStateManager instance for this specific UDAF
"""
# Create a unique key based on function name and argument types
type_names = [str(field.type) for field in python_udaf_meta.input_types]
func_key = f"{python_udaf_meta.name}({','.join(type_names)})"
func_key = (
f"{python_udaf_meta.id}:{python_udaf_meta.name}({','.join(type_names)})"
)

with self.udaf_managers_lock:
if func_key not in self.udaf_state_managers:
manager = self.udaf_state_managers.get(func_key)
if manager is None:
manager = UDAFStateManager()
# Load and set the UDAF class for this manager using UDAFClassLoader
udaf_class = UDAFClassLoader.load_udaf_class(python_udaf_meta)
manager.set_udaf_class(udaf_class)
self.udaf_state_managers[func_key] = manager

return self.udaf_state_managers[func_key]
# Return the manager while holding the lock so a concurrent DROP cleanup
# cannot pop the key between lookup and return.
return manager

def _clear_udaf_state_cache_by_function_id(self, function_id: int) -> int:
"""
Clear UDAF managers for a dropped function id.

DROP FUNCTION cache cleanup is asynchronous. The runtime key still includes
function_id for correctness, while this action detaches dropped functions
from the manager registry so new exchanges cannot reuse the old UDAF class.
"""
prefix = f"{function_id}:"
cleared = 0

with self.udaf_managers_lock:
keys_to_remove = [
key for key in self.udaf_state_managers if key.startswith(prefix)
]
for key in keys_to_remove:
# Do not clear manager.states here. An already-started Flight
# exchange may still hold this manager and continue with later
# SERIALIZE/FINALIZE/DESTROY calls for its place_ids.
self.udaf_state_managers.pop(key, None)
cleared += 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearing manager.states can invalidate in-flight UDAF queries. The DROP cleanup task is submitted asynchronously after FE removes the function, while an already-started query can still have a Flight exchange using this same UDAFStateManager for the old function id. If this action runs between that query's CREATE/ACCUMULATE and later SERIALIZE/FINALIZE/DESTROY calls, those operations will find their place_id entries removed and fail with KeyError/failed UDAF results. Since adding function_id to the key already prevents a recreated function from reusing the old class, cleanup should detach the manager from udaf_state_managers without clearing a manager that active exchanges may still reference, or add explicit lifecycle/ref-count coordination. Also consider returning the manager from _get_udaf_state_manager() while still under the lock so a concurrent pop cannot occur between lookup and return.


if cleared:
gc.collect()

return cleared

@staticmethod
def parse_python_udf_meta(
Expand All @@ -1623,6 +1658,7 @@ def parse_python_udf_meta(
return None

cmd_json = json.loads(descriptor.command)
function_id = cmd_json["id"]
name = cmd_json["name"]
symbol = cmd_json["symbol"]
location = cmd_json["location"]
Expand All @@ -1648,6 +1684,7 @@ def parse_python_udf_meta(
output_type = output_schema.field(0).type

python_udf_meta = PythonUDFMeta(
function_id=function_id,
name=name,
symbol=symbol,
location=location,
Expand Down Expand Up @@ -2513,14 +2550,42 @@ def do_action(
Supported actions:
- "clear_module_cache": Clear Python module cache for a specific location
Body: JSON with "location" field (the UDF cache directory path)
- "clear_udaf_state_cache": Clear UDAF runtime state for a dropped function id
Body: JSON with "function_id" field
"""
action_type = action.type

if action_type == "clear_module_cache":
yield from self._handle_clear_module_cache(action.body.to_pybytes())
elif action_type == "clear_udaf_state_cache":
yield from self._handle_clear_udaf_state_cache(action.body.to_pybytes())
else:
raise flight.FlightUnavailableError(f"Unknown action: {action_type}")

def _handle_clear_udaf_state_cache(self, body: bytes):
"""
Clear cached UDAF state managers for a dropped function id.
"""
try:
params = json.loads(body.decode("utf-8"))
function_id = int(params["function_id"])

cleared_managers = self._clear_udaf_state_cache_by_function_id(function_id)

result = {
"success": True,
"cleared_managers": cleared_managers,
"function_id": function_id,
}
yield flight.Result(json.dumps(result).encode("utf-8"))

except Exception as e:
logging.error("clear_udaf_state_cache failed: %s", e)
yield flight.Result(json.dumps({
"success": False,
"error": str(e)
}).encode("utf-8"))

def _handle_clear_module_cache(self, body: bytes):
"""
Clear Python module cache for a specific UDF location.
Expand Down
2 changes: 2 additions & 0 deletions be/src/udf/python/python_udf_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Status PythonUDFMeta::serialize_arrow_schema(const std::shared_ptr<arrow::Schema
json format:
{
"name": "xxx",
"id": 123,
"symbol": "xxx",
"location": "xxx",
"udf_load_type": 0 or 1,
Expand All @@ -72,6 +73,7 @@ Status PythonUDFMeta::serialize_to_json(std::string* json_str) const {
doc.SetObject();
auto& allocator = doc.GetAllocator();
doc.AddMember("name", rapidjson::Value().SetString(name.c_str(), allocator), allocator);
doc.AddMember("id", rapidjson::Value().SetInt64(id), allocator);
doc.AddMember("symbol", rapidjson::Value().SetString(symbol.c_str(), allocator), allocator);
doc.AddMember("location", rapidjson::Value().SetString(location.c_str(), allocator), allocator);
doc.AddMember("udf_load_type", rapidjson::Value().SetInt(static_cast<int>(type)), allocator);
Expand Down
24 changes: 23 additions & 1 deletion be/test/udf/python/python_server_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
#include <sys/socket.h>
#include <sys/un.h>

#include <boost/process.hpp>
#include <filesystem>
#include <fstream>
#include <future>
#include <string>
#include <vector>

#include "common/config.h"
#include "common/status.h"
Expand All @@ -36,6 +37,7 @@
namespace doris {

namespace fs = std::filesystem;
namespace bp = boost::process;

class PythonServerTest : public ::testing::Test {
protected:
Expand Down Expand Up @@ -136,6 +138,13 @@ class PythonServerTest : public ::testing::Test {
ofs << "# fake server\n";
ofs.close();
}

ProcessPtr create_sleep_process() {
bp::ipstream output_stream;
std::string sleep_path = fs::exists("/bin/sleep") ? "/bin/sleep" : "/usr/bin/sleep";
bp::child child(sleep_path, "60", bp::std_out > output_stream, bp::std_err > bp::null);
return std::make_shared<PythonUDFProcess>(std::move(child), std::move(output_stream));
}
};

// ============================================================================
Expand Down Expand Up @@ -304,6 +313,19 @@ TEST_F(PythonServerTest, ShutdownAfterFailedInitializationDoesNotCrash) {
EXPECT_NO_THROW(mgr.shutdown());
}

TEST_F(PythonServerTest, ClearUdafStateCacheWithoutProcessesIsNoOp) {
PythonServerManager mgr;

EXPECT_NO_THROW(mgr.clear_udaf_state_cache(12345));
}

TEST_F(PythonServerTest, ClearModuleCacheWithoutProcessesIsNoOp) {
PythonServerManager mgr;

auto status = mgr.clear_module_cache("/tmp/python_udf_cache");
EXPECT_TRUE(status.ok()) << status.to_string();
}

// ============================================================================
// PythonServerManager::get_client() - client retrieval test
// ============================================================================
Expand Down
3 changes: 3 additions & 0 deletions be/test/udf/python/python_udf_meta_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ TEST_F(PythonUDFMetaTest, SerializeToJsonBasic) {
doc.Parse(json_str.c_str());
EXPECT_FALSE(doc.HasParseError());

EXPECT_TRUE(doc.HasMember("id"));
EXPECT_EQ(doc["id"].GetInt64(), 1);

EXPECT_TRUE(doc.HasMember("name"));
EXPECT_STREQ(doc["name"].GetString(), "test_udf");

Expand Down
Loading
Loading