Skip to content

Commit

Permalink
add: strongly typed API responses and comments from API.
Browse files Browse the repository at this point in the history
  • Loading branch information
brianzinn committed Oct 9, 2020
1 parent 0caf2ab commit fa0f632
Showing 1 changed file with 214 additions and 30 deletions.
244 changes: 214 additions & 30 deletions src/snowflake-ingest-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,199 @@ export type APIEndpointHistory = {
loadHistoryScan: RecordedCall[]
}

/**
* Have seen the response return a 403 when things were misconfigured in HTML response.
*/
export type InsertFileResponse = {
/**
* This is the one we send. (Could try with an empty requestId in request to see if one is assigned)
*/
requestId: string,
/**
* When response code is "SUCCESS" it was received.
* Haven't seen other codes and not documented on snowpipe API page.
*/
responseCode: 'SUCCESS'
}

/**
* A success response (200) contains information about files that have recently been added to the table. Note that this report may only represent a portion of a large file.
*/
export type InsertReportResponse = {
/**
* The fully-qualified name of the pipe.
*/
pipe: string
/**
* false if an event was missed between the supplied beginMark and the first event in this report history. Otherwise, true.
*/
completeResult: boolean
/**
* beginMark to use on the next request to avoid seeing duplicate records. Note that this value is a hint. Duplicates can still occasionally occur.
*/
nextBeginMark: string
/**
* An array of JSON objects, one object for each file that is part of the history response.
*/
files: SnowpipeFile[]
}

export enum LoadStatus {
/**
* The entire file has been loaded into the table.
*/
LOADED= 'LOADED',

/**
* Part of the file has been loaded into the table, but the load process has not completed yet.
*/
LOAD_IN_PROGRESS='LOAD_IN_PROGRESS',

/**
* The file load failed.
*/
LOAD_FAILED= 'LOAD_FAILED',

/**
* Some rows from this file were loaded successfully, but others were not loaded due to errors. Processing of this file is completed.
*/
PARTIALLY_LOADED= 'PARTIALLY_LOADED',
}

export type SnowpipeFile = {
/**
* The file path relative to the stage location.
*/
path: string,
/**
* Either the stage ID (internal stage) or the S3 bucket (external stage) defined in the pipe.
*/
stageLocation: string,
/**
* File size, in bytes.
*/
fileSize: number,
/**
* Time that this file was received for processing. Format is ISO-8601 in UTC time zone.
*/
timeReceived: string,
/**
* Time that data from this file was last inserted into the table. Format is ISO-8601 in UTC time zone.
*/
lastInsertTime: string,
/**
* Number of rows inserted into the target table from the file.
*/
rowsInserted: number,
/**
* Number of rows parsed from the file. Rows with errors may be skipped.
*/
rowsParsed: number,
/**
* Number of errors seen in the file
*/
errorsSeen: number,
/**
* Number of errors allowed in the file before it is considered failed (based on ON_ERROR copy option).
*/
errorLimit: number,
/**
* Error message for the first error encountered in this file.
*/
firstError?: string,
/**
* Line number of the first error.
*/
firstErrorLineNum?: number,
/**
* Column name where the first error occurred.
*/
firstErrorColumnName?: string,
/**
* General error describing why the file was not processed.
*/
systemError?: string,
/**
* Indicates whether the file was completely processed successfully.
*/
complete: boolean,
/**
* Load status for the file:
*/
status: LoadStatus
}

/**
* A success response (200) contains information about files that have recently been added to the table. Note that this report may only represent a portion of a large file.
*/
export type LoadHistoryScanResponse = {
/**
* An array of JSON objects, one object for each file that is part of the history response.
*/
files: SnowpipeFile[],
/**
* Starting timestamp (in ISO-8601 format) provided in the request.
*/
startTimeInclusive: string,
/**
* Ending timestamp (in ISO-8601 format) provided in the request.
*/
endTimeExclusive: string,
/**
* Timestamp (in ISO-8601 format) of the oldest entry in the files included in the response.
*/
rangeStartTime: string,
/**
* Timestamp (in ISO-8601 format) of the latest entry in the files included in the response.
*/
rangeEndTime: string,
/**
* Fully-qualified name of the pipe.
*/
pipe: string,
/**
* false if the report is incomplete (i.e. the number of entries in the specified time range exceeds the 10,000 entry limit).
* If false, the user can specify the current rangeEndTime value as the startTimeInclusive value for the next request to proceed to the next set of entries.
*/
completeResult: "true" | "false"
}

export type SnowpipeAPIResponse<T> = {
json: T | null
rawResponse: string
statusCode?: number
}

export interface SnowpipeAPI {
/**
* Fetches a report about ingested files whose contents have been added to table. Note that for large files, this may only be part of the file.
* This endpoint differs from insertReport in that it views the history between two points in time. There is a maximum of 10,000 items returned, but multiple calls can be issued to cover the desired time range.
*
* @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe.
* @param startTimeInclusive Timestamp in ISO-8601 format. Start of the time range to retrieve load history data.
* @param endTimeExclusive Timestamp in ISO-8601 format. End of the time range to retrieve load history data. If omitted, then CURRENT_TIMESTAMP() is used as the end of the range.
*/
loadHistoryScan: (pipeName: string, startTimeInclusive: string, endTimeExclusive?: string) => Promise<SnowpipeAPIResponse<LoadHistoryScanResponse>>;
/**
* https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#endpoint-insertreport
*
* @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe.
* @param beginMark optional (see docs)
*/
insertReport: (pipeName: string, beginMark?: string) => Promise<SnowpipeAPIResponse<InsertReportResponse>>;
/**
* https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#data-file-ingestion
*
* @param filenames list of files to be ingested by snowflake
* @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe.
*/
insertFile: (pipeName: string, filenames: string[], postJSON?: boolean) => Promise<SnowpipeAPIResponse<InsertFileResponse>>;
/**
* full call history with response/response pairs.
*/
readonly endpointHistory: APIEndpointHistory;
}

