Skip to content
This repository has been archived by the owner on Nov 21, 2023. It is now read-only.

Refactor plugin not to rely on historic export internals #19

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 3 additions & 65 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ import fetch from 'node-fetch'

export interface Migrator3000MetaInput {
global: {
startDate: string
debug: boolean
versionMinor: number
versionMajor: number
}
config: {
host: string
projectApiKey: string
startDate: string
debug: 'ON' | 'OFF'
posthogVersion: string
}
}
Expand All @@ -23,8 +19,6 @@ interface PluginEventExtra extends PluginEvent {
api_key?: string
}

const TEN_MINUTES = 10 * 60 * 1000

const ELEMENT_TRANSFORMATIONS: Record<string, string> = {
text: '$el_text',
attr_class: 'attr__class',
Expand Down Expand Up @@ -66,68 +60,12 @@ const parseAndSendEvents = async (events: PluginEventExtra[], { config, global }
body: JSON.stringify(batch),
headers: { 'Content-Type': 'application/json' },
})
if (global.debug) {
const textRes = await res.text()
console.log('RESPONSE:', textRes)
}
console.log(`Flushing ${batch.length} event${batch.length > 1 ? 's' : ''} to ${config.host}`)
} else if (global.debug) {
console.log('Skipping empty batch of events')
}
}

const plugin: Plugin<Migrator3000MetaInput> = {
jobs: {
'[ADVANCED] Force restart': async (_, { storage, jobs }) => {
await storage.del('is_export_running')
const cursor = await storage.get('timestamp_cursor', null)
if (cursor) {
const dateFrom = new Date(Number(cursor)).toISOString()
console.log(`Restarting export from ${dateFrom}`)
await jobs['Export historical events']({
dateFrom,
dateTo: new Date().toISOString(),
}).runNow()
} else {
throw new Error('Unable to restart correctly')
}
},
parseAndSendEvents: async (payload, meta) => {
await parseAndSendEvents(payload.events, meta)
}
},
runEveryMinute: async ({ global, jobs, storage, cache }) => {
const currentDate = new Date()
const lastRun = await cache.get('last_run', null)
if (!lastRun || currentDate.getTime() - Number(lastRun) > TEN_MINUTES) {
// this "magic" key is added via the historical export upgrade
const isExportRunning = await storage.get('is_export_running', false)
if (isExportRunning) {
return
}

const previousMaxDate = await storage.get('max_date', global.startDate)

await jobs['Export historical events']({
dateFrom: previousMaxDate,
dateTo: currentDate.toISOString(),
}).runNow()

console.log(`Now starting export of events from ${previousMaxDate} to ${currentDate.toISOString()}`)
await storage.set('max_date', currentDate.toISOString())
await cache.set('last_run', currentDate.getTime())
}
},

setupPlugin: async ({ config, global }) => {
try {
global.startDate = config.startDate ? new Date(config.startDate).toISOString() : null
} catch (e) {
console.log(`Failed to parse start date. Make sure to use the format YYYY-MM-DD`)
throw e
}
global.debug = config.debug === 'ON'

if (config.posthogVersion === "Latest" || config.posthogVersion === "1.30.0+") {
global.versionMajor = 1
global.versionMinor = 31
Expand All @@ -142,21 +80,21 @@ const plugin: Plugin<Migrator3000MetaInput> = {
throw new Error('Invalid PostHog version')
}
},
exportEvents: async (events: PluginEventExtra[], { global, jobs }) => {
exportEvents: async (events: PluginEventExtra[], meta) => {
if (events.length === 0) {
return
}

// dont export live events, only historical ones
if (global.versionMajor > 1 || (global.versionMajor === 1 && global.versionMinor > 29)) {
if (meta.global.versionMajor > 1 || (meta.global.versionMajor === 1 && meta.global.versionMinor > 29)) {
if (!events[0].properties || !events[0].properties['$$is_historical_export_event']) {
return
}
} else if (events[0].uuid) {
return
}

await jobs.parseAndSendEvents({ events }).runNow()
await parseAndSendEvents(events, meta)
},
}

Expand Down
21 changes: 1 addition & 20 deletions plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,6 @@
"type": "string",
"required": true
},
{
"key": "startDate",
"hint": "Use format YYYY-MM-DD e.g. 2021-10-26 = 26th October 2021. Not specifying a value will export all events.",
"name": "Date to start exporting events from",
"type": "string",
"required": false
},
{
"key": "debug",
"hint": "Turn this on to get additional logging from the plugin",
"name": "DEBUG",
"type": "choice",
"choices": ["ON", "OFF"],
"default": "OFF",
"required": false
},
{
"key": "posthogVersion",
"hint": "The PostHog version this instance is on. Format: x.y.z. Select 'Latest' when using this in PostHog Cloud.",
Expand All @@ -43,8 +27,5 @@
"default": "Latest",
"required": false
}
],
"publicJobs": {
"[ADVANCED] Force restart": {}
}
]
}