Skip to content

Commit 979eee2

Browse files
committed
For the "transactions" table, insert rather than upsert
We generally always upsert, since that matches best the Datomic semantics, but transactions are never updated, so a simple INSERT will do, and it ensures that when importing a transaction twice that the process throws (the primary key constraint will be violated), which acts as a consistency guarantee (the caller can fetch the latest t value and retry.)
1 parent d1796e6 commit 979eee2

File tree

7 files changed

+88
-40
lines changed

7 files changed

+88
-40
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ resources/public/ui
1212
.store
1313
out
1414
.#*
15+
deps.local.edn

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
## Added
44

5-
## Fixed
5+
- Added a `find-max-t` helper function, for picking up work where it was left off
66

77
## Changed
88

9+
- Throw when trying to reprocess an earlier transaction
10+
911
# 0.2.37 (2022-09-14 / 5b770a2)
1012

1113
## Added
@@ -20,4 +22,4 @@
2022

2123
## Fixed
2224

23-
## Changed
25+
## Changed

deps.local.edn

Lines changed: 0 additions & 3 deletions
This file was deleted.

repl_sessions/find_prev_tx.clj

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,17 @@
2020
(d/log conn)
2121
1000
2222
9010))
23+
24+
;; Given we are currently processing a certain transaction (say t=1012), can we
25+
;; find the t value of the previous transaction? This could be useful to add an
26+
;; additional guarantee that transactions are processed exactly in order, by
27+
;; adding a postgresql trigger that validates that transactions form an unbroken
28+
;; chain.
29+
30+
(let [max-t 1012
31+
db (d/as-of (d/db conn) (dec max-t))]
32+
(d/q '[:find (max ?t) .
33+
:where
34+
[?i :db/txInstant]
35+
[(datomic.api/tx->t ?i) ?t]]
36+
db))

src/lambdaisland/plenish.clj

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -269,21 +269,24 @@
269269
[:delete
270270
{:table (table-name ctx mem-attr)
271271
:values {"db__id" eid}}]
272-
[:upsert
273-
{:table (table-name ctx mem-attr)
274-
:by #{"db__id"}
275-
:values (into (cond-> {"db__id" eid}
276-
;; Bit of manual fudgery to also get the "t"
277-
;; value of each transaction into
278-
;; our "transactions" table.
279-
(= :db/txInstant mem-attr)
280-
(assoc "t" (d/tx->t (-t (first datoms)))))
281-
(map (juxt #(column-name ctx mem-attr (ctx-ident ctx (-a %)))
282-
#(when (-added? %)
283-
(encode-value ctx
284-
(ctx-valueType ctx (-a %))
285-
(-v %)))))
286-
datoms)}])))))
272+
(let [table (table-name ctx mem-attr)]
273+
[(if (= "transactions" table)
274+
:insert
275+
:upsert)
276+
{:table table
277+
:by #{"db__id"}
278+
:values (into (cond-> {"db__id" eid}
279+
;; Bit of manual fudgery to also get the "t"
280+
;; value of each transaction into
281+
;; our "transactions" table.
282+
(= :db/txInstant mem-attr)
283+
(assoc "t" (d/tx->t (-t (first datoms)))))
284+
(map (juxt #(column-name ctx mem-attr (ctx-ident ctx (-a %)))
285+
#(when (-added? %)
286+
(encode-value ctx
287+
(ctx-valueType ctx (-a %))
288+
(-v %)))))
289+
datoms)}]))))))
287290

288291
(defn card-many-entity-ops
289292
"Add operations `:ops` to the context for all the `cardinality/many` datoms in a
@@ -435,6 +438,10 @@
435438
:if-not-exists]}))
436439
columns))
437440