/**
*
* @param username user that you created in Snowflake (and added a private key auth)
Expand All @@ -34,7 +227,7 @@ export type APIEndpointHistory = {
* @param regionId needed for non-default (AWS)?
* @param cloudProvider needed for non-AWS?
*/
export const createSnowpipeAPI = (username: string, privateKey: string, account: string, regionId?: string, cloudProvider?: string, snowpipeAPIOptions?: SnowpipeAPIOptions) => {
export const createSnowpipeAPI = (username: string, privateKey: string, account: string, regionId?: string, cloudProvider?: string, snowpipeAPIOptions?: SnowpipeAPIOptions): SnowpipeAPI => {
// `{account}.snowflakecomputing.com` is for default US AWS.
// for GCP you need `{account}.{regionId}.gcp.snowflakecomputing.com`
const domainParts = [account, regionId, cloudProvider];
Expand Down Expand Up @@ -78,8 +271,8 @@ export const createSnowpipeAPI = (username: string, privateKey: string, account:
return bearer;
}

const makeRequest = async (options: https.RequestOptions, endpointCallHistory: RecordedCall[], postBody?: string): Promise<string> => {
return new Promise<string>((resolve, reject) => {
const makeRequest = async <T>(options: https.RequestOptions, endpointCallHistory: RecordedCall[], postBody?: string): Promise<SnowpipeAPIResponse<T>> => {
return new Promise<SnowpipeAPIResponse<T>>((resolve, reject) => {
const req: ClientRequest = https.request(
options,
(response: IncomingMessage) => {
Expand All @@ -103,7 +296,17 @@ export const createSnowpipeAPI = (username: string, privateKey: string, account:
if (response.statusCode !== undefined && (response.statusCode < 200 || response.statusCode > 299)) {
reject(new Error(`status code: ${response.statusCode}. '${messageBody}'`));
} else {
resolve(messageBody);
let json: T | null = null;
try {
json = JSON.parse(messageBody) as T;
} catch (e) {
console.warn(`unable to parse response (expecting valid JSON on a ${response.statusCode} status code`, messageBody);
}
resolve({
json,
rawResponse: messageBody,
statusCode: response.statusCode
} as SnowpipeAPIResponse<T>);
}
})
}
Expand Down Expand Up @@ -136,13 +339,7 @@ export const createSnowpipeAPI = (username: string, privateKey: string, account:
return crypto.randomBytes(16).toString("hex");
}

/**
* https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#data-file-ingestion
*
* @param filenames list of files to be ingested by snowflake
* @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe.
*/
const insertFile = async (pipeName: string, filenames: string[], postJSON: boolean = false): Promise<string> => {
const insertFile = async (pipeName: string, filenames: string[], postJSON: boolean = false): Promise<SnowpipeAPIResponse<InsertFileResponse>> => {
let contentType;
let postBody;
if (postJSON === true) {
Expand Down Expand Up @@ -172,13 +369,10 @@ export const createSnowpipeAPI = (username: string, privateKey: string, account:
Accept: 'application/json'
}
};
return await makeRequest(options, apiEndpointHistory.insertFile, postBody);
return await makeRequest<InsertFileResponse>(options, apiEndpointHistory.insertFile, postBody);
}

/**
* https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#endpoint-insertreport
*/
const insertReport = async (pipeName: string, beginMark?: string) => {
const insertReport = async (pipeName: string, beginMark?: string): Promise<SnowpipeAPIResponse<InsertReportResponse>> => {
// https://<account>.snowflakecomputing.com/v1/data/pipes/<pipeName>/insertReport?requestId=
let path = `/v1/data/pipes/${pipeName}/insertReport?requestId=${getRequestId()}`;
if (beginMark) {
Expand All @@ -198,16 +392,10 @@ export const createSnowpipeAPI = (username: string, privateKey: string, account:
Accept: 'application/json'
}
};
return await makeRequest(options, apiEndpointHistory.insertReport);
return await makeRequest<InsertReportResponse>(options, apiEndpointHistory.insertReport);
}

/**
*
* @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe.
* @param startTimeInclusive Timestamp in ISO-8601 format. Start of the time range to retrieve load history data.
* @param endTimeExclusive Timestamp in ISO-8601 format. End of the time range to retrieve load history data. If omitted, then CURRENT_TIMESTAMP() is used as the end of the range.
*/
const loadHistoryScan = async (pipeName: string, startTimeInclusive: string, endTimeExclusive?: string) => {
const loadHistoryScan = async (pipeName: string, startTimeInclusive: string, endTimeExclusive?: string): Promise<SnowpipeAPIResponse<LoadHistoryScanResponse>> => {
// /v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<startTime>&endTimeExclusive=<endTime>&requestId=<requestId>
let path = `/v1/data/pipes/${pipeName}/loadHistoryScan?startTimeInclusive=${startTimeInclusive}&requestId=${getRequestId()}`;
if (endTimeExclusive) {
Expand All @@ -227,17 +415,13 @@ export const createSnowpipeAPI = (username: string, privateKey: string, account:
Accept: 'application/json'
}
};
return await makeRequest(options, apiEndpointHistory.loadHistoryScan);
}

const endpointHistory = (): APIEndpointHistory => {
return apiEndpointHistory
return await makeRequest<LoadHistoryScanResponse>(options, apiEndpointHistory.loadHistoryScan);
}

return {
loadHistoryScan,
insertReport,
insertFile,
endpointHistory,
endpointHistory: apiEndpointHistory,
}
}

0 comments on commit fa0f632

Please sign in to comment.