From 7343cf1df1ac7ff79f991564df77b3c715c89c0b Mon Sep 17 00:00:00 2001 From: Yigit Ozkavci Date: Wed, 9 Oct 2024 12:06:15 +0100 Subject: [PATCH 1/7] Fix the UTC timezone issue with cursor resume --- .../active_record_batch_enumerator.rb | 2 +- test/test_helper.rb | 3 ++- test/unit/active_record_batch_enumerator_test.rb | 16 ++++++++++++---- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/lib/job-iteration/active_record_batch_enumerator.rb b/lib/job-iteration/active_record_batch_enumerator.rb index 3972693f..a6f1db08 100644 --- a/lib/job-iteration/active_record_batch_enumerator.rb +++ b/lib/job-iteration/active_record_batch_enumerator.rb @@ -114,7 +114,7 @@ def serialize_column_values!(column_values) end def column_value(value) - value.is_a?(Time) ? value.strftime(SQL_DATETIME_WITH_NSEC) : value + value.is_a?(Time) ? value.utc.strftime(SQL_DATETIME_WITH_NSEC) : value end end end diff --git a/test/test_helper.rb b/test/test_helper.rb index d9733382..ca53cc4c 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -18,6 +18,7 @@ GlobalID.app = "iteration" ActiveRecord::Base.include(GlobalID::Identification) # https://github.com/rails/globalid/blob/main/lib/global_id/railtie.rb +ActiveRecord.default_timezone = :local ActiveJob::Base.queue_adapter = :test @@ -45,7 +46,7 @@ class Order < ActiveRecord::Base database: "job_iteration_test", username: "root", host: mysql_host, - port: mysql_port, + port: mysql_port } connection_config[:password] = "root" if ENV["CI"] diff --git a/test/unit/active_record_batch_enumerator_test.rb b/test/unit/active_record_batch_enumerator_test.rb index 79da7793..f5f5484c 100644 --- a/test/unit/active_record_batch_enumerator_test.rb +++ b/test/unit/active_record_batch_enumerator_test.rb @@ -72,7 +72,7 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:updated_at]) products = Product.order(:updated_at).take(2) - expected_product_cursor = products.last.updated_at.strftime(SQL_TIME_FORMAT) + expected_product_cursor = products.last.updated_at.utc.strftime(SQL_TIME_FORMAT) assert_equal([products, expected_product_cursor], enum.first) end @@ -80,7 +80,7 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:updated_at, :id]) products = Product.order(:updated_at, :id).take(2) - expected_product_cursor = [products.last.updated_at.strftime(SQL_TIME_FORMAT), products.last.id] + expected_product_cursor = [products.last.updated_at.utc.strftime(SQL_TIME_FORMAT), products.last.id] assert_equal([products, expected_product_cursor], enum.first) end @@ -92,6 +92,14 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest assert_match(/\A\s?`products`.`updated_at`, `products`.`id`\z/, queries.first[/SELECT (.*) FROM/, 1]) end + test "columns use UTC during serialization if they are Time" do + enum = build_enumerator(columns: [:updated_at]) + products = Product.order(:updated_at).take(2) + + expected_product_cursor = products.last.updated_at.utc.strftime(SQL_TIME_FORMAT) + assert_equal([products, expected_product_cursor], enum.first) + end + test "cursor can be used to resume" do products = Product.order(:id).take(3) @@ -104,13 +112,13 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:created_at, :id]) products = Product.order(:created_at, :id).take(2) - cursor = [products.last.created_at.strftime(SQL_TIME_FORMAT), products.last.id] + cursor = [products.last.created_at.utc.strftime(SQL_TIME_FORMAT), products.last.id] assert_equal([products, cursor], enum.first) enum = build_enumerator(columns: [:created_at, :id], cursor: cursor) products = Product.order(:created_at, :id).offset(2).take(2) - cursor = [products.last.created_at.strftime(SQL_TIME_FORMAT), products.last.id] + cursor = [products.last.created_at.utc.strftime(SQL_TIME_FORMAT), products.last.id] assert_equal([products, cursor], enum.first) end From ccb8d29fcd07a55b459ff804223ca15fd8cc570d Mon Sep 17 00:00:00 2001 From: Yigit Ozkavci Date: Wed, 9 Oct 2024 12:13:30 +0100 Subject: [PATCH 2/7] Remove the no-longer-needed local default timezone --- test/test_helper.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_helper.rb b/test/test_helper.rb index ca53cc4c..f767c9d0 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -18,7 +18,6 @@ GlobalID.app = "iteration" ActiveRecord::Base.include(GlobalID::Identification) # https://github.com/rails/globalid/blob/main/lib/global_id/railtie.rb -ActiveRecord.default_timezone = :local ActiveJob::Base.queue_adapter = :test From 753fa6603d1e0ec9a496e31650526a216caf6009 Mon Sep 17 00:00:00 2001 From: Yigit Ozkavci Date: Wed, 9 Oct 2024 12:25:17 +0100 Subject: [PATCH 3/7] Fix timezone serialisation on ActiveRecordEnumerator too --- lib/job-iteration/active_record_enumerator.rb | 2 +- test/unit/active_record_enumerator_test.rb | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/job-iteration/active_record_enumerator.rb b/lib/job-iteration/active_record_enumerator.rb index 6633e708..6165df6e 100644 --- a/lib/job-iteration/active_record_enumerator.rb +++ b/lib/job-iteration/active_record_enumerator.rb @@ -61,7 +61,7 @@ def column_value(record, attribute) value = record.read_attribute(attribute.to_sym) case record.class.columns_hash.fetch(attribute).type when :datetime - value.strftime(SQL_DATETIME_WITH_NSEC) + value.utc.strftime(SQL_DATETIME_WITH_NSEC) else value end diff --git a/test/unit/active_record_enumerator_test.rb b/test/unit/active_record_enumerator_test.rb index 724bbaae..b727aa5c 100644 --- a/test/unit/active_record_enumerator_test.rb +++ b/test/unit/active_record_enumerator_test.rb @@ -61,14 +61,14 @@ class ActiveRecordEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:updated_at]).batches shops = Product.order(:updated_at).take(2) - assert_equal([shops, shops.last.updated_at.strftime(SQL_TIME_FORMAT)], enum.first) + assert_equal([shops, shops.last.updated_at.utc.strftime(SQL_TIME_FORMAT)], enum.first) end test "columns can be an array" do enum = build_enumerator(columns: [:updated_at, :id]).batches shops = Product.order(:updated_at, :id).take(2) - assert_equal([shops, [shops.last.updated_at.strftime(SQL_TIME_FORMAT), shops.last.id]], enum.first) + assert_equal([shops, [shops.last.updated_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id]], enum.first) end test "cursor can be used to resume" do @@ -83,13 +83,13 @@ class ActiveRecordEnumeratorTest < IterationUnitTest enum = build_enumerator(columns: [:created_at, :id]).batches shops = Product.order(:created_at, :id).take(2) - cursor = [shops.last.created_at.strftime(SQL_TIME_FORMAT), shops.last.id] + cursor = [shops.last.created_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id] assert_equal([shops, cursor], enum.first) enum = build_enumerator(columns: [:created_at, :id], cursor: cursor).batches shops = Product.order(:created_at, :id).offset(2).take(2) - cursor = [shops.last.created_at.strftime(SQL_TIME_FORMAT), shops.last.id] + cursor = [shops.last.created_at.utc.strftime(SQL_TIME_FORMAT), shops.last.id] assert_equal([shops, cursor], enum.first) end From e16df32e271e396e250c4f23d0d76cbcaf542b34 Mon Sep 17 00:00:00 2001 From: Yigit Ozkavci Date: Wed, 9 Oct 2024 12:33:22 +0100 Subject: [PATCH 4/7] Fix linter error --- test/test_helper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_helper.rb b/test/test_helper.rb index f767c9d0..d9733382 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -45,7 +45,7 @@ class Order < ActiveRecord::Base database: "job_iteration_test", username: "root", host: mysql_host, - port: mysql_port + port: mysql_port, } connection_config[:password] = "root" if ENV["CI"] From 9b7cea1f3fdf683217c64d1cffb9a032b79b9613 Mon Sep 17 00:00:00 2001 From: Yigit Ozkavci Date: Thu, 10 Oct 2024 09:50:10 +0100 Subject: [PATCH 5/7] Support custom timezones in ActiveRecord enumerators --- lib/job-iteration/active_record_batch_enumerator.rb | 7 +++++-- lib/job-iteration/active_record_enumerator.rb | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/job-iteration/active_record_batch_enumerator.rb b/lib/job-iteration/active_record_batch_enumerator.rb index a6f1db08..44110b57 100644 --- a/lib/job-iteration/active_record_batch_enumerator.rb +++ b/lib/job-iteration/active_record_batch_enumerator.rb @@ -8,8 +8,9 @@ class ActiveRecordBatchEnumerator SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N" - def initialize(relation, columns: nil, batch_size: 100, cursor: nil) + def initialize(relation, columns: nil, batch_size: 100, timezone: nil, cursor: nil) @batch_size = batch_size + @timezone = timezone @primary_key = "#{relation.table_name}.#{relation.primary_key}" @columns = Array(columns&.map(&:to_s) || @primary_key) @primary_key_index = @columns.index(@primary_key) || @columns.index(relation.primary_key) @@ -114,7 +115,9 @@ def serialize_column_values!(column_values) end def column_value(value) - value.is_a?(Time) ? value.utc.strftime(SQL_DATETIME_WITH_NSEC) : value + return value unless value.is_a?(Time) + value = value.in_time_zone(@timezone) unless @timezone.nil? + value.strftime(SQL_DATETIME_WITH_NSEC) end end end diff --git a/lib/job-iteration/active_record_enumerator.rb b/lib/job-iteration/active_record_enumerator.rb index 6165df6e..7c721ff5 100644 --- a/lib/job-iteration/active_record_enumerator.rb +++ b/lib/job-iteration/active_record_enumerator.rb @@ -7,9 +7,10 @@ module JobIteration class ActiveRecordEnumerator SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N" - def initialize(relation, columns: nil, batch_size: 100, cursor: nil) + def initialize(relation, columns: nil, batch_size: 100, timezone: nil, cursor: nil) @relation = relation @batch_size = batch_size + @timezone = timezone @columns = if columns Array(columns) else @@ -61,7 +62,8 @@ def column_value(record, attribute) value = record.read_attribute(attribute.to_sym) case record.class.columns_hash.fetch(attribute).type when :datetime - value.utc.strftime(SQL_DATETIME_WITH_NSEC) + value = value.in_time_zone(@timezone) unless @timezone.nil? + value.strftime(SQL_DATETIME_WITH_NSEC) else value end From 82ea0f75bccea2b677ae7bd2a0ff6eb8eb33e4e4 Mon Sep 17 00:00:00 2001 From: Yigit Ozkavci Date: Thu, 10 Oct 2024 10:12:56 +0100 Subject: [PATCH 6/7] Fix linter error --- lib/job-iteration/active_record_batch_enumerator.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/job-iteration/active_record_batch_enumerator.rb b/lib/job-iteration/active_record_batch_enumerator.rb index 44110b57..81d4585a 100644 --- a/lib/job-iteration/active_record_batch_enumerator.rb +++ b/lib/job-iteration/active_record_batch_enumerator.rb @@ -116,6 +116,7 @@ def serialize_column_values!(column_values) def column_value(value) return value unless value.is_a?(Time) + value = value.in_time_zone(@timezone) unless @timezone.nil? value.strftime(SQL_DATETIME_WITH_NSEC) end From 14f522a7bc6538683a65689630ea26d200eeb9f0 Mon Sep 17 00:00:00 2001 From: Yigit Ozkavci Date: Thu, 10 Oct 2024 10:55:19 +0100 Subject: [PATCH 7/7] Add the tests and update CHANGELOG.md --- CHANGELOG.md | 1 + test/unit/active_record_batch_enumerator_test.rb | 12 +++++++++++- test/unit/active_record_enumerator_test.rb | 12 +++++++++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 79e2fb89..45429d42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ### Main (unreleased) - Added CSV batching functionality to EnumeratorBuilder with `build_csv_enumerator_on_batches` method and `csv_on_batches` alias. +- Added support for custom timezones on ActiveRecordEnumerator and ActiveRecordBatchEnumerator. This allows for using cursors with datetime columns where ActiveRecord.default_timezone is set to :local and ActiveRecord is not using the same timezone as the database. ## v1.6.0 (Sep 24, 2024) diff --git a/test/unit/active_record_batch_enumerator_test.rb b/test/unit/active_record_batch_enumerator_test.rb index f5f5484c..d16e76a0 100644 --- a/test/unit/active_record_batch_enumerator_test.rb +++ b/test/unit/active_record_batch_enumerator_test.rb @@ -108,6 +108,15 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest assert_equal([products, products.last.id], enum.first) end + test "using custom timezone results in a cursor with the correct offset" do + custom_timezone = "Eastern Time (US & Canada)" + enum = build_enumerator(columns: [:created_at, :id], timezone: custom_timezone) + shops = Product.order(:created_at, :id).take(2) + + cursor = [shops.last.created_at.in_time_zone(custom_timezone).strftime(SQL_TIME_FORMAT), shops.last.id] + assert_equal([shops, cursor], enum.first) + end + test "cursor can be used to resume on multiple columns" do enum = build_enumerator(columns: [:created_at, :id]) products = Product.order(:created_at, :id).take(2) @@ -141,10 +150,11 @@ class ActiveRecordBatchEnumeratorTest < IterationUnitTest private - def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil) + def build_enumerator(relation: Product.all, batch_size: 2, timezone: nil, columns: nil, cursor: nil) JobIteration::ActiveRecordBatchEnumerator.new( relation, batch_size: batch_size, + timezone: timezone, columns: columns, cursor: cursor, ) diff --git a/test/unit/active_record_enumerator_test.rb b/test/unit/active_record_enumerator_test.rb index b727aa5c..724f8779 100644 --- a/test/unit/active_record_enumerator_test.rb +++ b/test/unit/active_record_enumerator_test.rb @@ -93,6 +93,15 @@ class ActiveRecordEnumeratorTest < IterationUnitTest assert_equal([shops, cursor], enum.first) end + test "using custom timezone results in a cursor with the correct offset" do + custom_timezone = "Eastern Time (US & Canada)" + enum = build_enumerator(columns: [:created_at, :id], timezone: custom_timezone).batches + shops = Product.order(:created_at, :id).take(2) + + cursor = [shops.last.created_at.in_time_zone(custom_timezone).strftime(SQL_TIME_FORMAT), shops.last.id] + assert_equal([shops, cursor], enum.first) + end + test "#size returns the number of items in the relation" do enum = build_enumerator(relation: Product.all) @@ -135,10 +144,11 @@ class ActiveRecordEnumeratorTest < IterationUnitTest private - def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil) + def build_enumerator(relation: Product.all, batch_size: 2, timezone: nil, columns: nil, cursor: nil) JobIteration::ActiveRecordEnumerator.new( relation, batch_size: batch_size, + timezone: timezone, columns: columns, cursor: cursor, )