Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/wallet/activity/fetched_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
// getFetchedEntriesByIDs fetches Alchemy transaction details by IDs
// Returns a map keyed by "chainID-hash-address" string
func getFetchedEntriesByIDs(ctx context.Context, deps FilterDependencies, txIDs []OrderedTransactionID) (map[string]Entry, error) {

if len(txIDs) == 0 {
return make(map[string]Entry), nil
}
Expand Down Expand Up @@ -176,6 +175,7 @@ func thirdpartyActivityEntriesToEntries(deps FilterDependencies, activityEntries
}

entry.symbolOut, entry.symbolIn = lookupAndFillInTokens(deps, entry.tokenOut, entry.tokenIn)

entries = append(entries, entry)
}

Expand Down
24 changes: 21 additions & 3 deletions services/wallet/activityfetcher/alchemy/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"github.com/ethereum/go-ethereum/common"
geth_rpc "github.com/ethereum/go-ethereum/rpc"

"github.com/status-im/status-go/pkg/pubsub"
"github.com/status-im/status-go/services/wallet/activityfetcher"
wc "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty"
alchemy "github.com/status-im/status-go/services/wallet/thirdparty/activity/alchemy"
)

type Manager struct {
client *alchemy.Client
persistence *alchemy.Persistence
client *alchemy.Client
persistence *alchemy.Persistence
activityPublisher *pubsub.Publisher
}

func NewManager(client *alchemy.Client, persistence *alchemy.Persistence) *Manager {
Expand All @@ -26,6 +29,10 @@ func NewManager(client *alchemy.Client, persistence *alchemy.Persistence) *Manag
}
}

func (m *Manager) SetActivityPublisher(publisher *pubsub.Publisher) {
m.activityPublisher = publisher
}

