Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't sync MotherDuck metadata forever #582

Merged
merged 6 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions include/pgduckdb/pgduckdb_background_worker.hpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#pragma once

void DuckdbInitBackgroundWorker(void);

namespace pgduckdb {

void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade);
void InitBackgroundWorker(void);
void TriggerActivity(void);

extern bool is_background_worker;
extern bool doing_motherduck_sync;
extern char *current_duckdb_database_name;
extern char *current_motherduck_catalog_version;
Expand Down
1 change: 1 addition & 0 deletions include/pgduckdb/pgduckdb_guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ extern int duckdb_motherduck_enabled;
extern char *duckdb_motherduck_token;
extern char *duckdb_postgres_role;
extern char *duckdb_motherduck_default_database;
extern char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout;
8 changes: 7 additions & 1 deletion src/pgduckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ int duckdb_motherduck_enabled = MotherDuckEnabled::MOTHERDUCK_AUTO;
char *duckdb_motherduck_token = strdup("");
char *duckdb_motherduck_postgres_database = strdup("postgres");
char *duckdb_motherduck_default_database = strdup("");
char *duckdb_motherduck_background_catalog_refresh_inactivity_timeout = strdup("5 minutes");
char *duckdb_postgres_role = strdup("");

int duckdb_maximum_threads = -1;
Expand All @@ -44,7 +45,7 @@ _PG_init(void) {
DuckdbInitGUC();
DuckdbInitHooks();
DuckdbInitNode();
DuckdbInitBackgroundWorker();
pgduckdb::InitBackgroundWorker();
pgduckdb::RegisterDuckdbXactCallback();
}
} // extern "C"
Expand Down Expand Up @@ -186,4 +187,9 @@ DuckdbInitGUC(void) {
DefineCustomVariable("duckdb.motherduck_default_database",
"Which database in MotherDuck to designate as default (in place of my_db)",
&duckdb_motherduck_default_database, PGC_POSTMASTER, GUC_SUPERUSER_ONLY);

DefineCustomVariable("duckdb.motherduck_background_catalog_refresh_inactivity_timeout",
"When to stop syncing of the motherduck catalog when no activity has taken place",
&duckdb_motherduck_background_catalog_refresh_inactivity_timeout, PGC_POSTMASTER,
GUC_SUPERUSER_ONLY);
}
159 changes: 136 additions & 23 deletions src/pgduckdb_background_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,28 @@ extern "C" {
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/pgduckdb_metadata_cache.hpp"

static bool is_background_worker = false;
static std::unordered_map<std::string, std::string> last_known_motherduck_catalog_versions;
static uint64 initial_cache_version = 0;

namespace pgduckdb {

bool is_background_worker = false;

static void SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context);
static void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *context);

typedef struct BackgoundWorkerShmemStruct {
Latch *bgw_latch; /* The latch of the background worker */

slock_t lock; /* protects all the fields below */

int64 activity_count; /* the number of times activity was triggered by other backends */
} BackgoundWorkerShmemStruct;

static BackgoundWorkerShmemStruct *BgwShmemStruct;

} // namespace pgduckdb

