Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4746a45
feat: added fetchBalances capability to Tink
fguery Sep 29, 2025
2ef4944
chore: quick refacto of fetch_open_banking_data for clarity
fguery Sep 30, 2025
760b48d
chore: start on splitting fetch_open_banking_data between components
fguery Sep 30, 2025
bee9293
chore: revert change on handle_webhook
fguery Sep 30, 2025
fc11550
feat: added DataToFetch field to the fetchOpenBankingData to support …
fguery Sep 30, 2025
0231739
feat: fetch balance from accounts api
fguery Oct 3, 2025
87b6b92
fix: try to store psuID and connectionID in account
fguery Oct 3, 2025
7038c87
chore: improve docker env
fguery Oct 3, 2025
159ee07
feat: balances work now, but PSPBalance does not have psuID + connect…
fguery Oct 3, 2025
5fbbdb0
feat: make sure we store PSUID for balances as well
fguery Oct 3, 2025
e8be2f5
feat(EN-185): handle Plaid balances from accounts
fguery Oct 6, 2025
4f08c64
feat: fix up plaid balance fetching
fguery Oct 6, 2025
b214d06
fix: air rebuilding was sometime failing; this aims to fix it
fguery Oct 6, 2025
fece774
fix: linting
fguery Oct 6, 2025
d2f1b97
fix: update go version to be the same as the docker image
fguery Oct 6, 2025
4e8f1cc
chore: remove unused go install flags
fguery Oct 6, 2025
50f8e06
fix: make OpenBankingDataToFetchAccountsAndBalances test a bit more e…
fguery Oct 6, 2025
7f07adc
fix: added extra null checks as required
fguery Oct 6, 2025
b7e5bd4
chore: remove duplicated test
fguery Oct 6, 2025
4054324
fix: code review comments
fguery Oct 6, 2025
bf5135b
fix: improve translate_plaid_amount to also return asset
fguery Oct 6, 2025
3e117d8
chore: added tests for amount_utils
fguery Oct 6, 2025
45f7106
fix: make fetch open banking activity fail when child workflow fail
fguery Oct 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .air.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ tmp_dir = "tmp"
[build]
args_bin = []
bin = "./tmp/main"
cmd = "go build -o ./tmp/main ."
cmd = "go build -gcflags='all=-N -l' -o ./tmp/main . && chmod +x ./tmp/main"
full_bin = "dlv exec --headless --listen=:2345 --api-version=2 --accept-multiclient --continue ./tmp/main --"
delay = 1000
exclude_dir = ["assets", "tmp", "vendor", "testdata", "docs", "openapi", "pkg/client"]
Expand All @@ -20,7 +20,7 @@ tmp_dir = "tmp"
log = "build-errors.log"
poll = false
poll_interval = 0
rerun = false
rerun = true
rerun_delay = 500
send_interrupt = false
stop_on_root = false
Expand Down
6 changes: 6 additions & 0 deletions .idea/runConfigurations/debug_service.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/runConfigurations/debug_worker.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions dev.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ RUN apk add --no-cache \
make

# Install delve (debugger) and air (hot reload)
RUN go install github.com/go-delve/delve/cmd/dlv@latest
RUN go install github.com/air-verse/[email protected]
RUN CGO_ENABLED=0 go install github.com/go-delve/delve/cmd/dlv@v1.25.2
RUN CGO_ENABLED=0 go install github.com/air-verse/[email protected]

# Create a custom user with appropriate permissions
RUN addgroup -g 1001 -S appgroup && \
Expand All @@ -29,6 +29,9 @@ WORKDIR /app
# Set proper ownership and permissions for the working directory
RUN chown -R appuser:appgroup /app

# Create tmp directory and set permissions
RUN mkdir -p /app/tmp && chown -R appuser:appgroup /app/tmp && chmod 755 /app/tmp

# Copy go mod files first for better caching
COPY go.mod go.sum ./