func (m *Manager) ID() string {
return alchemy.AlchemyID
}
Expand All @@ -44,7 +51,6 @@ func (m *Manager) GetLastFetchedBlockAndTimestamp(ctx context.Context, chainID u

// FetchActivity orchestrates fetching, persistence, and type conversion.
func (m *Manager) FetchActivity(ctx context.Context, chainID uint64, parameters thirdparty.ActivityFetchParameters, cursor string, limit int) (thirdparty.ActivityEntryContainer, error) {

transfers, nextCursor, err := m.client.FetchTransfers(ctx, chainID, parameters, cursor, limit)
if err != nil {
return thirdparty.ActivityEntryContainer{}, err
Expand All @@ -55,6 +61,18 @@ func (m *Manager) FetchActivity(ctx context.Context, chainID uint64, parameters
return thirdparty.ActivityEntryContainer{}, err
}

// Emit event about ERC20 activity being fetched so interested components can react
if m.activityPublisher != nil {
for _, transfer := range transfers {
if transfer.Category == alchemy.TransferCategoryErc20 && transfer.RawContract.Address != nil {
pubsub.Publish(m.activityPublisher, activityfetcher.EventERC20ActivityFetched{
ChainID: chainID,
Address: *transfer.RawContract.Address,
})
}
}
}

items := alchemy.TransfersToThirdpartyActivityEntries(transfers, chainID, parameters.Address)

return thirdparty.ActivityEntryContainer{
Expand Down
13 changes: 13 additions & 0 deletions services/wallet/activityfetcher/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package activityfetcher

import (
"github.com/ethereum/go-ethereum/common"
)

// EventERC20ActivityFetched is emitted when ERC20 token transfers are fetched from activity sources.
// This event notifies subscribers that ERC20 activity has been discovered, allowing them to take
// appropriate actions such as fetching token metadata.
type EventERC20ActivityFetched struct {
ChainID uint64 // The chain ID where the activity was found
Address common.Address // The ERC20 token contract address
}
23 changes: 18 additions & 5 deletions services/wallet/activityfetcher/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
gethrpc "github.com/ethereum/go-ethereum/rpc"

"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/pkg/pubsub"
"github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty"
)
Expand All @@ -23,17 +24,24 @@ type ManagerIface interface {
IsChainSupported(chainID uint64) bool
FetchActivity(ctx context.Context, chainID uint64, account gethcommon.Address, currentBlock uint64) (thirdparty.ActivityEntryContainer, error)
GetLastFetchedBlockAndTimestamp(ctx context.Context, chainID uint64, address gethcommon.Address) (*gethrpc.BlockNumber, *time.Time, error)
GetPublisher() *pubsub.Publisher
}

type Manager struct {
fetcher thirdparty.ActivityFetcher
logger *zap.Logger
fetcher thirdparty.ActivityFetcher
logger *zap.Logger
publisher *pubsub.Publisher
}

func NewManager(fetcher thirdparty.ActivityFetcher) *Manager {
publisher := pubsub.NewPublisher()

fetcher.SetActivityPublisher(publisher)

return &Manager{
fetcher: fetcher,
logger: logutils.ZapLogger().Named("ActivityFetcher"),
fetcher: fetcher,
logger: logutils.ZapLogger().Named("ActivityFetcher"),
publisher: publisher,
}
}

Expand Down Expand Up @@ -63,7 +71,7 @@ func (m *Manager) FetchActivity(ctx context.Context, chainID uint64, account get
parameters.ToBlock = &toBlock

if lastFetchedBlock == nil {
fromBlock := gethrpc.EarliestBlockNumber
fromBlock := gethrpc.BlockNumber(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have my doubts about this :D
#7035 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlipicar, wondering if we are referring to the same gethrpc)
My lsp server jumps to this:

EarliestBlockNumber = BlockNumber(-5)

Screenshot 2025-11-03 at 11 26 48

parameters.FromBlock = &fromBlock
} else if uint64(lastFetchedBlock.Int64()) >= currentBlock {
// Nothing to fetch
Expand Down Expand Up @@ -104,3 +112,8 @@ func (m *Manager) FetchActivity(ctx context.Context, chainID uint64, account get

return activity, nil
}

// GetPublisher returns the publisher for activity fetcher events
func (m *Manager) GetPublisher() *pubsub.Publisher {
return m.publisher
}
7 changes: 7 additions & 0 deletions services/wallet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ func NewService(
activityFetcherManager := activityfetcher.NewManager(alchemyFetcherManager)
activityFetcherService := activityfetcher.NewService(activityFetcherManager, rpcClient.GetNetworkManager(), accountsDB, accountsPublisher, rpcClient, feed)

// connect activity fetcher publisher to token manager
if tokenManager != nil {
if activityPublisher := activityFetcherManager.GetPublisher(); activityPublisher != nil {
tokenManager.SetActivityFetcherPublisher(activityPublisher)
}
}

return &Service{
db: db,
accountsDB: accountsDB,
Expand Down
12 changes: 10 additions & 2 deletions services/wallet/thirdparty/activity/alchemy/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,30 @@ func extractTransferTokenAndValue(t Transfer, chainID uint64) []transferTokenAnd
Value: (*hexutil.Big)(t.RawContract.Value.Int),
})
case TransferCategoryErc721:
var tokenID *hexutil.Big
if t.TokenID != nil && t.TokenID.Int != nil {
tokenID = (*hexutil.Big)(t.TokenID.Int)
}
transfersData = append(transfersData, transferTokenAndValue{
Token: ac.Token{
TokenType: ac.Erc721,
ChainID: wCommon.ChainID(chainID),
Address: *t.RawContract.Address,
TokenID: (*hexutil.Big)(t.TokenID.Int),
TokenID: tokenID,
},
})
case TransferCategoryErc1155:
for _, m := range t.Erc1155Metadata {
var tokenID *hexutil.Big
if m.TokenID != nil && m.TokenID.Int != nil {
tokenID = (*hexutil.Big)(m.TokenID.Int)
}
transfersData = append(transfersData, transferTokenAndValue{
Token: ac.Token{
TokenType: ac.Erc1155,
ChainID: wCommon.ChainID(chainID),
Address: *t.RawContract.Address,
TokenID: (*hexutil.Big)(m.TokenID.Int),
TokenID: tokenID,
},
Value: (*hexutil.Big)(m.Value.Int),
})
Expand Down
34 changes: 32 additions & 2 deletions services/wallet/thirdparty/activity/alchemy/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package alchemy

import (
"encoding/json"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -14,6 +16,34 @@ import (
const getAssetTransfersMethod = "alchemy_getAssetTransfers"
const MaxAssetTransfersCount = 1000

// AlchemyBigInt wraps VarHexBigInt to handle Alchemy API inconsistency
// it sometimes returns plain decimal numbers instead of hexadecimal
type AlchemyBigInt struct {
bigint.VarHexBigInt
}

func (a *AlchemyBigInt) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}

// alchemy sometimes returns plain numbers like "0" or "1" without "0x" prefix
// so we need to take this into account
if len(str) >= 2 && str[0:2] == "0x" {
// hex string case - unmarshal directly into embedded struct
return a.VarHexBigInt.UnmarshalJSON(data)
} else {
// plain decimal number
val := new(big.Int)
if _, ok := val.SetString(str, 10); !ok {
return fmt.Errorf("invalid decimal number: %s", str)
}
a.VarHexBigInt.Int = val
return nil
}
}

type TransferCategory string

const (
Expand Down Expand Up @@ -58,7 +88,7 @@ type Transfer struct {
ToAddress *common.Address `json:"to,omitempty"`
Value float64 `json:"value,omitempty"`
Erc1155Metadata []Erc1155Metadata `json:"erc1155Metadata,omitempty"`
TokenID *bigint.VarHexBigInt `json:"tokenId"`
TokenID *AlchemyBigInt `json:"tokenId"`
Asset string `json:"asset"`
UniqueID string `json:"uniqueId"`
Hash common.Hash `json:"hash"`
Expand All @@ -79,7 +109,7 @@ func (t Transfer) IsIncoming(accountAddress common.Address) bool {
}

type Erc1155Metadata struct {
TokenID *bigint.VarHexBigInt `json:"tokenId"`
TokenID *AlchemyBigInt `json:"tokenId"`
Value *bigint.VarHexBigInt `json:"value"`
}

Expand Down
2 changes: 2 additions & 0 deletions services/wallet/thirdparty/activity_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"

"github.com/status-im/status-go/pkg/pubsub"
ac "github.com/status-im/status-go/services/wallet/activity/common"
)

Expand Down Expand Up @@ -72,6 +73,7 @@ type ActivityFetcher interface {
ActivityProvider
FetchActivity(ctx context.Context, chainID uint64, parameters ActivityFetchParameters, cursor string, limit int) (ActivityEntryContainer, error)
GetLastFetchedBlockAndTimestamp(ctx context.Context, chainID uint64, address common.Address) (*rpc.BlockNumber, *time.Time, error)
SetActivityPublisher(publisher *pubsub.Publisher)
}

func (e ActivityEntry) String() string {
Expand Down
67 changes: 55 additions & 12 deletions services/wallet/token/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import (
"github.com/status-im/status-go/services/accounts/accountsevent"
"github.com/status-im/status-go/services/communitytokens/communitytokensdatabase"
"github.com/status-im/status-go/services/utils"
"github.com/status-im/status-go/services/wallet/activityfetcher"
"github.com/status-im/status-go/services/wallet/community"
tokenlists "github.com/status-im/status-go/services/wallet/token/token-lists"
"github.com/status-im/status-go/services/wallet/token/token-lists/fetcher"
tokenTypes "github.com/status-im/status-go/services/wallet/token/types"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/signal"
)

const (
Expand Down Expand Up @@ -80,17 +82,18 @@ type ManagerInterface interface {

// Manager is used for accessing token store. It changes the token store based on overridden tokens
type Manager struct {
db *sql.DB
ethClientGetter rpc.EthClientGetter
ContractMaker *contracts.ContractMaker
networkManager network.ManagerInterface
communityTokensDB *communitytokensdatabase.Database
communityManager *community.Manager
mediaServer *server.MediaServer
walletFeed *event.Feed
accountsDB *accounts.Database
accountsPublisher *pubsub.Publisher
tokenBalancesStorage TokenBalancesStorage
db *sql.DB
ethClientGetter rpc.EthClientGetter
ContractMaker *contracts.ContractMaker
networkManager network.ManagerInterface
communityTokensDB *communitytokensdatabase.Database
communityManager *community.Manager
mediaServer *server.MediaServer
walletFeed *event.Feed
accountsDB *accounts.Database
accountsPublisher *pubsub.Publisher
activityFetcherPublisher *pubsub.Publisher
tokenBalancesStorage TokenBalancesStorage

tokenLists *tokenlists.TokenLists

Expand Down Expand Up @@ -136,6 +139,7 @@ func NewTokenManager(
func (tm *Manager) Start(ctx context.Context, autoRefreshInterval time.Duration, autoRefreshCheckInterval time.Duration) {
tm.stopCh = make(chan struct{})
tm.startAccountsWatcher()
tm.startTokenDiscoveryWatcher()

// For now we don't have the list of tokens lists remotely set so we're uisng the harcoded default lists. Once we have it
//we will just need to update the empty string with the correct URL.
Expand Down Expand Up @@ -165,6 +169,35 @@ func (tm *Manager) startAccountsWatcher() {
}()
}

func (tm *Manager) startTokenDiscoveryWatcher() {
if tm.activityFetcherPublisher == nil {
return
}

ch, unsubFn := pubsub.Subscribe[activityfetcher.EventERC20ActivityFetched](tm.activityFetcherPublisher, 100)
go func() {
defer gocommon.LogOnPanic()
defer unsubFn()
for {
select {
case <-tm.stopCh:
return
case event, ok := <-ch:
if !ok {
return
}
tm.handleERC20ActivityFetched(context.Background(), event)
}
}
}()
}

func (tm *Manager) handleERC20ActivityFetched(ctx context.Context, event activityfetcher.EventERC20ActivityFetched) {
// when ERC20 activity is fetched, discover token metadata if needed
// this will fetch token metadata and send TokenListsUpdated signal to frontend
tm.FindOrCreateTokenByAddress(ctx, event.ChainID, event.Address)
}

func (tm *Manager) Stop() {
if tm.stopCh != nil {
close(tm.stopCh)
Expand All @@ -173,6 +206,14 @@ func (tm *Manager) Stop() {
tm.tokenLists.Stop()
}

func (tm *Manager) SetActivityFetcherPublisher(publisher *pubsub.Publisher) {
tm.activityFetcherPublisher = publisher
// Start watching for token discovery events if not already started
if tm.stopCh != nil {
tm.startTokenDiscoveryWatcher()
}
}

// overrideTokensInPlace overrides tokens in the store with the ones from the networks
// BEWARE: overridden tokens will have their original address removed and replaced by the one in networks
func overrideTokensInPlace(networks []params.Network, tokens []*tokenTypes.Token) {
Expand Down Expand Up @@ -298,6 +339,8 @@ func (tm *Manager) FindOrCreateTokenByAddress(ctx context.Context, chainID uint6
}

tm.discoverTokenCommunityID(ctx, token, address)
signal.SendWalletEvent(signal.TokenListsUpdated, nil)

return token
}

Expand Down Expand Up @@ -437,7 +480,7 @@ func (tm *Manager) getNativeTokens() ([]*tokenTypes.Token, error) {
}

func (tm *Manager) GetAllTokens() ([]*tokenTypes.Token, error) {
allTokens, err := tm.GetCustoms(true)
allTokens, err := tm.GetCustoms(false)
if err != nil {
logutils.ZapLogger().Error("can't fetch custom tokens", zap.Error(err))
}
Expand Down
Loading