Skip to content

Commit

Permalink
formatted the code with black
Browse files Browse the repository at this point in the history
  • Loading branch information
mikaelene committed Nov 4, 2019
1 parent bb51f50 commit 5bb6ef8
Show file tree
Hide file tree
Showing 16 changed files with 1,819 additions and 945 deletions.
130 changes: 87 additions & 43 deletions eneel/adapters/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import os

import logging
logger = logging.getLogger('main_logger')

logger = logging.getLogger("main_logger")

def run_export_query(server, user, password, database, port, query, file_path, delimiter, rows=5000):

def run_export_query(
server, user, password, database, port, query, file_path, delimiter, rows=5000
):
try:
db = Database(server, user, password, database, port)
export = db.cursor.execute(query)
Expand All @@ -29,18 +32,27 @@ def run_export_query(server, user, password, database, port, query, file_path, d
logger.error(e)


def NumbersAsDecimal(cursor, name, defaultType, size, precision,
scale):
def NumbersAsDecimal(cursor, name, defaultType, size, precision, scale):
if defaultType == cx_Oracle.NUMBER:
return cursor.var(str, 100, cursor.arraysize,
outconverter = decimal.Decimal)
return cursor.var(str, 100, cursor.arraysize, outconverter=decimal.Decimal)


class Database:
def __init__(self, server, user, password, database, port=None, limit_rows=None, table_where_clause=None,
read_only=False, table_parallel_loads=10, table_parallel_batch_size=1000000):
def __init__(
self,
server,
user,
password,
database,
port=None,
limit_rows=None,
table_where_clause=None,
read_only=False,
table_parallel_loads=10,
table_parallel_batch_size=1000000,
):
try:
server_db = '{}:{}/{}'.format(server, port, database)
server_db = "{}:{}/{}".format(server, port, database)
self._server = server
self._user = user
self._password = password
Expand All @@ -54,7 +66,7 @@ def __init__(self, server, user, password, database, port=None, limit_rows=None,
self._table_parallel_loads = table_parallel_loads
self._table_parallel_batch_size = table_parallel_batch_size

os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.WE8ISO8859P1'
os.environ["NLS_LANG"] = "AMERICAN_AMERICA.WE8ISO8859P1"
self._conn = cx_Oracle.connect(user, password, server_db)

self._conn.outputtypehandler = NumbersAsDecimal
Expand Down Expand Up @@ -112,7 +124,7 @@ def fetchone(self):

def fetchmany(self, rows):
try:
print('Entering fetchmany')
print("Entering fetchmany")
return self.cursor.fetchmany(rows)
except cx_Oracle.Error as e:
logger.error(e)
Expand All @@ -126,7 +138,7 @@ def query(self, sql, params=None):

def schemas(self):
try:
q = 'SELECT DISTINCT OWNER FROM ALL_TABLES'
q = "SELECT DISTINCT OWNER FROM ALL_TABLES"
schemas = self.query(q)
return schemas
except:
Expand Down Expand Up @@ -164,10 +176,14 @@ def table_columns(self, schema, table):

def check_table_exist(self, table_name):
try:
check_statement = """
check_statement = (
"""
SELECT 1
FROM ALL_TABLES
WHERE OWNER || '.' || TABLE_NAME = '""" + table_name + "'"
WHERE OWNER || '.' || TABLE_NAME = '"""
+ table_name
+ "'"
)
exists = self.query(check_statement)
if exists:
return True
Expand All @@ -177,13 +193,13 @@ def check_table_exist(self, table_name):
logger.error("Failed checking table exist")

def truncate_table(self, table_name):
return 'Not implemented for this adapter'
return "Not implemented for this adapter"

def create_schema(self, schema):
return 'Not implemented for this adapter'
return "Not implemented for this adapter"

def get_max_column_value(self, table_name, column):
return 'Not implemented for this adapter'
return "Not implemented for this adapter"

def get_min_max_column_value(self, table_name, column):
try:
Expand All @@ -199,7 +215,13 @@ def get_min_max_batch(self, table_name, column):
try:
sql = "SELECT MIN(" + column + "), MAX(" + column
sql += "), ceil((max( " + column + ") - min("
sql += column + ")) / (count(*)/" + str(self._table_parallel_batch_size) + ".0)) FROM " + table_name
sql += (
column
+ ")) / (count(*)/"
+ str(self._table_parallel_batch_size)
+ ".0)) FROM "
+ table_name
)
res = self.query(sql)
min_value = int(res[0][0])
max_value = int(res[0][1])
Expand All @@ -208,8 +230,15 @@ def get_min_max_batch(self, table_name, column):
except:
logger.debug("Failed getting min, max and batch column value")

def generate_export_query(self, columns, schema, table, replication_key=None, max_replication_key=None,
parallelization_where=None):
def generate_export_query(
self,
columns,
schema,
table,
replication_key=None,
max_replication_key=None,
parallelization_where=None,
):
# Generate SQL statement for extract
select_stmt = "SELECT "
# Add columns
Expand All @@ -218,11 +247,13 @@ def generate_export_query(self, columns, schema, table, replication_key=None, ma
select_stmt += column_name + ", "
select_stmt = select_stmt[:-2]

select_stmt += ' FROM ' + schema + "." + table
select_stmt += " FROM " + schema + "." + table

# Where-claues for incremental replication
if replication_key:
replication_where = replication_key + " > " + "'" + max_replication_key + "'"
replication_where = (
replication_key + " > " + "'" + max_replication_key + "'"
)
else:
replication_where = None

Expand All @@ -239,36 +270,49 @@ def generate_export_query(self, columns, schema, table, replication_key=None, ma
return select_stmt

def export_query(self, query, file_path, delimiter, rows=5000):
rowcounts = run_export_query(self._server, self._user, self._password, self._database, self._port, query, file_path,
delimiter, rows=5000)
rowcounts = run_export_query(
self._server,
self._user,
self._password,
self._database,
self._port,
query,
file_path,
delimiter,
rows=5000,
)
return rowcounts

def insert_from_table_and_drop(self, schema, to_table, from_table):
return 'Not implemented for this adapter'
return "Not implemented for this adapter"

def switch_tables(self, schema, old_table, new_table):
return 'Not implemented for this adapter'
return "Not implemented for this adapter"

def import_table(self, schema, table, file, delimiter=','):
return 'Not implemented for this adapter'
def import_table(self, schema, table, file, delimiter=","):
return "Not implemented for this adapter"

def generate_create_table_ddl(self, schema, table, columns):
return 'Not implemented for this adapter'
return "Not implemented for this adapter"

def create_table_from_columns(self, schema, table, columns):
return 'Not implemented for this adapter'
return "Not implemented for this adapter"

def create_log_table(self, schema, table):
return 'Not implemented for this adapter'

def log(self, schema, table,
project=None,
project_started_at=None,
source_table=None,
target_table=None,
started_at=None,
ended_at=None,
status=None,
exported_rows=None,
imported_rows=None):
return 'Not implemented for this adapter'
return "Not implemented for this adapter"

def log(
self,
schema,
table,
project=None,
project_started_at=None,
source_table=None,
target_table=None,
started_at=None,
ended_at=None,
status=None,
exported_rows=None,
imported_rows=None,
):
return "Not implemented for this adapter"
Loading

0 comments on commit 5bb6ef8

Please sign in to comment.