Skip to content

Commit

Permalink
Merge pull request #512 from yigitozkavci/fix-utc-timezone-in-cursor-…
Browse files Browse the repository at this point in the history
…resume

Introduce custom timezone configuration support for ActiveRecord enumerators
  • Loading branch information
Mangara authored Oct 11, 2024
2 parents 2b56a7a + 14f522a commit 46ec901
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### Main (unreleased)

- Added CSV batching functionality to EnumeratorBuilder with `build_csv_enumerator_on_batches` method and `csv_on_batches` alias.
- Added support for custom timezones on ActiveRecordEnumerator and ActiveRecordBatchEnumerator. This allows for using cursors with datetime columns where ActiveRecord.default_timezone is set to :local and ActiveRecord is not using the same timezone as the database.

## v1.6.0 (Sep 24, 2024)

Expand Down
8 changes: 6 additions & 2 deletions lib/job-iteration/active_record_batch_enumerator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ class ActiveRecordBatchEnumerator

SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N"

def initialize(relation, columns: nil, batch_size: 100, cursor: nil)
def initialize(relation, columns: nil, batch_size: 100, timezone: nil, cursor: nil)
@batch_size = batch_size
@timezone = timezone
@primary_key = "#{relation.table_name}.#{relation.primary_key}"
@columns = Array(columns&.map(&:to_s) || @primary_key)
@primary_key_index = @columns.index(@primary_key) || @columns.index(relation.primary_key)
Expand Down Expand Up @@ -114,7 +115,10 @@ def serialize_column_values!(column_values)
end

def column_value(value)
value.is_a?(Time) ? value.strftime(SQL_DATETIME_WITH_NSEC) : value
return value unless value.is_a?(Time)

value = value.in_time_zone(@timezone) unless @timezone.nil?
value.strftime(SQL_DATETIME_WITH_NSEC)
end
end
end
4 changes: 3 additions & 1 deletion lib/job-iteration/active_record_enumerator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ module JobIteration
class ActiveRecordEnumerator
SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N"

def initialize(relation, columns: nil, batch_size: 100, cursor: nil)
def initialize(relation, columns: nil, batch_size: 100, timezone: nil, cursor: nil)
@relation = relation
@batch_size = batch_size
@timezone = timezone
@columns = if columns
Array(columns)
else
Expand Down Expand Up @@ -61,6 +62,7 @@ def column_value(record, attribute)
value = record.read_attribute(attribute.to_sym)
case record.class.columns_hash.fetch(attribute).type
when :datetime
value = value.in_time_zone(@timezone) unless @timezone.nil?
value.strftime(SQL_DATETIME_WITH_NSEC)
else
value
Expand Down
28 changes: 23 additions & 5 deletions test/unit/active_record_batch_enumerator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest
enum = build_enumerator(columns: [:updated_at])
products = Product.order(:updated_at).take(2)

expected_product_cursor = products.last.updated_at.strftime(SQL_TIME_FORMAT)
expected_product_cursor = products.last.updated_at.utc.strftime(SQL_TIME_FORMAT)
assert_equal([products, expected_product_cursor], enum.first)
end

test "columns can be an array" do
enum = build_enumerator(columns: [:updated_at, :id])
products = Product.order(:updated_at, :id).take(2)

expected_product_cursor = [products.last.updated_at.strftime(SQL_TIME_FORMAT), products.last.id]
expected_product_cursor = [products.last.updated_at.utc.strftime(SQL_TIME_FORMAT), products.last.id]
assert_equal([products, expected_product_cursor], enum.first)
end

Expand All @@ -92,6 +92,14 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest
assert_match(/\A\s?`products`.`updated_at`, `products`.`id`\z/, queries.first[/SELECT (.*) FROM/, 1])
end

test "columns use UTC during serialization if they are Time" do
enum = build_enumerator(columns: [:updated_at])
products = Product.order(:updated_at).take(2)

expected_product_cursor = products.last.updated_at.utc.strftime(SQL_TIME_FORMAT)
assert_equal([products, expected_product_cursor], enum.first)
end

test "cursor can be used to resume" do
products = Product.order(:id).take(3)

Expand All @@ -100,17 +108,26 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest
assert_equal([products, products.last.id], enum.first)
end

