diff --git a/include/pgduckdb/pgduckdb_background_worker.hpp b/include/pgduckdb/pgduckdb_background_worker.hpp index f6ab19c7..8752404e 100644 --- a/include/pgduckdb/pgduckdb_background_worker.hpp +++ b/include/pgduckdb/pgduckdb_background_worker.hpp @@ -1,10 +1,11 @@ #pragma once -void DuckdbInitBackgroundWorker(void); - namespace pgduckdb { -void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade); +void InitBackgroundWorker(void); +void TriggerActivity(void); + +extern bool is_background_worker; extern bool doing_motherduck_sync; extern char *current_duckdb_database_name; extern char *current_motherduck_catalog_version; diff --git a/include/pgduckdb/pgduckdb_guc.h b/include/pgduckdb/pgduckdb_guc.h index 20e965f4..8682ca77 100644 --- a/include/pgduckdb/pgduckdb_guc.h +++ b/include/pgduckdb/pgduckdb_guc.h @@ -15,3 +15,4 @@ extern int duckdb_motherduck_enabled; extern char *duckdb_motherduck_token; extern char *duckdb_postgres_role; extern char *duckdb_motherduck_default_database; +extern char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout; diff --git a/src/pgduckdb.cpp b/src/pgduckdb.cpp index 973798b8..9fcb3290 100644 --- a/src/pgduckdb.cpp +++ b/src/pgduckdb.cpp @@ -20,6 +20,7 @@ int duckdb_motherduck_enabled = MotherDuckEnabled::MOTHERDUCK_AUTO; char *duckdb_motherduck_token = strdup(""); char *duckdb_motherduck_postgres_database = strdup("postgres"); char *duckdb_motherduck_default_database = strdup(""); +char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout = strdup(""); char *duckdb_postgres_role = strdup(""); int duckdb_maximum_threads = -1; @@ -44,7 +45,7 @@ _PG_init(void) { DuckdbInitGUC(); DuckdbInitHooks(); DuckdbInitNode(); - DuckdbInitBackgroundWorker(); + pgduckdb::InitBackgroundWorker(); pgduckdb::RegisterDuckdbXactCallback(); } } // extern "C" @@ -186,4 +187,9 @@ DuckdbInitGUC(void) { DefineCustomVariable("duckdb.motherduck_default_database", "Which database in MotherDuck to designate as default (in place of my_db)", &duckdb_motherduck_default_database, PGC_POSTMASTER, GUC_SUPERUSER_ONLY); + + DefineCustomVariable("duckdb.motherduck_background_catalog_refresh_inactivity_timeout", + "When to stop syncing of the motherduck catalog when no activity has taken place", + &duckdb_motherduck_background_catalog_refresh_inactivity_timeout, PGC_POSTMASTER, + GUC_SUPERUSER_ONLY); } diff --git a/src/pgduckdb_background_worker.cpp b/src/pgduckdb_background_worker.cpp index 49a00422..01610ae4 100644 --- a/src/pgduckdb_background_worker.cpp +++ b/src/pgduckdb_background_worker.cpp @@ -53,10 +53,28 @@ extern "C" { #include "pgduckdb/pgduckdb_background_worker.hpp" #include "pgduckdb/pgduckdb_metadata_cache.hpp" -static bool is_background_worker = false; static std::unordered_map last_known_motherduck_catalog_versions; static uint64 initial_cache_version = 0; +namespace pgduckdb { + +bool is_background_worker = false; + +static void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context); +static void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *context); + +typedef struct BackgoundWorkerShmemStruct { + Latch *bgw_latch; /* The latch of the background worker */ + + slock_t lock; /* protects all the fields below */ + + int64 activity_count; /* the number of times activity was triggered by other backends */ +} BackgoundWorkerShmemStruct; + +static BackgoundWorkerShmemStruct *BgwShmemStruct; + +} // namespace pgduckdb + extern "C" { PGDLLEXPORT void pgduckdb_background_worker_main(Datum /* main_arg */) { @@ -66,9 +84,15 @@ pgduckdb_background_worker_main(Datum /* main_arg */) { BackgroundWorkerUnblockSignals(); BackgroundWorkerInitializeConnection(duckdb_motherduck_postgres_database, NULL, 0); + SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock); + pgduckdb::BgwShmemStruct->bgw_latch = MyLatch; + int64 last_activity_count = pgduckdb::BgwShmemStruct->activity_count; + SpinLockRelease(&pgduckdb::BgwShmemStruct->lock); pgduckdb::doing_motherduck_sync = true; - is_background_worker = true; + pgduckdb::is_background_worker = true; + + duckdb::unique_ptr connection; while (true) { // Initialize SPI (Server Programming Interface) and connect to the database @@ -78,12 +102,23 @@ pgduckdb_background_worker_main(Datum /* main_arg */) { PushActiveSnapshot(GetTransactionSnapshot()); if (pgduckdb::IsExtensionRegistered()) { + if (!connection) { + connection = pgduckdb::DuckDBManager::Get().CreateConnection(); + } + SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock); + int64 new_activity_count = pgduckdb::BgwShmemStruct->activity_count; + SpinLockRelease(&pgduckdb::BgwShmemStruct->lock); + if (last_activity_count != new_activity_count) { + last_activity_count = new_activity_count; + /* Trigger some activity to restart the syncing */ + pgduckdb::DuckDBQueryOrThrow(*connection, "FROM duckdb_tables() limit 0"); + } /* * If the extension is not registerid this loop is a no-op, which * means we essentially keep polling until the extension is * installed */ - pgduckdb::SyncMotherDuckCatalogsWithPg(false); + pgduckdb::SyncMotherDuckCatalogsWithPg(false, *connection->context); } // Commit the transaction @@ -108,11 +143,20 @@ force_motherduck_sync(PG_FUNCTION_ARGS) { Datum drop_with_cascade = PG_GETARG_BOOL(0); /* clear the cache of known catalog versions to force a full sync */ last_known_motherduck_catalog_versions.clear(); + + /* + * We don't use GetConnection, because we want to be able to precisely + * control the transaction lifecycle. We commit Postgres connections + * throughout this function, and the GetConnect its cached connection its + * lifecycle would be linked to those postgres transactions, which we + * don't want. + */ + auto connection = pgduckdb::DuckDBManager::Get().CreateConnection(); SPI_connect_ext(SPI_OPT_NONATOMIC); PG_TRY(); { pgduckdb::doing_motherduck_sync = true; - pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade); + pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade, *connection->context); } PG_FINALLY(); { @@ -124,8 +168,60 @@ force_motherduck_sync(PG_FUNCTION_ARGS) { } } +namespace pgduckdb { +#if PG_VERSION_NUM >= 150000 +static shmem_request_hook_type prev_shmem_request_hook = NULL; +#endif +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +/* + * shmem_request hook: request additional shared resources. We'll allocate or + * attach to the shared resources in pgss_shmem_startup(). + */ +static void +ShmemRequest(void) { +#if PG_VERSION_NUM >= 150000 + if (prev_shmem_request_hook) + prev_shmem_request_hook(); +#endif + + RequestAddinShmemSpace(sizeof(BackgoundWorkerShmemStruct)); +} + +/* + * CheckpointerShmemInit + * Allocate and initialize checkpointer-related shared memory + */ +static void +ShmemStartup(void) { + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + Size size = sizeof(BackgoundWorkerShmemStruct); + bool found; + + /* + * Create or attach to the shared memory state, including hash table + */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + BgwShmemStruct = (BackgoundWorkerShmemStruct *)ShmemInitStruct("DuckdbBackgroundWorker Data", size, &found); + + if (!found) { + /* + * First time through, so initialize. Note that we zero the whole + * requests array; this is so that CompactCheckpointerRequestQueue can + * assume that any pad bytes in the request structs are zeroes. + */ + MemSet(BgwShmemStruct, 0, size); + SpinLockInit(&BgwShmemStruct->lock); + } + + LWLockRelease(AddinShmemInitLock); +} + void -DuckdbInitBackgroundWorker(void) { +InitBackgroundWorker(void) { if (!pgduckdb::IsMotherDuckEnabledAnywhere()) { return; } @@ -143,9 +239,31 @@ DuckdbInitBackgroundWorker(void) { // Register the worker RegisterBackgroundWorker(&worker); + + /* Set up the shared memory hooks */ +#if PG_VERSION_NUM >= 150000 + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = ShmemRequest; +#else + ShmemRequest(); +#endif + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = ShmemStartup; +} + +void +TriggerActivity(void) { + if (!IsMotherDuckEnabled()) { + return; + } + + SpinLockAcquire(&BgwShmemStruct->lock); + BgwShmemStruct->activity_count++; + /* Force wake up the background worker */ + SetLatch(BgwShmemStruct->bgw_latch); + SpinLockRelease(&BgwShmemStruct->lock); } -namespace pgduckdb { /* Global variables that are used to communicate with our event triggers so * they handle DDL of syncing differently than user-initiated DDL */ bool doing_motherduck_sync; @@ -546,30 +664,25 @@ CreateSchemaIfNotExists(const char *postgres_schema_name, bool is_default_db) { return true; } -void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade); - -void -SyncMotherDuckCatalogsWithPg(bool drop_with_cascade) { - InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade); +static void +SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context) { + /* + * TODO: Passing a reference through InvokeCPPFunc doesn't work here + * for some reason. So to work around that we use a pointer instead. + * We should fix the underlying problem instead. + */ + InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade, &context); } -void -SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) { +static void +SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *_context) { if (!pgduckdb::IsMotherDuckEnabled()) { throw std::runtime_error("MotherDuck support is not enabled"); } - initial_cache_version = pgduckdb::CacheVersion(); + auto &context = *_context; - /* - * We don't use GetConnection, because we want to be able to precisely - * control the transaction lifecycle. We commit Postgres connections - * throughout this function, and the GetConnect its cached connection its - * lifecycle would be linked to those postgres transactions, which we - * don't want. - */ - auto connection = pgduckdb::DuckDBManager::Get().CreateConnection(); - auto &context = *connection->context; + initial_cache_version = pgduckdb::CacheVersion(); auto &db_manager = duckdb::DatabaseManager::Get(context); const auto &default_db = db_manager.GetDefaultDatabase(context); diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index ac417fc3..bf32e3b4 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -6,6 +6,7 @@ #include "duckdb/main/extension_util.hpp" #include "duckdb/parser/parsed_data/create_table_function_info.hpp" +#include "pgduckdb/pgduckdb_background_worker.hpp" #include "pgduckdb/catalog/pgduckdb_storage.hpp" #include "pgduckdb/scan/postgres_scan.hpp" #include "pgduckdb/pg/transactions.hpp" @@ -190,15 +191,11 @@ DuckDBManager::Initialize() { pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE 'pgduckdb' (TYPE pgduckdb)"); pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE ':memory:' AS pg_temp;"); - if (pgduckdb::IsMotherDuckEnabled()) { - /* - * Workaround for MotherDuck catalog sync that turns off automatically, - * in case of no queries being sent to MotherDuck. Since the background - * worker never sends any query to MotherDuck we need to turn this off. - * So we set the timeout to an arbitrary very large value. - */ - pgduckdb::DuckDBQueryOrThrow(context, - "SET motherduck_background_catalog_refresh_inactivity_timeout='99 years'"); + if (pgduckdb::IsMotherDuckEnabled() && + strlen(duckdb_motherduck_background_catalog_refresh_inactivity_timeout) > 0) { + pgduckdb::DuckDBQueryOrThrow(context, "SET motherduck_background_catalog_refresh_inactivity_timeout=" + + duckdb::KeywordHelper::WriteQuoted( + duckdb_motherduck_background_catalog_refresh_inactivity_timeout)); } LoadFunctions(context); diff --git a/src/pgduckdb_hooks.cpp b/src/pgduckdb_hooks.cpp index e9e0d1cb..c5583668 100644 --- a/src/pgduckdb_hooks.cpp +++ b/src/pgduckdb_hooks.cpp @@ -27,6 +27,7 @@ extern "C" { #include "pgduckdb/pgduckdb_metadata_cache.hpp" #include "pgduckdb/pgduckdb_ddl.hpp" #include "pgduckdb/pgduckdb_table_am.hpp" +#include "pgduckdb/pgduckdb_background_worker.hpp" #include "pgduckdb/utility/copy.hpp" #include "pgduckdb/vendor/pg_explain.hpp" #include "pgduckdb/vendor/pg_list.hpp" @@ -189,10 +190,12 @@ static PlannedStmt * DuckdbPlannerHook_Cpp(Query *parse, const char *query_string, int cursor_options, ParamListInfo bound_params) { if (pgduckdb::IsExtensionRegistered()) { if (NeedsDuckdbExecution(parse)) { + pgduckdb::TriggerActivity(); IsAllowedStatement(parse, true); return DuckdbPlanNode(parse, query_string, cursor_options, bound_params, true); } else if (duckdb_force_execution && IsAllowedStatement(parse) && ContainsFromClause(parse)) { + pgduckdb::TriggerActivity(); PlannedStmt *duckdbPlan = DuckdbPlanNode(parse, query_string, cursor_options, bound_params, false); if (duckdbPlan) { return duckdbPlan; @@ -353,6 +356,18 @@ DuckdbExplainOneQueryHook(Query *query, int cursorOptions, IntoClause *into, Exp prev_explain_one_query_hook(query, cursorOptions, into, es, queryString, params, queryEnv); } +static bool +IsOutdatedMotherduckCatalogErrcode(int error_code) { + switch (error_code) { + case ERRCODE_UNDEFINED_COLUMN: + case ERRCODE_UNDEFINED_SCHEMA: + case ERRCODE_UNDEFINED_TABLE: + return true; + default: + return false; + } +} + static void DuckdbEmitLogHook(ErrorData *edata) { if (prev_emit_log_hook) { @@ -385,6 +400,22 @@ DuckdbEmitLogHook(ErrorData *edata) { "If you use DuckDB functions like read_parquet, you need to use the r['colname'] syntax introduced " "in pg_duckdb 0.3.0. It seems like you might be using the outdated \"AS (colname coltype, ...)\" syntax"); } + + /* + * The background worker stops syncing the catalogs after the + * motherduck_background_catalog_refresh_inactivity_timeout has been + * reached. This means that the table metadata that Postgres knows about + * could be out of date, which could then easily result in errors about + * missing from the Postgres parser because it cannot understand the query. + * + * This mitigates the impact of that by triggering a restart of the catalog + * syncing when one of those errors occurs AND the current user can + * actually use DuckDB. + */ + if (IsOutdatedMotherduckCatalogErrcode(edata->sqlerrcode) && pgduckdb::IsExtensionRegistered() && + pgduckdb::IsDuckdbExecutionAllowed()) { + pgduckdb::TriggerActivity(); + } } void