diff --git a/CHANGELOG.md b/CHANGELOG.md index 79e2fb89..45429d42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ### Main (unreleased) - Added CSV batching functionality to EnumeratorBuilder with `build_csv_enumerator_on_batches` method and `csv_on_batches` alias. +- Added support for custom timezones on ActiveRecordEnumerator and ActiveRecordBatchEnumerator. This allows for using cursors with datetime columns where ActiveRecord.default_timezone is set to :local and ActiveRecord is not using the same timezone as the database. ## v1.6.0 (Sep 24, 2024) diff --git a/lib/job-iteration/active_record_batch_enumerator.rb b/lib/job-iteration/active_record_batch_enumerator.rb index 3972693f..81d4585a 100644 --- a/lib/job-iteration/active_record_batch_enumerator.rb +++ b/lib/job-iteration/active_record_batch_enumerator.rb @@ -8,8 +8,9 @@ class ActiveRecordBatchEnumerator SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N" - def initialize(relation, columns: nil, batch_size: 100, cursor: nil) + def initialize(relation, columns: nil, batch_size: 100, timezone: nil, cursor: nil) @batch_size = batch_size + @timezone = timezone @primary_key = "#{relation.table_name}.#{relation.primary_key}" @columns = Array(columns&.map(&:to_s) || @primary_key) @primary_key_index = @columns.index(@primary_key) || @columns.index(relation.primary_key) @@ -114,7 +115,10 @@ def serialize_column_values!(column_values) end def column_value(value) - value.is_a?(Time) ? value.strftime(SQL_DATETIME_WITH_NSEC) : value + return value unless value.is_a?(Time) + + value = value.in_time_zone(@timezone) unless @timezone.nil? + value.strftime(SQL_DATETIME_WITH_NSEC) end end end diff --git a/lib/job-iteration/active_record_enumerator.rb b/lib/job-iteration/active_record_enumerator.rb index 6633e708..7c721ff5 100644 --- a/lib/job-iteration/active_record_enumerator.rb +++ b/lib/job-iteration/active_record_enumerator.rb @@ -7,9 +7,10 @@ module JobIteration class ActiveRecordEnumerator SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N" - def initialize(relation, columns: nil, batch_size: 100, cursor: nil) + def initialize(relation, columns: nil, batch_size: 100, timezone: nil, cursor: nil) @relation = relation @batch_size = batch_size + @timezone = timezone @columns = if columns Array(columns) else @@ -61,6 +62,7 @@ def column_value(record, attribute) value = record.read_attribute(attribute.to_sym) case record.class.columns_hash.fetch(attribute).type when :datetime + value = value.in_time_zone(@timezone) unless @timezone.nil? value.strftime(SQL_DATETIME_WITH_NSEC) else value diff --git a/test/unit/active_record_batch_enumerator_test.rb b/test/unit/active_record_batch_enumerator_test.rb index 79da7793..d16e76a0 100644 --- a/test/unit/active_record_batch_enumerator_test.rb +++ b/test/unit/active_record_batch_enumerator_test.rb @@ -72,7 +72,7 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:updated_at]) products = Product.order(:updated_at).take(2) - expected_product_cursor = products.last.updated_at.strftime(SQL_TIME_FORMAT) + expected_product_cursor = products.last.updated_at.utc.strftime(SQL_TIME_FORMAT) assert_equal([products, expected_product_cursor], enum.first) end @@ -80,7 +80,7 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:updated_at, :id]) products = Product.order(:updated_at, :id).take(2) - expected_product_cursor = [products.last.updated_at.strftime(SQL_TIME_FORMAT), products.last.id] + expected_product_cursor = [products.last.updated_at.utc.strftime(SQL_TIME_FORMAT), products.last.id] assert_equal([products, expected_product_cursor], enum.first) end @@ -92,6 +92,14 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest assert_match(/\A\s?`products`.`updated_at`, `products`.`id`\z/, queries.first[/SELECT (.*) FROM/, 1]) end + test "columns use UTC during serialization if they are Time" do + enum = build_enumerator(columns: [:updated_at]) + products = Product.order(:updated_at).take(2) + + expected_product_cursor = products.last.updated_at.utc.strftime(SQL_TIME_FORMAT) + assert_equal([products, expected_product_cursor], enum.first) + end + test "cursor can be used to resume" do products = Product.order(:id).take(3) @@ -100,17 +108,26 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest assert_equal([products, products.last.id], enum.first) end + test "using custom timezone results in a cursor with the correct offset" do + custom_timezone = "Eastern Time (US & Canada)" + enum = build_enumerator(columns: [:created_at, :id], timezone: custom_timezone) + shops = Product.order(:created_at, :id).take(2) + + cursor = [shops.last.created_at.in_time_zone(custom_timezone).strftime(SQL_TIME_FORMAT), shops.last.id] + assert_equal([shops, cursor], enum.first) + end + test "cursor can be used to resume on multiple columns" do enum = build_enumerator(columns: [:created_at, :id]) products = Product.order(:created_at, :id).take(2) - cursor = [products.last.created_at.strftime(SQL_TIME_FORMAT), products.last.id] + cursor = [products.last.created_at.utc.strftime(SQL_TIME_FORMAT), products.last.id] assert_equal([products, cursor], enum.first) enum = build_enumerator(columns: [:created_at, :id], cursor: cursor) products = Product.order(:created_at, :id).offset(2).take(2) - cursor = [products.last.created_at.strftime(SQL_TIME_FORMAT), products.last.id] + cursor = [products.last.created_at.utc.strftime(SQL_TIME_FORMAT), products.last.id] assert_equal([products, cursor], enum.first) end @@ -133,10 +150,11 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest private - def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil) + def build_enumerator(relation: Product.all, batch_size: 2, timezone: nil, columns: nil, cursor: nil) JobIteration::ActiveRecordBatchEnumerator.new( relation, batch_size: batch_size, + timezone: timezone, columns: columns, cursor: cursor, ) diff --git a/test/unit/active_record_enumerator_test.rb b/test/unit/active_record_enumerator_test.rb index 724bbaae..724f8779 100644 --- a/test/unit/active_record_enumerator_test.rb +++ b/test/unit/active_record_enumerator_test.rb @@ -61,14 +61,14 @@ class ActiveRecordEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:updated_at]).batches shops = Product.order(:updated_at).take(2) - assert_equal([shops, shops.last.updated_at.strftime(SQL_TIME_FORMAT)], enum.first) + assert_equal([shops, shops.last.updated_at.utc.strftime(SQL_TIME_FORMAT)], enum.first) end test "columns can be an array" do enum = build_enumerator(columns: [:updated_at, :id]).batches shops = Product.order(:updated_at, :id).take(2) - assert_equal([shops, [shops.last.updated_at.strftime(SQL_TIME_FORMAT), shops.last.id]], enum.first) + assert_equal([shops, [shops.last.updated_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id]], enum.first) end test "cursor can be used to resume" do @@ -83,13 +83,22 @@ class ActiveRecordEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:created_at, :id]).batches shops = Product.order(:created_at, :id).take(2) - cursor = [shops.last.created_at.strftime(SQL_TIME_FORMAT), shops.last.id] + cursor = [shops.last.created_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id] assert_equal([shops, cursor], enum.first) enum = build_enumerator(columns: [:created_at, :id], cursor: cursor).batches shops = Product.order(:created_at, :id).offset(2).take(2) - cursor = [shops.last.created_at.strftime(SQL_TIME_FORMAT), shops.last.id] + cursor = [shops.last.created_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id] + assert_equal([shops, cursor], enum.first) + end + + test "using custom timezone results in a cursor with the correct offset" do + custom_timezone = "Eastern Time (US & Canada)" + enum = build_enumerator(columns: [:created_at, :id], timezone: custom_timezone).batches + shops = Product.order(:created_at, :id).take(2) + + cursor = [shops.last.created_at.in_time_zone(custom_timezone).strftime(SQL_TIME_FORMAT), shops.last.id] assert_equal([shops, cursor], enum.first) end @@ -135,10 +144,11 @@ class ActiveRecordEnumeratorTest < IterationUnitTest private - def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil) + def build_enumerator(relation: Product.all, batch_size: 2, timezone: nil, columns: nil, cursor: nil) JobIteration::ActiveRecordEnumerator.new( relation, batch_size: batch_size, + timezone: timezone, columns: columns, cursor: cursor, )