Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 87 additions & 8 deletions lib/sage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ defmodule Sage do

defguardp is_transaction(value) when is_function(value, 2) or is_mfa(value)

@typedoc """
Just like a transaction, although it receives extra argument of the name of the previous transaction
"""
@type intermediate_transaction ::
(effects_so_far :: effects(), attrs :: any(), previous_stage :: stage_name() ->
{:ok | :error | :abort, any()})
| mfa()

defguardp is_intermediate_transaction(value) when is_function(value, 3) or is_mfa(value)

@typedoc """
Tracer callback, can be a module that implements `Sage.Tracer` behaviour, an anonymous function, or an
`{module, function, [arguments]}` tuple.
Expand Down Expand Up @@ -437,6 +447,52 @@ defmodule Sage do
def run_async(sage, name, transaction, compensation, opts \\ []),
do: add_stage(sage, name, build_operation!(:run_async, transaction, compensation, opts))

@doc """
For a given Sage S with transactions `:t1` -> `:t2` -> `:t3`, a call to `interleave(S, :name, f)`
will yield a saga with transactions `:t1` -> `{:interleave, :name, 1}` -> `:t2` -> `{:interleave, :name, 2}`
-> `:t3` -> `{:interleave, :name, 3}`.

This can be useful if you are trying to do a long computation and want to do something with
the intermediate results, such as logging or persistence.

Note:
- This isn't strict interleaving because a transaction is still appended at the end.
- Calling this function twice with the same name will give a `Sage.DuplicateStageError`, but
calling the function twice with a different name will compound its effects.
- Calling this function before the end of your saga definition will mean any stages you add after the
call to `interleave` will not have the intermediate stages added after them
"""
@spec interleave(
sage :: t(),
name :: stage_name(),
intermediate_transaction :: intermediate_transaction(),
compensation :: compensation()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also want compensations to be reported in your use case? If so we'll need to add a type for it too or reuse intermediate_transaction here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. My use case actually doesn't require compensations at all, I just added them for completeness/generality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add it for completeness. If there is some reporter on the saga progress, it should also be able to report compensation progress.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are you saying that an interleaved compensation should be passed the previously run compensation, to mirror the intermediate_transaction?

) :: t()
def interleave(sage, name, intermediate_transaction, compensation \\ :noop)
when is_intermediate_transaction(intermediate_transaction) and is_compensation(compensation) do
new_stages =
sage.stages
|> Enum.reverse()
|> Enum.with_index(1)
|> Enum.flat_map(fn {stage, index} ->
name = {:interleave, name, index}
{stage_name, _} = stage

transaction =
case intermediate_transaction do
{m, f, a} ->
{m, f, [stage_name | a]}

_ ->
&intermediate_transaction.(&1, &2, stage_name)
end

[stage, {name, build_operation!(:run, transaction, compensation)}]
end)

add_stages(%{sage | stage_names: MapSet.new(), stages: []}, new_stages)
end

@doc """
Executes a Sage.

Expand Down Expand Up @@ -497,16 +553,39 @@ defmodule Sage do
end

defp add_stage(sage, name, operation) do
add_stages(sage, [{name, operation}])
end

defp add_stages(sage, name_operation_pairs) do
%{stages: stages, stage_names: names} = sage

if MapSet.member?(names, name) do
raise Sage.DuplicateStageError, sage: sage, name: name
else
%{
sage
| stages: [{name, operation} | stages],
stage_names: MapSet.put(names, name)
}
{names_to_add_list, _operations} =
Enum.unzip(name_operation_pairs)

names_to_add_set = MapSet.new(names_to_add_list)

duplicates =
names_to_add_set
|> MapSet.intersection(names)
|> MapSet.to_list()

cond do
# There is a duplicate between what was existing in the struct beforehand and
# what was passed to this function
!Enum.empty?(duplicates) ->
raise Sage.DuplicateStageError, sage: sage, name: hd(duplicates)

# There was a duplicate within name_operation_pairs
length(names_to_add_list) != MapSet.size(names_to_add_set) ->
duplicates = names_to_add_list -- MapSet.to_list(names_to_add_set)
raise Sage.DuplicateStageError, sage: sage, name: hd(duplicates)

true ->
%{
sage
| stages: Enum.reverse(name_operation_pairs) ++ stages,
stage_names: MapSet.union(names, names_to_add_set)
}
end
end

Expand Down
86 changes: 86 additions & 0 deletions test/sage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,92 @@ defmodule SageTest do
end
end

describe "interleaves/3" do
test "adds a step between every transaction" do
sage =
new()
|> run(:t1, transaction(:t1))
|> run(:t2, transaction(:t2))
|> run_async(:t_async, transaction(:t_async), :noop)
|> run(:t3, transaction(:t3))
|> interleave(:i, fn _effects, _args, previous_stage_name -> {:ok, previous_stage_name} end)

assert [
{{:interleave, :i, 4}, _},
{:t3, _},
{{:interleave, :i, 3}, _},
{:t_async, _},
{{:interleave, :i, 2}, _},
{:t2, _},
{{:interleave, :i, 1}, _},
t1: _
] = sage.stages

assert {:ok, _,
%{
{:interleave, :i, 4} => :t3,
{:interleave, :i, 3} => :t_async,
{:interleave, :i, 2} => :t2,
{:interleave, :i, 1} => :t1
}} = execute(sage)
end

test "adds nothing if there are no transactions" do
sage = interleave(new(), :i, fn _effects, _args, _previous_stage_name -> :ok end)

assert sage.stages == []
end

test "adds a transaction at the end if there is one transaction" do
assert [{{:interleave, :i, 1}, _}, t1: _] =
new()
|> run(:t1, transaction(:t1))
|> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end)
|> Map.get(:stages)
end

test "works with mfa" do
sage =
new()
|> run(:t1, transaction(:t1))
|> run(:t2, transaction(:t2))
|> run(:t3, transaction(:t3))
|> interleave(:i, {TestIntermediateTransactionHandler, :intermediate_transaction_handler, [:foo]})

assert {:ok, _,
%{
{:interleave, :i, 3} => {:t3, :foo},
{:interleave, :i, 2} => {:t2, :foo},
{:interleave, :i, 1} => {:t1, :foo}
}} = execute(sage)
end

test "can run a compensations" do
new()
|> run(:t1, transaction(:t1))
|> run(:t2, transaction(:t2))
|> run(:t3, transaction_with_error(:t3))
|> interleave(:i, fn _effects, _args, _previous_stage_name -> {:ok, nil} end, fn _errored_effect,
_effects_so_far,
_attrs ->
send(self(), :compensating)
:ok
end)
|> execute()

for _ <- 1..2, do: assert_received(:compensating)
end

test "errors if used more than once" do
assert_raise Sage.DuplicateStageError, fn ->
new()
|> run(:t1, transaction(:t1))
|> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end)
|> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end)
end
end
end

def dummy_transaction_for_mfa(_effects_so_far, _opts), do: raise("Not implemented")
def dummy_compensation_for_mfa(_effect_to_compensate, _opts), do: raise("Not implemented")
def dummy_final_cb(_status, _opts, _return), do: raise("Not implemented")
Expand Down
5 changes: 5 additions & 0 deletions test/support/test_intermediate_transaction_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule TestIntermediateTransactionHandler do
def intermediate_transaction_handler(_effects, _args, previous_stage_name, something_else) do
{:ok, {previous_stage_name, something_else}}
end
end