Skip to content

Commit

Permalink
Initial Commit. The library can be used to build an Elixir cluster
Browse files Browse the repository at this point in the history
automatically through Tailscale.
  • Loading branch information
arjunbajaj committed Aug 7, 2023
0 parents commit 8e18997
Show file tree
Hide file tree
Showing 21 changed files with 917 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
tailscale-*.tar

# Temporary files, for example, from tests.
/tmp/
8 changes: 8 additions & 0 deletions lib/tailscale.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule Tailscale do
defdelegate child_spec(opts \\ []), to: Tailscale.Supervisor
defdelegate start_link(opts \\ []), to: Tailscale.Supervisor

def status do
Tailscale.Local.Status.get!()
end
end
117 changes: 117 additions & 0 deletions lib/tailscale/change_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
defmodule Tailscale.ChangeServer do
use GenServer
alias Tailscale.Event
require Logger

@initial_subscribers_map Event.events_available_for_subscription()

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(opts) do
refresh_interval = Application.get_env(:tailscale, :refresh_interval, 30_000)
refresh_interval = Keyword.get(opts, :refresh_interval, refresh_interval)

state = %{
refresh_interval: refresh_interval,
status_map: nil,
subscribers: @initial_subscribers_map
}

{:ok, state, {:continue, :refresh}}
end

def handle_continue(:refresh, %{refresh_interval: refresh_interval} = state) do
Logger.debug("Refreshing Tailscale Status")

old_status = state.status_map
new_status = Tailscale.Status.get!()

if old_status != nil do
events = Tailscale.Status.diff(old_status, new_status)
fire_events(events, state)
end

state = %{state | status_map: new_status}
Process.send_after(self(), :refresh, refresh_interval)
{:noreply, state}
end

def handle_info(:refresh, state), do: {:noreply, state, {:continue, :refresh}}

# ---------------------------
# --- GENSERVER CALLBACKS ---
# ---------------------------

def handle_call(:get_status, _from, state) do
reply = if state.status_map == nil, do: Tailscale.Local.Status.get!(), else: state.status_map
{:reply, reply, state}
end

def handle_call({:subscribe, target, event}, {pid, _ref}, state) do
state =
update_in(state.subscribers[target][event], fn pids -> MapSet.put(pids, pid) end)

{:reply, :ok, state}
end

def handle_call(:subscribe_all, {pid, _ref}, state) do
state = update_in(state.subscribers.all, fn pids -> MapSet.put(pids, pid) end)
{:reply, :ok, state}
end

# ------------------
# --- PUBLIC API ---
# ------------------

def get_status, do: GenServer.call(__MODULE__, :get_status)

def subscribe(target), do: subscribe(target, :all)

@spec subscribe(Event.target(), Event.events()) :: :ok
def subscribe(target, event) when target in [:self, :peer, :tailnet, :user] do
GenServer.call(__MODULE__, {:subscribe, target, event})
end

def subscribe_all, do: GenServer.call(__MODULE__, :subscribe_all)

# ---------------------
# --- PRIVATE FUNCS ---
# ---------------------

defp fire_events(:no_change, _), do: nil

defp fire_events(events, state) do
events
|> Enum.map(fn
%Event.Tailnet{} = event -> {:tailnet, event.event, event}
%Event.Self{} = event -> {:self, event.event, event}
%Event.User{} = event -> {:user, event.event, event}
%Event.Peer{} = event -> {:peer, event.event, event}
end)
|> Enum.each(fn
# peer#changed and self#changed are composite events.
# they're emitted along with other change events.
# so here we prevent the :all firehoses from getting these events.
{:peer, :changed, payload} ->
trigger(state, :peer, :changed, payload)

{:self, :changed, payload} ->
trigger(state, :peer, :changed, payload)

{target, event, payload} ->
trigger(state, target, event, payload)
trigger(state, target, :all, payload)
trigger_all(state, payload)
end)
end

defp trigger(state, target, event, payload) do
Enum.each(state.subscribers[target][event], fn pid -> send(pid, {:tailscale, payload}) end)
end

defp trigger_all(state, payload) do
Enum.each(state.subscribers.all, fn pid -> send(pid, {:tailscale, payload}) end)
end
end
215 changes: 215 additions & 0 deletions lib/tailscale/cluster.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
defmodule Tailscale.Cluster do
use GenServer
alias Tailscale.ChangeServer
alias Tailscale.Event
require Logger

@ensure_interval 30_000

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(opts) do
tags = opts[:tags]
match = opts[:match_tags] || :all

if opts[:start_distribution] != false do
start_distribution(Tailscale.ChangeServer.get_status().self)
end

cond do
match not in [:all, :any] ->
raise ArgumentError, "Option `match_tags` needs to be either `:all` or `:any`."

tags == nil ->
raise ArgumentError, "Option `tags` is required."

not is_list(tags) ->
raise ArgumentError,
"Option `tags` needs to be a list of strings (without the Tailscale \"tag:\" qualifier)."

true ->
nil
end

