Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ abstract class Adapter

protected bool $alterLocks = false;

protected bool $skipDuplicates = false;

/**
* @var array<string, mixed>
*/
Expand Down Expand Up @@ -392,6 +394,27 @@ public function inTransaction(): bool
return $this->inTransaction > 0;
}

/**
* Run a callback with skipDuplicates enabled.
* Duplicate key errors during createDocuments() will be silently skipped
* instead of thrown. Nestable — saves and restores previous state.
*
* @template T
* @param callable(): T $callback
* @return T
*/
public function skipDuplicates(callable $callback): mixed
{
$previous = $this->skipDuplicates;
$this->skipDuplicates = true;

try {
return $callback();
} finally {
$this->skipDuplicates = $previous;
}
}

/**
* @template T
* @param callable(): T $callback
Expand Down
41 changes: 41 additions & 0 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ public function withTransaction(callable $callback): mixed
return $callback();
}

// upsert + $setOnInsert hits WriteConflict (E112) under txn snapshot isolation.
if ($this->skipDuplicates) {
return $callback();
}

try {
$this->startTransaction();
$result = $callback();
Expand Down Expand Up @@ -1492,6 +1497,42 @@ public function createDocuments(Document $collection, array $documents): array
$records[] = $record;
}

// insertMany aborts the txn on any duplicate; upsert + $setOnInsert no-ops instead.
if ($this->skipDuplicates) {
if (empty($records)) {
return [];
}

$operations = [];
foreach ($records as $record) {
$filter = ['_uid' => $record['_uid'] ?? ''];
if ($this->sharedTables) {
$filter['_tenant'] = $record['_tenant'] ?? $this->getTenant();
}

// Filter fields can't reappear in $setOnInsert (mongo path-conflict error).
$setOnInsert = $record;
unset($setOnInsert['_uid'], $setOnInsert['_tenant']);

if (empty($setOnInsert)) {
continue;
}

$operations[] = [
'filter' => $filter,
'update' => ['$setOnInsert' => $setOnInsert],
];
}

try {
$this->client->upsert($name, $operations, $options);
} catch (MongoException $e) {
throw $this->processException($e);
}

return $documents;
}

try {
$documents = $this->client->insertMany($name, $records, $options);
} catch (MongoException $e) {
Expand Down
15 changes: 15 additions & 0 deletions src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public function __construct(UtopiaPool $pool)
public function delegate(string $method, array $args): mixed
{
if ($this->pinnedAdapter !== null) {
if ($this->skipDuplicates) {
return $this->pinnedAdapter->skipDuplicates(
fn () => $this->pinnedAdapter->{$method}(...$args)
);
}
return $this->pinnedAdapter->{$method}(...$args);
}

Expand All @@ -66,6 +71,11 @@ public function delegate(string $method, array $args): mixed
$adapter->setMetadata($key, $value);
}

if ($this->skipDuplicates) {
return $adapter->skipDuplicates(
fn () => $adapter->{$method}(...$args)
);
}
return $adapter->{$method}(...$args);
});
}
Expand Down Expand Up @@ -146,6 +156,11 @@ public function withTransaction(callable $callback): mixed

$this->pinnedAdapter = $adapter;
try {
if ($this->skipDuplicates) {
return $adapter->skipDuplicates(
fn () => $adapter->withTransaction($callback)
);
}
return $adapter->withTransaction($callback);
} finally {
$this->pinnedAdapter = null;
Expand Down
29 changes: 29 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,35 @@ public function getSupportForOptionalSpatialAttributeWithExistingRows(): bool
return false;
}

protected function getInsertKeyword(): string
{
return 'INSERT INTO';
}

protected function getInsertSuffix(string $table): string
{
if (!$this->skipDuplicates) {
return '';
}

$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

protected function getInsertPermissionsSuffix(): string
{
if (!$this->skipDuplicates) {
return '';
}

$conflictTarget = $this->sharedTables
? '("_type", "_permission", "_document", "_tenant")'
: '("_type", "_permission", "_document")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

public function decodePoint(string $wkb): array
{
if (str_starts_with(strtoupper($wkb), 'POINT(')) {
Expand Down
36 changes: 33 additions & 3 deletions src/Database/Adapter/SQL.php
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,33 @@ public function getSupportForHostname(): bool
return true;
}

/**
* Returns the INSERT keyword, optionally with IGNORE for duplicate handling.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertKeyword(): string
{
return $this->skipDuplicates ? 'INSERT IGNORE INTO' : 'INSERT INTO';
}

/**
* Returns a suffix appended after VALUES clause for duplicate handling.
* Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING).
*/
protected function getInsertSuffix(string $table): string
{
return '';
}

