Skip to content

Commit 5e6a842

Browse files
authored
Initial commit
1 parent 3b5f9eb commit 5e6a842

File tree

4 files changed

+264
-0
lines changed

4 files changed

+264
-0
lines changed

config.sample.json

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"project_id": "bigquery-public-data",
3+
"dataset_id": "samples",
4+
"table_id": "github_timeline"
5+
}

setup.cfg

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[metadata]
2+
description-file = README.md

setup.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/usr/bin/env python
2+
3+
from setuptools import setup
4+
5+
setup(name='target-bigquery',
6+
version='0.0.1',
7+
description='Singer.io target for writing data to Google BigQuery',
8+
author='Grayson Williams',
9+
url='https://realself.com',
10+
classifiers=['Programming Language :: Python :: 3 :: Only'],
11+
py_modules=['target_bigquery'],
12+
install_requires=[
13+
'jsonschema==2.6.0',
14+
'singer-python==1.5.0',
15+
'google-api-python-client==1.6.2'
16+
],
17+
entry_points='''
18+
[console_scripts]
19+
target-bigquery=target_bigquery:main
20+
''',
21+
)

target_bigquery.py

+236
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
#!/usr/bin/env python3
2+
3+
import argparse
4+
import functools
5+
import io
6+
import os
7+
import sys
8+
import json
9+
import logging
10+
import collections
11+
import threading
12+
import http.client
13+
import urllib
14+
import pkg_resources
15+
16+
from jsonschema import validate
17+
import singer
18+
19+
import httplib2
20+
21+
from apiclient import discovery
22+
from oauth2client import client
23+
from oauth2client import tools
24+
from oauth2client.file import Storage
25+
26+
from google.cloud import bigquery
27+
from google.cloud.bigquery import Dataset
28+
from google.cloud.bigquery import SchemaField
29+
from google.api_core import exceptions
30+
31+
try:
32+
parser = argparse.ArgumentParser(parents=[tools.argparser])
33+
parser.add_argument('-c', '--config', help='Config file', required=True)
34+
flags = parser.parse_args()
35+
36+
except ImportError:
37+
flags = None
38+
39+
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
40+
logger = singer.get_logger()
41+
42+
SCOPES = 'https://www.googleapis.com/auth/bigquery'
43+
CLIENT_SECRET_FILE = 'client_secret.json'
44+
APPLICATION_NAME = 'Singer BigQuery Target'
45+
46+
StreamMeta = collections.namedtuple('StreamMeta', ['schema', 'key_properties', 'bookmark_properties'])
47+
48+
def get_credentials():
49+
"""Gets valid user credentials from storage.
50+
51+
If nothing has been stored, or if the stored credentials are invalid,
52+
the OAuth2 flow is completed to obtain the new credentials.
53+
54+
Returns:
55+
Credentials, the obtained credential.
56+
"""
57+
58+
home_dir = os.path.expanduser('~')
59+
credential_dir = os.path.join(home_dir, '.credentials')
60+
if not os.path.exists(credential_dir):
61+
os.makedirs(credential_dir)
62+
credential_path = os.path.join(credential_dir,
63+
'bigquery.googleapis.com-singer-target.json')
64+
65+
store = Storage(credential_path)
66+
credentials = store.get()
67+
if not credentials or credentials.invalid:
68+
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
69+
flow.user_agent = APPLICATION_NAME
70+
if flags:
71+
credentials = tools.run_flow(flow, store, flags)
72+
else: # Needed only for compatibility with Python 2.6
73+
credentials = tools.run(flow, store)
74+
print('Storing credentials to ' + credential_path)
75+
return credentials
76+
77+
78+
def emit_state(state):
79+
if state is not None:
80+
line = json.dumps(state)
81+
logger.debug('Emitting state {}'.format(line))
82+
sys.stdout.write("{}\n".format(line))
83+
sys.stdout.flush()
84+
85+
def build_schema(schema):
86+
SCHEMA = []
87+
for key in schema['properties'].keys():
88+
schema_name = key
89+
schema_type = "STRING"
90+
schema_mode = "NULLABLE"
91+
schema_fields = None
92+
93+
if type(schema['properties'][key]['type']) is list:
94+
if schema['properties'][key]['type'][0] == "null":
95+
schema_mode = 'NULLABLE'
96+
else:
97+
schema_mode = 'required'
98+
schema_type = schema['properties'][key]['type'][1]
99+
else:
100+
schema_type = schema['properties'][key]['type']
101+
if schema_type == schema['properties'][key]['type'] == "array":
102+
schema_mode = "repeated"
103+
if "items" in schema['properties'][key]:
104+
schema_fields = build_schema(schema['properties'][key])
105+
106+
if schema_type == "string":
107+
if "format" in schema['properties'][key]:
108+
if schema['properties'][key]['format'] == "date-time":
109+
schema_type = "timestamp"
110+
111+
SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_fields))
112+
113+
return SCHEMA
114+
115+
def persist_lines(project_id, dataset_id, table_id, lines):
116+
state = None
117+
schemas = {}
118+
key_properties = {}
119+
120+
headers_by_stream = {}
121+
122+
for line in lines:
123+
try:
124+
msg = singer.parse_message(line)
125+
except json.decoder.JSONDecodeError:
126+
logger.error("Unable to parse:\n{}".format(line))
127+
raise
128+
129+
if isinstance(msg, singer.RecordMessage):
130+
if msg.stream not in schemas:
131+
raise Exception("A record for stream {} was encountered before a corresponding schema".format(msg.stream))
132+
133+
schema = schemas[msg.stream]
134+
validate(msg.record, schema)
135+
136+
bigquery_client = bigquery.Client(project=project_id)
137+
138+
dataset_ref = bigquery_client.dataset(dataset_id)
139+
dataset = Dataset(dataset_ref)
140+
141+
try:
142+
dataset = bigquery_client.create_dataset(Dataset(dataset_ref)) or Dataset(dataset_ref)
143+
except exceptions.Conflict:
144+
pass
145+
146+
table_ref = dataset.table(table_id)
147+
table_schema = build_schema(schema)
148+
149+
table = bigquery.Table(table_ref, schema=table_schema)
150+
try:
151+
table = bigquery_client.create_table(table)
152+
except exceptions.Conflict:
153+
pass
154+
155+
rows = [msg.record]
156+
errors = bigquery_client.create_rows(table, rows)
157+
158+
if not errors:
159+
print('Loaded 1 row into {}:{}'.format(dataset_id, table_id))
160+
else:
161+
print('Errors:')
162+
pprint(errors)
163+
164+
state = None
165+
elif isinstance(msg, singer.StateMessage):
166+
logger.debug('Setting state to {}'.format(msg.value))
167+
state = msg.value
168+
elif isinstance(msg, singer.SchemaMessage):
169+
schemas[msg.stream] = msg.schema
170+
key_properties[msg.stream] = msg.key_properties
171+
elif isinstance(msg, singer.ActivateVersionMessage):
172+
# This is experimental and won't be used yet
173+
pass
174+
else:
175+
raise Exception("Unrecognized message {}".format(msg))
176+
177+
#print("Schemas: ", schemas[list(schemas.keys())[0]]['properties'])
178+
#print("\n\n")
179+
#print("Schema keys: ", schemas.keys())
180+
return state
181+
182+
183+
def collect():
184+
try:
185+
version = pkg_resources.get_distribution('target-bigquery').version
186+
conn = http.client.HTTPSConnection('collector.stitchdata.com', timeout=10)
187+
conn.connect()
188+
params = {
189+
'e': 'se',
190+
'aid': 'singer',
191+
'se_ca': 'target-bigquery',
192+
'se_ac': 'open',
193+
'se_la': version,
194+
}
195+
conn.request('GET', '/i?' + urllib.parse.urlencode(params))
196+
response = conn.getresponse()
197+
conn.close()
198+
except:
199+
logger.debug('Collection request failed')
200+
201+
202+
def main():
203+
with open(flags.config) as input:
204+
config = json.load(input)
205+
206+
"""
207+
if not config.get('disable_collection', False):
208+
logger.info('Sending version information to stitchdata.com. ' +
209+
'To disable sending anonymous usage data, set ' +
210+
'the config parameter "disable_collection" to true')
211+
threading.Thread(target=collect).start()
212+
"""
213+
214+
"""
215+
credentials = get_credentials()
216+
http = credentials.authorize(httplib2.Http())
217+
discoveryUrl = ('https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest')
218+
service = discovery.build('bigquery', 'v2', http=http,
219+
discoveryServiceUrl=discoveryUrl)
220+
"""
221+
222+
input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
223+
224+
#state = persist_lines(service, config['project_id'], config['dataset_id'], config['table_id'], input)
225+
state = persist_lines(config['project_id'], config['dataset_id'], config['table_id'], input)
226+
emit_state(state)
227+
logger.debug("Exiting normally")
228+
229+
230+
if __name__ == '__main__':
231+
main()
232+
233+
234+
"""
235+
schema: {'properties': {'consult_cost': {'inclusion': 'available', 'minimum': -2147483648, 'maximum': 2147483647, 'type': ['null', 'integer']}, 'primary_address_id': {'inclusion': 'available', 'minimum': -2147483648, 'maximum': 2147483647, 'type': ['null', 'integer']}, 'is_core_override': {'inclusion': 'available', 'type': ['null', 'boolean']}, 'primary_specialty_id': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'last_change_by': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'website_vendor': {'inclusion': 'available', 'maxLength': 50, 'type': ['null', 'string']}, 'list_flags': {'inclusion': 'available', 'maxLength': 50, 'type': ['null', 'string']}, 'certification': {'inclusion': 'available', 'maxLength': 65535, 'type': ['null', 'string']}, 'postdoc_training': {'inclusion': 'available', 'maxLength': 65535, 'type': ['null', 'string']}, 'city': {'inclusion': 'available', 'maxLength': 50, 'type': ['null', 'string']}, 'auto_publish': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'address': {'inclusion': 'available', 'maxLength': 100, 'type': ['null', 'string']}, 'began_aesthetic_medicine': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'website': {'inclusion': 'available', 'maxLength': 200, 'type': ['null', 'string']}, 'education': {'inclusion': 'available', 'maxLength': 65535, 'type': ['null', 'string']}, 'linked_url': {'inclusion': 'available', 'maxLength': 200, 'type': ['null', 'string']}, 'anonymous_votes': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'primary_twilio_id': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'zipcode': {'inclusion': 'available', 'maxLength': 50, 'type': ['null', 'string']}, 'initial_contact_date': {'inclusion': 'available', 'maxLength': 50, 'type': ['null', 'string']}, 'modified': {'inclusion': 'available', 'format': 'date-time', 'type': ['null', 'string']}, 'additional_addresses': {'inclusion': 'available', 'maxLength': 500, 'type': ['null', 'string']}, 'business': {'inclusion': 'available', 'maxLength': 100, 'type': ['null', 'string']}, 'gplus': {'inclusion': 'available', 'maxLength': 200, 'type': ['null', 'string']}, 'last_name': {'inclusion': 'available', 'maxLength': 50, 'type': ['null', 'string']}, 'status': {'inclusion': 'available', 'type': ['null', 'boolean']}, 'enable_activity_stream': {'inclusion': 'available', 'type': ['null', 'boolean']}, 'pro_purchase_date': {'inclusion': 'available', 'format': 'date-time', 'type': ['null', 'string']}, 'awards': {'inclusion': 'available', 'maxLength': 65535, 'type': ['null', 'string']}, 'uri': {'inclusion': 'available', 'maxLength': 128, 'type': ['null', 'string']}, 'inactive': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'realcare_promise': {'inclusion': 'available', 'type': ['null', 'boolean']}, 'tier': {'inclusion': 'available', 'maxLength': 14, 'type': ['null', 'string']}, 'statement': {'inclusion': 'available', 'maxLength': 65535, 'type': ['null', 'string']}, 'user_id': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'country': {'inclusion': 'available', 'maxLength': 50, 'type': ['null', 'string']}, 'photo': {'inclusion': 'available', 'maxLength': 128, 'type': ['null', 'string']}, 'hide_gplus_msg': {'inclusion': 'available', 'type': ['null', 'boolean']}, 'directory_link': {'inclusion': 'available', 'maxLength': 150, 'type': ['null', 'string']}, 'name': {'inclusion': 'available', 'maxLength': 60, 'type': ['null', 'string']}, 'consult_notes': {'inclusion': 'available', 'maxLength': 300, 'type': ['null', 'string']}, 'disclosures': {'inclusion': 'available', 'maxLength': 500, 'type': ['null', 'string']}, 'lead_email': {'inclusion': 'available', 'maxLength': 255, 'type': ['null', 'string']}, 'address_suite': {'inclusion': 'available', 'maxLength': 100, 'type': ['null', 'string']}, 'spotlight': {'inclusion': 'available', 'type': ['null', 'boolean']}, 'opinion_leader': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'clinical_privileges': {'inclusion': 'available', 'maxLength': 500, 'type': ['null', 'string']}, 'state': {'inclusion': 'available', 'maxLength': 50, 'type': ['null', 'string']}, 'primary_location_id': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'twitter_facebook': {'inclusion': 'available', 'type': ['null', 'boolean']}, 'advisor_user_id': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'gender': {'inclusion': 'available', 'maxLength': 10, 'type': ['null', 'string']}, 'rid': {'inclusion': 'automatic', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'salesforce_id': {'inclusion': 'available', 'maxLength': 15, 'type': ['null', 'string']}, 'phone': {'inclusion': 'available', 'maxLength': 100, 'type': ['null', 'string']}, 'id': {'inclusion': 'automatic', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'promote': {'inclusion': 'available', 'minimum': 0, 'maximum': 4294967295, 'type': ['null', 'integer']}, 'created': {'inclusion': 'available', 'format': 'date-time', 'type': ['null', 'string']}, 'premier_status': {'inclusion': 'available', 'maxLength': 20, 'type': ['null', 'string']}}, 'type': 'object'}
236+
"""

0 commit comments

Comments
 (0)