Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
brianzinn committed Oct 7, 2020
1 parent c385c9d commit a815e11
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 2 deletions.
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
node_modules
coverage
.nyc_output
.DS_Store
*.log
.vscode
.idea
dist
compiled
.awcache
.rpt2_cache
docs
yarn.lock
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# -snowflake-ingest-node
For integrating with Snowflake snowpipe API
# snowflake-ingest-node
simple API wrapper for Snowpipe at time of writing only Python and Java were available SDKs
61 changes: 61 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"name": "snowflake-ingest-node",
"version": "0.0.1",
"description": "snowflake snowpipe API ingest for Node",
"keywords": [
"snowflake",
"snowpipe",
"ingest",
"node",
"typescript"
],
"main": "dist/lib/snowflake-ingest-node.js",
"module": "dist/snowflake-ingest-node.es5.js",
"typings": "dist/types/snowflake-ingest-node.d.ts",
"files": [
"dist"
],
"author": "Brian Zinn <[email protected]>",
"repository": {
"type": "git",
"url": "https://github.com/brianzinn/snowflake-ingest-node"
},
"scripts": {
"lint": "tslint --project tsconfig.json -t codeFrame 'src/**/*.ts'",
"prebuild": "rimraf dist",
"build": "tsc --module commonjs && rollup -c rollup.config.ts",
"start": "rollup -c rollup.config.ts -w",
"precommit": "lint-staged"
},
"lint-staged": {
"src/**/*.ts": [
"prettier --write"
]
},
"prettier": {
"semi": false,
"singleQuote": true,
"trailingComma": "es5"
},
"devDependencies": {
"@types/node": "^12.0.8",
"cross-env": "^6.0.0",
"lint-staged": "^9.0.0",
"prettier": "^1.14.3",
"rimraf": "^3.0.0",
"rollup": "^1.15.6",
"rollup-plugin-commonjs": "^10.0.0",
"rollup-plugin-json": "^4.0.0",
"rollup-plugin-node-resolve": "^5.0.3",
"rollup-plugin-sourcemaps": "^0.4.2",
"rollup-plugin-typescript2": "^0.23.0",
"ts-node": "^8.3.0",
"tslint": "^5.11.0",
"tslint-config-prettier": "^1.15.0",
"tslint-config-standard": "^8.0.1",
"typescript": "^3.0.3"
},
"dependencies": {
"jwt-simple": "^0.5.6"
}
}
36 changes: 36 additions & 0 deletions rollup.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import resolve from 'rollup-plugin-node-resolve'
import commonjs from 'rollup-plugin-commonjs'
import sourceMaps from 'rollup-plugin-sourcemaps'
import typescript from 'rollup-plugin-typescript2'
import json from 'rollup-plugin-json'

const pkg = require('./package.json')

const libraryName = 'snowflake-ingest-node'

export default {
input: `src/${libraryName}.ts`,
output: [
{ file: pkg.module, format: 'es', sourcemap: true },
],
// Indicate here external modules you don't wanna include in your bundle (i.e.: 'lodash')
external: [],
watch: {
include: 'src/**',
},
plugins: [
// Allow json resolution
json(),
// Compile TypeScript files
typescript({ useTsconfigDeclarationDir: true }),
// Allow bundling cjs modules (unlike webpack, rollup doesn't understand cjs)
commonjs(),
// Allow node_modules resolution, so you can use 'external' to control
// which external modules to include in the bundle
// https://github.com/rollup/rollup-plugin-node-resolve#usage
resolve({ preferBuiltins: true}),

// Resolve source maps to the original source
sourceMaps(),
],
}
231 changes: 231 additions & 0 deletions src/snowflake-ingest-node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import * as crypto from 'crypto';
import { ClientRequest, IncomingMessage } from 'http';
import https, { RequestOptions } from 'https' // chosen over ie: axios to not bring in extra dep.
import jwt from 'jwt-simple';

const USER_AGENT = 'snowpipe-ingest-node/0.0.1/node/npm';

export type SnowpipeAPIOptions = {
recordHistory: boolean
}

export type RecordedCallResponse = {
error?: Error
statusCode?: number
messageBody?: string
}

export type RecordedCall = {
request: RequestOptions
response: RecordedCallResponse
}

export type APIEndpointHistory = {
insertFile: RecordedCall[]
insertReport: RecordedCall[]
loadHistoryScan: RecordedCall[]
}

