diff --git a/lib/kafkaesque/query.ex b/lib/kafkaesque/query.ex index 04c2641..b964079 100644 --- a/lib/kafkaesque/query.ex +++ b/lib/kafkaesque/query.ex @@ -74,9 +74,16 @@ defmodule Kafkaesque.Query do end defp update_batch(repo, ids, new_state, query_opts) do + updates = + if new_state == "published" do + [set: [state: new_state, published_at: DateTime.utc_now()]] + else + [set: [state: new_state]] + end + from(Message) |> where([m], m.id in ^ids) - |> repo.update_all([set: [state: new_state]], query_opts) + |> repo.update_all(updates, query_opts) end @spec rescue_publishing(Ecto.Repo.t(), time_limit_ms :: pos_integer(), Keyword.t()) :: diff --git a/test/kafkaesque/query_test.exs b/test/kafkaesque/query_test.exs index 86f6e05..1d54de2 100644 --- a/test/kafkaesque/query_test.exs +++ b/test/kafkaesque/query_test.exs @@ -42,7 +42,10 @@ defmodule Kafkaesque.QueryTest do test "updates the state of the messages" do {:ok, %{id: id}} = Repo.insert(%Message{topic: "foobar", partition: 0}) assert {1, _} = Query.update_success_batch(Repo, [id]) - assert %{state: :published} = Repo.get(Message, id) + + message = Repo.get(Message, id) + assert message.state == :published + assert message.published_at != nil end end