-
Notifications
You must be signed in to change notification settings - Fork 9
feat: connector development server #534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
feat: adding a connector development server with direct access to connectors functions Signed-off-by: Clément Salaün <[email protected]>
WalkthroughAdds a new in-process connector development HTTP server with REST endpoints, per-connector state caching, and lifecycle via Uber Fx. Provides routes to list providers, create/list/get connectors, fetch data (accounts, balances, external-accounts, payments) with reset/caching options. Documents internal tools in tools/README.md. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Dev as Developer
participant Fx as fx.App
participant Router as HTTP Router
participant Server as http.Server
Dev->>Fx: Start application
Fx->>Fx: Provide(debug, addr, router)
Fx->>Server: Construct http.Server(addr, router)
Fx->>Server: OnStart -> ListenAndServe (goroutine)
Note right of Server: Prints "dev server listening on :8081"
Dev->>Server: HTTP requests
Server->>Router: Dispatch handlers
Dev->>Fx: Stop application
Fx->>Server: OnStop -> Shutdown(ctx)
sequenceDiagram
autonumber
participant Client as Client
participant API as Dev-Server API
participant Registry as Plugin Registry
participant Plugin as Connector Plugin
participant Cache as StateCache (in-memory)
Client->>API: POST /connectors {provider,name,config}
API->>Registry: GetConfig/GetPlugin
Registry-->>API: Plugin instance
API->>Plugin: instantiate with connectorId
API->>API: Store plugin by connectorId
Client->>API: POST /connectors/{id}/fetch/accounts {Reset,UseCachedState,State}
alt Reset
API->>Cache: Clear "fetch/accounts"
end
opt UseCachedState and no State
API->>Cache: Read cached state
Cache-->>API: cached State?
end
API->>Plugin: FetchNextAccounts(request)
Plugin-->>API: {State, HasMore, PageSize, Data}
API->>Cache: Upsert cachedState "fetch/accounts"
API-->>Client: Response JSON
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (6)
tools/README.md (1)
8-8
: Use headings instead of bold for section titles (MD036).Promote emphasized lines to proper subheadings.
-**Shell script that auto-generates connector import list** +#### Shell script that auto-generates connector import list -**Go tool that generates OpenAPI schemas from connector configs** +#### Go tool that generates OpenAPI schemas from connector configs -**Go tool that extracts connector capabilities** +#### Go tool that extracts connector capabilities -**Simple dev server for testing individual connectors** +#### Simple dev server for testing individual connectors -**Code generator for new connectors** +#### Code generator for new connectors -**Temporal schedule management** +#### Temporal schedule management -**Temporal workflow management** +#### Temporal workflow managementAlso applies to: 14-14, 21-21, 30-30, 36-36, 44-44, 50-50
tools/connector-dev-server/main.go (2)
15-20
: Drop nil ServeMux provider and log ListenAndServe errors.Avoid injecting a nil dependency and surface server errors in dev.
fx.Provide( func() bool { return true /* debug */ }, func() string { return ":8081" /* listen addr */ }, - func(debug bool) *http.ServeMux { return nil }, // placeholder to satisfy goimports func(debug bool) http.Handler { return newRouter(debug) }, ), @@ OnStart: func(ctx context.Context) error { go func() { - _ = server.ListenAndServe() + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + fmt.Printf("dev server error: %v\n", err) + } }()Also applies to: 25-27
16-18
: Make debug flag and listen addr configurable via env.Consider CONNECTOR_DEV_DEBUG and CONNECTOR_DEV_ADDR to avoid code edits during local testing.
tools/connector-dev-server/api.go (3)
140-140
: Update stale comment to match routes.- // removed singular /plugin endpoint; use /plugins and /plugins/{connectorId} + // removed singular /plugin endpoint; use /connectors and /connectors/{connectorId}
153-154
: Return “connectors” key for consistency with path naming.Optional, but aligns response shape with /connectors.
- _ = json.NewEncoder(w).Encode(map[string]any{"plugins": list}) + _ = json.NewEncoder(w).Encode(map[string]any{"connectors": list})
100-109
: Validate provider consistency when client supplies connectorId.Reject requests where connectorId.provider differs from body.provider to avoid surprising cross-provider IDs.
var cid models.ConnectorID if strings.TrimSpace(body.ConnectorID) != "" { parsed, err := models.ConnectorIDFromString(body.ConnectorID) if err != nil { w.WriteHeader(http.StatusBadRequest) _ = json.NewEncoder(w).Encode(map[string]any{"message": "invalid connectorId", "details": err.Error()}) return } + if parsed.Provider != strings.ToLower(body.Provider) { + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(map[string]any{ + "message": "connectorId provider mismatch", + "details": "connectorId provider " + parsed.Provider + " != body provider " + strings.ToLower(body.Provider), + }) + return + } cid = parsed } else {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
tools/README.md
(1 hunks)tools/connector-dev-server/api.go
(1 hunks)tools/connector-dev-server/main.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tools/connector-dev-server/api.go (1)
internal/connectors/plugins/registry/plugins.go (3)
GetConfigs
(147-157)GetConfig
(159-166)GetPlugin
(112-125)
🪛 LanguageTool
tools/README.md
[grammar] ~3-~3: Ensure spelling is correct
Context: ... contains a series of tools helping the develoment and build process. ## Core Build Tools...
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
[grammar] ~17-~17: There might be a mistake here.
Context: ...ags to generate OpenAPI YAML - Outputs: openapi/v3/v3-connectors-config.yaml
- Run: just compile-connector-configs
#...
(QB_NEW_EN)
[grammar] ~22-~22: There might be a mistake here.
Context: ...lities** - Reads connector capabilities from registry - Generates JSON mapping of pr...
(QB_NEW_EN)
[grammar] ~30-~30: There might be a mistake here.
Context: ...server for testing individual connectors** - Imports single connector for isolated te...
(QB_NEW_EN)
[grammar] ~31-~31: There might be a mistake here.
Context: ...ts single connector for isolated testing - Provides basic HTTP API for connector op...
(QB_NEW_EN)
[grammar] ~32-~32: There might be a mistake here.
Context: ... basic HTTP API for connector operations - Useful for development and debugging ##...
(QB_NEW_EN)
🪛 markdownlint-cli2 (0.17.2)
tools/README.md
8-8: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
14-14: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
21-21: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
30-30: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
36-36: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
44-44: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
50-50: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
cid = models.ConnectorID{Reference: uuid.Must(uuid.NewUUID()), Provider: strings.ToLower(body.Provider)} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid panic on UUID generation.
Use v4 UUID (uuid.New()) instead of Must(NewUUID()) to prevent process panic in dev.
- cid = models.ConnectorID{Reference: uuid.Must(uuid.NewUUID()), Provider: strings.ToLower(body.Provider)}
+ cid = models.ConnectorID{Reference: uuid.New(), Provider: strings.ToLower(body.Provider)}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
cid = models.ConnectorID{Reference: uuid.Must(uuid.NewUUID()), Provider: strings.ToLower(body.Provider)} | |
} | |
cid = models.ConnectorID{Reference: uuid.New(), Provider: strings.ToLower(body.Provider)} |
🤖 Prompt for AI Agents
In tools/connector-dev-server/api.go around lines 110-111, the code uses
uuid.Must(uuid.NewUUID()) which can panic; replace this with a safe v4
generation using uuid.New() (or the package's v4 equivalent) and check the
returned error, assigning the UUID only on success and returning/handling an
error (e.g., respond with a 500 and log the failure) instead of letting the
process panic.
pluginsMu.RLock() | ||
e, ok := pluginsByID[id] | ||
pluginsMu.RUnlock() | ||
if !ok { | ||
w.WriteHeader(http.StatusNotFound) | ||
_ = json.NewEncoder(w).Encode(map[string]any{"message": "plugin not found"}) | ||
return | ||
} | ||
_ = json.NewEncoder(w).Encode(map[string]any{ | ||
"fetch": map[string]cachedState{ | ||
"accounts": e.StateCache["fetch/accounts"], | ||
"external-accounts": e.StateCache["fetch/external-accounts"], | ||
"balances": e.StateCache["fetch/balances"], | ||
"payments": e.StateCache["fetch/payments"], | ||
}, | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix data race: reading StateCache map after unlocking.
You copy pluginEntry, unlock, then read its map — can panic with “concurrent map read and map write”. Snapshot under the lock, then unlock.
- pluginsMu.RLock()
- e, ok := pluginsByID[id]
- pluginsMu.RUnlock()
- if !ok {
+ pluginsMu.RLock()
+ e, ok := pluginsByID[id]
+ var accounts, external, balances, payments cachedState
+ if ok {
+ accounts = e.StateCache["fetch/accounts"]
+ external = e.StateCache["fetch/external-accounts"]
+ balances = e.StateCache["fetch/balances"]
+ payments = e.StateCache["fetch/payments"]
+ }
+ pluginsMu.RUnlock()
+ if !ok {
w.WriteHeader(http.StatusNotFound)
_ = json.NewEncoder(w).Encode(map[string]any{"message": "plugin not found"})
return
}
- _ = json.NewEncoder(w).Encode(map[string]any{
- "fetch": map[string]cachedState{
- "accounts": e.StateCache["fetch/accounts"],
- "external-accounts": e.StateCache["fetch/external-accounts"],
- "balances": e.StateCache["fetch/balances"],
- "payments": e.StateCache["fetch/payments"],
- },
- })
+ _ = json.NewEncoder(w).Encode(map[string]any{
+ "fetch": map[string]cachedState{
+ "accounts": accounts,
+ "external-accounts": external,
+ "balances": balances,
+ "payments": payments,
+ },
+ })
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pluginsMu.RLock() | |
e, ok := pluginsByID[id] | |
pluginsMu.RUnlock() | |
if !ok { | |
w.WriteHeader(http.StatusNotFound) | |
_ = json.NewEncoder(w).Encode(map[string]any{"message": "plugin not found"}) | |
return | |
} | |
_ = json.NewEncoder(w).Encode(map[string]any{ | |
"fetch": map[string]cachedState{ | |
"accounts": e.StateCache["fetch/accounts"], | |
"external-accounts": e.StateCache["fetch/external-accounts"], | |
"balances": e.StateCache["fetch/balances"], | |
"payments": e.StateCache["fetch/payments"], | |
}, | |
}) | |
}) | |
pluginsMu.RLock() | |
e, ok := pluginsByID[id] | |
var accounts, external, balances, payments cachedState | |
if ok { | |
accounts = e.StateCache["fetch/accounts"] | |
external = e.StateCache["fetch/external-accounts"] | |
balances = e.StateCache["fetch/balances"] | |
payments = e.StateCache["fetch/payments"] | |
} | |
pluginsMu.RUnlock() | |
if !ok { | |
w.WriteHeader(http.StatusNotFound) | |
_ = json.NewEncoder(w).Encode(map[string]any{"message": "plugin not found"}) | |
return | |
} | |
_ = json.NewEncoder(w).Encode(map[string]any{ | |
"fetch": map[string]cachedState{ | |
"accounts": accounts, | |
"external-accounts": external, | |
"balances": balances, | |
"payments": payments, | |
}, | |
}) |
🤖 Prompt for AI Agents
In tools/connector-dev-server/api.go around lines 185 to 201, the handler reads
e.StateCache after releasing pluginsMu which can cause a concurrent map
read/write panic; snapshot the needed cached values for "fetch/accounts",
"fetch/external-accounts", "fetch/balances", and "fetch/payments" while holding
the lock (e.g., copy them into local variables or a small map), then release the
lock and encode the copied snapshot to the response so no map access occurs
after unlocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@altitude see what I was saying with coderabbit getting better? 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
@@ -0,0 +1,63 @@ | |||
# Internal Development Support Tools | |||
|
|||
This directory contains a series of tools helping the develoment and build process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo and remove stray trailing line.
- "develoment" → "development".
- Remove the dangling "64" at EOF.
-This directory contains a series of tools helping the develoment and build process.
+This directory contains a series of tools helping the development and build process.
-
-64
Also applies to: 64-64
🧰 Tools
🪛 LanguageTool
[grammar] ~3-~3: Ensure spelling is correct
Context: ... contains a series of tools helping the develoment and build process. ## Core Build Tools...
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
🤖 Prompt for AI Agents
In tools/README.md around line 3 (and also at line 64), fix the misspelling
"develoment" to "development" and remove the stray trailing "64" at EOF; update
the sentence to read "This directory contains a series of tools helping the
development and build process." and delete the dangling "64" so the file ends
cleanly without extraneous characters.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #534 +/- ##
==========================================
- Coverage 66.71% 66.57% -0.15%
==========================================
Files 761 763 +2
Lines 39754 39887 +133
==========================================
+ Hits 26522 26554 +32
- Misses 11846 11942 +96
- Partials 1386 1391 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to rebase to main because the refactor I did to improve config validation recently played around with the registry a bit.
All tools are orchestrated via the main `Justfile`: | ||
|
||
```bash | ||
just pre-commit # Runs all build tools |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No changes were made to the Justfile, so I don't think this is the case?
models.FetchNextExternalAccountsRequest | ||
UseCachedState bool `json:"useCachedState"` | ||
Reset bool `json:"reset"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an incredibly counter intuitive API... if the state is held internally in a cache it should be able use its own cache instead of requiring the user to use the /state
endpoint to fetch the state and then reformulate it into the request payload, right?
All tools are orchestrated via the main `Justfile`: | ||
|
||
```bash | ||
just pre-commit # Runs all build tools |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't run all tools right?
}) | ||
}) | ||
|
||
// removed singular /plugin endpoint; use /plugins and /plugins/{connectorId} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover comment
**Simple dev server for testing individual connectors** | ||
- Imports single connector for isolated testing | ||
- Provides basic HTTP API for connector operations | ||
- Useful for development and debugging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add how to run it
pluginsMu.RLock() | ||
e, ok := pluginsByID[id] | ||
pluginsMu.RUnlock() | ||
if !ok { | ||
w.WriteHeader(http.StatusNotFound) | ||
_ = json.NewEncoder(w).Encode(map[string]any{"message": "plugin not found"}) | ||
return | ||
} | ||
_ = json.NewEncoder(w).Encode(map[string]any{ | ||
"fetch": map[string]cachedState{ | ||
"accounts": e.StateCache["fetch/accounts"], | ||
"external-accounts": e.StateCache["fetch/external-accounts"], | ||
"balances": e.StateCache["fetch/balances"], | ||
"payments": e.StateCache["fetch/payments"], | ||
}, | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@altitude see what I was saying with coderabbit getting better? 😂
// Only works when exactly one plugin is instantiated | ||
pluginsMu.RLock() | ||
var p models.Plugin | ||
if len(pluginsByID) == 1 { | ||
for _, e := range pluginsByID { | ||
p = e.Plugin | ||
} | ||
} | ||
pluginsMu.RUnlock() | ||
|
||
if p == nil { | ||
w.WriteHeader(http.StatusConflict) | ||
_ = json.NewEncoder(w).Encode(map[string]any{"message": "requires exactly one plugin; use /connectors/{connectorId}/fetch/balances"}) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move that in a separate method as it's repeted twice (+ we'll want the same for payment & transactions I assume)
entry := pluginsByID[id] | ||
c, ok := entry.StateCache["fetch/accounts"] | ||
pluginsMu.RUnlock() | ||
if ok && len(c.State) != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the updated body state is not used for accounts; it is for balances though
}) | ||
|
||
r.Post("/fetch/accounts", func(w http.ResponseWriter, req *http.Request) { | ||
// Only works when exactly one plugin is instantiated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally I'd just forward that to connectors//fetch/accounts, same for balances
c, ok := entry.StateCache["fetch/external-accounts"] | ||
pluginsMu.RUnlock() | ||
if ok && len(c.State) != 0 { | ||
body.State = c.State |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state ignored here too
This PR introduces a minimal HTTP dev server to run connector plugins locally without the full stack. It tightens the feedback loop for developing and debugging connector code by exposing thin HTTP wrappers around plugin methods and providing simple pagination state handling.
What’s included
tools/connector-dev-server
usingchi
+fx
./providers
: array of registered providers/providers/{provider}/config-schema
: provider config schema/connectors
: instantiate a plugin (in-memory)/connectors
: list instantiated connectors/connectors/{id}
: connector info/connectors/{id}/state
: last cached pagination states/connectors/{id}/fetch/accounts
/connectors/{id}/fetch/external-accounts
/connectors/{id}/fetch/balances
(acceptsaccount
orfromPayload
)/connectors/{id}/fetch/payments
useCachedState
: auto-continue from lastnewState
reset
: clear cached state before calling