Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 197 additions & 86 deletions server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,31 @@ function isValidDate(value: string): boolean {
(value.includes('-') || value.includes('/'));
}

function guessSqlType(value: any): string {
if (value === null || value === undefined) return 'TEXT';
if (typeof value === 'string' && isValidDate(value)) return 'TIMESTAMP';
if (!isNaN(value) && value.toString().includes('.')) return 'NUMERIC';
if (!isNaN(value)) return 'INTEGER';
function guessSqlType(values: any[]): string {
let hasText = false;
let hasTimestamp = false;
let hasNumeric = false;
let hasInteger = false;

for (const value of values) {
if (value === null || value === undefined) continue;

if (typeof value === 'string' && isValidDate(value)) {
hasTimestamp = true;
} else if (!isNaN(value) && value.toString().includes('.')) {
hasNumeric = true;
} else if (!isNaN(value)) {
hasInteger = true;
} else {
hasText = true;
break; // Text is most general, no need to check further
}
}

if (hasText) return 'TEXT';
if (hasTimestamp) return 'TIMESTAMP';
if (hasNumeric) return 'NUMERIC';
if (hasInteger) return 'INTEGER';
return 'TEXT';
}

Expand Down Expand Up @@ -51,7 +71,51 @@ function validateColumnNames(columns: string[]): boolean {
function validateFilePath(filePath: string, uploadDir: string): boolean {
const resolvedUploadDir = path.resolve(uploadDir);
const resolvedFilePath = path.resolve(filePath);
return resolvedFilePath.startsWith(resolvedUploadDir);
const relative = path.relative(resolvedUploadDir, resolvedFilePath);
return !!relative && !relative.startsWith('..') && !path.isAbsolute(relative);
}

// Table operation locks to prevent race conditions
const tableOperationLocks = new Map<string, Promise<void>>();

async function acquireTableLock(tableName: string): Promise<() => void> {
const existingLock = tableOperationLocks.get(tableName);
if (existingLock) {
await existingLock;
}

let releaseLock: () => void;
const lockPromise = new Promise<void>((resolve) => {
releaseLock = resolve;
});

tableOperationLocks.set(tableName, lockPromise);

return () => {
tableOperationLocks.delete(tableName);
releaseLock!();
};
}

async function insertBatch(records: any[], columns: string[], originalColumns: string[], tableName: string): Promise<void> {
const values: any[] = [];
const placeholders: string[] = [];

records.forEach((record, batchIndex) => {
const recordValues = originalColumns.map(c => record[c]);
values.push(...recordValues);
const recordPlaceholders = columns.map((_, colIndex) =>
`$${batchIndex * columns.length + colIndex + 1}`
).join(', ');
placeholders.push(`(${recordPlaceholders})`);
});

const insertSQL = `
INSERT INTO "${tableName}" (${columns.map(c => `"${c}"`).join(', ')})
VALUES ${placeholders.join(', ')}
`;

await query(insertSQL, values);
}

async function startServer() {
Expand Down Expand Up @@ -84,41 +148,108 @@ async function startServer() {
return res.status(400).json({ error: 'Invalid file path' });
}

// Single pass CSV processing with sampling and batch insertion
const csvStream = createReadStream(req.file.path);
const parser = parse({
columns: true,
skip_empty_lines: true
});

const sampleRows: any[] = [];
const columnTypes = new Map<string, string>();
const allRecords: any[] = [];
let isInitialized = false;
let columns: string[] = [];
// Acquire table lock to prevent race conditions
const releaseLock = await acquireTableLock(tableName);

for await (const record of csvStream.pipe(parser)) {
allRecords.push(record);
try {
// Streaming CSV processing with reduced memory usage
const csvStream = createReadStream(req.file.path);
const parser = parse({
columns: true,
skip_empty_lines: true
});

const sampleRows: any[] = [];
const columnSamples = new Map<string, any[]>();
let batchRecords: any[] = [];
let isInitialized = false;
let columns: string[] = [];
let originalColumns: string[] = [];
let recordCount = 0;
const batchSize = 100;

// Collect first 10 rows for sampling
if (sampleRows.length < 10) {
sampleRows.push(record);
for await (const record of csvStream.pipe(parser)) {
recordCount++;

// Collect first 10 rows for sampling
if (sampleRows.length < 10) {
sampleRows.push(record);

// Collect samples for each column
Object.keys(record).forEach(key => {
if (!columnSamples.has(key)) {
columnSamples.set(key, []);
}
columnSamples.get(key)!.push(record[key]);
});
}

// Initialize table after collecting enough samples
if (sampleRows.length === 10 && !isInitialized) {
originalColumns = Object.keys(sampleRows[0]);
columns = originalColumns.map(normalizeColumnName);

// Validate column names to prevent SQL injection
if (!validateColumnNames(columns)) {
return res.status(400).json({ error: 'Invalid column names detected' });
}

if (columns.length === 0) {
return res.status(400).json({ error: 'CSV file has no columns' });
}

// Determine column types from sample data using multiple rows
const columnTypes = new Map<string, string>();
originalColumns.forEach((originalColumn, index) => {
const column = columns[index];
const samples = columnSamples.get(originalColumn) || [];
columnTypes.set(column, guessSqlType(samples));
});

// Drop existing table if it exists
await query(`DROP TABLE IF EXISTS "${tableName}"`);

// Create new table with quoted identifiers
const createTableSQL = `
CREATE TABLE "${tableName}" (
${columns.map(column => `"${column}" ${columnTypes.get(column)}`).join(',\n')}
)
`;
console.log(createTableSQL);
await query(createTableSQL);
isInitialized = true;
}

// Add record to batch
batchRecords.push(record);

// Process batch when it reaches batch size
if (batchRecords.length >= batchSize && isInitialized) {
await insertBatch(batchRecords, columns, originalColumns, tableName);
batchRecords = [];
}
}
// Initialize table after collecting first sample
if (sampleRows.length === 1 && !isInitialized) {
// Determine column types from sample data
columns = Object.keys(sampleRows[0]).map(normalizeColumnName);

// Handle case where we have fewer than 10 rows but still need to initialize
if (!isInitialized && sampleRows.length > 0) {
originalColumns = Object.keys(sampleRows[0]);
columns = originalColumns.map(normalizeColumnName);

// Validate column names to prevent SQL injection
if (!validateColumnNames(columns)) {
return res.status(400).json({ error: 'Invalid column names detected' });
}

columns.forEach((column, index) => {
const originalColumn = Object.keys(sampleRows[0])[index];
const value = sampleRows[0][originalColumn];
columnTypes.set(column, guessSqlType(value));
if (columns.length === 0) {
return res.status(400).json({ error: 'CSV file has no columns' });
}

// Determine column types from available sample data
const columnTypes = new Map<string, string>();
originalColumns.forEach((originalColumn, index) => {
const column = columns[index];
const samples = columnSamples.get(originalColumn) || [];
columnTypes.set(column, guessSqlType(samples));
});

// Drop existing table if it exists
Expand All @@ -134,65 +265,45 @@ async function startServer() {
await query(createTableSQL);
isInitialized = true;
}
}

if (allRecords.length === 0) {
return res.status(400).json({ error: 'CSV file is empty' });
}
if (recordCount === 0) {
return res.status(400).json({ error: 'CSV file is empty' });
}

// Batch insert records
const batchSize = 100;
const originalColumns = Object.keys(allRecords[0]);

for (let i = 0; i < allRecords.length; i += batchSize) {
const batch = allRecords.slice(i, i + batchSize);
const values: any[] = [];
const placeholders: string[] = [];

batch.forEach((record, batchIndex) => {
const recordValues = originalColumns.map(c => record[c]);
values.push(...recordValues);
const recordPlaceholders = columns.map((_, colIndex) =>
`$${batchIndex * columns.length + colIndex + 1}`
).join(', ');
placeholders.push(`(${recordPlaceholders})`);
});

const insertSQL = `
INSERT INTO "${tableName}" (${columns.map(c => `"${c}"`).join(', ')})
VALUES ${placeholders.join(', ')}
`;
if (!isInitialized || columns.length === 0) {
return res.status(400).json({ error: 'Failed to initialize table - CSV may be invalid' });
}

// Insert remaining records in batch
if (batchRecords.length > 0) {
await insertBatch(batchRecords, columns, originalColumns, tableName);
}

// After successful upload, analyze the table and store the results
const analysis = await analyzeTable(tableName);

await query(insertSQL, values);
}
// Store the analysis in TABLE_SCHEMA
await query(
`INSERT INTO TABLE_SCHEMA (table_name, analysis)
VALUES ($1, $2)
ON CONFLICT (table_name)
DO UPDATE SET
analysis = $2,
updated_at = CURRENT_TIMESTAMP`,
[tableName, analysis]
);

// After successful upload, analyze the table and store the results
const analysis = await analyzeTable(tableName);

// Validate table name again before storing in schema
if (!validateTableName(tableName)) {
return res.status(400).json({ error: 'Invalid table name' });
res.json({
message: 'CSV data successfully imported to database',
tableName,
recordCount,
columnCount: columns.length,
analysis
});
} finally {
// Always release the table lock
releaseLock();
}

// Store the analysis in TABLE_SCHEMA
await query(
`INSERT INTO TABLE_SCHEMA (table_name, analysis)
VALUES ($1, $2)
ON CONFLICT (table_name)
DO UPDATE SET
analysis = $2,
updated_at = CURRENT_TIMESTAMP`,
[tableName, analysis]
);

res.json({
message: 'CSV data successfully imported to database',
tableName,
recordCount: allRecords.length,
columnCount: columns.length,
columnTypes: Object.fromEntries(columnTypes),
analysis
});
} catch (error) {
console.error('Error processing CSV:', error);
res.status(500).json({ error: 'Failed to process CSV file' });
Expand Down