Skip to content

Commit 17b0f97

Browse files
robcolegrepsedawkrobacarp
committed
Add support for Bulk Upserting records
Adds an `upsert` overload for Arrays to create large numbers of records at the same time. Uses PG's UNNEST behavior to allow for a near-infinite (buyer beware) number of insertions rather than being limited by PG's bind parameter restrictions (64k total binds, which would prevent more than a few thousand upserts at a time depending on the number of column inserts). Co-authored-by: Alex Piechowski <[email protected]> Co-authored-by: robacarp <[email protected]>
1 parent 36e2f87 commit 17b0f97

File tree

7 files changed

+292
-3
lines changed

7 files changed

+292
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
class AddUniqueConstraintToUsers::V20220113043033 < Avram::Migrator::Migration::V1
2+
def migrate
3+
create_index :users, [:name, :nickname], unique: true
4+
end
5+
6+
def rollback
7+
drop_index :users, [:name, :nickname]
8+
end
9+
end

spec/avram/operations/save_operation_spec.cr

+148
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ private class ParamKeySaveOperation < ValueColumnModel::SaveOperation
6666
end
6767

6868
private class UpsertUserOperation < User::SaveOperation
69+
include QuerySpy
70+
71+
upsert_lookup_columns :name, :nickname
72+
upsert_unique_on :name, :nickname
73+
end
74+
75+
private class UpsertWithoutUniqueKeys < User::SaveOperation
6976
upsert_lookup_columns :name, :nickname
7077
end
7178

@@ -307,6 +314,147 @@ describe "Avram::SaveOperation" do
307314
end
308315
end
309316

317+
describe ".upsert" do
318+
it "should only proc one query" do
319+
UpsertUserOperation.times_called = 0
320+
some_time = Time.utc(2016, 2, 15, 10, 20, 30)
321+
322+
updates = [
323+
{
324+
name: "Name 1",
325+
nickname: "Nickname 1",
326+
age: 42,
327+
joined_at: some_time,
328+
created_at: some_time,
329+
updated_at: some_time,
330+
},
331+
{
332+
name: "Name 2",
333+
nickname: "Nickname 2",
334+
age: 42,
335+
joined_at: some_time,
336+
created_at: some_time,
337+
updated_at: some_time,
338+
},
339+
]
340+
341+
records = UpsertUserOperation.upsert(updates)
342+
UpsertUserOperation.times_called.should eq 1
343+
end
344+
345+
context "when a record already exists" do
346+
before_each do
347+
UserFactory.create do |u|
348+
u.name("Name 1")
349+
u.nickname("Nickname 1")
350+
u.age(42)
351+
u.year_born(1960)
352+
u.joined_at(Time.utc)
353+
end
354+
end
355+
356+
it "allows manual passing of updated_at, but ignores created_at" do
357+
some_time = Time.utc(2016, 2, 15, 10, 20, 30)
358+
359+
update = {
360+
name: "Name 1",
361+
nickname: "Nickname 1",
362+
age: 42,
363+
joined_at: some_time,
364+
created_at: some_time,
365+
updated_at: some_time,
366+
}
367+
368+
records = UpsertUserOperation.upsert([update])
369+
records.first.created_at.should_not eq some_time
370+
records.first.updated_at.should eq some_time
371+
end
372+
373+
it "should create one, and update the other record" do
374+
update = {
375+
name: "Name 1",
376+
nickname: "Nickname 1",
377+
year_born: nil,
378+
age: 42,
379+
joined_at: Time.utc,
380+
}
381+
382+
insert = {
383+
name: "Name 2",
384+
nickname: "Nickname 2",
385+
year_born: 1980_i16,
386+
age: 64,
387+
joined_at: Time.utc,
388+
}
389+
390+
records = UpsertUserOperation.upsert([update, insert])
391+
392+
records.first.id.should_not eq nil
393+
records.last.id.should_not eq nil
394+
records.first.year_born.should eq nil
395+
records.last.year_born.should eq 1980_i16
396+
end
397+
end
398+
399+
context "when no records exist" do
400+
it "allows manual passing of id" do
401+
insert = {
402+
id: 42_i64,
403+
name: "Name 1",
404+
nickname: "Nickname 1",
405+
age: 42,
406+
joined_at: Time.utc,
407+
}
408+
409+
records = UpsertUserOperation.upsert([insert])
410+
records.first.id.should eq 42_i64
411+
end
412+
413+
it "allows manual passing of updated_at and created_at" do
414+
some_time = Time.utc(2016, 2, 15, 10, 20, 30)
415+
416+
insert = {
417+
name: "Name 1",
418+
nickname: "Nickname 1",
419+
age: 42,
420+
joined_at: some_time,
421+
created_at: some_time,
422+
updated_at: some_time,
423+
}
424+
425+
records = UpsertUserOperation.upsert([insert])
426+
records.first.id.should_not eq nil
427+
records.first.created_at.should eq some_time
428+
records.first.updated_at.should eq some_time
429+
end
430+
end
431+
432+
context "when the tuple values are passed in different orders" do
433+
it "should upsert records" do
434+
record_args = [
435+
{
436+
name: "Name 1",
437+
nickname: "Nickname 1",
438+
year_born: nil,
439+
age: 42,
440+
joined_at: Time.utc,
441+
},
442+
{
443+
nickname: "Nickname 2",
444+
name: "Name 2",
445+
age: 42,
446+
joined_at: Time.utc,
447+
year_born: nil,
448+
},
449+
]
450+
451+
records = UpsertUserOperation.upsert(record_args)
452+
records.last.nickname.should eq "Nickname 2"
453+
records.last.name.should eq "Name 2"
454+
end
455+
end
456+
end
457+
310458
describe "#errors" do
311459
it "includes errors for all operation attributes" do
312460
operation = SaveUser.new