Expand Down
4 changes: 2 additions & 2 deletions docker-compose.dev-override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
dockerfile: dev.Dockerfile
image: payments-dev:latest
pull_policy: never
command: air -c .air.toml -- server
command: sh -c "mkdir -p /app/tmp && chmod 755 /app/tmp && air -c .air.toml -- server"
ports:
- "8080:8080"
- "9090:9090"
Expand All @@ -39,7 +39,7 @@ services:
dockerfile: dev.Dockerfile
image: payments-dev:latest
pull_policy: never
command: air -c .air.toml -- worker
command: sh -c "mkdir -p /app/tmp && chmod 755 /app/tmp && air -c .air.toml -- worker"
ports:
- "9191:9090"
- "2346:2345" # Delve debug port for worker
Expand Down
2 changes: 1 addition & 1 deletion docs/other/connector-capabilities.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"adyen":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"atlar":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_FETCH_OTHERS"],"bankingcircle":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_CREATE_BANK_ACCOUNT","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"column":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_BANK_ACCOUNT","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"currencycloud":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"dummypay":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_ALLOW_FORMANCE_ACCOUNT_CREATION","CAPABILITY_ALLOW_FORMANCE_PAYMENT_CREATION","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"generic":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_ALLOW_FORMANCE_ACCOUNT_CREATION","CAPABILITY_ALLOW_FORMANCE_PAYMENT_CREATION"],"increase":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT","CAPABILITY_CREATE_BANK_ACCOUNT","CAPABILITY_TRANSLATE_WEBHOOKS","CAPABILITY_CREATE_WEBHOOKS"],"mangopay":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_FETCH_OTHERS","CAPABILITY_CREATE_BANK_ACCOUNT","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"modulr":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"moneycorp":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"plaid":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"powens":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"qonto":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS"],"stripe":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"tink":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"wise":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_FETCH_OTHERS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"]}
{"adyen":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"atlar":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_FETCH_OTHERS"],"bankingcircle":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_CREATE_BANK_ACCOUNT","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"column":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_BANK_ACCOUNT","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"currencycloud":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"dummypay":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_ALLOW_FORMANCE_ACCOUNT_CREATION","CAPABILITY_ALLOW_FORMANCE_PAYMENT_CREATION","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"generic":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_ALLOW_FORMANCE_ACCOUNT_CREATION","CAPABILITY_ALLOW_FORMANCE_PAYMENT_CREATION"],"increase":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT","CAPABILITY_CREATE_BANK_ACCOUNT","CAPABILITY_TRANSLATE_WEBHOOKS","CAPABILITY_CREATE_WEBHOOKS"],"mangopay":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_FETCH_OTHERS","CAPABILITY_CREATE_BANK_ACCOUNT","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"modulr":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"moneycorp":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"plaid":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"powens":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"qonto":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS"],"stripe":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT"],"tink":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"],"wise":["CAPABILITY_FETCH_ACCOUNTS","CAPABILITY_FETCH_BALANCES","CAPABILITY_FETCH_EXTERNAL_ACCOUNTS","CAPABILITY_FETCH_PAYMENTS","CAPABILITY_FETCH_OTHERS","CAPABILITY_CREATE_TRANSFER","CAPABILITY_CREATE_PAYOUT","CAPABILITY_CREATE_WEBHOOKS","CAPABILITY_TRANSLATE_WEBHOOKS"]}
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/formancehq/payments

go 1.24.4

toolchain go1.24.7
go 1.24.7

replace github.com/formancehq/payments/pkg/client => ./pkg/client

Expand Down
139 changes: 90 additions & 49 deletions internal/connectors/engine/workflow/fetch_open_banking_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workflow

