diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..95f0c5b --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +node_modules +coverage +.nyc_output +.DS_Store +*.log +.vscode +.idea +dist +compiled +.awcache +.rpt2_cache +docs +yarn.lock diff --git a/README.md b/README.md index a8ae1fa..587848f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# -snowflake-ingest-node -For integrating with Snowflake snowpipe API +# snowflake-ingest-node +simple API wrapper for Snowpipe at time of writing only Python and Java were available SDKs \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..fa213b0 --- /dev/null +++ b/package.json @@ -0,0 +1,61 @@ +{ + "name": "snowflake-ingest-node", + "version": "0.0.1", + "description": "snowflake snowpipe API ingest for Node", + "keywords": [ + "snowflake", + "snowpipe", + "ingest", + "node", + "typescript" + ], + "main": "dist/lib/snowflake-ingest-node.js", + "module": "dist/snowflake-ingest-node.es5.js", + "typings": "dist/types/snowflake-ingest-node.d.ts", + "files": [ + "dist" + ], + "author": "Brian Zinn ", + "repository": { + "type": "git", + "url": "https://github.com/brianzinn/snowflake-ingest-node" + }, + "scripts": { + "lint": "tslint --project tsconfig.json -t codeFrame 'src/**/*.ts'", + "prebuild": "rimraf dist", + "build": "tsc --module commonjs && rollup -c rollup.config.ts", + "start": "rollup -c rollup.config.ts -w", + "precommit": "lint-staged" + }, + "lint-staged": { + "src/**/*.ts": [ + "prettier --write" + ] + }, + "prettier": { + "semi": false, + "singleQuote": true, + "trailingComma": "es5" + }, + "devDependencies": { + "@types/node": "^12.0.8", + "cross-env": "^6.0.0", + "lint-staged": "^9.0.0", + "prettier": "^1.14.3", + "rimraf": "^3.0.0", + "rollup": "^1.15.6", + "rollup-plugin-commonjs": "^10.0.0", + "rollup-plugin-json": "^4.0.0", + "rollup-plugin-node-resolve": "^5.0.3", + "rollup-plugin-sourcemaps": "^0.4.2", + "rollup-plugin-typescript2": "^0.23.0", + "ts-node": "^8.3.0", + "tslint": "^5.11.0", + "tslint-config-prettier": "^1.15.0", + "tslint-config-standard": "^8.0.1", + "typescript": "^3.0.3" + }, + "dependencies": { + "jwt-simple": "^0.5.6" + } +} diff --git a/rollup.config.ts b/rollup.config.ts new file mode 100644 index 0000000..b2c9a6b --- /dev/null +++ b/rollup.config.ts @@ -0,0 +1,36 @@ +import resolve from 'rollup-plugin-node-resolve' +import commonjs from 'rollup-plugin-commonjs' +import sourceMaps from 'rollup-plugin-sourcemaps' +import typescript from 'rollup-plugin-typescript2' +import json from 'rollup-plugin-json' + +const pkg = require('./package.json') + +const libraryName = 'snowflake-ingest-node' + +export default { + input: `src/${libraryName}.ts`, + output: [ + { file: pkg.module, format: 'es', sourcemap: true }, + ], + // Indicate here external modules you don't wanna include in your bundle (i.e.: 'lodash') + external: [], + watch: { + include: 'src/**', + }, + plugins: [ + // Allow json resolution + json(), + // Compile TypeScript files + typescript({ useTsconfigDeclarationDir: true }), + // Allow bundling cjs modules (unlike webpack, rollup doesn't understand cjs) + commonjs(), + // Allow node_modules resolution, so you can use 'external' to control + // which external modules to include in the bundle + // https://github.com/rollup/rollup-plugin-node-resolve#usage + resolve({ preferBuiltins: true}), + + // Resolve source maps to the original source + sourceMaps(), + ], +} diff --git a/src/snowflake-ingest-node.ts b/src/snowflake-ingest-node.ts new file mode 100644 index 0000000..f8093ac --- /dev/null +++ b/src/snowflake-ingest-node.ts @@ -0,0 +1,231 @@ +import * as crypto from 'crypto'; +import { ClientRequest, IncomingMessage } from 'http'; +import https, { RequestOptions } from 'https' // chosen over ie: axios to not bring in extra dep. +import jwt from 'jwt-simple'; + +const USER_AGENT = 'snowpipe-ingest-node/0.0.1/node/npm'; + +export type SnowpipeAPIOptions = { + recordHistory: boolean +} + +export type RecordedCallResponse = { + error?: Error + statusCode?: number + messageBody?: string +} + +export type RecordedCall = { + request: RequestOptions + response: RecordedCallResponse +} + +export type APIEndpointHistory = { + insertFile: RecordedCall[] + insertReport: RecordedCall[] + loadHistoryScan: RecordedCall[] +} + +/** + * + * @param username user that you created in Snowflake (and added a private key auth) + * @param privateKey private key for provided user (for generating bearer token) + * @param account account provided by Snowflake + * @param regionId needed for non-default (AWS)? + * @param cloudProvider needed for non-AWS? + */ +export const createSnowpipeAPI = (username: string, privateKey: string, account: string, regionId?: string, cloudProvider?: string, snowpipeAPIOptions?: SnowpipeAPIOptions) => { + // `{account}.snowflakecomputing.com` is for default US AWS. + // for GCP you need `{account}.{regionId}.gcp.snowflakecomputing.com` + const domainParts = [account, regionId, cloudProvider]; + + const apiEndpointHistory: APIEndpointHistory = { + insertFile: [], + insertReport: [], + loadHistoryScan: [] + } + + const config = { + username: username.toUpperCase(), + privateKey, + account: account.toUpperCase(), + hostname: `${domainParts.filter(p => p !== undefined).join('.')}.snowflakecomputing.com` + } + + const getBearerToken = async (): Promise => { + const publicKeyBytes = crypto.createPublicKey(privateKey).export({ type: 'spki', format: 'der' }); + // matches FP (fingerprint) on `desc user `in snowflake. + // ie: SHA256:g....I= + const signature = 'SHA256:' + crypto.createHash('sha256').update(publicKeyBytes).digest().toString('base64'); + + const ISSUER = 'iss'; + const ISSUED_AT_TIME = 'iat'; + const EXPIRY_TIME = 'exp'; + const SUBJECT = 'sub'; + + const payload = { + [ISSUER]: `${config.account}.${config.username}.${signature}`, + [SUBJECT]: `${config.account}.${config.username}`, + [ISSUED_AT_TIME]: Math.round(new Date().getTime() / 1000), + [EXPIRY_TIME]: Math.round(new Date().getTime() / 1000 + 60 * 59) + } + + const bearer = jwt.encode(payload, privateKey, 'RS256'); + return bearer; + } + + const makeRequest = async (options: RequestOptions, endpointCallHistory: RecordedCall[], postBody?: string): Promise => { + return new Promise((resolve, reject) => { + const req: ClientRequest = https.request( + options, + (response: IncomingMessage) => { + const body: string[] = []; + response.on('data', (chunk: any) => { + body.push(chunk); + }) + + response.on('end', () => { + const messageBody = body.join(''); + if (snowpipeAPIOptions?.recordHistory === true) { + endpointCallHistory.push({ + request: options, + response: { + statusCode: response.statusCode, + messageBody + } + }); + } + + if (response.statusCode !== undefined && (response.statusCode < 200 || response.statusCode > 299)) { + reject(new Error(`status code: ${response.statusCode}. '${messageBody}'`)); + } else { + resolve(messageBody); + } + }) + } + ); + + req.on('error', (error: Error) => { + if (snowpipeAPIOptions?.recordHistory === true) { + endpointCallHistory.push({ + request: options, + response: { + error + } + }); + } + reject(error); + }) + + if (postBody) { + req.write(postBody); + } + + req.end(); + }); + } + + /** + * Snowflake recommends providing a random string with each request, e.g. a UUID. + */ + const getRequestId = () => { + return crypto.randomBytes(16).toString("hex"); + } + + /** + * https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#data-file-ingestion + * + * @param filenames list of files to be ingested by snowflake + * @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe. + */ + const insertFile = async (filenames: string[], pipeName: string): Promise => { + const postBody = JSON.stringify({ + "files": filenames.map(filename => ({path: filename})) + }); + + const path = `/v1/data/pipes/${pipeName}/insertFiles?requestId=${getRequestId()}`; + + const jwt_token: string = await getBearerToken(); + + const options = { + hostname: config.hostname, + port: 443, + path, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': postBody.length, + 'Authorization': `Bearer ${jwt_token}`, + 'User-Agent': USER_AGENT, + Accept: 'application/json' + } + }; + return await makeRequest(options, apiEndpointHistory.insertFile, postBody); + } + + /** + * https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#endpoint-insertreport + */ + const insertReport = async (pipeName: string, beginMark?: string) => { + // https://.snowflakecomputing.com/v1/data/pipes//insertReport?requestId= + let path = `/v1/data/pipes/${pipeName}/insertReport?requestId=${getRequestId()}`; + if (beginMark) { + path += `&beginMark=${beginMark}`; + } + + const jwt_token: string = await getBearerToken(); + + const options = { + hostname: config.hostname, + port: 443, + path, + method: 'GET', + headers: { + 'Authorization': `Bearer ${jwt_token}`, + 'User-Agent': USER_AGENT, + Accept: 'application/json' + } + }; + return await makeRequest(options, apiEndpointHistory.insertReport); + } + + /** + * + * @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe. + * @param startTimeInclusive Timestamp in ISO-8601 format. Start of the time range to retrieve load history data. + * @param endTimeExclusive Timestamp in ISO-8601 format. End of the time range to retrieve load history data. If omitted, then CURRENT_TIMESTAMP() is used as the end of the range. + */ + const loadHistoryScan = async (pipeName: string, startTimeInclusive: string, endTimeExclusive?: string) => { + // /v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=&endTimeExclusive=&requestId= + let path = `/v1/data/pipes/${pipeName}/loadHistoryScan?startTimeInclusive=${startTimeInclusive}&requestId=${getRequestId()}`; + if (endTimeExclusive) { + path += `&endTimeExclusive=${endTimeExclusive}`; + } + + const jwt_token: string = await getBearerToken(); + + const options = { + hostname: config.hostname, + port: 443, + path, + method: 'GET', + headers: { + 'Authorization': `Bearer ${jwt_token}`, + 'User-Agent': USER_AGENT, + Accept: 'application/json' + } + }; + return await makeRequest(options, apiEndpointHistory.loadHistoryScan); + } + + const endpointHistory = (): APIEndpointHistory => { + return apiEndpointHistory + } + + return { + loadHistoryScan, + insertReport, + insertFile, + endpointHistory, + } +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..0bea067 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,19 @@ +{ + "compilerOptions": { + "moduleResolution": "node", + "target": "ES2017", // for node10. Use "ES5" for node 6 or 8 + "module": "es2015", + "lib": ["es2015", "es2016", "es2017"], + "strict": true, + "sourceMap": true, + "declaration": true, + "allowSyntheticDefaultImports": true, + "experimentalDecorators": true, + "emitDecoratorMetadata": true, + "noImplicitAny": true, + "declarationDir": "dist/types", + "outDir": "dist/lib", + "typeRoots": ["node_modules/@types"] + }, + "include": ["src"] +} diff --git a/tslint.json b/tslint.json new file mode 100644 index 0000000..6543a64 --- /dev/null +++ b/tslint.json @@ -0,0 +1,3 @@ +{ + "extends": ["tslint-config-standard", "tslint-config-prettier"] +}