test "using custom timezone results in a cursor with the correct offset" do
custom_timezone = "Eastern Time (US & Canada)"
enum = build_enumerator(columns: [:created_at, :id], timezone: custom_timezone)
shops = Product.order(:created_at, :id).take(2)

cursor = [shops.last.created_at.in_time_zone(custom_timezone).strftime(SQL_TIME_FORMAT), shops.last.id]
assert_equal([shops, cursor], enum.first)
end

test "cursor can be used to resume on multiple columns" do
enum = build_enumerator(columns: [:created_at, :id])
products = Product.order(:created_at, :id).take(2)

cursor = [products.last.created_at.strftime(SQL_TIME_FORMAT), products.last.id]
cursor = [products.last.created_at.utc.strftime(SQL_TIME_FORMAT), products.last.id]
assert_equal([products, cursor], enum.first)

enum = build_enumerator(columns: [:created_at, :id], cursor: cursor)
products = Product.order(:created_at, :id).offset(2).take(2)

cursor = [products.last.created_at.strftime(SQL_TIME_FORMAT), products.last.id]
cursor = [products.last.created_at.utc.strftime(SQL_TIME_FORMAT), products.last.id]
assert_equal([products, cursor], enum.first)
end

Expand All @@ -133,10 +150,11 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest

private

def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil)
def build_enumerator(relation: Product.all, batch_size: 2, timezone: nil, columns: nil, cursor: nil)
JobIteration::ActiveRecordBatchEnumerator.new(
relation,
batch_size: batch_size,
timezone: timezone,
columns: columns,
cursor: cursor,
)
Expand Down
20 changes: 15 additions & 5 deletions test/unit/active_record_enumerator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ class ActiveRecordEnumeratorTest < IterationUnitTest
enum = build_enumerator(columns: [:updated_at]).batches
shops = Product.order(:updated_at).take(2)

assert_equal([shops, shops.last.updated_at.strftime(SQL_TIME_FORMAT)], enum.first)
assert_equal([shops, shops.last.updated_at.utc.strftime(SQL_TIME_FORMAT)], enum.first)
end

test "columns can be an array" do
enum = build_enumerator(columns: [:updated_at, :id]).batches
shops = Product.order(:updated_at, :id).take(2)

assert_equal([shops, [shops.last.updated_at.strftime(SQL_TIME_FORMAT), shops.last.id]], enum.first)
assert_equal([shops, [shops.last.updated_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id]], enum.first)
end

test "cursor can be used to resume" do
Expand All @@ -83,13 +83,22 @@ class ActiveRecordEnumeratorTest < IterationUnitTest
enum = build_enumerator(columns: [:created_at, :id]).batches
shops = Product.order(:created_at, :id).take(2)

cursor = [shops.last.created_at.strftime(SQL_TIME_FORMAT), shops.last.id]
cursor = [shops.last.created_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id]
assert_equal([shops, cursor], enum.first)

enum = build_enumerator(columns: [:created_at, :id], cursor: cursor).batches
shops = Product.order(:created_at, :id).offset(2).take(2)

cursor = [shops.last.created_at.strftime(SQL_TIME_FORMAT), shops.last.id]
cursor = [shops.last.created_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id]
assert_equal([shops, cursor], enum.first)
end

test "using custom timezone results in a cursor with the correct offset" do
custom_timezone = "Eastern Time (US & Canada)"
enum = build_enumerator(columns: [:created_at, :id], timezone: custom_timezone).batches
shops = Product.order(:created_at, :id).take(2)

cursor = [shops.last.created_at.in_time_zone(custom_timezone).strftime(SQL_TIME_FORMAT), shops.last.id]
assert_equal([shops, cursor], enum.first)
end

Expand Down Expand Up @@ -135,10 +144,11 @@ class ActiveRecordEnumeratorTest < IterationUnitTest

private

def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil)
def build_enumerator(relation: Product.all, batch_size: 2, timezone: nil, columns: nil, cursor: nil)
JobIteration::ActiveRecordEnumerator.new(
relation,
batch_size: batch_size,
timezone: timezone,
columns: columns,
cursor: cursor,
)
Expand Down

0 comments on commit 46ec901

Please sign in to comment.