Skip to content

Commit

Permalink
fix: set published_at when published (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
msutkowski authored Jan 18, 2024
1 parent 1b61bcd commit a94c235
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
9 changes: 8 additions & 1 deletion lib/kafkaesque/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,16 @@ defmodule Kafkaesque.Query do
end

defp update_batch(repo, ids, new_state, query_opts) do
updates =
if new_state == "published" do
[set: [state: new_state, published_at: DateTime.utc_now()]]
else
[set: [state: new_state]]
end

from(Message)
|> where([m], m.id in ^ids)
|> repo.update_all([set: [state: new_state]], query_opts)
|> repo.update_all(updates, query_opts)
end

@spec rescue_publishing(Ecto.Repo.t(), time_limit_ms :: pos_integer(), Keyword.t()) ::
Expand Down
5 changes: 4 additions & 1 deletion test/kafkaesque/query_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ defmodule Kafkaesque.QueryTest do
test "updates the state of the messages" do
{:ok, %{id: id}} = Repo.insert(%Message{topic: "foobar", partition: 0})
assert {1, _} = Query.update_success_batch(Repo, [id])
assert %{state: :published} = Repo.get(Message, id)

message = Repo.get(Message, id)
assert message.state == :published
assert message.published_at != nil
end
end

Expand Down

0 comments on commit a94c235

Please sign in to comment.