import (
"fmt"
"slices"

"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/models"
Expand All @@ -15,17 +16,90 @@ type FetchOpenBankingData struct {
ConnectionID string
ConnectorID models.ConnectorID
Config models.Config
DataToFetch []models.OpenBankingDataToFetch
FromPayload *FromPayload
}

func (w Workflow) runFetchOpenBankingData(
ctx workflow.Context,
fetchOpenBankingData FetchOpenBankingData,
) error {
if len(fetchOpenBankingData.DataToFetch) == 0 {
return fmt.Errorf(
"no data to fetch for psu %s, connection %s connector %s",
fetchOpenBankingData.PsuID,
fetchOpenBankingData.ConnectionID,
fetchOpenBankingData.ConnectorID,
)
}

wg := workflow.NewWaitGroup(ctx)
var accountFetchErr, paymentFetchErr error

if slices.Contains(fetchOpenBankingData.DataToFetch, models.OpenBankingDataToFetchAccountsAndBalances) {
wg.Add(1)
workflow.Go(ctx, w.startFetchNextAccountWorkflow(wg, fetchOpenBankingData, &accountFetchErr))
}

if slices.Contains(fetchOpenBankingData.DataToFetch, models.OpenBankingDataToFetchPayments) {
wg.Add(1)
workflow.Go(ctx, w.startFetchNextPaymentsWorkflow(wg, fetchOpenBankingData, &paymentFetchErr))
}

wg.Wait(ctx)

// Check if any of the fetch workflows failed
if accountFetchErr != nil {
return fmt.Errorf("failed to fetch accounts: %w", accountFetchErr)
}
if paymentFetchErr != nil {
return fmt.Errorf("failed to fetch payments: %w", paymentFetchErr)
}

now := workflow.Now(ctx)

err := activities.StorageOpenBankingConnectionsLastUpdatedAtUpdate(
infiniteRetryContext(ctx),
fetchOpenBankingData.PsuID,
fetchOpenBankingData.ConnectorID,
fetchOpenBankingData.ConnectionID,
now,
)
if err != nil {
return fmt.Errorf("updating open banking connection last updated at: %w", err)
}

sendEvent := SendEvents{
UserConnectionDataSynced: &models.UserConnectionDataSynced{
PsuID: fetchOpenBankingData.PsuID,
ConnectorID: fetchOpenBankingData.ConnectorID,
ConnectionID: fetchOpenBankingData.ConnectionID,
At: now,
},
}

if err := workflow.ExecuteChildWorkflow(
workflow.WithChildOptions(
ctx,
workflow.ChildWorkflowOptions{
TaskQueue: w.getDefaultTaskQueue(),
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
SearchAttributes: map[string]interface{}{
SearchAttributeStack: w.stack,
},
},
),
RunSendEvents,
sendEvent,
).Get(ctx, nil); err != nil {
return fmt.Errorf("sending events: %w", err)
}

wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) {
return nil
}

func (w Workflow) startFetchNextAccountWorkflow(wg workflow.WaitGroup, fetchOpenBankingData FetchOpenBankingData, errPtr *error) func(ctx workflow.Context) {
return func(ctx workflow.Context) {
defer wg.Done()

if err := workflow.ExecuteChildWorkflow(
Expand All @@ -46,14 +120,23 @@ func (w Workflow) runFetchOpenBankingData(
FromPayload: fetchOpenBankingData.FromPayload,
Periodically: false,
},
[]models.ConnectorTaskTree{},
[]models.ConnectorTaskTree{
{
TaskType: models.TASK_FETCH_BALANCES,
Name: "fetch_balances",
Periodically: false,
NextTasks: []models.ConnectorTaskTree{},
},
},
).Get(ctx, nil); err != nil {
workflow.GetLogger(ctx).Error("failed to fetch accounts", "error", err)
*errPtr = err
}
})
}
}

wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) {
func (w Workflow) startFetchNextPaymentsWorkflow(wg workflow.WaitGroup, fetchOpenBankingData FetchOpenBankingData, errPtr *error) func(ctx workflow.Context) {
return func(ctx workflow.Context) {
defer wg.Done()

if err := workflow.ExecuteChildWorkflow(
Expand All @@ -77,51 +160,9 @@ func (w Workflow) runFetchOpenBankingData(
[]models.ConnectorTaskTree{},
).Get(ctx, nil); err != nil {
workflow.GetLogger(ctx).Error("failed to fetch payments", "error", err)
*errPtr = err
}
})

wg.Wait(ctx)

now := workflow.Now(ctx)

err := activities.StorageOpenBankingConnectionsLastUpdatedAtUpdate(
infiniteRetryContext(ctx),
fetchOpenBankingData.PsuID,
fetchOpenBankingData.ConnectorID,
fetchOpenBankingData.ConnectionID,
now,
)
if err != nil {
return fmt.Errorf("updating open banking connection last updated at: %w", err)
}

sendEvent := SendEvents{
UserConnectionDataSynced: &models.UserConnectionDataSynced{
PsuID: fetchOpenBankingData.PsuID,
ConnectorID: fetchOpenBankingData.ConnectorID,
ConnectionID: fetchOpenBankingData.ConnectionID,
At: now,
},
}

if err := workflow.ExecuteChildWorkflow(
workflow.WithChildOptions(
ctx,
workflow.ChildWorkflowOptions{
TaskQueue: w.getDefaultTaskQueue(),
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
SearchAttributes: map[string]interface{}{
SearchAttributeStack: w.stack,
},
},
),
RunSendEvents,
sendEvent,
).Get(ctx, nil); err != nil {
return fmt.Errorf("sending events: %w", err)
}

return nil
}

const RunFetchOpenBankingData = "RunFetchOpenBankingData"
Loading
Loading