Skip to content

Commit 278b28b

Browse files
robcolegrepsedawk
andcommitted
Add support for Bulk Upserting records
🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀 It's alive! It's alive! Adds the ability to insert large numbers of records all at once. (don't worry, will rebase once I'm sane again) 🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀 Co-authored-by: Alex Piechowski <[email protected]>
1 parent 36e2f87 commit 278b28b

7 files changed

+174
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
class AddUniqueConstraintToUsers::V20220113043033 < Avram::Migrator::Migration::V1
2+
def migrate
3+
create_index :users, [:name, :nickname], unique: true
4+
end
5+
6+
def rollback
7+
drop_index :users, [:name, :nickname]
8+
end
9+
end

spec/avram/bulk_upsert_spec.cr

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
require "../spec_helper"
2+
3+
describe Avram::BulkUpsert do
4+
describe "bulk upserts" do
5+
# Ideally, compiler can catch this / this should be impossible..
6+
context "when collections mismatch" do
7+
end
8+
9+
context "when mixed new records and updated records" do
10+
# Insert spec example.
11+
it "inserts with a hash of String" do
12+
# params = {:first_name => "Paul", :last_name => "Smith"}
13+
# insert = Avram::Insert.new(table: :users, params: params)
14+
# insert.statement.should eq "insert into users(first_name, last_name) values($1, $2) returning *"
15+
# insert.args.should eq ["Paul", "Smith"]
16+
end
17+
end
18+
end
19+
end

spec/avram/operations/save_operation_spec.cr

+31
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ end
6767

6868
private class UpsertUserOperation < User::SaveOperation
6969
upsert_lookup_columns :name, :nickname
70+
upsert_unique_on :name, :nickname
7071
end
7172

7273
private class OverrideDefaults < ModelWithDefaultValues::SaveOperation
@@ -307,6 +308,36 @@ describe "Avram::SaveOperation" do
307308
end
308309
end
309310

311+
describe ".bulk_upsert" do
312+
context "when the records are persisted" do
313+
it "should upsert records" do
314+
user = UserFactory.create do |u|
315+
u.name("Test 1")
316+
u.nickname("Test Nickname 1")
317+
u.age(64)
318+
u.year_born(1942)
319+
u.joined_at(Time.utc)
320+
end
321+
322+
record_args = (1..2).to_a.map do |i|
323+
{
324+
name: "Test #{i}",
325+
nickname: "Test Nickname #{i}",
326+
year_born: nil,
327+
age: 42,
328+
joined_at: Time.utc,
329+
}
330+
end
331+
332+
records = UpsertUserOperation.bulk_upsert(record_args)
333+
records.map(&.year_born).flatten.uniq.should eq [nil]
334+
end
335+
end
336+
337+
context "when the records are persisted" do
338+
end
339+
end
340+
310341
describe "#errors" do
311342
it "includes errors for all operation attributes" do
312343
operation = SaveUser.new

spec/avram/view_spec.cr

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ describe "views" do
1212
end
1313

1414
it "works without a primary key" do
15-
UserFactory.new.nickname("Johnny").create
16-
UserFactory.new.nickname("Johnny").create
17-
UserFactory.new.nickname("Johnny").create
15+
UserFactory.new.name("P1").nickname("Johnny").create
16+
UserFactory.new.name("P2").nickname("Johnny").create
17+
UserFactory.new.name("P3").nickname("Johnny").create
1818
nickname_info = NicknameInfo::BaseQuery.first
1919

2020
nickname_info.nickname.should eq "Johnny"

src/avram/bulk_upsert.cr

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
class Avram::BulkUpsert(T)
2+
def initialize(@records : Array(T), @column_names : Array(Symbol))
3+
@records = set_timestamps(records)
4+
end
5+
6+
def statement
7+
<<-SQL
8+
INSERT INTO #{table}(#{fields})
9+
(select * from unnest(#{value_placeholders}))
10+
ON CONFLICT (#{conflicts}) DO UPDATE SET #{updates}
11+
RETURNING #{returning}
12+
SQL
13+
end
14+
15+
def args
16+
@records.map do |record|
17+
record.changed_attributes.map(&.value)
18+
end.transpose
19+
end
20+
21+
private def conflicts
22+
@column_names.join(", ")
23+
end
24+
25+
private def set_timestamps(collection)
26+
collection.map do |record|
27+
record.created_at.value ||= Time.utc if record.responds_to?(:created_at)
28+
record.updated_at.value = Time.utc if record.responds_to?(:updated_at)
29+
record
30+
end
31+
end
32+
33+
private def table
34+
@records.first.table_name
35+
end
36+
37+
private def updates
38+
update_keys = changed_attributes.flat_map(&.name)
39+
(update_keys - [:created_at]).map do |column|
40+
"#{column}=EXCLUDED.#{column}"
41+
end.join(", ")
42+
end
43+
44+
private def returning
45+
T.column_names.join(", ")
46+
end
47+
48+
private def changed_attributes
49+
@records.first.changed_attributes
50+
end
51+
52+
private def fields
53+
changed_attributes.map do |key|
54+
<<-TEXT
55+
"#{key.name.to_s}"
56+
TEXT
57+
end.join(", ")
58+
end
59+
60+
private def column_types
61+
T.database_table_info.not_nil!.columns.map do |column_info|
62+
[
63+
column_info.column_name,
64+
column_info.data_type,
65+
]
66+
end.to_h
67+
end
68+
69+
private def cast(column)
70+
"#{column_types[column.name.to_s]}[]"
71+
end
72+
73+
private def cast(column : Avram::Attribute(Time))
74+
"timestamptz[]"
75+
end
76+
77+
private def value_placeholders
78+
changed_attributes.map_with_index(1) do |k, index|
79+
"$#{index}::#{cast(k)}"
80+
end.join(", ")
81+
end
82+
end

src/avram/save_operation.cr

+12
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,10 @@ abstract class Avram::SaveOperation(T)
367367
{{ T.constant(:PRIMARY_KEY_NAME).id }}.value.nil?
368368
end
369369

370+
def changed_attributes
371+
column_attributes.select(&.changed?)
372+
end
373+
370374
private def insert_or_update
371375
if persisted?
372376
update record_id
@@ -379,6 +383,14 @@ abstract class Avram::SaveOperation(T)
379383
@record.try &.id
380384
end
381385

386+
def self.column_names
387+
T.column_names
388+
end
389+
390+
def self.database_table_info
391+
T.database_table_info
392+
end
393+
382394
def before_save; end
383395

384396
def after_save(_record : T); end

src/avram/upsert.cr

+18-1
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,26 @@ module Avram::Upsert
9090
end
9191
end
9292

93+
macro upsert_unique_on(*attribute_names)
94+
def self.bulk_upsert(upserts)
95+
operations = upserts.map do |upsert_args|
96+
new(**upsert_args)
97+
end
98+
99+
upsert = Avram::BulkUpsert(self).new(
100+
operations,
101+
{{ attribute_names }}.to_a
102+
)
103+
104+
new.database.query upsert.statement, args: upsert.args do |rs|
105+
T.from_rs(rs)
106+
end
107+
end
108+
end
109+
93110
# :nodoc:
94111
macro included
95-
{% for method in ["upsert", "upsert!"] %}
112+
{% for method in ["upsert", "upsert!", "bulk_upsert"] %}
96113
# Performs a create or update depending on if there is a conflicting row in the database.
97114
#
98115
# See `Avram::Upsert.upsert_lookup_columns` for full documentation and examples.

0 commit comments

Comments
 (0)