/**
* Returns a suffix for the permissions INSERT statement when ignoring duplicates.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertPermissionsSuffix(): string
{
return '';
}

/**
* Get current attribute count from collection document
*
Expand Down Expand Up @@ -2476,6 +2503,7 @@ public function createDocuments(Document $collection, array $documents): array
if (empty($documents)) {
return $documents;
}

$spatialAttributes = $this->getSpatialAttributes($collection);
$collection = $collection->getId();
try {
Expand Down Expand Up @@ -2573,8 +2601,9 @@ public function createDocuments(Document $collection, array $documents): array
$batchKeys = \implode(', ', $batchKeys);

$stmt = $this->getPDO()->prepare("
INSERT INTO {$this->getSQLTable($name)} {$columns}
{$this->getInsertKeyword()} {$this->getSQLTable($name)} {$columns}
VALUES {$batchKeys}
{$this->getInsertSuffix($name)}
");

foreach ($bindValues as $key => $value) {
Expand All @@ -2588,8 +2617,9 @@ public function createDocuments(Document $collection, array $documents): array
$permissions = \implode(', ', $permissions);

$sqlPermissions = "
INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions};
{$this->getInsertKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions}
{$this->getInsertPermissionsSuffix()}
";

$stmtPermissions = $this->getPDO()->prepare($sqlPermissions);
Expand Down
5 changes: 5 additions & 0 deletions src/Database/Adapter/SQLite.php
Original file line number Diff line number Diff line change
Expand Up @@ -1936,4 +1936,9 @@ public function getSupportForTTLIndexes(): bool
{
return false;
}

protected function getInsertKeyword(): string
{
return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO';
}
Comment on lines +1940 to +1943
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Does SQLite INSERT OR IGNOREsuppress NOT NULL and CHECK violations in addition to UNIQUE/PRIMARY KEY conflicts, and since which SQLite version isINSERT ... ON CONFLICT(column-list) DO NOTHING supported?

💡 Result:

Yes, SQLite INSERT OR IGNORE suppresses NOT NULL violations (skips the row silently) in addition to UNIQUE/PRIMARY KEY conflicts, but it does NOT suppress CHECK violations (those cause ABORT like default). INSERT ... ON CONFLICT(column-list) DO NOTHING (the UPSERT syntax specifying columns) has been supported since SQLite version 3.24.0 (2018-06-04).

Citations:


INSERT OR IGNORE suppresses more than just duplicate constraints—fix to target only _uid conflicts.

SQLite's INSERT OR IGNORE silently suppresses NOT NULL and UNIQUE/PRIMARY KEY violations alike. This means rows failing unrelated constraints (e.g., NOT NULL on a required field) are dropped without error, violating the design intent that only duplicate _uid conflicts should be silently skipped.

Use INSERT ... ON CONFLICT(_uid) DO NOTHING instead to suppress only the intended unique keys. This syntax is supported in SQLite 3.24.0+ and applies the constraint exception only to the specified columns.

Suggested direction
 protected function getInsertKeyword(): string
 {
-    return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO';
+    return 'INSERT INTO';
+}
+
+protected function getInsertSuffix(string $table): string
+{
+    if (!$this->skipDuplicates) {
+        return '';
+    }
+
+    $conflictTarget = $this->sharedTables ? '(`_tenant`, `_uid`)' : '(`_uid`)';
+
+    return "ON CONFLICT {$conflictTarget} DO NOTHING";
+}
+
+protected function getInsertPermissionsSuffix(): string
+{
+    if (!$this->skipDuplicates) {
+        return '';
+    }
+
+    $conflictTarget = $this->sharedTables
+        ? '(`_tenant`, `_document`, `_type`, `_permission`)'
+        : '(`_document`, `_type`, `_permission`)';
+
+    return "ON CONFLICT {$conflictTarget} DO NOTHING";
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Database/Adapter/SQLite.php` around lines 1940 - 1943, The current
getInsertKeyword() returns 'INSERT OR IGNORE INTO', which silences all
constraint errors; change the logic so when $this->skipDuplicates is true the
generated SQL uses SQLite's targeted conflict clause for the _uid column (e.g.,
use an INSERT ... ON CONFLICT(_uid) DO NOTHING form) instead of INSERT OR
IGNORE; update getInsertKeyword() (and any place that composes the INSERT
statement if it only expects a keyword) to emit the ON CONFLICT(_uid) DO NOTHING
variant so only _uid uniqueness conflicts are suppressed while other constraint
violations still raise errors.

}
90 changes: 76 additions & 14 deletions src/Database/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ class Database

protected bool $preserveDates = false;

protected bool $skipDuplicates = false;

protected bool $preserveSequence = false;

protected int $maxQueryValues = 5000;
Expand Down Expand Up @@ -842,6 +844,29 @@ public function skipRelationshipsExistCheck(callable $callback): mixed
}
}

public function skipDuplicates(callable $callback): mixed
{
$previous = $this->skipDuplicates;
$this->skipDuplicates = true;

try {
return $callback();
} finally {
$this->skipDuplicates = $previous;
}
}

/**
* Build a tenant-aware identity key for a document.
* Returns "<tenant>:<id>" in tenant-per-document shared-table mode, otherwise just the id.
*/
private function tenantKey(Document $document): string
{
return ($this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument())
? $document->getTenant() . ':' . $document->getId()
: $document->getId();
}

