Skip to content

added the primary key field to source streams #1366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 87 additions & 2 deletions src/components/Connections/CreateConnectionForm.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import React, { useContext, useEffect, useMemo, useRef, useState } from 'react';
import useSWR from 'swr';
import CustomDialog from '../Dialog/CustomDialog';
import { Autocomplete, Box, Button, Switch, Select, MenuItem, TextField } from '@mui/material';
import {
Autocomplete,
Box,
Button,
Switch,
Select,
MenuItem,
TextField,
Checkbox,
} from '@mui/material';
import { Table, TableBody, TableCell, TableHead, TableRow } from '@mui/material';
import { Controller, useForm } from 'react-hook-form';
import { httpGet, httpPost, httpPut } from '@/helpers/http';
Expand All @@ -25,6 +34,10 @@
sourceDefinedCursor: boolean;
cursorFieldOptions: string[];
};
type PrimayKeyConfig = {
sourceDefinedPrimaryKey: boolean;
primaryKeyOptions: string[];
};

interface SourceStream {
name: string;
Expand All @@ -34,6 +47,8 @@
destinationSyncMode: string; // append | overwrite | append_dedup
cursorFieldConfig: CursorFieldConfig; // this will not be posted to backend
cursorField: string;
primaryKeyConfig: PrimayKeyConfig;
primaryKey: string;
}

const CreateConnectionForm = ({
Expand Down Expand Up @@ -61,7 +76,6 @@
const isAnyCursorAbsent = useMemo(() => {
return filteredSourceStreams.some((stream) => !stream.cursorField);
}, [filteredSourceStreams]);
console.log(filteredSourceStreams, 'filtered source stream');
const [loading, setLoading] = useState<boolean>(false);
const [someStreamSelected, setSomeStreamSelected] = useState<boolean>(false);
const [normalize, setNormalize] = useState<boolean>(false);
Expand Down Expand Up @@ -91,6 +105,11 @@
cursorFieldOptions: [],
},
cursorField: '',
primaryKeyConfig: {
sourceDefinedPrimaryKey: false,
primaryKeyOptions: [],
},
primaryKey: [], // eg.[[id]], [[id], [airbyte_raw]]etc. this can be multiple hence we have to make it an array. This can be a composite primary key.
};

const cursorFieldObj = stream.cursorFieldConfig;
Expand Down Expand Up @@ -119,6 +138,26 @@
stream.cursorField = el.config.cursorField.length > 0 ? el.config.cursorField[0] : '';
}

const primaryKeyObj = stream.primaryKeyConfig;

if ('sourceDefinedPrimaryKey' in el.stream && el.stream.sourceDefinedPrimaryKey.length > 0)
primaryKeyObj.sourceDefinedPrimaryKey = true;

Check warning on line 144 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L144

Added line #L144 was not covered by tests

if (primaryKeyObj.sourceDefinedPrimaryKey) {
stream.primaryKey = el.config.primaryKey.flat();
primaryKeyObj.primaryKeyOptions = el.config.primaryKey.flat();

Check warning on line 148 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L147-L148

Added lines #L147 - L148 were not covered by tests
} else {
// user needs to define the primary key
// available options are picked from the stream's jsonSchema (cols)
if ('jsonSchema' in el.stream)
primaryKeyObj.primaryKeyOptions = Object.keys(el.stream.jsonSchema.properties) as any;

Check warning on line 153 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L153

Added line #L153 was not covered by tests

// overwrite default if the primary key field is set
if ('primaryKey' in el.config) {
stream.primaryKey = el.config.primaryKey.length > 0 ? el.config.primaryKey.flat() : [];
}
}

return stream;
});

Expand Down Expand Up @@ -240,6 +279,7 @@
syncMode: stream.syncMode, // incremental | full_refresh
destinationSyncMode: stream.destinationSyncMode, // append | overwrite | append_dedup
cursorField: stream.cursorField,
primaryKey: stream.primaryKey,
};
}),
normalize,
Expand Down Expand Up @@ -326,6 +366,10 @@
updateThisStreamTo_(stream, { ...stream, cursorField: value });
};

const updatePrimaryKey = (value: string, stream: SourceStream) => {
updateThisStreamTo_(stream, { ...stream, primaryKey: value });

Check warning on line 370 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L370

Added line #L370 was not covered by tests
};

const handleSyncAllStreams = (checked: boolean) => {
setSelectAllStreams(checked);
if (!checked && incrementalAllStreams) {
Expand Down Expand Up @@ -477,6 +521,9 @@
<TableCell key="cursorfield" align="center">
Cursor Field
</TableCell>
<TableCell key="primarykey" align="center">
Primary Key
</TableCell>
</TableRow>
<TableRow>
<TableCell key="searchstream" align="center">
Expand Down Expand Up @@ -593,6 +640,44 @@
)}
</Select>
</TableCell>
<TableCell key="primarykey" align="center">
<Select
data-testid={`stream-primarykey-${idx}`}
disabled={
!stream.selected ||
!stream.supportsIncremental ||
stream.syncMode !== 'incremental' ||
stream.destinationSyncMode !== 'append_dedup'

Check warning on line 650 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L650

Added line #L650 was not covered by tests
}
required={ifIncremental}
onInvalid={(e: any) =>
e.target.setCustomValidity(

Check warning on line 654 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L654

Added line #L654 was not covered by tests
'Primary Key is required for incremental streams'
)
}
multiple
value={stream.primaryKey}
onChange={(event) => {

Check warning on line 660 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L660

Added line #L660 was not covered by tests
if (!stream.primaryKeyConfig.sourceDefinedPrimaryKey) {
updatePrimaryKey(event.target.value, stream);

Check warning on line 662 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L662

Added line #L662 was not covered by tests
}
}}
renderValue={(selected: any) => selected.join(', ')}