441+
(defmethod op->sql :insert [[_ {:keys [table by values]}]]
442+
[{:insert-into [(keyword table)]
443+
:values [values]}])
444+
438445
(defmethod op->sql :upsert [[_ {:keys [table by values]}]]
439446
(let [op {:insert-into [(keyword table)]
440447
:values [values]
@@ -510,27 +517,27 @@
510517
configuration regarding tables and target db, and eventually `:ops` that need
511518
to be processed."
512519
([conn metaschema]
513-
(initial-ctx conn metaschema 999))
520+
(initial-ctx conn metaschema nil))
514521
([conn metaschema t]
515-
;; Bootstrap, make sure we have info about idents that datomic creates itself
516-
;; at db creation time. d/as-of t=999 is basically an empty database with only
517-
;; metaschema attributes (:db/txInstant etc), since the first "real"
518-
;; transaction is given t=1000. Interesting to note that Datomic seems to
519-
;; bootstrap in pieces: t=0 most basic idents, t=57 add double, t=63 add
520-
;; docstrings, ...
521-
(let [idents (pull-idents (d/as-of (d/db conn) t))]
522+
;; Bootstrap, make sure we have info about idents that datomic creates itself
523+
;; at db creation time. d/as-of t=999 is basically an empty database with only
524+
;; metaschema attributes (:db/txInstant etc), since the first "real"
525+
;; transaction is given t=1000. Interesting to note that Datomic seems to
526+
;; bootstrap in pieces: t=0 most basic idents, t=57 add double, t=63 add
527+
;; docstrings, ...
528+
(let [idents (pull-idents (d/as-of (d/db conn) (or t 999)))]
522529
{;; Track datomic schema
523530
:entids (into {} (map (juxt :db/ident :db/id)) idents)
524531
:idents (into {} (map (juxt :db/id identity)) idents)
525-
;; Configure/track relational schema
532+
;; Configure/track relational schema
526533
:tables (-> metaschema
527534
:tables
528535
(update :db/txInstant assoc :name "transactions")
529536
(update :db/ident assoc :name "idents"))
530-
;; Mapping from datomic to relational type
537+
;; Mapping from datomic to relational type
531538
:db-types pg-type
532-
;; Create two columns that don't have a attribute as such in datomic, but
533-
;; which we still want to track
539+
;; Create two columns that don't have a attribute as such in datomic, but
540+
;; which we still want to track
534541
:ops [[:ensure-columns
535542
{:table "idents"
536543
:columns {:db/id {:name "db__id"
@@ -565,3 +572,16 @@
565572
(jdbc/execute! jdbc-tx %)) queries))
566573
(recur (dissoc ctx :ops) txs))
567574
ctx)))
575+
576+
(defn find-max-t
577+
"Find the highest value in the transactions table in postgresql. The sync should
578+
continue from `(inc (find-max-t ds))`"
579+
[ds]
580+
(:max
581+
(first
582+
(try
583+
(jdbc/execute! ds ["SELECT max(t) FROM transactions"])
584+
(catch Exception e
585+
;; If the transactions table doesn't yet exist, return `nil`, so we start
586+
;; from the beginning of the log
587+
nil)))))

test/lambdaisland/plenish_test.clj

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@
2626
@(d/transact *conn* factories/schema)
2727
(f))))
2828

29-
(defn import! [metaschema]
30-
(let [ctx (plenish/initial-ctx *conn* metaschema)]
31-
(plenish/import-tx-range
32-
ctx *conn* *ds*
33-
(d/tx-range (d/log *conn*) nil nil))))
29+
(defn import!
30+
([metaschema]
31+
(import! metaschema nil))
32+
([metaschema t]
33+
(let [ctx (plenish/initial-ctx *conn* metaschema t)]
34+
(plenish/import-tx-range
35+
ctx *conn* *ds*
36+
(d/tx-range (d/log *conn*) t nil)))))
3437

3538
(defn transact! [tx]
3639
@(d/transact *conn* tx))
@@ -110,7 +113,7 @@
110113
:idents/ident "fruit/orange"}]
111114
(jdbc/execute! *ds* ["SELECT fruit.db__id, idents.ident FROM fruit, idents WHERE fruit.type = idents.db__id;"]))))))
112115

113-
(deftest update-cardinality-one-attribute
116+
(deftest update-cardinality-one-attribute--membership
114117
(testing "membership attribute"
115118
(transact! [{:db/ident :fruit/type
116119
:db/valueType :db.type/string
@@ -125,8 +128,9 @@
125128
(import! {:tables {:fruit/type {}}})
126129
(is (= [{:fruit/db__id apple-id
127130
:fruit/type "orange"}]
128-
(jdbc/execute! *ds* ["SELECT * FROM fruit"])))))
131+
(jdbc/execute! *ds* ["SELECT * FROM fruit"]))))))
129132

133+
(deftest update-cardinality-one-attribute--regular
130134
(testing "regular attribute"
131135
(transact! [{:db/ident :veggie/type
132136
:db/valueType :db.type/string
@@ -174,6 +178,15 @@
174178
:veggie_x_rating/rating 5}]
175179
(jdbc/execute! *ds* ["SELECT * FROM veggie_x_rating"]))))))
176180

181+
(deftest duplicate-import-throws
182+
(testing "Trying to import a transaction that was already processed should throw"
183+
(fd/create! *conn* factories/cart)
184+
(import! factories/metaschema)
185+
186+
(let [max-t (plenish/find-max-t *ds*)]
187+
(is (thrown? com.impossibl.postgres.jdbc.PGSQLIntegrityConstraintViolationException
188+
(import! factories/metaschema max-t))))))
189+
177190
(comment
178191
;; REPL alternative to fixture
179192
(recreate-replica!)

tests.edn

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
#kaocha/v1
2-
{:plugins [:notifier :print-invocations :profiling]}
2+
{:plugins [:notifier :print-invocations :profiling]
3+
:capture-output? false}

0 commit comments

Comments
 (0)