Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 111 additions & 73 deletions server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,48 @@ function validateFilePath(filePath: string, uploadDir: string): boolean {
return !!relative && !relative.startsWith('..') && !path.isAbsolute(relative);
}

// Table operation locks to prevent race conditions
const tableOperationLocks = new Map<string, Promise<void>>();
// Table operation locks to prevent race conditions using proper mutex
interface TableMutex {
locked: boolean;
queue: Array<() => void>;
}

const tableMutexes = new Map<string, TableMutex>();

async function acquireTableLock(tableName: string): Promise<() => void> {
const existingLock = tableOperationLocks.get(tableName);
if (existingLock) {
await existingLock;
if (!tableMutexes.has(tableName)) {
tableMutexes.set(tableName, { locked: false, queue: [] });
}

let releaseLock: () => void;
const lockPromise = new Promise<void>((resolve) => {
releaseLock = resolve;
});
const mutex = tableMutexes.get(tableName)!;

tableOperationLocks.set(tableName, lockPromise);
if (!mutex.locked) {
mutex.locked = true;
return () => {
mutex.locked = false;
const next = mutex.queue.shift();
if (next) {
next();
} else if (mutex.queue.length === 0) {
tableMutexes.delete(tableName);
}
};
}

return () => {
tableOperationLocks.delete(tableName);
releaseLock!();
};
return new Promise<() => void>((resolve) => {
mutex.queue.push(() => {
mutex.locked = true;
resolve(() => {
mutex.locked = false;
const next = mutex.queue.shift();
if (next) {
next();
} else if (mutex.queue.length === 0) {
tableMutexes.delete(tableName);
}
});
});
});
}

async function insertBatch(records: any[], columns: string[], originalColumns: string[], tableName: string): Promise<void> {
Expand Down Expand Up @@ -159,78 +181,92 @@ async function startServer() {
skip_empty_lines: true
});

// Add error handling for parser
parser.on('error', (err) => {
console.error('CSV parser error:', err);
throw new Error('Invalid or corrupted CSV file');
});

const sampleRows: any[] = [];
const columnSamples = new Map<string, any[]>();
let batchRecords: any[] = [];
let isInitialized = false;
let columns: string[] = [];
let originalColumns: string[] = [];
let recordCount = 0;
const batchSize = 100;
const batchSize = 500; // Increased batch size for better performance
const sampleSize = 100; // Increased sample size for better schema inference

for await (const record of csvStream.pipe(parser)) {
recordCount++;

// Collect first 10 rows for sampling
if (sampleRows.length < 10) {
sampleRows.push(record);
try {
for await (const record of csvStream.pipe(parser)) {
recordCount++;

// Collect samples for each column
Object.keys(record).forEach(key => {
if (!columnSamples.has(key)) {
columnSamples.set(key, []);
// Collect sample rows for schema inference (increased from 10 to 100)
if (sampleRows.length < sampleSize) {
sampleRows.push(record);

// Collect samples for each column
Object.keys(record).forEach(key => {
if (!columnSamples.has(key)) {
columnSamples.set(key, []);
}
columnSamples.get(key)!.push(record[key]);
});
}

// Initialize table after collecting enough samples
if (sampleRows.length === sampleSize && !isInitialized) {
originalColumns = Object.keys(sampleRows[0]);
columns = originalColumns.map(normalizeColumnName);

// Validate column names to prevent SQL injection
if (!validateColumnNames(columns)) {
return res.status(400).json({ error: 'Invalid column names detected' });
}
columnSamples.get(key)!.push(record[key]);
});
}

// Initialize table after collecting enough samples
if (sampleRows.length === 10 && !isInitialized) {
originalColumns = Object.keys(sampleRows[0]);
columns = originalColumns.map(normalizeColumnName);

if (columns.length === 0) {
return res.status(400).json({ error: 'CSV file has no columns' });
}

// Determine column types from sample data using multiple rows
const columnTypes = new Map<string, string>();
originalColumns.forEach((originalColumn, index) => {
const column = columns[index];
const samples = columnSamples.get(originalColumn) || [];
columnTypes.set(column, guessSqlType(samples));
});

// Drop existing table if it exists (done only once)
await query(`DROP TABLE IF EXISTS "${tableName}"`);

// Create new table with quoted identifiers (done only once)
const createTableSQL = `
CREATE TABLE "${tableName}" (
${columns.map(column => `"${column}" ${columnTypes.get(column)}`).join(',\n')}
)
`;
console.log(createTableSQL);
await query(createTableSQL);
isInitialized = true;
}

// Validate column names to prevent SQL injection
if (!validateColumnNames(columns)) {
return res.status(400).json({ error: 'Invalid column names detected' });
// Add record to batch only after table is initialized or if still sampling
if (isInitialized || sampleRows.length < sampleSize) {
batchRecords.push(record);
}

if (columns.length === 0) {
return res.status(400).json({ error: 'CSV file has no columns' });
// Process batch when it reaches batch size
if (batchRecords.length >= batchSize && isInitialized) {
await insertBatch(batchRecords, columns, originalColumns, tableName);
batchRecords = []; // Clear batch to reduce memory usage
}

// Determine column types from sample data using multiple rows
const columnTypes = new Map<string, string>();
originalColumns.forEach((originalColumn, index) => {
const column = columns[index];
const samples = columnSamples.get(originalColumn) || [];
columnTypes.set(column, guessSqlType(samples));
});

// Drop existing table if it exists
await query(`DROP TABLE IF EXISTS "${tableName}"`);

// Create new table with quoted identifiers
const createTableSQL = `
CREATE TABLE "${tableName}" (
${columns.map(column => `"${column}" ${columnTypes.get(column)}`).join(',\n')}
)
`;
console.log(createTableSQL);
await query(createTableSQL);
isInitialized = true;
}

// Add record to batch
batchRecords.push(record);

// Process batch when it reaches batch size
if (batchRecords.length >= batchSize && isInitialized) {
await insertBatch(batchRecords, columns, originalColumns, tableName);
batchRecords = [];
}
} catch (err) {
console.error('Error reading CSV:', err);
return res.status(400).json({ error: 'Invalid or corrupted CSV file' });
}

// Handle case where we have fewer than 10 rows but still need to initialize
// Handle case where we have fewer than sampleSize rows but still need to initialize
if (!isInitialized && sampleRows.length > 0) {
originalColumns = Object.keys(sampleRows[0]);
columns = originalColumns.map(normalizeColumnName);
Expand All @@ -252,10 +288,10 @@ async function startServer() {
columnTypes.set(column, guessSqlType(samples));
});

// Drop existing table if it exists
// Drop existing table if it exists (done only once)
await query(`DROP TABLE IF EXISTS "${tableName}"`);

// Create new table with quoted identifiers
// Create new table with quoted identifiers (done only once)
const createTableSQL = `
CREATE TABLE "${tableName}" (
${columns.map(column => `"${column}" ${columnTypes.get(column)}`).join(',\n')}
Expand All @@ -282,15 +318,17 @@ async function startServer() {
// After successful upload, analyze the table and store the results
const analysis = await analyzeTable(tableName);

// Store the analysis in TABLE_SCHEMA
// Store the analysis in TABLE_SCHEMA with safe table name handling
// tableName is already validated with validateTableName function
const safeTableName = tableName.replace(/[^a-zA-Z0-9_]/g, '');
await query(
`INSERT INTO TABLE_SCHEMA (table_name, analysis)
VALUES ($1, $2)
ON CONFLICT (table_name)
DO UPDATE SET
analysis = $2,
updated_at = CURRENT_TIMESTAMP`,
[tableName, analysis]
[safeTableName, analysis]
);

res.json({
Expand Down