From e7371ab7d6ed00d6d17d1d34dc72869b56c447f3 Mon Sep 17 00:00:00 2001 From: Tim Roes Date: Thu, 27 Apr 2023 18:34:33 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=9F=20=F0=9F=94=A7=20Refactor=20Jobs?= =?UTF-8?q?=20APIs=20(#6204)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../components/JobItem/components/JobLogs.tsx | 2 +- .../ConnectionSync/ConnectionSyncContext.tsx | 2 +- airbyte-webapp/src/core/api/hooks/index.ts | 1 + airbyte-webapp/src/core/api/hooks/jobs.ts | 60 ++++++++++++ airbyte-webapp/src/core/domain/job/Job.ts | 9 -- .../src/core/domain/job/JobsService.tsx | 20 ---- airbyte-webapp/src/core/domain/job/index.ts | 2 - .../src/hooks/services/useSourceHook.tsx | 5 +- .../ConnectionJobHistoryPage.tsx | 2 +- .../StreamStatusPage/StreamStatusPage.tsx | 2 +- .../src/services/job/JobService.tsx | 92 ------------------- 11 files changed, 67 insertions(+), 130 deletions(-) create mode 100644 airbyte-webapp/src/core/api/hooks/jobs.ts delete mode 100644 airbyte-webapp/src/core/domain/job/Job.ts delete mode 100644 airbyte-webapp/src/core/domain/job/JobsService.tsx delete mode 100644 airbyte-webapp/src/core/domain/job/index.ts delete mode 100644 airbyte-webapp/src/services/job/JobService.tsx diff --git a/airbyte-webapp/src/components/JobItem/components/JobLogs.tsx b/airbyte-webapp/src/components/JobItem/components/JobLogs.tsx index d6b63b8e7ae..839d8ac4bbf 100644 --- a/airbyte-webapp/src/components/JobItem/components/JobLogs.tsx +++ b/airbyte-webapp/src/components/JobItem/components/JobLogs.tsx @@ -8,8 +8,8 @@ import { StatusIcon } from "components/ui/StatusIcon"; import { StatusIconStatus } from "components/ui/StatusIcon/StatusIcon"; import { Text } from "components/ui/Text"; +import { useGetDebugInfoJob } from "core/api"; import { AttemptRead, AttemptStatus, SynchronousJobRead } from "core/request/AirbyteClient"; -import { useGetDebugInfoJob } from "services/job/JobService"; import styles from "./JobLogs.module.scss"; import { LogsDetails } from "./LogsDetails"; diff --git a/airbyte-webapp/src/components/connection/ConnectionSync/ConnectionSyncContext.tsx b/airbyte-webapp/src/components/connection/ConnectionSync/ConnectionSyncContext.tsx index 975182ce25f..1239e757a61 100644 --- a/airbyte-webapp/src/components/connection/ConnectionSync/ConnectionSyncContext.tsx +++ b/airbyte-webapp/src/components/connection/ConnectionSync/ConnectionSyncContext.tsx @@ -4,6 +4,7 @@ import { createContext, useCallback, useContext, useEffect, useMemo, useState } import { Status as ConnectionSyncStatus } from "components/EntityTable/types"; import { getConnectionSyncStatus } from "components/EntityTable/utils"; +import { useCancelJob } from "core/api"; import { JobRead, ConnectionStatus, @@ -15,7 +16,6 @@ import { } from "core/request/AirbyteClient"; import { useConnectionEditService } from "hooks/services/ConnectionEdit/ConnectionEditService"; import { useResetConnection, useResetConnectionStream, useSyncConnection } from "hooks/services/useConnectionHook"; -import { useCancelJob } from "services/job/JobService"; import { moveTimeToFutureByPeriod } from "utils/time"; interface ConnectionSyncContext { diff --git a/airbyte-webapp/src/core/api/hooks/index.ts b/airbyte-webapp/src/core/api/hooks/index.ts index 9da3724cadf..f7c8e05a4ae 100644 --- a/airbyte-webapp/src/core/api/hooks/index.ts +++ b/airbyte-webapp/src/core/api/hooks/index.ts @@ -1,5 +1,6 @@ export * from "./geographies"; export * from "./health"; +export * from "./jobs"; export * from "./logs"; export * from "./notifications"; export * from "./operations"; diff --git a/airbyte-webapp/src/core/api/hooks/jobs.ts b/airbyte-webapp/src/core/api/hooks/jobs.ts new file mode 100644 index 00000000000..efe3567919b --- /dev/null +++ b/airbyte-webapp/src/core/api/hooks/jobs.ts @@ -0,0 +1,60 @@ +import { useMutation, useQuery } from "react-query"; + +import { SCOPE_WORKSPACE } from "services/Scope"; + +import { cancelJob, getJobDebugInfo, listJobsFor } from "../generated/AirbyteClient"; +import { JobDebugInfoRead, JobListRequestBody, JobReadList } from "../types/AirbyteClient"; +import { useRequestOptions } from "../useRequestOptions"; +import { useSuspenseQuery } from "../useSuspenseQuery"; + +export const useListJobs = (listParams: JobListRequestBody, keepPreviousData = true) => { + const requestOptions = useRequestOptions(); + const result = useQuery( + [SCOPE_WORKSPACE, "jobs", "list", listParams.configId, listParams.includingJobId, listParams.pagination], + () => listJobsFor(listParams, requestOptions), + { + // 2.5 second refresh + refetchInterval: 2500, + keepPreviousData, + suspense: true, + } + ); + // cast to JobReadList because (suspense: true) means we will never get undefined + const jobReadList = result.data as JobReadList; + return { jobs: jobReadList.jobs, totalJobCount: jobReadList.totalJobCount, isPreviousData: result.isPreviousData }; +}; + +export const useGetDebugInfoJob = ( + id: number, + enabled = true, + refetchWhileRunning = false +): JobDebugInfoRead | undefined => { + const requestOptions = useRequestOptions(); + + return useSuspenseQuery( + [SCOPE_WORKSPACE, "jobs", "getDebugInfo", id], + () => getJobDebugInfo({ id }, requestOptions), + { + refetchInterval: !refetchWhileRunning + ? false + : (data) => { + // If refetchWhileRunning was true, we keep refetching debug info (including logs), while the job is still + // running or hasn't ended too long ago. We need some time after the last attempt has stopped, since logs + // keep incoming for some time after the job has already been marked as finished. + const lastAttemptEndTimestamp = + data?.attempts.length && data.attempts[data.attempts.length - 1].attempt.endedAt; + // While no attempt ended timestamp exists yet (i.e. the job is still running) or it hasn't ended + // more than 2 minutes (2 * 60 * 1000ms) ago, keep refetching + return lastAttemptEndTimestamp && Date.now() - lastAttemptEndTimestamp * 1000 > 2 * 60 * 1000 + ? false + : 2500; + }, + enabled, + } + ); +}; + +export const useCancelJob = () => { + const requestOptions = useRequestOptions(); + return useMutation((id: number) => cancelJob({ id }, requestOptions)); +}; diff --git a/airbyte-webapp/src/core/domain/job/Job.ts b/airbyte-webapp/src/core/domain/job/Job.ts deleted file mode 100644 index a5fb52f07b9..00000000000 --- a/airbyte-webapp/src/core/domain/job/Job.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { SynchronousJobRead } from "../../request/AirbyteClient"; - -export interface Logs { - logLines: string[]; -} - -export interface JobInfo extends SynchronousJobRead { - logs: Logs; -} diff --git a/airbyte-webapp/src/core/domain/job/JobsService.tsx b/airbyte-webapp/src/core/domain/job/JobsService.tsx deleted file mode 100644 index fbf7c4e6004..00000000000 --- a/airbyte-webapp/src/core/domain/job/JobsService.tsx +++ /dev/null @@ -1,20 +0,0 @@ -import { cancelJob, getJobDebugInfo, getJobInfo, JobListRequestBody, listJobsFor } from "../../request/AirbyteClient"; -import { AirbyteRequestService } from "../../request/AirbyteRequestService"; - -export class JobsService extends AirbyteRequestService { - public list(listParams: JobListRequestBody) { - return listJobsFor(listParams, this.requestOptions); - } - - public get(id: number) { - return getJobInfo({ id }, this.requestOptions); - } - - public cancel(id: number) { - return cancelJob({ id }, this.requestOptions); - } - - public getDebugInfo(id: number) { - return getJobDebugInfo({ id }, this.requestOptions); - } -} diff --git a/airbyte-webapp/src/core/domain/job/index.ts b/airbyte-webapp/src/core/domain/job/index.ts deleted file mode 100644 index 5a312af26f1..00000000000 --- a/airbyte-webapp/src/core/domain/job/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from "./Job"; -export * from "./JobsService"; diff --git a/airbyte-webapp/src/hooks/services/useSourceHook.tsx b/airbyte-webapp/src/hooks/services/useSourceHook.tsx index 3c53afe6768..a6ce8bba314 100644 --- a/airbyte-webapp/src/hooks/services/useSourceHook.tsx +++ b/airbyte-webapp/src/hooks/services/useSourceHook.tsx @@ -6,7 +6,6 @@ import { Action, Namespace } from "core/analytics"; import { SyncSchema } from "core/domain/catalog"; import { ConnectionConfiguration } from "core/domain/connection"; import { SourceService } from "core/domain/connector/SourceService"; -import { JobInfo } from "core/domain/job"; import { useInitService } from "services/useInitService"; import { isDefined } from "utils/common"; @@ -14,7 +13,7 @@ import { useAnalyticsService } from "./Analytics"; import { useRemoveConnectionsFromList } from "./useConnectionHook"; import { useRequestErrorHandler } from "./useRequestErrorHandler"; import { useCurrentWorkspace } from "./useWorkspace"; -import { SourceRead, WebBackendConnectionListItem } from "../../core/request/AirbyteClient"; +import { SourceRead, SynchronousJobRead, WebBackendConnectionListItem } from "../../core/request/AirbyteClient"; import { useSuspenseQuery } from "../../services/connector/useSuspenseQuery"; import { SCOPE_WORKSPACE } from "../../services/Scope"; import { useDefaultRequestMiddlewares } from "../../services/useDefaultRequestMiddlewares"; @@ -164,7 +163,7 @@ const useUpdateSource = () => { ); }; -export type SchemaError = (Error & { status: number; response: JobInfo }) | null; +export type SchemaError = (Error & { status: number; response: SynchronousJobRead }) | null; const useDiscoverSchema = ( sourceId: string, diff --git a/airbyte-webapp/src/pages/connections/ConnectionJobHistoryPage/ConnectionJobHistoryPage.tsx b/airbyte-webapp/src/pages/connections/ConnectionJobHistoryPage/ConnectionJobHistoryPage.tsx index 22b66bedb79..d3425d21c1d 100644 --- a/airbyte-webapp/src/pages/connections/ConnectionJobHistoryPage/ConnectionJobHistoryPage.tsx +++ b/airbyte-webapp/src/pages/connections/ConnectionJobHistoryPage/ConnectionJobHistoryPage.tsx @@ -12,9 +12,9 @@ import { Link } from "components/ui/Link"; import { Action, Namespace } from "core/analytics"; import { getFrequencyFromScheduleData } from "core/analytics/utils"; +import { useListJobs } from "core/api"; import { useTrackPage, PageTrackingCodes, useAnalyticsService } from "hooks/services/Analytics"; import { useConnectionEditService } from "hooks/services/ConnectionEdit/ConnectionEditService"; -import { useListJobs } from "services/job/JobService"; import styles from "./ConnectionJobHistoryPage.module.scss"; import JobsList from "./JobsList"; diff --git a/airbyte-webapp/src/pages/connections/StreamStatusPage/StreamStatusPage.tsx b/airbyte-webapp/src/pages/connections/StreamStatusPage/StreamStatusPage.tsx index d4b0adb79b5..9fa02d64984 100644 --- a/airbyte-webapp/src/pages/connections/StreamStatusPage/StreamStatusPage.tsx +++ b/airbyte-webapp/src/pages/connections/StreamStatusPage/StreamStatusPage.tsx @@ -1,8 +1,8 @@ import { ConnectionSyncContextProvider } from "components/connection/ConnectionSync/ConnectionSyncContext"; +import { useListJobs } from "core/api"; import { useConnectionEditService } from "hooks/services/ConnectionEdit/ConnectionEditService"; import { useExperiment } from "hooks/services/Experiment"; -import { useListJobs } from "services/job/JobService"; import { StreamsList } from "./StreamsList"; import { StreamsListContextProvider } from "./StreamsListContext"; diff --git a/airbyte-webapp/src/services/job/JobService.tsx b/airbyte-webapp/src/services/job/JobService.tsx deleted file mode 100644 index 1a369a72906..00000000000 --- a/airbyte-webapp/src/services/job/JobService.tsx +++ /dev/null @@ -1,92 +0,0 @@ -import { useMutation, useQuery, useQueryClient } from "react-query"; - -import { useConfig } from "config"; -import { JobsService } from "core/domain/job/JobsService"; -import { useDefaultRequestMiddlewares } from "services/useDefaultRequestMiddlewares"; -import { useInitService } from "services/useInitService"; - -import { - JobDebugInfoRead, - JobInfoRead, - JobListRequestBody, - JobReadList, - Pagination, -} from "../../core/request/AirbyteClient"; -import { useSuspenseQuery } from "../connector/useSuspenseQuery"; - -export const jobsKeys = { - all: ["jobs"] as const, - lists: () => [...jobsKeys.all, "list"] as const, - list: (filters: string, includingJobId?: number, pagination?: Pagination) => - [...jobsKeys.lists(), { filters, includingJobId, pagination }] as const, - detail: (jobId: number) => [...jobsKeys.all, "details", jobId] as const, - getDebugInfo: (jobId: number) => [...jobsKeys.all, "getDebugInfo", jobId] as const, - cancel: (jobId: string) => [...jobsKeys.all, "cancel", jobId] as const, -}; - -function useGetJobService() { - const config = useConfig(); - const middlewares = useDefaultRequestMiddlewares(); - return useInitService(() => new JobsService(config.apiUrl, middlewares), [config.apiUrl, middlewares]); -} - -export const useListJobs = (listParams: JobListRequestBody, keepPreviousData = true) => { - const service = useGetJobService(); - const result = useQuery( - jobsKeys.list(listParams.configId, listParams.includingJobId, listParams.pagination), - () => service.list(listParams), - { - // 2.5 second refresh - refetchInterval: 2500, - keepPreviousData, - suspense: true, - } - ); - // cast to JobReadList because (suspense: true) means we will never get undefined - const jobReadList = result.data as JobReadList; - return { jobs: jobReadList.jobs, totalJobCount: jobReadList.totalJobCount, isPreviousData: result.isPreviousData }; -}; - -export const useGetJob = (id: number, enabled = true) => { - const service = useGetJobService(); - - return useSuspenseQuery(jobsKeys.detail(id), () => service.get(id), { - refetchInterval: 2500, // every 2,5 seconds, - enabled, - }); -}; - -export const useGetDebugInfoJob = ( - id: number, - enabled = true, - refetchWhileRunning = false -): JobDebugInfoRead | undefined => { - const service = useGetJobService(); - - return useSuspenseQuery(jobsKeys.getDebugInfo(id), () => service.getDebugInfo(id), { - refetchInterval: !refetchWhileRunning - ? false - : (data) => { - // If refetchWhileRunning was true, we keep refetching debug info (including logs), while the job is still - // running or hasn't ended too long ago. We need some time after the last attempt has stopped, since logs - // keep incoming for some time after the job has already been marked as finished. - const lastAttemptEndTimestamp = - data?.attempts.length && data.attempts[data.attempts.length - 1].attempt.endedAt; - // While no attempt ended timestamp exists yet (i.e. the job is still running) or it hasn't ended - // more than 2 minutes (2 * 60 * 1000ms) ago, keep refetching - return lastAttemptEndTimestamp && Date.now() - lastAttemptEndTimestamp * 1000 > 2 * 60 * 1000 ? false : 2500; - }, - enabled, - }); -}; - -export const useCancelJob = () => { - const service = useGetJobService(); - const queryClient = useQueryClient(); - - return useMutation((id: number) => service.cancel(id), { - onSuccess: (data) => { - queryClient.setQueryData(jobsKeys.detail(data.job.id), data); - }, - }); -};