extern "C" {
PGDLLEXPORT void
pgduckdb_background_worker_main(Datum /* main_arg */) {
Expand All @@ -66,9 +84,15 @@ pgduckdb_background_worker_main(Datum /* main_arg */) {
BackgroundWorkerUnblockSignals();

BackgroundWorkerInitializeConnection(duckdb_motherduck_postgres_database, NULL, 0);
SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock);
pgduckdb::BgwShmemStruct->bgw_latch = MyLatch;
int64 last_activity_count = pgduckdb::BgwShmemStruct->activity_count;
SpinLockRelease(&pgduckdb::BgwShmemStruct->lock);

pgduckdb::doing_motherduck_sync = true;
is_background_worker = true;
pgduckdb::is_background_worker = true;

duckdb::unique_ptr<duckdb::Connection> connection;

while (true) {
// Initialize SPI (Server Programming Interface) and connect to the database
Expand All @@ -78,12 +102,23 @@ pgduckdb_background_worker_main(Datum /* main_arg */) {
PushActiveSnapshot(GetTransactionSnapshot());

if (pgduckdb::IsExtensionRegistered()) {
if (!connection) {
connection = pgduckdb::DuckDBManager::Get().CreateConnection();
}
SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock);
int64 new_activity_count = pgduckdb::BgwShmemStruct->activity_count;
SpinLockRelease(&pgduckdb::BgwShmemStruct->lock);
if (last_activity_count != new_activity_count) {
last_activity_count = new_activity_count;
/* Trigger some activity to restart the syncing */
pgduckdb::DuckDBQueryOrThrow(*connection, "FROM duckdb_tables() limit 0");
}
/*
* If the extension is not registerid this loop is a no-op, which
* means we essentially keep polling until the extension is
* installed
*/
pgduckdb::SyncMotherDuckCatalogsWithPg(false);
pgduckdb::SyncMotherDuckCatalogsWithPg(false, *connection->context);
}

// Commit the transaction
Expand All @@ -108,11 +143,20 @@ force_motherduck_sync(PG_FUNCTION_ARGS) {
Datum drop_with_cascade = PG_GETARG_BOOL(0);
/* clear the cache of known catalog versions to force a full sync */
last_known_motherduck_catalog_versions.clear();

/*
* We don't use GetConnection, because we want to be able to precisely
* control the transaction lifecycle. We commit Postgres connections
* throughout this function, and the GetConnect its cached connection its
* lifecycle would be linked to those postgres transactions, which we
* don't want.
*/
auto connection = pgduckdb::DuckDBManager::Get().CreateConnection();
SPI_connect_ext(SPI_OPT_NONATOMIC);
PG_TRY();
{
pgduckdb::doing_motherduck_sync = true;
pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade);
pgduckdb::SyncMotherDuckCatalogsWithPg(drop_with_cascade, *connection->context);
}
PG_FINALLY();
{
Expand All @@ -124,8 +168,60 @@ force_motherduck_sync(PG_FUNCTION_ARGS) {
}
}

namespace pgduckdb {
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;

/*
* shmem_request hook: request additional shared resources. We'll allocate or
* attach to the shared resources in pgss_shmem_startup().
*/
static void
ShmemRequest(void) {
#if PG_VERSION_NUM >= 150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif

RequestAddinShmemSpace(sizeof(BackgoundWorkerShmemStruct));
}

/*
* CheckpointerShmemInit
* Allocate and initialize checkpointer-related shared memory
*/
static void
ShmemStartup(void) {
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();

Size size = sizeof(BackgoundWorkerShmemStruct);
bool found;

/*
* Create or attach to the shared memory state, including hash table
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);

BgwShmemStruct = (BackgoundWorkerShmemStruct *)ShmemInitStruct("DuckdbBackgroundWorker Data", size, &found);

if (!found) {
/*
* First time through, so initialize. Note that we zero the whole
* requests array; this is so that CompactCheckpointerRequestQueue can
* assume that any pad bytes in the request structs are zeroes.
*/
MemSet(BgwShmemStruct, 0, size);
SpinLockInit(&BgwShmemStruct->lock);
}

LWLockRelease(AddinShmemInitLock);
}

void
DuckdbInitBackgroundWorker(void) {
InitBackgroundWorker(void) {
if (!pgduckdb::IsMotherDuckEnabledAnywhere()) {
return;
}
Expand All @@ -143,9 +239,31 @@ DuckdbInitBackgroundWorker(void) {

// Register the worker
RegisterBackgroundWorker(&worker);

/* Set up the shared memory hooks */
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = ShmemRequest;
#else
ShmemRequest();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = ShmemStartup;
}

void
TriggerActivity(void) {
if (!IsMotherDuckEnabled()) {
return;
}

SpinLockAcquire(&BgwShmemStruct->lock);
BgwShmemStruct->activity_count++;
/* Force wake up the background worker */
SetLatch(BgwShmemStruct->bgw_latch);
SpinLockRelease(&BgwShmemStruct->lock);
}

namespace pgduckdb {
/* Global variables that are used to communicate with our event triggers so
* they handle DDL of syncing differently than user-initiated DDL */
bool doing_motherduck_sync;
Expand Down Expand Up @@ -546,30 +664,25 @@ CreateSchemaIfNotExists(const char *postgres_schema_name, bool is_default_db) {
return true;
}

void SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade);

void
SyncMotherDuckCatalogsWithPg(bool drop_with_cascade) {
InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade);
static void
SyncMotherDuckCatalogsWithPg(bool drop_with_cascade, duckdb::ClientContext &context) {
/*
* TODO: Passing a reference through InvokeCPPFunc doesn't work here
* for some reason. So to work around that we use a pointer instead.
* We should fix the underlying problem instead.
Comment on lines +670 to +672
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Y-- maybe something for your template magic to fix.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've seen something like that in the past. Never got a chance to investigate though... Let's open an issue and I'll try to look at it soon?

*/
InvokeCPPFunc(SyncMotherDuckCatalogsWithPg_Cpp, drop_with_cascade, &context);
}

void
SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) {
static void
SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *_context) {
if (!pgduckdb::IsMotherDuckEnabled()) {
throw std::runtime_error("MotherDuck support is not enabled");
}

initial_cache_version = pgduckdb::CacheVersion();
auto &context = *_context;

/*
* We don't use GetConnection, because we want to be able to precisely
* control the transaction lifecycle. We commit Postgres connections
* throughout this function, and the GetConnect its cached connection its
* lifecycle would be linked to those postgres transactions, which we
* don't want.
*/
auto connection = pgduckdb::DuckDBManager::Get().CreateConnection();
auto &context = *connection->context;
initial_cache_version = pgduckdb::CacheVersion();

auto &db_manager = duckdb::DatabaseManager::Get(context);
const auto &default_db = db_manager.GetDefaultDatabase(context);
Expand Down
12 changes: 4 additions & 8 deletions src/pgduckdb_duckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "duckdb/main/extension_util.hpp"
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"

#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/catalog/pgduckdb_storage.hpp"
#include "pgduckdb/scan/postgres_scan.hpp"
#include "pgduckdb/pg/transactions.hpp"
Expand Down Expand Up @@ -191,14 +192,9 @@ DuckDBManager::Initialize() {
pgduckdb::DuckDBQueryOrThrow(context, "ATTACH DATABASE ':memory:' AS pg_temp;");

if (pgduckdb::IsMotherDuckEnabled()) {
/*
* Workaround for MotherDuck catalog sync that turns off automatically,
* in case of no queries being sent to MotherDuck. Since the background
* worker never sends any query to MotherDuck we need to turn this off.
* So we set the timeout to an arbitrary very large value.
*/
pgduckdb::DuckDBQueryOrThrow(context,
"SET motherduck_background_catalog_refresh_inactivity_timeout='99 years'");
pgduckdb::DuckDBQueryOrThrow(context, "SET motherduck_background_catalog_refresh_inactivity_timeout=" +
duckdb::KeywordHelper::WriteQuoted(
duckdb_motherduck_background_catalog_refresh_inactivity_timeout));
}

LoadFunctions(context);
Expand Down
31 changes: 31 additions & 0 deletions src/pgduckdb_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" {
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
#include "pgduckdb/pgduckdb_ddl.hpp"
#include "pgduckdb/pgduckdb_table_am.hpp"
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/utility/copy.hpp"
#include "pgduckdb/vendor/pg_explain.hpp"
#include "pgduckdb/vendor/pg_list.hpp"
Expand Down Expand Up @@ -189,10 +190,12 @@ static PlannedStmt *
DuckdbPlannerHook_Cpp(Query *parse, const char *query_string, int cursor_options, ParamListInfo bound_params) {
if (pgduckdb::IsExtensionRegistered()) {
if (NeedsDuckdbExecution(parse)) {
pgduckdb::TriggerActivity();
IsAllowedStatement(parse, true);

return DuckdbPlanNode(parse, query_string, cursor_options, bound_params, true);
} else if (duckdb_force_execution && IsAllowedStatement(parse) && ContainsFromClause(parse)) {
pgduckdb::TriggerActivity();
PlannedStmt *duckdbPlan = DuckdbPlanNode(parse, query_string, cursor_options, bound_params, false);
if (duckdbPlan) {
return duckdbPlan;
Expand Down Expand Up @@ -353,6 +356,18 @@ DuckdbExplainOneQueryHook(Query *query, int cursorOptions, IntoClause *into, Exp
prev_explain_one_query_hook(query, cursorOptions, into, es, queryString, params, queryEnv);
}

static bool
IsOutdatedMotherduckCatalogErrcode(int error_code) {
switch (error_code) {
case ERRCODE_UNDEFINED_COLUMN:
case ERRCODE_UNDEFINED_SCHEMA:
case ERRCODE_UNDEFINED_TABLE:
return true;
default:
return false;
}
}

static void
DuckdbEmitLogHook(ErrorData *edata) {
if (prev_emit_log_hook) {
Expand Down Expand Up @@ -385,6 +400,22 @@ DuckdbEmitLogHook(ErrorData *edata) {
"If you use DuckDB functions like read_parquet, you need to use the r['colname'] syntax introduced "
"in pg_duckdb 0.3.0. It seems like you might be using the outdated \"AS (colname coltype, ...)\" syntax");
}

/*
* The background worker stops syncing the catalogs after the
* motherduck_background_catalog_refresh_inactivity_timeout has been
* reached. This means that the table metadata that Postgres knows about
* could be out of date, which could then easily result in errors about
* missing from the Postgres parser because it cannot understand the query.
*
* This mitigates the impact of that by triggering a restart of the catalog
* syncing when one of those errors occurs AND the current user can
* actually use DuckDB.
*/
if (IsOutdatedMotherduckCatalogErrcode(edata->sqlerrcode) && pgduckdb::IsExtensionRegistered() &&
pgduckdb::IsDuckdbExecutionAllowed()) {
pgduckdb::TriggerActivity();
}
}

void
Expand Down