/**
* Trigger callback for events
*
Expand Down Expand Up @@ -5700,9 +5725,11 @@ public function createDocuments(
}

foreach (\array_chunk($documents, $batchSize) as $chunk) {
$batch = $this->withTransaction(function () use ($collection, $chunk) {
return $this->adapter->createDocuments($collection, $chunk);
});
$insert = fn () => $this->withTransaction(fn () => $this->adapter->createDocuments($collection, $chunk));
// Set adapter flag before withTransaction so Mongo can opt out of a real txn.
$batch = $this->skipDuplicates
? $this->adapter->skipDuplicates($insert)
: $insert();
Comment on lines +5728 to +5732
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

skipDuplicates() does not cover relationship side effects.

Line 5728 only guards the adapter insert. By then, createDocumentRelationships() has already run for every input document, so a duplicate parent can still create/update related rows—or hit a duplicate on an existing junction record—before the guarded insert is reached. That makes skipDuplicates(createDocuments(...)) non-idempotent for documents carrying relationship payloads.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Database/Database.php` around lines 5728 - 5732, The adapter's
skipDuplicates guard is applied only around createDocuments, but
createDocumentRelationships runs earlier and can mutate related rows before the
insert; change the flow so the skipDuplicates context (adapter->skipDuplicates
or equivalent flag) and the withTransaction are established before calling
createDocumentRelationships and createDocuments, i.e., wrap both
createDocumentRelationships(...) and the $insert closure inside the
adapter->skipDuplicates(...) (or set the adapter flag prior to relationship
handling) so relationship side effects are subject to the same dedupe guard and
transaction; update references around createDocumentRelationships, $insert,
withTransaction, adapter->skipDuplicates and adapter->createDocuments
accordingly.


$batch = $this->adapter->getSequences($collection->getId(), $batch);

Expand Down Expand Up @@ -7116,18 +7143,53 @@ public function upsertDocumentsWithIncrease(
$created = 0;
$updated = 0;
$seenIds = [];
foreach ($documents as $key => $document) {
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
$old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
))));
} else {
$old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),

// Batch-fetch existing documents in one query instead of N individual getDocument() calls.
// tenantPerDocument: group ids by tenant and run one find() per tenant under withTenant,
// so cross-tenant batches (e.g. StatsUsage worker) don't get silently scoped to the
// session tenant and miss rows belonging to other tenants.
$existingDocs = [];

if ($this->getSharedTables() && $this->getTenantPerDocument()) {
$idsByTenant = [];
foreach ($documents as $doc) {
if ($doc->getId() !== '') {
$idsByTenant[$doc->getTenant()][] = $doc->getId();
}
}
foreach ($idsByTenant as $tenant => $tenantIds) {
$tenantIds = \array_values(\array_unique($tenantIds));
$found = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(
fn () => $this->find($collection->getId(), [
Query::equal('$id', $tenantIds),
Query::limit(\count($tenantIds)),
])
)));
foreach ($found as $doc) {
$existingDocs[$tenant . ':' . $doc->getId()] = $doc;
}
}
} else {
$docIds = \array_values(\array_unique(\array_filter(
\array_map(fn (Document $doc) => $doc->getId(), $documents),
fn ($id) => $id !== ''
)));

if (!empty($docIds)) {
$existing = $this->authorization->skip(fn () => $this->silent(
fn () => $this->find($collection->getId(), [
Query::equal('$id', $docIds),
Query::limit(\count($docIds)),
])
));
foreach ($existing as $doc) {
$existingDocs[$this->tenantKey($doc)] = $doc;
}
}
Comment on lines +7153 to 7188
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Chunk these pre-read find() calls by maxQueryValues.

Both branches build a single Query::equal('$id', ...) over the full ID set. Once a tenant batch or the global batch exceeds $this->maxQueryValues, find() can fail validation before the upsert loop even starts, which regresses large upserts. Please split each ID list with array_chunk(..., max(1, $this->maxQueryValues)) and merge the results into $existingDocs. Based on learnings, createDocuments() and upsertDocumentsWithIncrease() should prefetch existing document IDs in array_chunk(..., max(1, $this->maxQueryValues)) batches to satisfy DocumentsValidator limits on Query::equal('$id', ...).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Database/Database.php` around lines 7153 - 7188, The pre-read find()
calls build Query::equal('$id', ...) over full ID lists which can exceed
DocumentsValidator limits; modify both branches (the tenant-batched branch that
builds $idsByTenant and the else branch that builds $docIds) to split each ID
array into chunks using array_chunk($ids, max(1, $this->maxQueryValues)) and
call find() for each chunk (preserving the same withTenant/authorization/silent
wrappers), collecting/merging all returned documents into $existingDocs (use the
same keys: "$tenant:$id" in the tenant branch and $this->tenantKey($doc) in the
else branch). Apply the same chunking approach to the related callers
createDocuments() and upsertDocumentsWithIncrease() when they prefetch existing
document IDs so no single Query::equal('$id', ...) exceeds
$this->maxQueryValues.

}

foreach ($documents as $key => $document) {
$old = $existingDocs[$this->tenantKey($document)] ?? new Document();

// Extract operators early to avoid comparison issues
$documentArray = $document->getArrayCopy();
Expand Down Expand Up @@ -7294,7 +7356,7 @@ public function upsertDocumentsWithIncrease(
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
}

$seenIds[] = $document->getId();
$seenIds[] = $this->tenantKey($document);
$old = $this->adapter->castingBefore($collection, $old);
$document = $this->adapter->castingBefore($collection, $document);

Expand Down
Loading
Loading