Skip to content

Commit

Permalink
Flattening: Reduce memory Footprint.
Browse files Browse the repository at this point in the history
* Use ijson
* Use pyopenxl write_only mode
* Store sheet lines in an embedded btree ZODB index

#316
  • Loading branch information
kindly committed Jan 28, 2021
1 parent 83a4ef2 commit 1b40acc
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 140 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ jobs:
- run: py.test --cov .
- env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: coveralls
run: coveralls --service=github
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

- flattening: Uses much less memory by storing data in a embedded ZODB database, using ijson and using write only mode in pyopenxl.
- use-titles: Use $ref'erring title if available https://github.com/OpenDataServices/flatten-tool/pull/368
- create-template --no-deprecated-fields: Did not work if deprecated element at same level as a $ref https://github.com/OpenDataServices/flatten-tool/issues/185#issuecomment-719587348

Expand Down
50 changes: 25 additions & 25 deletions flattentool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def flatten(
else:
schema_parser = None

parser = JSONParser(
with JSONParser(
json_filename=input_name,
root_list_path=None if root_is_list else root_list_path,
schema_parser=schema_parser,
Expand All @@ -126,33 +126,33 @@ def flatten(
preserve_fields=preserve_fields,
remove_empty_schema_columns=remove_empty_schema_columns,
truncation_length=truncation_length,
)
parser.parse()

def spreadsheet_output(spreadsheet_output_class, name):
spreadsheet_output = spreadsheet_output_class(
parser=parser,
main_sheet_name=main_sheet_name,
output_name=name,
sheet_prefix=sheet_prefix,
)
spreadsheet_output.write_sheets()

if output_format == "all":
if not output_name:
output_name = "flattened"
for format_name, spreadsheet_output_class in OUTPUT_FORMATS.items():
spreadsheet_output(
spreadsheet_output_class, output_name + FORMATS_SUFFIX[format_name]
persist=True,
) as parser:

def spreadsheet_output(spreadsheet_output_class, name):
spreadsheet_output = spreadsheet_output_class(
parser=parser,
main_sheet_name=main_sheet_name,
output_name=name,
sheet_prefix=sheet_prefix,
)
spreadsheet_output.write_sheets()

if output_format == "all":
if not output_name:
output_name = "flattened"
for format_name, spreadsheet_output_class in OUTPUT_FORMATS.items():
spreadsheet_output(
spreadsheet_output_class, output_name + FORMATS_SUFFIX[format_name]
)

elif output_format in OUTPUT_FORMATS.keys(): # in dictionary of allowed formats
if not output_name:
output_name = "flattened" + FORMATS_SUFFIX[output_format]
spreadsheet_output(OUTPUT_FORMATS[output_format], output_name)
elif output_format in OUTPUT_FORMATS.keys(): # in dictionary of allowed formats
if not output_name:
output_name = "flattened" + FORMATS_SUFFIX[output_format]
spreadsheet_output(OUTPUT_FORMATS[output_format], output_name)

else:
raise Exception("The requested format is not available")
else:
raise Exception("The requested format is not available")


# From http://bugs.python.org/issue16535
Expand Down
108 changes: 83 additions & 25 deletions flattentool/json_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@

import codecs
import copy
import json
import os
import tempfile
import uuid
from collections import OrderedDict
from decimal import Decimal
from warnings import warn

import BTrees.OOBTree
import ijson
import transaction
import xmltodict
import zc.zlibstorage
import ZODB.FileStorage

from flattentool.i18n import _
from flattentool.input import path_search
from flattentool.schema import make_sub_sheet_name
from flattentool.sheet import Sheet
from flattentool.sheet import PersistentSheet

BASIC_TYPES = [str, bool, int, Decimal, type(None)]

Expand Down Expand Up @@ -112,9 +118,26 @@ def __init__(
remove_empty_schema_columns=False,
rollup=False,
truncation_length=3,
persist=False,
):
if persist:
self.zodb_db_location = (
tempfile.gettempdir() + "/flattentool-" + str(uuid.uuid4())
)
zodb_storage = zc.zlibstorage.ZlibStorage(
ZODB.FileStorage.FileStorage(self.zodb_db_location)
)
self.db = ZODB.DB(zodb_storage)
else:
# If None, in memory storage is used.
self.db = ZODB.DB(None)

self.connection = self.db.open()
root = self.connection.root
root.sheet_store = BTrees.OOBTree.BTree()

self.sub_sheets = {}
self.main_sheet = Sheet()
self.main_sheet = PersistentSheet(connection=self.connection, name="")
self.root_list_path = root_list_path
self.root_id = root_id
self.use_titles = use_titles
Expand All @@ -125,9 +148,17 @@ def __init__(
self.filter_value = filter_value
self.remove_empty_schema_columns = remove_empty_schema_columns
self.seen_paths = set()
self.persist = persist

if schema_parser:
self.main_sheet = copy.deepcopy(schema_parser.main_sheet)
self.main_sheet = PersistentSheet.from_sheet(
schema_parser.main_sheet, self.connection
)
for sheet_name, sheet in list(self.sub_sheets.items()):
self.sub_sheets[sheet_name] = PersistentSheet.from_sheet(
sheet, self.connection
)

self.sub_sheets = copy.deepcopy(schema_parser.sub_sheets)
if remove_empty_schema_columns:
# Don't use columns from the schema parser
Expand Down Expand Up @@ -194,18 +225,13 @@ def __init__(
_("Only one of json_file or root_json_dict should be supplied")
)

if json_filename:
with codecs.open(json_filename, encoding="utf-8") as json_file:
try:
self.root_json_dict = json.load(
json_file, object_pairs_hook=OrderedDict, parse_float=Decimal
)
except UnicodeError as err:
raise BadlyFormedJSONErrorUTF8(*err.args)
except ValueError as err:
raise BadlyFormedJSONError(*err.args)
else:
self.root_json_dict = root_json_dict
if not json_filename:
if self.root_list_path is None:
self.root_json_list = root_json_dict
else:
self.root_json_list = path_search(
root_json_dict, self.root_list_path.split("/")
)

if preserve_fields:
# Extract fields to be preserved from input file (one path per line)
Expand Down Expand Up @@ -240,19 +266,37 @@ def __init__(
self.preserve_fields = None
self.preserve_fields_input = None

if json_filename:
if self.root_list_path is None:
path = "item"
else:
path = root_list_path.replace("/", ".") + ".item"

json_file = codecs.open(json_filename, encoding="utf-8")

self.root_json_list = ijson.items(json_file, path)

try:
self.parse()
except ijson.common.IncompleteJSONError as err:
raise BadlyFormedJSONError(*err.args)
except UnicodeDecodeError as err:
raise BadlyFormedJSONErrorUTF8(*err.args)
finally:
if json_filename:
json_file.close()

def parse(self):
if self.root_list_path is None:
root_json_list = self.root_json_dict
else:
root_json_list = path_search(
self.root_json_dict, self.root_list_path.split("/")
)
for json_dict in root_json_list:
for num, json_dict in enumerate(self.root_json_list):
if json_dict is None:
# This is particularly useful for IATI XML, in order to not
# fall over on empty activity, e.g. <iati-activity/>
continue
self.parse_json_dict(json_dict, sheet=self.main_sheet)
if num % 2000 == 0 and num != 0:
transaction.commit()

transaction.commit()

if self.remove_empty_schema_columns:
# Remove sheets with no lines of data
Expand Down Expand Up @@ -501,7 +545,9 @@ def parse_json_dict(
parent_name, key, truncation_length=self.truncation_length
)
if sub_sheet_name not in self.sub_sheets:
self.sub_sheets[sub_sheet_name] = Sheet(name=sub_sheet_name)
self.sub_sheets[sub_sheet_name] = PersistentSheet(
name=sub_sheet_name, connection=self.connection
)

for json_dict in value:
if json_dict is None:
Expand All @@ -518,4 +564,16 @@ def parse_json_dict(
raise ValueError(_("Unsupported type {}").format(type(value)))

if top:
sheet.lines.append(flattened_dict)
sheet.append_line(flattened_dict)

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
if self.persist:
self.connection.close()
self.db.close()
os.remove(self.zodb_db_location)
os.remove(self.zodb_db_location + ".lock")
os.remove(self.zodb_db_location + ".index")
os.remove(self.zodb_db_location + ".tmp")
3 changes: 1 addition & 2 deletions flattentool/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def close(self):

class XLSXOutput(SpreadsheetOutput):
def open(self):
self.workbook = openpyxl.Workbook()
self.workbook = openpyxl.Workbook(write_only=True)

def write_sheet(self, sheet_name, sheet):
sheet_header = list(sheet)
Expand All @@ -75,7 +75,6 @@ def write_sheet(self, sheet_name, sheet):
worksheet.append(line)

def close(self):
self.workbook.remove(self.workbook.active)
self.workbook.save(self.output_name)


Expand Down
47 changes: 46 additions & 1 deletion flattentool/sheet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import copy

import BTrees.IOBTree


class Sheet(object):
"""
An abstract representation of a single sheet of a spreadsheet.
Expand All @@ -8,10 +13,14 @@ def __init__(self, columns=None, root_id="", name=None):
self.id_columns = []
self.columns = columns if columns else []
self.titles = {}
self.lines = []
self._lines = []
self.root_id = root_id
self.name = name

@property
def lines(self):
return self._lines

def add_field(self, field, id_field=False):
columns = self.id_columns if id_field else self.columns
if field not in columns:
Expand All @@ -27,3 +36,39 @@ def __iter__(self):
yield column
for column in self.columns:
yield column

def append_line(self, flattened_dict):
self._lines.append(flattened_dict)


class PersistentSheet(Sheet):
"""
A sheet that is persisted in ZODB database.
"""

def __init__(self, columns=None, root_id="", name=None, connection=None):
super().__init__(columns=columns, root_id=root_id, name=name)
self.connection = connection
self.index = 0
connection.root.sheet_store[self.name] = BTrees.IOBTree.BTree()

@property
def lines(self):
for key, value in self.connection.root.sheet_store[self.name].items():
if key % 5000 == 0:
self.connection.cacheMinimize()
yield value

def append_line(self, flattened_dict):
self.connection.root.sheet_store[self.name][self.index] = flattened_dict
self.index += 1

@classmethod
def from_sheet(cls, sheet, connection):
instance = cls(name=sheet.name, connection=connection)
instance.id_columns = copy.deepcopy(sheet.id_columns)
instance.columns = copy.deepcopy(sheet.columns)
instance.titles = copy.deepcopy(sheet.titles)
instance.root_id = sheet.root_id
return instance
Loading

0 comments on commit 1b40acc

Please sign in to comment.