spec/avram/view_spec.cr

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ describe "views" do
1212
end
1313

1414
it "works without a primary key" do
15-
UserFactory.new.nickname("Johnny").create
16-
UserFactory.new.nickname("Johnny").create
17-
UserFactory.new.nickname("Johnny").create
15+
UserFactory.new.name("P1").nickname("Johnny").create
16+
UserFactory.new.name("P2").nickname("Johnny").create
17+
UserFactory.new.name("P3").nickname("Johnny").create
1818
nickname_info = NicknameInfo::BaseQuery.first
1919

2020
nickname_info.nickname.should eq "Johnny"

src/avram/bulk_upsert.cr

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
class Avram::BulkUpsert(T)
2+
@column_types : Hash(String, String)
3+
@permitted_fields : Array(Symbol)
4+
5+
def initialize(@records : Array(T),
6+
@conflicts : Array(Symbol),
7+
permitted_fields : Array(Symbol))
8+
set_timestamps
9+
@sample_record = @records.first.as(T)
10+
@permitted_fields = permitted_fields_for(permitted_fields)
11+
12+
@column_types = T.database_table_info.columns.map do |col_info|
13+
{
14+
col_info.column_name,
15+
col_info.data_type,
16+
}
17+
end.to_h
18+
end
19+
20+
def statement
21+
<<-SQL
22+
INSERT INTO #{table}(#{fields})
23+
(SELECT * FROM unnest(#{value_placeholders}))
24+
ON CONFLICT (#{conflicts}) DO UPDATE SET #{updates}
25+
RETURNING #{returning}
26+
SQL
27+
end
28+
29+
def args
30+
@records.map do |record|
31+
permitted_attributes(record).map(&.value)
32+
end.transpose
33+
end
34+
35+
private def permitted_fields_for(fields : Array(Symbol))
36+
fields.push(:created_at) if @sample_record.responds_to?(:created_at)
37+
fields.push(:updated_at) if @sample_record.responds_to?(:updated_at)
38+
fields.uniq!
39+
end
40+
41+
private def permitted_attributes(record)
42+
record
43+
.attributes
44+
.select { |attr| @permitted_fields.includes?(attr.name) }
45+
end
46+
47+
private def permitted_attributes
48+
permitted_attributes(@sample_record)
49+
end
50+
51+
private def conflicts
52+
@conflicts.join(", ")
53+
end
54+
55+
private def set_timestamps
56+
@records.each do |record|
57+
record.created_at.value ||= Time.utc if record.responds_to?(:created_at)
58+
record.updated_at.value ||= Time.utc if record.responds_to?(:updated_at)
59+
end
60+
end
61+
62+
private def table
63+
@sample_record.table_name
64+
end
65+
66+
private def updates
67+
(permitted_attribute_column_names - [:created_at]).map do |column|
68+
"#{column}=EXCLUDED.#{column}"
69+
end.compact.join(", ")
70+
end
71+
72+
private def returning
73+
T.column_names.join(", ")
74+
end
75+
76+
private def permitted_attribute_column_names
77+
permitted_attributes.map(&.name)
78+
end
79+
80+
private def fields
81+
permitted_attribute_column_names.map(&.to_s).join(", ")
82+
end
83+
84+
private def value_placeholders
85+
permitted_attributes.map_with_index(1) do |column, index|
86+
"$#{index}::#{@column_types[column.name.to_s]}[]"
87+
end.join(", ")
88+
end
89+
end

src/avram/save_operation.cr

+8
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,14 @@ abstract class Avram::SaveOperation(T)
379379
@record.try &.id
380380
end
381381

382+
def self.column_names
383+
T.column_names
384+
end
385+
386+
def self.database_table_info
387+
T.database_table_info.not_nil!
388+
end
389+
382390
def before_save; end
383391

384392
def after_save(_record : T); end

src/avram/upsert.cr

+27
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,29 @@ module Avram::Upsert
9090
end
9191
end
9292

93+
macro upsert_unique_on(*attribute_names)
94+
def self.upsert(upserts : Array(X)) forall X
95+
\{%
96+
if X > NamedTuple
97+
raise("All array elements for #{@type}.upsert must be NamedTuples. You provided: #{X}")
98+
elsif X.union?
99+
keys = X.union_types.map(&.keys).join(", ")
100+
raise("All tuples for #{@type}.upsert must have the same keys. Given: " + keys)
101+
end
102+
%}
103+
104+
upsert = Avram::BulkUpsert(self).new(
105+
records: upserts.map { |upsert_args| new(**upsert_args) },
106+
conflicts: {{ attribute_names }}.to_a,
107+
permitted_fields: upserts.first.keys.to_a
108+
)
109+
110+
new.database.query upsert.statement, args: upsert.args do |rs|
111+
T.from_rs(rs)
112+
end
113+
end
114+
end
115+
93116
# :nodoc:
94117
macro included
95118
{% for method in ["upsert", "upsert!"] %}
@@ -100,5 +123,9 @@ module Avram::Upsert
100123
\{% raise "Please use the 'upsert_lookup_columns' macro in #{@type} before using '{{ method.id }}'" %}
101124
end
102125
{% end %}
126+
127+
def self.upsert(_upserts : Array)
128+
\{% raise "Please use the 'upsert_unique_on' macro in #{@type} before using '.upsert'" %}
129+
end
103130
end
104131
end

src/ext/db/param.cr

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Can be removed once https://github.com/will/crystal-pg/pull/244 is merged.
2+
module PQ
3+
record Param, slice : Slice(UInt8), size : Int32, format : Int16 do
4+
def self.encode_array(io, value : Nil)
5+
io << "NULL"
6+
end
7+
end
8+
end

0 commit comments

Comments
 (0)