tags =
Enum.map(tags, fn
"tag:" <> tag -> tag
tag -> tag
end)
|> Enum.uniq()

state = %{
tags: tags,
match: match,
cluster_topology: %{},
disconnect_self_handler: opts[:disconnect_self_handler]
}

{:ok, state, {:continue, :start}}
end

def handle_continue(:start, state) do
# Subscribe to peer changes
ChangeServer.subscribe(:peer, :added)
ChangeServer.subscribe(:peer, :removed)
ChangeServer.subscribe(:peer, :online)
ChangeServer.subscribe(:peer, :offline)
ChangeServer.subscribe(:peer, :tags_changed)

# Subscribe to self changes
ChangeServer.subscribe(:self, :offline)
ChangeServer.subscribe(:self, :tags_changed)
ChangeServer.subscribe(:self, :node_changed)

# Connect to all nodes that are currently online
state = ensure_connected(state)

# Ensure connected repeatedly
Process.send_after(self(), :ensure_connected, @ensure_interval)

{:noreply, state}
end

# ---------------------------
# --- EVENT SUBSCRIPTIONS ---
# ---------------------------

def handle_info({:tailscale, %Event.Self{event: :offline}}, state) do
disconnect_self(:offline, "Tailscale is offline", state)
end

def handle_info({:tailscale, %Event.Self{event: :tags_changed, self: self}}, state) do
case check_if_peer_matches_tags(self, state) do
true -> nil
false -> disconnect_self(:tags_changed, "Machine tags changed on Tailscale", state)
end
end

def handle_info({:tailscale, %Event.Self{event: :node_changed}}, state) do
disconnect_self(:hostname_changed, "Machine hostname changed on Tailscale", state)
end

def handle_info({:tailscale, %Event.Peer{event: :added, peer: peer}}, state) do
connect_to_node(peer, state)
end

def handle_info({:tailscale, %Event.Peer{event: :online, peer: peer}}, state) do
connect_to_node(peer, state)
end

def handle_info({:tailscale, %Event.Peer{event: :removed, peer: peer}}, state) do
disconnect_from_node(peer, state)
end

def handle_info({:tailscale, %Event.Peer{event: :offline, peer: peer}}, state) do
disconnect_from_node(peer, state)
end

def handle_info({:tailscale, %Event.Peer{event: :tags_changed, peer: peer}}, state) do
case check_if_peer_matches_tags(peer, state) do
true -> connect_to_node(peer, state)
false -> disconnect_from_node(peer, state)
end
end

def handle_info(:ensure_connected, state) do
Process.send_after(self(), :ensure_connected, @ensure_interval)
{:noreply, ensure_connected(state)}
end

def terminate(reason, _state) do
Logger.debug("Tailscale.Cluster is terminating: #{inspect(reason)}.")
Node.stop()
end

# ---------------------
# --- PRIVATE FUNCS ---
# ---------------------

defp start_distribution(%Tailscale.Self{} = self) do
case Node.stop() do
{:error, :not_allowed} ->
raise """
Elixir was configured to start the distribution.
It cannot be stopped.
Do not pass --sname or --name to avoid starting the distribution.
Tailscale.Cluster will automatically setup the distribution for you.
"""

_ ->
case Node.start(self.node, :longnames) do
{:ok, _pid} ->
:ok

{:error, _} ->
raise "Failed to start Erlang distribution."
end
end
end

defp ensure_connected(state) do
Tailscale.ChangeServer.get_status()
|> Map.get(:peers)
|> Enum.filter(&check_if_peer_matches_tags(&1, state))
|> Enum.filter(fn peer -> peer.online == true end)
|> Enum.map(fn peer ->
case Node.connect(peer.node) do
true -> {peer.id, peer}
false -> nil
end
end)
|> Enum.filter(&(&1 != nil))
|> Enum.into(%{})
|> then(fn cluster_topology ->
%{state | cluster_topology: cluster_topology}
end)
end

defp disconnect_self(reason, msg, state) do
if state.disconnect_self_handler != nil do
state.disconnect_self_handler.(reason)
else
Logger.debug("Restarting Application: #{msg}")
System.stop(1)
end

{:noreply, state}
end

defp connect_to_node(%Tailscale.Peer{} = peer, state) do
state =
case Node.connect(peer.node) do
true ->
update_in(state.cluster_topology, fn topology -> Map.put(topology, peer.id, peer) end)

false ->
state
end

{:noreply, state}
end

defp disconnect_from_node(%Tailscale.Peer{} = peer, state) do
state =
case Node.disconnect(peer.node) do
true ->
update_in(state.cluster_topology, fn topology -> Map.delete(topology, peer.id) end)

false ->
state
end

{:noreply, state}
end

defp check_if_peer_matches_tags(%{tags: nil} = _peer, _state), do: false

defp check_if_peer_matches_tags(peer, state) do
case state.match do
:all -> peer.tags |> Enum.all?(fn tag -> tag in state.tags end)
:any -> peer.tags |> Enum.any?(fn tag -> tag in state.tags end)
end
end
end
Loading

0 comments on commit 8e18997

Please sign in to comment.