Skip to content

Commit

Permalink
Rewrite assets_held_by_address SQL to not use balances_aggregated
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Jan 29, 2025
1 parent e51a633 commit 228b011
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 60 deletions.
88 changes: 56 additions & 32 deletions lib/sanbase/balances/balance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,28 +191,30 @@ defmodule Sanbase.Balance do
in Santiment's database it is not shown.
"""
@spec assets_held_by_address(address, Keyword.t()) ::
{:ok, list(%{slug: slug, balance: number()})} | {:error, String.t()}
{:ok, list(%{slug: slug, balance: number()})}
| {:error, String.t()}
def assets_held_by_address(address, opts \\ []) do
address = Sanbase.BlockchainAddress.to_internal_format(address)
tables = address_supported_tables(address)

hidden_projects_slugs = hidden_projects_slugs()
transform_fn = fn [slug, balance] -> %{slug: slug, balance: balance} end

assets_held_by_address_query(address, opts)
|> ClickhouseRepo.query_transform(fn [slug, balance] ->
%{
slug: slug,
balance: balance
}
Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} ->
query = assets_held_by_address_query(address, table, opts)

case ClickhouseRepo.query_transform(query, transform_fn) do
{:ok, data} -> {:cont, {:ok, acc ++ data}}
{:error, error} -> {:halt, {:error, error}}
end
end)
|> maybe_apply_function(fn data -> Enum.reject(data, &(&1.slug in hidden_projects_slugs)) end)
|> maybe_apply_function(&remove_hidden_projects/1)
end

def usd_value_address_change(address, datetime) do
address = Sanbase.BlockchainAddress.to_internal_format(address)
query_struct = usd_value_address_change_query(address, datetime)
tables = address_supported_tables(address)

ClickhouseRepo.query_transform(
query_struct,
transform_fn =
fn [
slug,
previous_balance,
Expand All @@ -234,33 +236,41 @@ defmodule Sanbase.Balance do
usd_value_change: previous_usd_value - current_usd_value
}
end
)
|> maybe_apply_function(fn data ->
hidden_projects_slugs = hidden_projects_slugs()
Enum.reject(data, &(&1.slug in hidden_projects_slugs))
end)
|> maybe_apply_function(fn data ->
Enum.sort_by(data, & &1.usd_value_change, :desc)

Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} ->
query_struct = usd_value_address_change_query(address, datetime, table)

case ClickhouseRepo.query_transform(query_struct, transform_fn) do
{:ok, data} -> {:cont, {:ok, acc ++ data}}
{:error, error} -> {:halt, {:error, error}}
end
end)
|> maybe_apply_function(&remove_hidden_projects/1)
|> maybe_apply_function(fn data -> Enum.sort_by(data, & &1.usd_value_change, :desc) end)
end

def usd_value_held_by_address(address) do
address = Sanbase.BlockchainAddress.to_internal_format(address)
query_struct = usd_value_held_by_address_query(address)
hidden_projects_slugs = hidden_projects_slugs()
tables = address_supported_tables(address)

ClickhouseRepo.query_transform(
query_struct,
fn [slug, current_balance, current_price_usd, current_usd_value] ->
%{
slug: slug,
current_balance: current_balance,
current_price_usd: current_price_usd,
current_usd_value: current_usd_value
}
transform_fn = fn [slug, current_balance, current_price_usd, current_usd_value] ->
%{
slug: slug,
current_balance: current_balance,
current_price_usd: current_price_usd,
current_usd_value: current_usd_value
}
end

Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} ->
query_struct = usd_value_held_by_address_query(address, table)

case ClickhouseRepo.query_transform(query_struct, transform_fn) do
{:ok, data} -> {:cont, {:ok, acc ++ data}}
{:error, error} -> {:halt, {:error, error}}
end
)
|> maybe_apply_function(fn data -> Enum.reject(data, &(&1.slug in hidden_projects_slugs)) end)
end)
|> maybe_apply_function(&remove_hidden_projects/1)
|> maybe_sort(:current_usd_value, :desc)
end

Expand Down Expand Up @@ -516,4 +526,18 @@ defmodule Sanbase.Balance do

hidden_projects_slugs
end

defp remove_hidden_projects(list) do
hidden_projects_slugs = hidden_projects_slugs()
Enum.reject(list, &(&1.slug in hidden_projects_slugs))
end

defp address_supported_tables(address) do
case Sanbase.BlockchainAddress.to_infrastructure(address) do
"ETH" -> ["erc20_balances_yearly_test", "eth_balances"]
"BTC" -> ["btc_balances", "ltc_balances", "doge_balances"]
"XRP" -> ["xrp_balances"]
_ -> []
end
end
end
152 changes: 124 additions & 28 deletions lib/sanbase/balances/balance_sql_query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ defmodule Sanbase.Balance.SqlQuery do
end
end

def table_to_slug(table) do
case table do
"eth_balances" -> "ethereum"
"btc_balances" -> "bitcoin"
"ltc_balances" -> "litecoin"
"doge_balances" -> "dogecoin"
"bch_balances" -> "bitcoin-cash"
end
end

def maybe_selector_clause("ethereum", "ethereum", _slug_key), do: ""

def maybe_selector_clause("ethereum", _, slug_key) do
Expand All @@ -24,6 +34,7 @@ defmodule Sanbase.Balance.SqlQuery do
def maybe_selector_clause(_, _, _), do: ""

def decimals("bitcoin", _), do: 1
def decimals("bitcoin-cash", _), do: 1
def decimals("litecoin", _), do: 1
def decimals("dogecoin", _), do: 1
def decimals(_, decimals), do: Integer.pow(10, decimals)
Expand All @@ -48,7 +59,7 @@ defmodule Sanbase.Balance.SqlQuery do
slug: slug,
from: DateTime.to_unix(from),
to: DateTime.to_unix(to),
decimals: Integer.pow(10, decimals),
decimals: decimals(blockchain, decimals),
table: blockchain_to_table(blockchain, slug)
}

Expand Down Expand Up @@ -207,7 +218,7 @@ defmodule Sanbase.Balance.SqlQuery do
params = %{
addresses: addresses,
slug: slug,
decimals: Integer.pow(10, decimals),
decimals: decimals(blockchain, decimals),
table: blockchain_to_table(blockchain, slug)
}

Expand Down Expand Up @@ -338,7 +349,7 @@ defmodule Sanbase.Balance.SqlQuery do
sql = """
SELECT address, balance
FROM (
SELECT address, argMax(balance, dt) / {{decimals}}AS balance
SELECT address, argMax(balance, dt) / {{decimals}} AS balance
FROM #{table}
PREWHERE
assetRefId = (SELECT asset_ref_id FROM asset_metadata FINAL WHERE name = {{slug}} LIMIT 1) AND
Expand Down Expand Up @@ -378,54 +389,105 @@ defmodule Sanbase.Balance.SqlQuery do
{join_str, params}
end

def assets_held_by_address_changes_query(address, datetime) do
def assets_held_by_address_changes_query(address, datetime, table, opts \\ [])

def assets_held_by_address_changes_query(address, datetime, "erc20_balances" <> _ = table, opts) do
sql = """
SELECT
name,
greatest(argMaxIf(value, dt, dt <= toDateTime({{datetime}})) / pow(10, decimals), 0) AS previous_balance,
greatest(argMax(value, dt) / pow(10, decimals), 0) AS current_balance,
greatest(
argMaxIf(balance, (dt, txID, computedAt), dt <= toDateTime({{datetime}})) / pow(10, decimals),
0
) AS previous_balance,
greatest(
argMax(balance, (dt, txID, computedAt)) / pow(10, decimals),
0
) AS current_balance,
current_balance - previous_balance AS balance_change
FROM (
SELECT
dt,
address,
asset_ref_id,
arrayJoin(groupArrayMerge(values)) AS values_merged,
values_merged.1 AS dt,
values_merged.2 AS value
FROM balances_aggregated
assetRefId AS asset_ref_id,
balance,
txID,
computedAt
FROM {{table}}
WHERE
#{address_clause(address, argument_name: "address")}
GROUP BY address, blockchain, asset_ref_id
)
INNER JOIN (
SELECT asset_ref_id, name, decimals
FROM asset_metadata FINAL
) USING (asset_ref_id)
GROUP BY address, asset_ref_id, name, decimals
HAVING previous_balance > 0 AND current_balance > 0
#{if Keyword.get(opts, :show_assets_with_zero_balance, false), do: "", else: "HAVING balance > 0"}
"""

params = %{address: address, datetime: DateTime.to_unix(datetime)}
params = %{
address: address,
table: table,
datetime: DateTime.to_unix(datetime)
}

Sanbase.Clickhouse.Query.new(sql, params)
end

def assets_held_by_address_query(address, opts \\ []) do
def assets_held_by_address_changes_query(address, datetime, table, opts)
when table in [
"eth_balances",
"btc_balances",
"bch_balances",
"ltc_balances",
"doge_balances"
] do
sql = """
SELECT
{{slug}} AS name,
greatest(
argMaxIf(balance, (dt, txID, computedAt), dt <= toDateTime({{datetime}})) / {{decimals}},
0
) AS previous_balance,
greatest(
argMax(balance, (dt, txID, computedAt)) / {{decimals}},
0
) AS current_balance,
current_balance - previous_balance AS balance_change
FROM {{table}}
WHERE
#{address_clause(address, argument_name: "address")}
#{if Keyword.get(opts, :show_assets_with_zero_balance, false), do: "", else: "HAVING balance > 0"}
"""

params = %{
decimals: decimals(table_to_slug(table), 18),
slug: table_to_slug(table),
address: address,
table: table,
datetime: DateTime.to_unix(datetime)
}

Sanbase.Clickhouse.Query.new(sql, params)
end

def assets_held_by_address_query(address, table, opts \\ [])

def assets_held_by_address_query(address, "erc20_balances_" <> _ = table, opts) do
sql = """
SELECT
name,
argMax(value, dt) / pow(10, decimals) AS balance
argMax(balance, (dt, txID, computedAt)) / pow(10, decimals) AS balance
FROM (
SELECT
dt,
address,
asset_ref_id,
arrayJoin(groupArrayMerge(values)) AS values_merged,
values_merged.1 AS dt,
values_merged.2 AS value
FROM balances_aggregated
assetRefId AS asset_ref_id,
balance,
txID,
computedAt
FROM {{table}}
WHERE
#{address_clause(address, argument_name: "address")}
GROUP BY address, blockchain, asset_ref_id
)
INNER JOIN (
SELECT asset_ref_id, name, decimals
Expand All @@ -435,13 +497,44 @@ defmodule Sanbase.Balance.SqlQuery do
#{if Keyword.get(opts, :show_assets_with_zero_balance, false), do: "", else: "HAVING balance > 0"}
"""

params = %{address: address}
params = %{address: address, table: table}

Sanbase.Clickhouse.Query.new(sql, params)
end

def usd_value_address_change_query(address, datetime) do
query_struct = assets_held_by_address_changes_query(address, datetime)
def assets_held_by_address_query(address, table, opts)
when table in [
"eth_balances",
"btc_balances",
"bch_balances",
"ltc_balances",
"doge_balances"
] do
# These tables hold info for only 1 asset
sql = """
SELECT
{{slug}} AS name,
argMax(balance, (dt, txID, computedAt)) / {{decimals}} AS balance
FROM {{table}}
WHERE
#{address_clause(address, argument_name: "address")}
#{if Keyword.get(opts, :show_assets_with_zero_balance, false), do: "", else: "HAVING balance > 0"}
"""

params = %{
decimals: decimals(table_to_slug(table), 18),
slug: table_to_slug(table),
table: table,
address: address
}

Sanbase.Clickhouse.Query.new(sql, params)
end

def usd_value_address_change_query(address, datetime, table, opts \\ [])

def usd_value_address_change_query(address, datetime, table, opts) do
query_struct = assets_held_by_address_changes_query(address, datetime, table, opts)

sql = """
SELECT
Expand Down Expand Up @@ -471,11 +564,14 @@ defmodule Sanbase.Balance.SqlQuery do
) USING (name)
"""

# The params are already in the query_struct
Sanbase.Clickhouse.Query.put_sql(query_struct, sql)
end

def usd_value_held_by_address_query(address) do
query_struct = assets_held_by_address_query(address)
def usd_value_held_by_address_query(address, table, opts \\ [])

def usd_value_held_by_address_query(address, table, opts) do
query_struct = assets_held_by_address_query(address, table, opts)

sql = """
SELECT
Expand Down Expand Up @@ -516,7 +612,7 @@ defmodule Sanbase.Balance.SqlQuery do
params = %{
addresses: addresses,
slug: slug,
decimals: Integer.pow(10, decimals),
decimals: decimals(blockchain, decimals),
datetime: DateTime.to_unix(datetime),
table: blockchain_to_table(blockchain, slug)
}
Expand Down

0 comments on commit 228b011

Please sign in to comment.