Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add Nomad strategy #181

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
## Unreleased

- Use new cypher names
- Adds clustering strategy for Nomad orchestratory (see: github.com/hashicorp/nomad)

### 3.3.0

### Changed

- Default multicast address is now 233.252.1.32, was 230.1.1.251, [commit](https://github.com/bitwalker/libcluster/commit/449a65e14f152a83a0f8ee371f05743610cd292f)


### 2.3.0

### Added
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ You can find supporting documentation [here](https://hexdocs.pm/libcluster).
- Kubernetes via its metadata API using via a configurable label selector and
node basename; or alternatively, using DNS.
- Rancher, via its [metadata API][rancher-api]
- Nomad, via its [services API][nomad-api]
- Easy to provide your own custom clustering strategies for your specific environment.
- Easy to use provide your own distribution plumbing (i.e. something other than
Distributed Erlang), by implementing a small set of callbacks. This allows
Expand Down Expand Up @@ -120,7 +121,9 @@ You have a handful of choices with regards to cluster management out of the box:
nodes based on a label selector and basename.
- `Cluster.Strategy.Kubernetes.DNS`, which uses DNS to join nodes under a shared
headless service in a given namespace.
- `Cluster.Strategy.Rancher`, which like the Kubernetes strategy, uses a
- `Cluster.Strategy.Rancher`, which like the Kubernetes and Nomad strategies, uses a
metadata API to query nodes to cluster with.
- `Cluster.Strategy.Nomad`, which like the Kubernetes and Ranches strategies, uses a
metadata API to query nodes to cluster with.

You can also define your own strategy implementation, by implementing the
Expand Down Expand Up @@ -153,3 +156,4 @@ This library is MIT licensed. See the
[LICENSE.md](https://github.com/bitwalker/libcluster/blob/master/LICENSE.md) for details.

[rancher-api]: http://rancher.com/docs/rancher/latest/en/rancher-services/metadata-service/
[nomad-api]: https://www.nomadproject.io/api-docs/services
218 changes: 218 additions & 0 deletions lib/strategy/nomad.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
defmodule Cluster.Strategy.Nomad do
@moduledoc """
This clustering strategy works by querying Nomad for a service specified
by name. It will poll for new service addresses based on the polling interval
specified (in milliseconds).

## Options

* `service_name` - The name of the Nomad service you wish to get the addresses for (required; e.g. "my-elixir-app")
* `namespace` - The Nomad namespace to query (optional; default: "default")
* `nomad_server_url` - The short name of the nodes you wish to connect to (required; e.g. "https://127.0.0.1:4646")
* `node_basename` - The erland node basename (required; e.g. "app")
* `poll_interval` - How often to poll in milliseconds (optional; default: 5_000)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: must add "token" to docs


## Usage

config :libcluster,
topologies: [
dns_poll_example: [
strategy: #{__MODULE__},
config: [
service_name: "my-elixir-app",
nomad_server_url: "https://my-nomad-url:4646",
namespace: "engineering",
node_basename: "app",
polling_interval: 5_000]]]
"""

use GenServer
import Cluster.Logger

alias Cluster.Strategy.State
alias Cluster.Strategy

@default_polling_interval 5_000
@default_namespace "default"
@default_token ""

def start_link(args), do: GenServer.start_link(__MODULE__, args)

@impl true
def init([%State{meta: nil} = state]) do
init([%State{state | :meta => MapSet.new()}])
end

def init([%State{} = state]) do
{:ok, do_poll(state)}
end

@impl true
def handle_info(:timeout, state), do: handle_info(:poll, state)
def handle_info(:poll, state), do: {:noreply, do_poll(state)}
def handle_info(_, state), do: {:noreply, state}

defp do_poll(
%State{
topology: topology,
connect: connect,
disconnect: disconnect,
list_nodes: list_nodes
} = state
) do
new_nodelist = state |> get_nodes() |> MapSet.new()
removed = MapSet.difference(state.meta, new_nodelist)

new_nodelist =
case Strategy.disconnect_nodes(
topology,
disconnect,
list_nodes,
MapSet.to_list(removed)
) do
:ok ->
new_nodelist

{:error, bad_nodes} ->
# Add back the nodes which should have been removed, but which couldn't be for some reason
Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
MapSet.put(acc, n)
end)
end

new_nodelist =
case Strategy.connect_nodes(
topology,
connect,
list_nodes,
MapSet.to_list(new_nodelist)
) do
:ok ->
new_nodelist

{:error, bad_nodes} ->
# Remove the nodes which should have been added, but couldn't be for some reason
Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
MapSet.delete(acc, n)
end)
end

Process.send_after(self(), :poll, polling_interval(state))

%{state | :meta => new_nodelist}
end

defp polling_interval(%{config: config}) do
Keyword.get(config, :polling_interval, @default_polling_interval)
end

defp get_namespace(config) do
Keyword.get(config, :namespace, @default_namespace)
end

defp get_token(config) do
Keyword.get(config, :token, @default_token)
end

@spec get_nodes(State.t()) :: [atom()]
defp get_nodes(%State{config: config} = state) do
server_url = Keyword.fetch(config, :nomad_server_url)
service_name = Keyword.fetch(config, :service_name)
node_basename = Keyword.fetch(config, :node_basename)
namespace = get_namespace(config)
token = get_token(config)

fetch_nodes(server_url, service_name, node_basename, namespace, token, state)
end

defp fetch_nodes(
{:ok, server_url},
{:ok, service_name},
{:ok, node_basename},
namespace,
token,
%State{
topology: topology
}
)
when server_url != "" and service_name != "" and node_basename != "" do
debug(topology, "polling nomad for '#{service_name}' in namespace '#{namespace}'")

headers = [{'X-Nomad-Token', '#{token}'}]
http_options = []
url = '#{server_url}/v1/service/#{service_name}'

case :httpc.request(:get, {url, headers}, http_options, []) do
{:ok, {{_version, 200, _status}, _headers, body}} ->
Jason.decode!(body)
|> Enum.map(fn %{"Address" => addr} -> :"#{node_basename}@#{addr}" end)

{:ok, {{_version, 403, _status}, _headers, _body}} ->
warn(topology, "cannot query nomad (unauthorized)")
[]

{:ok, {{_version, code, status}, _headers, body}} ->
warn(topology, "cannot query nomad (#{code} #{status}): #{inspect(body)}")
[]

{:error, reason} ->
error(topology, "request to nomad failed!: #{inspect(reason)}")
[]

_ ->
error(topology, "unknown error fetching nomad service info")
[]
end
end

defp fetch_nodes(
{:ok, invalid_server_url},
{:ok, invalid_service_name},
{:ok, invalid_node_base_name},
_namespace,
_token,
%State{
topology: topology
}
) do
warn(
topology,
"nomad strategy is selected, but server_url, service_name, or node_base_name param is invalid: #{inspect(%{nomad_server_url: invalid_server_url, service_name: invalid_service_name, node_basename: invalid_node_base_name})}"
)

[]
end

defp fetch_nodes(:error, _service_name, _node_base_name, _namespace, _token, %State{
topology: topology
}) do
warn(
topology,
"nomad polling strategy is selected, but nomad_server_url param missed"
)

[]
end

defp fetch_nodes(_server_url, :error, _node_base_name, _namespace, _token, %State{
topology: topology
}) do
warn(
topology,
"nomad polling strategy is selected, but service_name param missed"
)

[]
end

defp fetch_nodes(_server_url, _service_name, :error, _namespace, _token, %State{
topology: topology
}) do
warn(
topology,
"nomad polling strategy is selected, but node_base_name param missed"
)

[]
end
end
Empty file added test/nomat_test.exs
Empty file.