/**
*
* @param username user that you created in Snowflake (and added a private key auth)
* @param privateKey private key for provided user (for generating bearer token)
* @param account account provided by Snowflake
* @param regionId needed for non-default (AWS)?
* @param cloudProvider needed for non-AWS?
*/
export const createSnowpipeAPI = (username: string, privateKey: string, account: string, regionId?: string, cloudProvider?: string, snowpipeAPIOptions?: SnowpipeAPIOptions) => {
// `{account}.snowflakecomputing.com` is for default US AWS.
// for GCP you need `{account}.{regionId}.gcp.snowflakecomputing.com`
const domainParts = [account, regionId, cloudProvider];

const apiEndpointHistory: APIEndpointHistory = {
insertFile: [],
insertReport: [],
loadHistoryScan: []
}

const config = {
username: username.toUpperCase(),
privateKey,
account: account.toUpperCase(),
hostname: `${domainParts.filter(p => p !== undefined).join('.')}.snowflakecomputing.com`
}

const getBearerToken = async (): Promise<string> => {
const publicKeyBytes = crypto.createPublicKey(privateKey).export({ type: 'spki', format: 'der' });
// matches FP (fingerprint) on `desc user <username>`in snowflake.
// ie: SHA256:g....I=
const signature = 'SHA256:' + crypto.createHash('sha256').update(publicKeyBytes).digest().toString('base64');

const ISSUER = 'iss';
const ISSUED_AT_TIME = 'iat';
const EXPIRY_TIME = 'exp';
const SUBJECT = 'sub';

const payload = {
[ISSUER]: `${config.account}.${config.username}.${signature}`,
[SUBJECT]: `${config.account}.${config.username}`,
[ISSUED_AT_TIME]: Math.round(new Date().getTime() / 1000),
[EXPIRY_TIME]: Math.round(new Date().getTime() / 1000 + 60 * 59)
}

const bearer = jwt.encode(payload, privateKey, 'RS256');
return bearer;
}

const makeRequest = async (options: RequestOptions, endpointCallHistory: RecordedCall[], postBody?: string): Promise<string> => {
return new Promise<string>((resolve, reject) => {
const req: ClientRequest = https.request(
options,
(response: IncomingMessage) => {
const body: string[] = [];
response.on('data', (chunk: any) => {
body.push(chunk);
})

response.on('end', () => {
const messageBody = body.join('');
if (snowpipeAPIOptions?.recordHistory === true) {
endpointCallHistory.push({
request: options,
response: {
statusCode: response.statusCode,
messageBody
}
});
}

if (response.statusCode !== undefined && (response.statusCode < 200 || response.statusCode > 299)) {
reject(new Error(`status code: ${response.statusCode}. '${messageBody}'`));
} else {
resolve(messageBody);
}
})
}
);

req.on('error', (error: Error) => {
if (snowpipeAPIOptions?.recordHistory === true) {
endpointCallHistory.push({
request: options,
response: {
error
}
});
}
reject(error);
})

if (postBody) {
req.write(postBody);
}

req.end();
});
}

/**
* Snowflake recommends providing a random string with each request, e.g. a UUID.
*/
const getRequestId = () => {
return crypto.randomBytes(16).toString("hex");
}

/**
* https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#data-file-ingestion
*
* @param filenames list of files to be ingested by snowflake
* @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe.
*/
const insertFile = async (filenames: string[], pipeName: string): Promise<string> => {
const postBody = JSON.stringify({
"files": filenames.map(filename => ({path: filename}))
});

const path = `/v1/data/pipes/${pipeName}/insertFiles?requestId=${getRequestId()}`;

const jwt_token: string = await getBearerToken();

const options = {
hostname: config.hostname,
port: 443,
path,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': postBody.length,
'Authorization': `Bearer ${jwt_token}`,
'User-Agent': USER_AGENT,
Accept: 'application/json'
}
};
return await makeRequest(options, apiEndpointHistory.insertFile, postBody);
}

/**
* https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html#endpoint-insertreport
*/
const insertReport = async (pipeName: string, beginMark?: string) => {
// https://<account>.snowflakecomputing.com/v1/data/pipes/<pipeName>/insertReport?requestId=
let path = `/v1/data/pipes/${pipeName}/insertReport?requestId=${getRequestId()}`;
if (beginMark) {
path += `&beginMark=${beginMark}`;
}

const jwt_token: string = await getBearerToken();

const options = {
hostname: config.hostname,
port: 443,
path,
method: 'GET',
headers: {
'Authorization': `Bearer ${jwt_token}`,
'User-Agent': USER_AGENT,
Accept: 'application/json'
}
};
return await makeRequest(options, apiEndpointHistory.insertReport);
}

/**
*
* @param pipeName Case-sensitive, fully-qualified pipe name. For example, myDatabase.mySchema.myPipe.
* @param startTimeInclusive Timestamp in ISO-8601 format. Start of the time range to retrieve load history data.
* @param endTimeExclusive Timestamp in ISO-8601 format. End of the time range to retrieve load history data. If omitted, then CURRENT_TIMESTAMP() is used as the end of the range.
*/
const loadHistoryScan = async (pipeName: string, startTimeInclusive: string, endTimeExclusive?: string) => {
// /v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<startTime>&endTimeExclusive=<endTime>&requestId=<requestId>
let path = `/v1/data/pipes/${pipeName}/loadHistoryScan?startTimeInclusive=${startTimeInclusive}&requestId=${getRequestId()}`;
if (endTimeExclusive) {
path += `&endTimeExclusive=${endTimeExclusive}`;
}

const jwt_token: string = await getBearerToken();

const options = {
hostname: config.hostname,
port: 443,
path,
method: 'GET',
headers: {
'Authorization': `Bearer ${jwt_token}`,
'User-Agent': USER_AGENT,
Accept: 'application/json'
}
};
return await makeRequest(options, apiEndpointHistory.loadHistoryScan);
}

const endpointHistory = (): APIEndpointHistory => {
return apiEndpointHistory
}

return {
loadHistoryScan,
insertReport,
insertFile,
endpointHistory,
}
}
19 changes: 19 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"compilerOptions": {
"moduleResolution": "node",
"target": "ES2017", // for node10. Use "ES5" for node 6 or 8
"module": "es2015",
"lib": ["es2015", "es2016", "es2017"],
"strict": true,
"sourceMap": true,
"declaration": true,
"allowSyntheticDefaultImports": true,
"experimentalDecorators": true,
"emitDecoratorMetadata": true,
"noImplicitAny": true,
"declarationDir": "dist/types",
"outDir": "dist/lib",
"typeRoots": ["node_modules/@types"]
},
"include": ["src"]
}
3 changes: 3 additions & 0 deletions tslint.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"extends": ["tslint-config-standard", "tslint-config-prettier"]
}

0 comments on commit a815e11

Please sign in to comment.