Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module code.houdinigraphql.com
go 1.23.2

require (
github.com/gorilla/websocket v1.5.3
github.com/joho/godotenv v1.5.1
github.com/spf13/afero v1.12.0
github.com/stretchr/testify v1.10.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
Expand Down
2 changes: 1 addition & 1 deletion packages/houdini-core/plugin/documents/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func ValidateDuplicateArgumentInField(
err := db.StepQuery(ctx, query, nil, func(row *sqlite.Stmt) {
selectionID := row.ColumnText(0)
argName := row.ColumnText(1)
filepath := row.ColumnText(2)
filepath := row.ColumnText(3)
rowNum := row.ColumnInt(4)
colNum := row.ColumnInt(5)

Expand Down
3 changes: 2 additions & 1 deletion packages/houdini/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
"minimatch": "^5.1.0",
"node-fetch": "^3.2.10",
"npx-import": "^1.1.3",
"recast": "^0.23.1"
"recast": "^0.23.1",
"ws": "^8.18.0"
},
"peerDependencies": {
"vite": "^5.3.3"
Expand Down
219 changes: 159 additions & 60 deletions packages/houdini/src/lib/codegen.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { type ChildProcess, spawn } from 'node:child_process'
import { randomUUID } from 'node:crypto'
import path from 'node:path'
import sqlite, { type DatabaseSync } from 'node:sqlite'
import { WebSocket } from 'ws'

import type { ProjectManifest } from '../runtime'
import { db_path, houdini_root } from './conventions.js'
import type * as routerConventions from './conventions.js'
import { create_schema, write_config } from './database.js'
import { format_hook_error, type HookError } from './error.js'
import type { HookError } from './error.js'
import { format_hook_error } from './error.js'
import * as fs from './fs.js'
import type { Config } from './project.js'

Expand All @@ -27,9 +30,7 @@ export type Adapter = ((args: {
manifest: ProjectManifest
adapterPath: string
}) => void | Promise<void>) & {
includePaths?:
| Record<string, string>
| ((args: { config: Config }) => Record<string, string>)
includePaths?: Record<string, string> | ((args: { config: Config }) => Record<string, string>)
disableServer?: boolean
pre?: (args: {
config: Config
Expand Down Expand Up @@ -76,7 +77,7 @@ export type CompilerProxy = {
close: () => Promise<void>
trigger_hook: (
name: PipelineHook,
opts?: { parallel_safe?: boolean; payload?: {}; task_id?: string },
opts?: { parallel_safe?: boolean; payload?: {}; task_id?: string }
) => Promise<Record<string, any> | null>
database_path: string
}
Expand All @@ -87,7 +88,7 @@ export async function codegen_setup(
config: Config,
mode: string,
db: DatabaseSync,
db_file: string,
db_file: string
): Promise<CompilerProxy> {
// We need the root dir before we get to the exciting stuff
await fs.mkdirpSync(houdini_root(config))
Expand Down Expand Up @@ -119,10 +120,8 @@ export async function codegen_setup(

// update the plugin spec with the user provided config
db.prepare('UPDATE plugins set config = ? where name = ?').run(
JSON.stringify(
config.plugins.find((p) => p.name === name)?.config ?? {},
),
name,
JSON.stringify(config.plugins.find((p) => p.name === name)?.config ?? {}),
name
)

// create the plugin spec
Expand All @@ -131,8 +130,7 @@ export async function codegen_setup(
port: row.port,
hooks: new Set(JSON.parse(row.hooks)),
order: row.plugin_order as 'before' | 'after' | 'core',
directory:
config.plugins.find((p) => p.name === name)?.directory || '',
directory: config.plugins.find((p) => p.name === name)?.directory || '',
}

// store the spec
Expand Down Expand Up @@ -197,53 +195,137 @@ export async function codegen_setup(
...(await wait_for_plugin(plugin.name)),
}
console.timeEnd(`Spawn ${plugin.name}`)
}),
})
)
console.timeEnd('Start Plugins')

const invoke_hook = async (
name: string,
hook: string,
payload: Record<string, any> = {},
task_id?: string,
task_id?: string
) => {
const { port, directory } = plugin_specs[name]

// make the request
const response = await fetch(
`http://localhost:${port}/${hook.toLowerCase()}`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Task-ID': task_id?.toString() ?? '',
'X-Plugin-Directory': directory,
},
body: JSON.stringify(payload),
},
)

// if the request failed, throw an error
if (!response.ok) {
if (response.status === 404) {
throw new Error(`Plugin ${name} does not support hook ${hook}`)
}
const responseJSON = await response.json()
const errors: HookError[] = Array.isArray(responseJSON)
? responseJSON
: [responseJSON]
errors.forEach((error) => {
format_hook_error(config.root_dir, error, name)
})
// errors
throw new Error(`Failed to call ${name}/${hook.toLowerCase()}`)
// All hooks now use WebSocket
return await invoke_hook_websocket(name, hook, payload, task_id, port, directory)
}

const wsConnections = new Map<string, WebSocket>()
const pendingRequests = new Map<
string,
{
resolve: (value: any) => void
reject: (reason: any) => void
timeout: NodeJS.Timeout
}
// look at the response headers, and if the content type is application/json, parse the body
const contentType = response.headers.get('content-type')
if (contentType && contentType.includes('application/json')) {
return await response.json()
>()

async function getOrCreateWS(name: string, port: number): Promise<WebSocket> {
const existing = wsConnections.get(name)
if (existing && existing.readyState === WebSocket.OPEN) {
return existing
}
return await response.text()

return new Promise((resolve, reject) => {
// get a new connection
const wsUrl = `ws://localhost:${port}/ws`
const ws = new WebSocket(wsUrl)

ws.on('open', () => {
// clear any pending requests
wsConnections.set(name, ws)
return resolve(ws)
})

// Set up message handler for this connection
ws.on('message', (data: Buffer) => {
try {
const response = JSON.parse(data.toString())
const pending = pendingRequests.get(response.id)
if (!pending) return

clearTimeout(pending.timeout)
pendingRequests.delete(response.id)

switch (response.type) {
case 'error':
// Non-fatal error - log and continue listening
console.error(`! [${name}] ${response.error}`)
pending.reject(new Error(`${name}: ${response.error}`))
break

case 'response':
if (response.error) {
// Handle errors like the old HTTP implementation
const errors: HookError[] = Array.isArray(response.error)
? response.error
: [response.error]

errors.forEach((error) => {
format_hook_error(config.root_dir, error, name)
})

pending.reject(new Error(`Failed to call ${name}`))
} else {
pending.resolve(response.result)
}
break

default:
console.warn(`[${name}] Unknown message type: ${response.type}`)
pending.reject(new Error(`Unknown message type: ${response.type}`))
}
} catch (err) {
// if parsing the message fails, then we are not sure which pending request it belongs to
// just log and move on
console.error(`Error processing WebSocket message for ${name}:`, err)
}
})

ws.on('error', (err) => {
console.error(`WebSocket error for ${name}:`, err)
wsConnections.delete(name)
reject(new Error(`WebSocket error for ${name}: ${err}`))
})

ws.on('close', () => {
// Remove from pool so next request creates new connection
// requestsd will eventually timeout and be rejected, we can agressively remove them but it's not a big deal
wsConnections.delete(name)
})
})
}

const invoke_hook_websocket = async (
name: string,
hook: string,
payload: Record<string, any> = {},
task_id: string | undefined,
port: number,
directory: string
): Promise<any> => {
const ws = await getOrCreateWS(name, port)

return new Promise((resolve, reject) => {
const messageId = `${hook}-${Date.now()}-${randomUUID()}`

const timeout = setTimeout(() => {
pendingRequests.delete(messageId)
reject(new Error(`WebSocket request timeout for ${name}/${hook}`))
}, 30000)
pendingRequests.set(messageId, { resolve, reject, timeout })

const message = {
id: messageId,
type: 'request',
hook: hook,
payload: payload,
taskId: task_id,
pluginDirectory: directory,
}
ws.send(JSON.stringify(message))
})
}

const trigger_hook = async (
Expand All @@ -256,14 +338,12 @@ export async function codegen_setup(
parallel_safe?: boolean
payload?: Record<string, any>
task_id?: string
} = {},
} = {}
) => {
const timeName = hook + (task_id ? ` (${task_id})` : '')
console.time(timeName)
// look for all of the plugins that have registered for this hook
const plugins = Object.entries(plugin_specs).filter(([, { hooks }]) =>
hooks.has(hook),
)
const plugins = Object.entries(plugin_specs).filter(([, { hooks }]) => hooks.has(hook))

const result: Record<string, any> = {}

Expand All @@ -272,7 +352,7 @@ export async function codegen_setup(
await Promise.all(
plugins.map(async ([plugin]) => {
result[plugin] = await invoke_hook(plugin, hook, payload, task_id)
}),
})
)
} else {
// if the hook isn't parallel safe, we need to run the plugins in order
Expand Down Expand Up @@ -301,6 +381,27 @@ export async function codegen_setup(
database_path: db_file,
trigger_hook,
close: async () => {
// close ws connections first, this will trigger plugin processes to exit gracefully
for (const [name, ws] of wsConnections.entries()) {
try {
if (ws.readyState === WebSocket.OPEN) {
ws.close()
}
} catch (err) {
console.error(`Error closing WebSocket for ${name}:`, err)
}
}
wsConnections.clear()

// clear pending requests
for (const [, { timeout }] of pendingRequests.entries()) {
clearTimeout(timeout)
}
pendingRequests.clear()

// give plugins a moment to exit gracefully
await new Promise((resolve) => setTimeout(resolve, 100))

// Close our connection to the database
try {
db.close()
Expand All @@ -327,10 +428,12 @@ export async function codegen_setup(
try {
// The child was spawned with detached: true so that it is its own process group.
process.kill(-plugin.process.pid, 'SIGINT')
} catch (err) {}
} catch (err) {
console.error(`Error killing plugin ${plugin.name}:`, err)
}
}
}
}),
})
)
},
}
Expand Down Expand Up @@ -363,7 +466,7 @@ export type RunPipelineOptions = {

export async function run_pipeline(
trigger_hook: CompilerProxy['trigger_hook'],
options: RunPipelineOptions = {},
options: RunPipelineOptions = {}
): Promise<Record<PipelineHook, Record<string, any>>> {
const { task_id, after, start, through } = options
const results: Record<string, any> = {}
Expand Down Expand Up @@ -405,11 +508,7 @@ export async function run_pipeline(
const opts: any = { task_id }

// Set parallel_safe for hooks that support it
if (
hook === 'Validate' ||
hook === 'GenerateDocuments' ||
hook === 'GenerateRuntime'
) {
if (hook === 'Validate' || hook === 'GenerateDocuments' || hook === 'GenerateRuntime') {
opts.parallel_safe = true
}

Expand Down
Loading
Loading