Check warning on line 665 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L665

Added line #L665 was not covered by tests
>
{stream.primaryKeyConfig?.primaryKeyOptions?.length > 0 &&
stream.primaryKeyConfig.primaryKeyOptions.map(

Check warning on line 668 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L668

Added line #L668 was not covered by tests
(option: string, index: number) => (
<MenuItem key={option} value={option}>

Check warning on line 670 in src/components/Connections/CreateConnectionForm.tsx

View check run for this annotation

Codecov / codecov/patch

src/components/Connections/CreateConnectionForm.tsx#L670

Added line #L670 was not covered by tests
<Checkbox
checked={stream.primaryKey.indexOf(option) > -1}
disabled={stream.primaryKeyConfig.sourceDefinedPrimaryKey}
/>
{option}
</MenuItem>
)
)}
</Select>
</TableCell>
</TableRow>
);
})}
Expand Down
29 changes: 20 additions & 9 deletions src/components/Connections/__tests__/CreateConnection.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ jest.mock('@/helpers/websocket', () => ({
generateWebsocketUrl: jest.fn(),
}));
describe('Create connection', () => {
let mockServer: Server;

afterEach(async () => {
if (mockServer) {
await new Promise((resolve) => {
mockServer.stop(resolve); // Fully stop the WebSocket server
});
}
});

const mockSession: Session = {
expires: 'false',
user: { email: 'a' },
Expand Down Expand Up @@ -47,7 +57,7 @@ describe('Create connection', () => {
];

it('mocks generateWebsocketUrl and tests WebSocket connection', async () => {
const mockServer = new Server('wss://mock-websocket-url');
mockServer = new Server('wss://mock-websocket-url');
const mockGenerateWebsocketUrl = generateWebsocketUrl;

// Mock the generateWebsocketUrl function
Expand Down Expand Up @@ -98,7 +108,7 @@ describe('Create connection', () => {
);

// Clean up the mock WebSocket server
mockServer.close();
//mockServer.close();
});

it('renders the form', () => {
Expand Down Expand Up @@ -132,7 +142,7 @@ describe('Create connection', () => {
});

it('checks source stream selection and WebSocket interactions', async () => {
const mockServer = new Server('wss://mock-websocket-url');
mockServer = new Server('wss://mock-websocket-url');
const mockGenerateWebsocketUrl = generateWebsocketUrl;

// Mock the generateWebsocketUrl function
Expand Down Expand Up @@ -224,20 +234,21 @@ describe('Create connection', () => {
expect(sourceStreamTableRows.length).toBe(STREAMS.length + 2); // Header + Streams
// Step 4: Validate table headers
const headerCells = within(sourceStreamTableRows[0]).getAllByRole('columnheader');
expect(headerCells.length).toBe(5);
expect(headerCells.length).toBe(6);
expect(headerCells[0].textContent).toBe('Stream');
expect(headerCells[1].textContent).toBe('Sync?');
expect(headerCells[2].textContent).toBe('Incremental?');
expect(headerCells[3].textContent).toBe('Destination');
expect(headerCells[4].textContent).toBe('Cursor Field');
expect(headerCells[5].textContent).toBe('Primary Key');

// Clean up the mock WebSocket server
mockServer.close();
//mockServer.close();
});
});

it('create connection success with WebSocket and fetch interactions', async () => {
const mockServer = new Server('wss://mock-websocket-url');
mockServer = new Server('wss://mock-websocket-url');
const mockGenerateWebsocketUrl = generateWebsocketUrl;

// Mock the generateWebsocketUrl function
Expand Down Expand Up @@ -345,11 +356,11 @@ describe('Create connection', () => {
expect(mockCreateConnectionFetch).toHaveBeenCalledTimes(1);

// Clean up mock server
mockServer.close();
//mockServer.close();
});

it('create connection failed with WebSocket and fetch interactions', async () => {
const mockServer = new Server('wss://mock-websocket-url');
mockServer = new Server('wss://mock-websocket-url');
const mockGenerateWebsocketUrl = generateWebsocketUrl;

// Mock the generateWebsocketUrl function
Expand Down Expand Up @@ -457,6 +468,6 @@ describe('Create connection', () => {
expect(mockCreateConnectionFetch).toHaveBeenCalledTimes(1);

//check how to show this test failed.
mockServer.close();
//mockServer.close();
});
});