From e79c0268952886f1dd7391c1a9e3e07575eab71c Mon Sep 17 00:00:00 2001 From: Ivan Beloborodov Date: Tue, 5 Nov 2019 13:13:51 +0300 Subject: [PATCH] getting linked lexical entry group info optimizations IMPORTANT: requires 'alembic upgrade head' to work --- alembic.ini | 23 +- .../eb70cc55b178_entry_group_optimization.py | 839 ++++++++++++++++++ lingvodoc/schema/gql_dictionaryperspective.py | 6 +- lingvodoc/schema/gql_search.py | 241 ++++- lingvodoc/schema/query.py | 217 +++-- lingvodoc/scripts/save_dictionary.py | 789 ++++++++++++++-- lingvodoc/utils/__init__.py | 99 +++ 7 files changed, 2059 insertions(+), 155 deletions(-) create mode 100644 alembic/versions/eb70cc55b178_entry_group_optimization.py diff --git a/alembic.ini b/alembic.ini index ecf9357b8..785ab3119 100644 --- a/alembic.ini +++ b/alembic.ini @@ -1,31 +1,31 @@ [app:main] -sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5433/lingvodoc_v_2_0_tmp +sqlalchemy.url = postgresql+psycopg2://postgres@/lingvodoc [alembic] script_location = alembic -sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5433/lingvodoc_v_2_0_tmp +sqlalchemy.url = postgresql+psycopg2://postgres@/lingvodoc [loggers] keys = root,sqlalchemy,alembic [handlers] -keys = console +keys = console, filelog [formatters] keys = generic [logger_root] -level = WARN -handlers = console +level = DEBUG +handlers = console, filelog qualname = [logger_sqlalchemy] -level = WARN +level = DEBUG handlers = qualname = sqlalchemy.engine [logger_alembic] -level = INFO +level = DEBUG handlers = qualname = alembic @@ -35,6 +35,11 @@ args = (sys.stderr,) level = NOTSET formatter = generic +[handler_filelog] +class = handlers.RotatingFileHandler +args = ('alembic.log', 'a', 16777216, 8) +level = NOTSET +formatter = generic + [formatter_generic] -format = %(levelname)-5.5s [%(name)s] %(message)s -datefmt = %H:%M:%S +format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s diff --git a/alembic/versions/eb70cc55b178_entry_group_optimization.py b/alembic/versions/eb70cc55b178_entry_group_optimization.py new file mode 100644 index 000000000..28db91612 --- /dev/null +++ b/alembic/versions/eb70cc55b178_entry_group_optimization.py @@ -0,0 +1,839 @@ +"""Entry group optimization + +Revision ID: eb70cc55b178 +Revises: 2b852140e36e +Create Date: 2019-11-05 09:40:55.615947 + +""" + + +# revision identifiers, used by Alembic. +revision = 'eb70cc55b178' +down_revision = '2b852140e36e' +branch_labels = None +depends_on = None + + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + + op.execute(''' + + /* Gathers lexical entries linked through a specified link field. */ + + create or replace function + + linked_cycle( + entity_field_client_id BIGINT, + entity_field_object_id BIGINT, + publish BOOLEAN = true, + accept BOOLEAN = true) + + returns void as $$ + + begin + + -- Gathering all entries until no unprocessed tags are left. + + while exists ( + select 1 from tag_list_a) loop + + with + + entry_id_cte as ( + + insert into entry_id_table + + select + L.client_id, + L.object_id + + from + lexicalentry L, + public.entity E, + publishingentity P + + where + L.marked_for_deletion = false and + E.parent_client_id = L.client_id and + E.parent_object_id = L.object_id and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + E.content in ( + select * from tag_list_a) and + P.client_id = E.client_id and + P.object_id = E.object_id and + (accept is null or P.accepted = accept) and + (publish is null or P.published = publish) + + on conflict do nothing + returning *), + + tag_cte as ( + + insert into tag_table + select distinct E.content + + from + public.entity E, + publishingentity P + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from entry_id_cte) and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + P.client_id = E.client_id and + P.object_id = E.object_id and + (accept is null or P.accepted = accept) and + (publish is null or P.published = publish) + + on conflict do nothing + returning *) + + insert into tag_list_b + select * from tag_cte; + + truncate table tag_list_a; + + -- The next batch of additional tags. + + if exists ( + select 1 from tag_list_b) then + + with + + entry_id_cte as ( + + insert into entry_id_table + + select + L.client_id, + L.object_id + + from + lexicalentry L, + public.entity E, + publishingentity P + + where + L.marked_for_deletion = false and + E.parent_client_id = L.client_id and + E.parent_object_id = L.object_id and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + E.content in ( + select * from tag_list_b) and + P.client_id = E.client_id and + P.object_id = E.object_id and + (accept is null or P.accepted = accept) and + (publish is null or P.published = publish) + + on conflict do nothing + returning *), + + tag_cte as ( + + insert into tag_table + select distinct E.content + + from + public.entity E, + publishingentity P + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from entry_id_cte) and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + P.client_id = E.client_id and + P.object_id = E.object_id and + (accept is null or P.accepted = accept) and + (publish is null or P.published = publish) + + on conflict do nothing + returning *) + + insert into tag_list_a + select * from tag_cte; + + truncate table tag_list_b; + + end if; + + end loop; + + end; + + $$ language plpgsql; + + ''') + + op.execute(''' + + /* + * Like linked_cycle(), but does not join with publishingentity, so is + * equivalent to linked_cycle(_, _, null, null), but should be faster. + */ + + create or replace function + + linked_cycle_no_publishing( + entity_field_client_id BIGINT, + entity_field_object_id BIGINT) + + returns void as $$ + + begin + + -- Gathering all entries until no unprocessed tags are left. + + while exists ( + select 1 from tag_list_a) loop + + with + + entry_id_cte as ( + + insert into entry_id_table + + select + L.client_id, + L.object_id + + from + lexicalentry L, + public.entity E + + where + L.marked_for_deletion = false and + E.parent_client_id = L.client_id and + E.parent_object_id = L.object_id and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + E.content in ( + select * from tag_list_a) + + on conflict do nothing + returning *), + + tag_cte as ( + + insert into tag_table + + select distinct E.content + from public.entity E + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from entry_id_cte) and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false + + on conflict do nothing + returning *) + + insert into tag_list_b + select * from tag_cte; + + truncate table tag_list_a; + + -- The next batch of additional tags. + + if exists ( + select 1 from tag_list_b) then + + with + + entry_id_cte as ( + + insert into entry_id_table + + select + L.client_id, + L.object_id + + from + lexicalentry L, + public.entity E + + where + L.marked_for_deletion = false and + E.parent_client_id = L.client_id and + E.parent_object_id = L.object_id and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + E.content in ( + select * from tag_list_b) + + on conflict do nothing + returning *), + + tag_cte as ( + + insert into tag_table + + select distinct E.content + from public.entity E + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from entry_id_cte) and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false + + on conflict do nothing + returning *) + + insert into tag_list_a + select * from tag_cte; + + truncate table tag_list_b; + + end if; + + end loop; + + end; + + $$ language plpgsql; + + ''') + + op.execute(''' + + /* + * Finds a group of lexical entries linked through a specified link + * field, starting from a given entry. + */ + + create or replace function + + linked_group( + entity_field_client_id BIGINT, + entity_field_object_id BIGINT, + entry_client_id BIGINT, + entry_object_id BIGINT, + publish BOOLEAN = true, + accept BOOLEAN = true) + + returns table ( + client_id BIGINT, + object_id BIGINT) as $$ + + begin + + -- Temporary table for lexical entry ids. + + create temporary table + if not exists + + entry_id_table ( + client_id BIGINT, + object_id BIGINT, + primary key (client_id, object_id)) + + on commit drop; + + insert into entry_id_table + values (entry_client_id, entry_object_id); + + -- Temporary table for etymological tags. + + create temporary table + if not exists + + tag_table ( + tag TEXT primary key) + + on commit drop; + + -- Temporary tables for tags to be processed. + + create temporary table + if not exists + + tag_list_a ( + tag TEXT) + + on commit drop; + + create temporary table + if not exists + + tag_list_b ( + tag TEXT) + + on commit drop; + + -- Initial batch of additional tags. + + with + tag_cte as ( + + insert into tag_table + select distinct E.content + + from + public.entity E, + publishingentity P + + where + E.parent_client_id = entry_client_id and + E.parent_object_id = entry_object_id and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + P.client_id = E.client_id and + P.object_id = E.object_id and + (accept is null or P.accepted = accept) and + (publish is null or P.published = publish) + + on conflict do nothing + returning *) + + insert into tag_list_a + select * from tag_cte; + + -- Gathering and returning linked lexical entries. + + perform linked_cycle( + entity_field_client_id, + entity_field_object_id, + publish, + accept); + + return query + select * from entry_id_table; + + truncate table entry_id_table; + truncate table tag_table; + + end; + + $$ language plpgsql; + + ''') + + op.execute(''' + + /* + * Like linked_group(), but does not join with publishingentity, so is + * equivalent to linked_group(_, _, _, _, null, null), but should be + * faster. + */ + + create or replace function + + linked_group_no_publishing( + entity_field_client_id BIGINT, + entity_field_object_id BIGINT, + entry_client_id BIGINT, + entry_object_id BIGINT, + publish BOOLEAN = true, + accept BOOLEAN = true) + + returns table ( + client_id BIGINT, + object_id BIGINT) as $$ + + begin + + -- Temporary table for lexical entry ids. + + create temporary table + if not exists + + entry_id_table ( + client_id BIGINT, + object_id BIGINT, + primary key (client_id, object_id)) + + on commit drop; + + insert into entry_id_table + values (entry_client_id, entry_object_id); + + -- Temporary table for etymological tags. + + create temporary table + if not exists + + tag_table ( + tag TEXT primary key) + + on commit drop; + + -- Temporary tables for tags to be processed. + + create temporary table + if not exists + + tag_list_a ( + tag TEXT) + + on commit drop; + + create temporary table + if not exists + + tag_list_b ( + tag TEXT) + + on commit drop; + + -- Initial batch of additional tags. + + with + tag_cte as ( + + insert into tag_table + + select distinct E.content + from public.entity E + + where + E.parent_client_id = entry_client_id and + E.parent_object_id = entry_object_id and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false + + on conflict do nothing + returning *) + + insert into tag_list_a + select * from tag_cte; + + -- Gathering and returning linked lexical entries. + + perform linked_cycle_no_publishing( + entity_field_client_id, + entity_field_object_id); + + return query + select * from entry_id_table; + + truncate table entry_id_table; + truncate table tag_table; + + end; + + $$ language plpgsql; + + ''') + + op.execute(''' + + /* + * Finds a group of lexical entries linked through a specified link + * field, starting from a link tag. + */ + + create or replace function + + linked_group( + entity_field_client_id BIGINT, + entity_field_object_id BIGINT, + tag TEXT, + publish BOOLEAN = true, + accept BOOLEAN = true) + + returns table ( + client_id BIGINT, + object_id BIGINT) as $$ + + begin + + -- Temporary table for lexical entry ids. + + create temporary table + if not exists + + entry_id_table ( + client_id BIGINT, + object_id BIGINT, + primary key (client_id, object_id)) + + on commit drop; + + insert into entry_id_table + + select + L.client_id, + L.object_id + + from + lexicalentry L, + public.entity E, + publishingentity P + + where + L.marked_for_deletion = false and + E.parent_client_id = L.client_id and + E.parent_object_id = L.object_id and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + E.content = tag and + P.client_id = E.client_id and + P.object_id = E.object_id and + (accept is null or P.accepted = accept) and + (publish is null or P.published = publish) + + on conflict do nothing; + + -- Temporary table for etymological tags. + + create temporary table + if not exists + + tag_table ( + tag TEXT primary key) + + on commit drop; + + insert into tag_table + values (tag); + + -- Temporary tables for tags to be processed. + + create temporary table + if not exists + + tag_list_a ( + tag TEXT) + + on commit drop; + + create temporary table + if not exists + + tag_list_b ( + tag TEXT) + + on commit drop; + + -- Initial batch of additional tags. + + with + tag_cte as ( + + insert into tag_table + select distinct E.content + + from + public.entity E, + publishingentity P + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from entry_id_table) and + E.field_client_id = entity_field_client_id and + E.field_object_id = entity_field_object_id and + E.marked_for_deletion = false and + P.client_id = E.client_id and + P.object_id = E.object_id and + (accept is null or P.accepted = accept) and + (publish is null or P.published = publish) + + on conflict do nothing + returning *) + + insert into tag_list_a + select * from tag_cte; + + -- Gathering and returning linked lexical entries. + + perform linked_cycle( + entity_field_client_id, + entity_field_object_id, + publish, + accept); + + return query + select * from entry_id_table; + + truncate table entry_id_table; + truncate table tag_table; + + end; + + $$ language plpgsql; + + ''') + + op.execute(''' + + /* + * Non-deleted text fields, used for getting etymology text info, see + * etymology_text() and etymology_group_text(). + */ + + create materialized view + text_field_id_view as + + select + client_id, + object_id + + from field + + where + data_type_translation_gist_client_id = 1 and + data_type_translation_gist_object_id = 47 and + marked_for_deletion = false; + + create unique index + text_field_id_view_idx on + + text_field_id_view ( + client_id, object_id); + + ''') + + op.execute(''' + + /* + * Returns aggregated text data of an etymologically linked lexical + * entry group. + */ + + create or replace function + + etymology_text( + tag TEXT, + publish BOOLEAN = true) + + returns table ( + content TEXT) as $$ + + begin + + -- Returning data of each linked lexical entry. + + return query + + select + string_agg(E.content, '; ') + + from + public.entity E, + publishingentity P + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from linked_group(66, 25, tag, publish)) and + (E.field_client_id, E.field_object_id) in ( + select * from text_field_id_view) and + E.marked_for_deletion = false and + E.content is not null and + P.client_id = E.client_id and + P.object_id = E.object_id and + P.accepted = true and + (publish is null or P.published = publish) + + group by ( + E.parent_client_id, E.parent_object_id); + + end; + + $$ language plpgsql; + + ''') + + op.execute(''' + + /* + * Returns aggregated text data and lexical entry ids of an + * etymologically linked lexical entry group. + */ + + create or replace function + + etymology_group_text( + tag TEXT, + publish BOOLEAN = true) + + returns table ( + client_id BIGINT, + object_id BIGINT, + content TEXT) as $$ + + begin + + -- Returning data of each linked lexical entry. + + return query + + select + E.parent_client_id, + E.parent_object_id, + string_agg(E.content, '; ') + + from + public.entity E, + publishingentity P + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from linked_group(66, 25, tag, publish)) and + (E.field_client_id, E.field_object_id) in ( + select * from text_field_id_view) and + E.marked_for_deletion = false and + E.content is not null and + P.client_id = E.client_id and + P.object_id = E.object_id and + P.accepted = true and + (publish is null or P.published = publish) + + group by ( + E.parent_client_id, E.parent_object_id); + + truncate table entry_id_table; + truncate table tag_table; + + end; + + $$ language plpgsql; + + ''') + + +def downgrade(): + + op.execute( + 'drop function if exists linked_cycle(bigint, bigint, boolean, boolean);') + + op.execute( + 'drop function if exists linked_cycle_no_publishing(bigint, bigint);') + + op.execute( + 'drop function if exists linked_group(bigint, bigint, bigint, bigint, boolean, boolean);') + + op.execute( + 'drop function if exists linked_group_no_publishing(bigint, bigint, bigint, bigint, boolean, boolean);') + + op.execute( + 'drop function if exists linked_group(bigint, bigint, text, boolean, boolean);') + + op.execute( + 'drop materialized view if exists text_field_id_view;') + + op.execute( + 'drop function if exists etymology_text(text, boolean);') + + op.execute( + 'drop function if exists etymology_group_text(text, boolean);') + diff --git a/lingvodoc/schema/gql_dictionaryperspective.py b/lingvodoc/schema/gql_dictionaryperspective.py index 7d85276fb..69f441ff7 100644 --- a/lingvodoc/schema/gql_dictionaryperspective.py +++ b/lingvodoc/schema/gql_dictionaryperspective.py @@ -98,7 +98,11 @@ def entries_with_entities(lexes, accept, delete, mode, publish): return [gql_lexicalentry(lex, None) for lex in lexes] lex_id_to_obj = dict() lexes_composite_list = list() - for lex_obj in lexes.yield_per(100).all(): + + for lex_obj in ( + lexes if isinstance(lexes, list) else + lexes.yield_per(100).all()): + lexes_composite_list.append((lex_obj.client_id, lex_obj.object_id, lex_obj.parent_client_id, lex_obj.parent_object_id)) lex_id_to_obj[(lex_obj.client_id, lex_obj.object_id)] = lex_obj diff --git a/lingvodoc/schema/gql_search.py b/lingvodoc/schema/gql_search.py index f619ed3c7..bc24fb002 100644 --- a/lingvodoc/schema/gql_search.py +++ b/lingvodoc/schema/gql_search.py @@ -1,12 +1,16 @@ import collections import datetime +import gzip +import hashlib import io import itertools import logging import os import os.path +import pickle import pprint import shutil +import time import graphene import xlsxwriter @@ -48,6 +52,8 @@ from lingvodoc.scripts.save_dictionary import Save_Context from lingvodoc.utils.search import translation_gist_search +import lingvodoc.utils as utils + from sqlalchemy import ( func, and_, @@ -161,7 +167,8 @@ def save_xlsx_data( for lexical_entry in lexical_entry_dict[ (perspective.client_id, perspective.object_id)]: - xlsx_context.save_lexical_entry(lexical_entry, published = True, accepted = True) + xlsx_context.save_lexical_entry( + lexical_entry, published = True, accepted = True) def search_mechanism( dictionaries, @@ -173,7 +180,8 @@ def search_mechanism( etymology, yield_batch_count, category_fields, - xlsx_context = None): + xlsx_context = None, + __debug_flag__ = False): """ 1) published dictionaries @@ -183,6 +191,117 @@ def search_mechanism( """ # 1) old filter + if __debug_flag__: + + # If we are in debug mode, we try to load already computed search data to reduce debugging time. + + dictionary_id_list = sorted(dictionaries.all()) + field_id_list = sorted(category_fields) + + search_list = [ + + [sorted(search_string.items()) + for search_string in search_block] + + for search_block in search_strings] + + search_digest = hashlib.md5( + + repr([ + dictionary_id_list, + category, + search_list, + publish, + accept, + adopted, + etymology, + field_id_list]) + + .encode('utf-8')).hexdigest() + + search_data_file_name = ( + '__search_data_{0}__.gz'.format(search_digest)) + + # Checking if we have saved data. + + if os.path.exists(search_data_file_name): + + with gzip.open(search_data_file_name, 'rb') as search_data_file: + + (lexical_entry_id_list, + perspective_id_list, + dictionary_id_list) = pickle.load(search_data_file) + + # Loading search data. + + lexical_entry_list = ( + + DBSession.query(dbLexicalEntry) + + .filter( + tuple_(dbLexicalEntry.client_id, dbLexicalEntry.object_id).in_( + lexical_entry_id_list)) + + .all()) + + perspective_list = ( + + DBSession.query(dbDictionaryPerspective) + + .filter( + tuple_(dbDictionaryPerspective.client_id, dbDictionaryPerspective.object_id).in_( + perspective_id_list)) + + .all()) + + dictionary_list = ( + + DBSession.query(dbDictionary) + + .filter( + tuple_(dbDictionary.client_id, dbDictionary.object_id).in_( + dictionary_id_list)) + + .all()) + + # Compiling search results. + + result_lexical_entries = entries_with_entities( + lexical_entry_list, accept = True, delete = False, mode = None, publish = True) + + def graphene_obj(dbobj, cur_cls): + obj = cur_cls(id=(dbobj.client_id, dbobj.object_id)) + obj.dbObject = dbobj + return obj + + res_perspectives = [ + graphene_obj(dbpersp, DictionaryPerspective) for dbpersp in perspective_list] + + res_dictionaries = [ + graphene_obj(dbdict, Dictionary) for dbdict in dictionary_list] + + # Exporting search results as an XLSX data, if required. + + if xlsx_context is not None: + + start_time = time.time() + + save_xlsx_data( + xlsx_context, + dictionary_list, + perspective_list, + [lexical_entry.dbObject for lexical_entry in result_lexical_entries]) + + elapsed_time = time.time() - start_time + resident_memory = utils.get_resident_memory() + + log.debug( + '\nelapsed_time, resident_memory: {0:.3f}s, {1:.3f}m'.format( + elapsed_time, + resident_memory / 1048576.0)) + + return [], result_lexical_entries, res_perspectives, res_dictionaries + lexes = DBSession.query(dbLexicalEntry.client_id, dbLexicalEntry.object_id).join(dbLexicalEntry.parent) \ .join(dbDictionaryPerspective.parent).filter( dbLexicalEntry.parent_client_id==dbDictionaryPerspective.client_id, @@ -376,6 +495,7 @@ def search_mechanism( inner_and.append(or_(*bs_or_block_list)) elif matching_type == 'regexp': inner_and.append(func.lower(cur_dbEntity.content).op('~*')(search_string["search_string"])) + and_lexes = DBSession.query(all_entities_cte.c.parent_client_id, all_entities_cte.c.parent_object_id)\ .filter(and_(*inner_and).self_group())\ @@ -391,7 +511,18 @@ def search_mechanism( all_results = all_results.union(or_element) if not all_results: + + # Saving search results data, if required. + + if __debug_flag__: + + with gzip.open( + search_data_file_name, 'wb') as search_data_file: + + pickle.dump(([], [], []), search_data_file) + return [], [], [], [] + resolved_search = DBSession.query(dbLexicalEntry)\ .filter(dbLexicalEntry.marked_for_deletion==False, tuple_(dbLexicalEntry.client_id, @@ -409,6 +540,27 @@ def graphene_obj(dbobj, cur_cls): tmp_dictionaries = set([le.dbObject.parent for le in res_perspectives]) res_dictionaries = [graphene_obj(dbdict, Dictionary) for dbdict in tmp_dictionaries] + # Saving search results data, if required. + + if __debug_flag__: + + with gzip.open(search_data_file_name, 'wb') as search_data_file: + + search_data = ( + + sorted(list(all_results)), + + sorted( + (perspective.client_id, perspective.object_id) + for perspective in tmp_perspectives), + + sorted( + (dictionary.client_id, dictionary.object_id) + for dictionary in tmp_dictionaries)) + + pickle.dump( + search_data, search_data_file) + # Exporting search results as an XLSX data, if required. if xlsx_context is not None: @@ -629,19 +781,24 @@ def get_sound_field_ids(): # return sound_field_id_list -def save_xlsx(info, xlsx_context): +def save_xlsx(info, xlsx_context, xlsx_filename): xlsx_context.workbook.close() storage = info.context.request.registry.settings['storage'] - storage_dir = os.path.join(storage['path'], 'map_search') + time_str = '{0:.6f}'.format(time.time()) - os.makedirs(storage_dir, exist_ok = True) + storage_dir = ( + + os.path.join( + storage['path'], + 'map_search', + time_str)) - xlsx_filename = '{0:.6f}.xlsx'.format( - datetime.datetime.now(datetime.timezone.utc).timestamp()) + os.makedirs(storage_dir, exist_ok = True) - xlsx_path = os.path.join(storage_dir, xlsx_filename) + xlsx_path = os.path.join( + storage_dir, xlsx_filename) with open(xlsx_path, 'wb') as xlsx_file: @@ -652,6 +809,7 @@ def save_xlsx(info, xlsx_context): storage['prefix'], storage['static_route'], 'map_search', '/', + time_str, '/', xlsx_filename]) @@ -676,7 +834,8 @@ def constructor( publish, accept, search_metadata, - xlsx_export = False): + xlsx_export = False, + __debug_flag__ = False): yield_batch_count = 200 dictionaries = DBSession.query(dbDictionary.client_id, dbDictionary.object_id).filter_by( @@ -800,8 +959,13 @@ def constructor( # Setting up export to an XLSX file, if required. xlsx_context = ( + None if not xlsx_export else - Save_Context(info.context.get('locale_id'), DBSession)) + + Save_Context( + info.context.get('locale_id'), + DBSession, + __debug_flag__)) # normal dictionaries if category != 1: @@ -815,10 +979,10 @@ def constructor( etymology=etymology, category_fields=text_fields, yield_batch_count=yield_batch_count, - xlsx_context=xlsx_context + xlsx_context=xlsx_context, + __debug_flag__=__debug_flag__ ) - # corpora if category != 0: tmp_entities, tmp_lexical_entries, tmp_perspectives, tmp_dictionaries = search_mechanism( @@ -831,7 +995,8 @@ def constructor( etymology=etymology, category_fields=markup_fields, yield_batch_count=yield_batch_count, - xlsx_context=xlsx_context + xlsx_context=xlsx_context, + __debug_flag__=__debug_flag__ ) res_entities += tmp_entities res_lexical_entries += tmp_lexical_entries @@ -840,9 +1005,28 @@ def constructor( # Saving XLSX-exported search results, if required. - xlsx_url = ( - None if not xlsx_export else - save_xlsx(info, xlsx_context)) + xlsx_url = None + + if xlsx_export: + + query_str = '_'.join([ + search_string["search_string"] + for search_block in search_strings + for search_string in search_block]) + + xlsx_filename = ('Search_' + query_str)[:64] + '.xlsx' + + xlsx_url = save_xlsx( + info, xlsx_context, xlsx_filename) + + # Saving resulting Excel workbook for debug purposes, if required. + + if __debug_flag__: + + xlsx_context.stream.seek(0) + + with open(xlsx_filename, 'wb') as xlsx_file: + shutil.copyfileobj(xlsx_context.stream, xlsx_file) return cls( entities=res_entities, @@ -960,9 +1144,28 @@ def constructor( # Saving XLSX-exported search results, if required. - xlsx_url = ( - None if not xlsx_export else - save_xlsx(info, xlsx_context)) + xlsx_url = None + + if xlsx_export: + + query_str = '_'.join([ + search_string["search_string"] + for search_block in search_strings + for search_string in search_block]) + + xlsx_filename = ('Search_' + query_str)[:64] + '.xlsx' + + xlsx_url = save_xlsx( + info, xlsx_context, xlsx_filename) + + # Saving resulting Excel workbook for debug purposes, if required. + + if __debug_flag__: + + xlsx_context.stream.seek(0) + + with open(xlsx_filename, 'wb') as xlsx_file: + shutil.copyfileobj(xlsx_context.stream, xlsx_file) return cls( entities=res_entities, diff --git a/lingvodoc/schema/query.py b/lingvodoc/schema/query.py index bb0f1d55e..718190d55 100644 --- a/lingvodoc/schema/query.py +++ b/lingvodoc/schema/query.py @@ -20,6 +20,7 @@ import urllib.parse import graphene +import lingvodoc.utils as utils from lingvodoc.utils.elan_functions import tgt_to_eaf import requests from lingvodoc.schema.gql_entity import ( @@ -185,6 +186,8 @@ from pyramid.request import Request from lingvodoc.utils.proxy import try_proxy, ProxyPass + +import sqlalchemy from sqlalchemy import ( func, and_, @@ -233,7 +236,10 @@ from lingvodoc.utils.merge import merge_suggestions import tempfile -from lingvodoc.scripts.save_dictionary import save_dictionary as sync_save_dictionary +from lingvodoc.scripts.save_dictionary import ( + find_group_by_tags, + save_dictionary as sync_save_dictionary) + from lingvodoc.views.v2.save_dictionary.core import async_save_dictionary import json @@ -1992,7 +1998,7 @@ def resolve_phonology_link_perspective_data(self, info, perspective_id, field_id # return True def resolve_connected_words(self, info, id, field_id, mode=None): - response = list() + client_id = id[0] object_id = id[1] field_client_id = field_id[0] @@ -2006,21 +2012,71 @@ def resolve_connected_words(self, info, id, field_id, mode=None): elif mode == 'not_accepted': publish = None accept = False + + # NOTE: modes 'deleted' and 'all_with_deleted' are currently not implemented. + elif mode == 'deleted': publish = None accept = None elif mode == 'all_with_deleted': publish = None accept = None + else: raise ResponseError(message="mode: ") - lexical_entry = DBSession.query(dbLexicalEntry).filter_by(client_id=client_id, object_id=object_id).first() - if not lexical_entry or lexical_entry.marked_for_deletion: - raise ResponseError(message="No such lexical entry in the system") - tags = find_all_tags(lexical_entry, field_client_id, field_object_id, accept, publish) - lexes = find_lexical_entries_by_tags(tags, field_client_id, field_object_id, accept, publish) - lexes_composite_list = [(lex.client_id, lex.object_id, lex.parent_client_id, lex.parent_object_id) - for lex in lexes] + + # Getting lexical entry group info. + + marked_for_deletion = ( + + DBSession + .query(dbLexicalEntry.marked_for_deletion) + + .filter_by( + client_id = client_id, + object_id = object_id) + + .scalar()) + + if (marked_for_deletion is None or + marked_for_deletion): + + raise ResponseError(message = 'No such lexical entry in the system') + + entry_query = ( + + DBSession + .query(dbLexicalEntry) + .filter( + + tuple_( + dbLexicalEntry.client_id, + dbLexicalEntry.object_id) + + .in_(sqlalchemy.text(''' + + select * from linked_group{0}( + :field_client_id, + :field_object_id, + :client_id, + :object_id) + + '''.format( + '_no_publishing' if publish is None and accept is None else + '')))) + + .params({ + 'field_client_id': field_client_id, + 'field_object_id': field_object_id, + 'client_id': client_id, + 'object_id': object_id})) + + lexes = entry_query.all() + + lexes_composite_list = [ + (entry.client_id, entry.object_id, entry.parent_client_id, entry.parent_object_id) + for entry in lexes] + entities = dbLexicalEntry.graphene_track_multiple(lexes_composite_list, publish=publish, accept=accept) @@ -2730,63 +2786,11 @@ def find_group( tag_list = list(tag_query.all()) tag_set = set(tag_list) - # While we have tags we don't have all lexical entries for, - # we get these all entries of these tags... - - while tag_list: - - entry_id_query = ( - - DBSession.query( - dbLexicalEntry.client_id, - dbLexicalEntry.object_id) - - .filter( - dbLexicalEntry.marked_for_deletion == False, - dbEntity.parent_client_id == dbLexicalEntry.client_id, - dbEntity.parent_object_id == dbLexicalEntry.object_id, - dbEntity.field_client_id == field_client_id, - dbEntity.field_object_id == field_object_id, - dbEntity.marked_for_deletion == False, - dbEntity.content.in_(tag_list), - dbPublishingEntity.client_id == dbEntity.client_id, - dbPublishingEntity.object_id == dbEntity.object_id, - dbPublishingEntity.published == True, - dbPublishingEntity.accepted == True)) - - entry_id_list = [] - - for entry_id in entry_id_query.all(): - if entry_id not in entry_id_set: - - entry_id_set.add(entry_id) - entry_id_list.append(entry_id) - - # And then get all tags for entries we haven't already done it for. - - tag_query = ( - - DBSession.query( - dbEntity.content) - - .filter( - tuple_(dbEntity.parent_client_id, dbEntity.parent_object_id) - .in_(entry_id_list), - dbEntity.field_client_id == field_client_id, - dbEntity.field_object_id == field_object_id, - dbEntity.marked_for_deletion == False, - dbPublishingEntity.client_id == dbEntity.client_id, - dbPublishingEntity.object_id == dbEntity.object_id, - dbPublishingEntity.published == True, - dbPublishingEntity.accepted == True)) - - tag_list = [] - - for tag in tag_query.all(): - if tag not in tag_set: - - tag_set.add(tag) - tag_list.append(tag) + find_group_by_tags( + DBSession, + entry_id_set, tag_set, tag_list, + field_client_id, field_object_id, + True) return entry_id_set @@ -3156,6 +3160,85 @@ def tag_data_aggregated( return entry_already_set, group_list, time.time() - start_time + @staticmethod + def tag_data_plpgsql( + perspective_info_list, + tag_field_id, + statistics_flag = False, + optimize_flag = False): + """ + Gets lexical entry grouping data using stored PL/pgSQL functions, computes elapsed time. + """ + + start_time = time.time() + + # Getting lexical entries with tag data of the specified tag field from all perspectives. + + perspective_id_list = [ + perspective_id + for perspective_id, _, _ in perspective_info_list] + + entry_id_query = ( + + DBSession.query( + dbLexicalEntry.client_id, + dbLexicalEntry.object_id) + + .filter( + tuple_( + dbLexicalEntry.parent_client_id, + dbLexicalEntry.parent_object_id) + .in_(perspective_id_list), + dbLexicalEntry.marked_for_deletion == False, + dbEntity.parent_client_id == dbLexicalEntry.client_id, + dbEntity.parent_object_id == dbLexicalEntry.object_id, + dbEntity.field_client_id == tag_field_id[0], + dbEntity.field_object_id == tag_field_id[1], + dbEntity.marked_for_deletion == False, + dbPublishingEntity.client_id == dbEntity.client_id, + dbPublishingEntity.object_id == dbEntity.object_id, + dbPublishingEntity.published == True, + dbPublishingEntity.accepted == True) + + .group_by( + dbLexicalEntry.client_id, + dbLexicalEntry.object_id)) + + # Grouping lexical entries using stored PL/pgSQL function. + + entry_already_set = set() + group_list = [] + + sql_str = ''' + select * from linked_group( + :field_client_id, + :field_object_id, + :client_id, + :object_id)''' + + for entry_id in entry_id_query: + + if entry_id in entry_already_set: + continue + + row_list = ( + + DBSession.execute(sql_str, { + 'field_client_id': tag_field_id[0], + 'field_object_id': tag_field_id[1], + 'client_id': entry_id[0], + 'object_id': entry_id[1]}) + + .fetchall()) + + entry_id_set = set( + map(tuple, row_list)) + + entry_already_set.update(entry_id_set) + group_list.append(entry_id_set) + + return entry_already_set, group_list, time.time() - start_time + @staticmethod def export_xlsx( language_str, @@ -3886,7 +3969,7 @@ def perform_cognate_analysis( entry_already_set, group_list, group_time = ( - CognateAnalysis.tag_data_aggregated( + CognateAnalysis.tag_data_plpgsql( perspective_info_list, group_field_id)) else: diff --git a/lingvodoc/scripts/save_dictionary.py b/lingvodoc/scripts/save_dictionary.py index c3f92566e..1df2cf7c5 100644 --- a/lingvodoc/scripts/save_dictionary.py +++ b/lingvodoc/scripts/save_dictionary.py @@ -9,6 +9,7 @@ import os import base64 import hashlib +import pprint import shutil import transaction import tempfile @@ -17,9 +18,10 @@ from pathvalidate import sanitize_filename from urllib import request +import sqlalchemy from sqlalchemy.orm.exc import NoResultFound -from sqlalchemy import create_engine -from sqlalchemy import and_ +from sqlalchemy import and_, create_engine, func + from lingvodoc.models import ( Client, DBSession as SyncDBSession, @@ -37,12 +39,13 @@ BaseGroup, Group, PublishingEntity - ) + from lingvodoc.cache.caching import TaskStatus, initialize_cache +from lingvodoc.utils import explain_analyze from sqlalchemy.orm import ( - sessionmaker, + aliased, sessionmaker, ) from sqlalchemy import tuple_, and_, or_ @@ -57,6 +60,7 @@ from os import path, makedirs from errno import EEXIST from shutil import copyfileobj +import uuid EAF_TIERS = { @@ -66,57 +70,166 @@ "transcription": "Transcription", "translation": "Translation" } + log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) -field_ids_to_str = lambda x: str(x.field_client_id) + '_' + str(x.field_object_id) - -def is_empty(row): - for col in row: - if col: - return False - return True +field_ids_to_str = lambda x: str(x.field_client_id) + '_' + str(x.field_object_id) def find_lexical_entries_by_tags(tags, session, published): result = session.query(LexicalEntry) \ .join(Entity) \ .join(PublishingEntity) \ - .filter(Entity.content.in_(tags), - PublishingEntity.accepted == True, - Entity.field_client_id == 66, - Entity.field_object_id == 25) + .filter( + LexicalEntry.marked_for_deletion == False, + Entity.content.in_(tags), + Entity.marked_for_deletion == False, + Entity.field_client_id == 66, + Entity.field_object_id == 25, + PublishingEntity.accepted == True) if published is not None: result = result.filter(PublishingEntity.published == published) return result.all() def find_all_tags(tag, session, published): - tags = [tag] - new_tags = [tag] + tags = {tag} + new_tags = {tag} while new_tags: lexical_entries = find_lexical_entries_by_tags(new_tags, session, published) - new_tags = list() + new_tags = set() for lex in lexical_entries: entities = session.query(Entity) \ .join(PublishingEntity) \ - .filter(Entity.parent == lex, - PublishingEntity.accepted == True, - Entity.field_client_id == 66, - Entity.field_object_id == 25) + .filter( + Entity.marked_for_deletion == False, + Entity.parent == lex, + Entity.field_client_id == 66, + Entity.field_object_id == 25, + PublishingEntity.accepted == True) if published is not None: entities = entities.filter(PublishingEntity.published == published) for entity in entities: if entity.content not in tags: - tags.append(entity.content) - new_tags.append(entity.content) + tags.add(entity.content) + new_tags.add(entity.content) return tags -def generate_massive_cell(tag, session, text_fields, published): - result = list() +def find_group_by_tags( + session, + entry_id_set, + tag_set, + tag_list, + field_client_id, + field_object_id, + published = None): + """ + Optimized retrieval of a group of lexical entries linked by a set of tags from a specified field. + """ + + # While we have tags we don't have all lexical entries for, + # we get these all entries of these tags... + + while tag_list: + + entry_id_query = ( + + session.query( + LexicalEntry.client_id, + LexicalEntry.object_id) + + .filter( + LexicalEntry.marked_for_deletion == False, + Entity.parent_client_id == LexicalEntry.client_id, + Entity.parent_object_id == LexicalEntry.object_id, + Entity.field_client_id == field_client_id, + Entity.field_object_id == field_object_id, + Entity.marked_for_deletion == False, + Entity.content.in_(tag_list), + PublishingEntity.client_id == Entity.client_id, + PublishingEntity.object_id == Entity.object_id, + PublishingEntity.accepted == True)) + + if published is not None: + + entry_id_query = entry_id_query.filter( + PublishingEntity.published == published) + + entry_id_list = [] + + for entry_id in entry_id_query.distinct(): + + if entry_id not in entry_id_set: + + entry_id_set.add(entry_id) + entry_id_list.append(entry_id) + + if not entry_id_list: + break + + # And then get all tags for entries we haven't already done it for. + + tag_query = ( + + session.query( + Entity.content) + + .filter( + tuple_(Entity.parent_client_id, Entity.parent_object_id) + .in_(entry_id_list), + Entity.field_client_id == field_client_id, + Entity.field_object_id == field_object_id, + Entity.marked_for_deletion == False, + PublishingEntity.client_id == Entity.client_id, + PublishingEntity.object_id == Entity.object_id, + PublishingEntity.accepted == True)) + + if published is not None: + + tag_query = tag_query.filter( + PublishingEntity.published == published) + + tag_list = [] + + for tag in tag_query.distinct(): + + if tag not in tag_set: + + tag_set.add(tag) + tag_list.append(tag) + + # Showing what we've found. + + log.debug( + '\nfind_group_by_tags({0}, {1}):' + '\nlen(tag_set): {2}' + '\nlen(entry_id_set): {3}'.format( + field_client_id, + field_object_id, + len(tag_set), + len(entry_id_set))) + + return entry_id_set, tag_set + + +def generate_massive_cell_simple( + tag, + session, + text_fields, + published): + """ + Gathers info of etymologically linked lexical entries via standard simple method. + """ + + result_list = [] + + # Current standard method. + tags = find_all_tags(tag, session, published) lexical_entries = find_lexical_entries_by_tags(tags, session, published) + for lex in lexical_entries: entities = session.query(Entity).join(PublishingEntity) \ .filter(Entity.parent_client_id == lex.client_id, @@ -130,11 +243,463 @@ def generate_massive_cell(tag, session, text_fields, published): if (entity.field_client_id, entity.field_object_id) in text_fields and entity.content is not None: subres.append(entity.content) if len(subres) > 0: - result.append("; ".join(subres)) - if len(result) > 0: - return "\n".join(result) - else: - return "" + result_list.append("; ".join(subres)) + + return "\n".join(result_list) + + +def generate_massive_cell_optimized( + tag, + session, + text_fields, + published): + """ + Optimized gathering of etymologically linked lexical entries info. + """ + + entry_id_set, tag_set = ( + + find_group_by_tags( + session, set(), {tag}, [tag], 66, 25, published)) + + query = ( + + session + + .query() + + .filter( + Entity.marked_for_deletion == False, + tuple_(Entity.parent_client_id, Entity.parent_object_id) + .in_(entry_id_set), + tuple_(Entity.field_client_id, Entity.field_object_id) + .in_(text_fields), + Entity.content != None, + PublishingEntity.client_id == Entity.client_id, + PublishingEntity.object_id == Entity.object_id, + PublishingEntity.accepted == True) + + .add_columns( + func.string_agg(Entity.content, '; ') + .label('content')) + + .group_by( + Entity.parent_client_id, Entity.parent_object_id)) + + if published is not None: + + query = query.filter( + PublishingEntity.published == published) + + return '\n'.join( + row[0] for row in query) + + +def generate_massive_cell_cte( + tag, + session, + text_fields, + published): + """ + Gathering of etymologically linked lexical entries info using recursive CTE. + + Takes much more time then the back-and-forth implementation in generate_massive_cell_optimized() due to + inefficiences stemming from the recursive CTE query restrictions. + """ + + entry_id_query = ( + + session.query( + LexicalEntry.client_id, + LexicalEntry.object_id) + + .filter( + LexicalEntry.marked_for_deletion == False, + Entity.parent_client_id == LexicalEntry.client_id, + Entity.parent_object_id == LexicalEntry.object_id, + Entity.field_client_id == 66, + Entity.field_object_id == 25, + Entity.marked_for_deletion == False, + Entity.content == tag, + PublishingEntity.client_id == Entity.client_id, + PublishingEntity.object_id == Entity.object_id, + PublishingEntity.accepted == True)) + + if published is not None: + + entry_id_query = entry_id_query.filter( + PublishingEntity.published == published) + + # See https://stackoverflow.com/questions/53186429/sqlalchemy-simple-recursive-cte-query for Sqlalchemy + # resursive CTE example. + + cte_query = entry_id_query.cte( + recursive = True, name = 'entry_id') + + E_entry = aliased(Entity, name = 'E1') + E_tag = aliased(Entity, name = 'E2') + + P_entry = aliased(PublishingEntity, name = 'P1') + P_tag = aliased(PublishingEntity, name = 'P2') + + recursive_query = ( + + session.query( + LexicalEntry.client_id, + LexicalEntry.object_id) + + .filter( + LexicalEntry.marked_for_deletion == False, + E_entry.parent_client_id == LexicalEntry.client_id, + E_entry.parent_object_id == LexicalEntry.object_id, + E_entry.field_client_id == 66, + E_entry.field_object_id == 25, + E_entry.marked_for_deletion == False, + P_entry.client_id == E_entry.client_id, + P_entry.object_id == E_entry.object_id, + P_entry.accepted == True, + E_entry.content == E_tag.content, + E_tag.parent_client_id == cte_query.c.client_id, + E_tag.parent_object_id == cte_query.c.object_id, + E_tag.field_client_id == 66, + E_tag.field_object_id == 25, + E_tag.marked_for_deletion == False, + P_tag.client_id == E_tag.client_id, + P_tag.object_id == E_tag.object_id, + P_tag.accepted == True)) + + if published is not None: + + recursive_query = ( + + recursive_query.filter( + P_entry.published == published, + P_tag.published == published)) + + cte_query = cte_query.union( + recursive_query.distinct()) + + # Query for required cell data. + + content_query = ( + + session + + .query() + + .filter( + Entity.marked_for_deletion == False, + tuple_(Entity.parent_client_id, Entity.parent_object_id) + .in_(session.query(cte_query)), + tuple_(Entity.field_client_id, Entity.field_object_id) + .in_(text_fields), + Entity.content != None, + PublishingEntity.client_id == Entity.client_id, + PublishingEntity.object_id == Entity.object_id, + PublishingEntity.accepted == True) + + .add_columns( + func.string_agg(Entity.content, '; ') + .label('content')) + + .group_by( + Entity.parent_client_id, Entity.parent_object_id)) + + if published is not None: + + content_query = content_query.filter( + PublishingEntity.published == published) + + log.debug( + '\ncontent_query:\n' + + str(content_query)) + + return '\n'.join( + row[0] for row in content_query) + + +def generate_massive_cell_temp_table( + tag, + session, + text_field_id_table_name, + published, + __debug_flag__ = False): + """ + Gathering of etymologically linked lexical entries info using temporary tables. + """ + + uuid_str = str(uuid.uuid4()).replace('-', '_') + + entry_id_table_name = 'entry_id_table_' + uuid_str + tag_table_name = 'tag_table_' + uuid_str + + tag_list_name = 'tag_list_' + uuid_str + tag_list_prev_name = 'tag_list_prev_' + uuid_str + + published_str = ( + '' if published is None else + ' and P.published = ' + str(published).lower()) + + # Temporary table for lexical entry ids. + + sql_str = (''' + + create temporary table + + {entry_id_table_name} ( + client_id BIGINT, + object_id BIGINT, + primary key (client_id, object_id)) + + on commit drop; + + insert into {entry_id_table_name} + + select + L.client_id, + L.object_id + + from + lexicalentry L, + public.entity E, + publishingentity P + + where + L.marked_for_deletion = false and + E.parent_client_id = L.client_id and + E.parent_object_id = L.object_id and + E.field_client_id = 66 and + E.field_object_id = 25 and + E.marked_for_deletion = false and + E.content = :tag and + P.client_id = E.client_id and + P.object_id = E.object_id and + P.accepted = true{0} + + on conflict do nothing; + + '''.format( + published_str, + entry_id_table_name = entry_id_table_name)) + + session.execute( + sql_str, {'tag': tag}) + + # Temporary table for tags. + + sql_str = (''' + + create temporary table + + {tag_table_name} ( + tag TEXT primary key) + + on commit drop; + + insert into {tag_table_name} + values (:tag); + + create temporary table + + {tag_list_name} + + on commit drop as + with + + tag_cte as ( + + insert into {tag_table_name} + select distinct E.content + + from + public.entity E, + publishingentity P + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from {entry_id_table_name}) and + E.field_client_id = 66 and + E.field_object_id = 25 and + E.marked_for_deletion = false and + P.client_id = E.client_id and + P.object_id = E.object_id and + P.accepted = true{0} + + on conflict do nothing + returning *) + + select * from tag_cte; + + '''.format( + published_str, + entry_id_table_name = entry_id_table_name, + tag_table_name = tag_table_name, + tag_list_name = tag_list_name)) + + session.execute( + sql_str, {'tag': tag}) + + # While we have tags we don't have all lexical entries for, we get these all new entries of these tags + # and then all new tags of this new entries. + + exists_str = ''' + + select exists ( + select 1 from {tag_list_name}); + + '''.format( + tag_list_name = tag_list_name) + + while session.execute(exists_str).scalar(): + + sql_str = (''' + + alter table {tag_list_name} + rename to {tag_list_prev_name}; + + create temporary table + + {tag_list_name} + + on commit drop as + with + + entry_id_cte as ( + + insert into {entry_id_table_name} + + select distinct + L.client_id, + L.object_id + + from + lexicalentry L, + public.entity E, + publishingentity P + + where + L.marked_for_deletion = false and + E.parent_client_id = L.client_id and + E.parent_object_id = L.object_id and + E.field_client_id = 66 and + E.field_object_id = 25 and + E.marked_for_deletion = false and + E.content in ( + select * from {tag_list_prev_name}) and + P.client_id = E.client_id and + P.object_id = E.object_id and + P.accepted = true{0} + + on conflict do nothing + returning *), + + tag_cte as ( + + insert into {tag_table_name} + select distinct E.content + + from + public.entity E, + publishingentity P + + where + (E.parent_client_id, E.parent_object_id) in ( + select * from entry_id_cte) and + E.field_client_id = 66 and + E.field_object_id = 25 and + E.marked_for_deletion = false and + P.client_id = E.client_id and + P.object_id = E.object_id and + P.accepted = true{0} + + on conflict do nothing + returning *) + + select * from tag_cte; + + drop table {tag_list_prev_name}; + + '''.format( + published_str, + entry_id_table_name = entry_id_table_name, + tag_table_name = tag_table_name, + tag_list_name = tag_list_name, + tag_list_prev_name = tag_list_prev_name)) + + session.execute( + sql_str) + + # And now we get required info of the linked lexical entries. + + query = ( + + session + + .query() + + .filter( + Entity.marked_for_deletion == False, + + tuple_( + Entity.parent_client_id, Entity.parent_object_id) + + .in_(sqlalchemy.text( + 'select * from ' + entry_id_table_name)), + + tuple_( + Entity.field_client_id, Entity.field_object_id) + + .in_(sqlalchemy.text( + 'select * from ' + text_field_id_table_name)), + + Entity.content != None, + PublishingEntity.client_id == Entity.client_id, + PublishingEntity.object_id == Entity.object_id, + PublishingEntity.accepted == True) + + .add_columns( + func.string_agg(Entity.content, '; ') + .label('content')) + + .group_by( + Entity.parent_client_id, Entity.parent_object_id)) + + if published is not None: + + query = query.filter( + PublishingEntity.published == published) + + if __debug_flag__: + + # Showing PostgreSQL's explain analyze, if required. + + row_list = ( + self.session.execute( + explain_analyze(query)).fetchall()) + + log.debug(''.join( + '\n' + row[0] for row in row_list)) + + return '\n'.join( + row[0] for row in query) + + +def generate_massive_cell_plpgsql( + tag, + session, + published): + """ + Gathering of etymologically linked lexical entries info using stored PL/pgSQL procedure. + """ + + row_list = ( + + session.execute( + 'select etymology_text(:tag, :publish)', + {'tag': tag, 'publish': published}).fetchall()) + + return '\n'.join( + row[0] for row in row_list) class Save_Context(object): @@ -144,7 +709,11 @@ class Save_Context(object): Also used in advanced_search. """ - def __init__(self, locale_id, session): + def __init__( + self, + locale_id, + session, + __debug_flag__ = False): self.locale_id = locale_id self.session = session @@ -152,16 +721,58 @@ def __init__(self, locale_id, session): self.stream = io.BytesIO() self.workbook = xlsxwriter.Workbook(self.stream, {'in_memory': True}) - self.text_fields = ( - session.query(Field.client_id, Field.object_id).filter_by( + # Getting up-to-date text field info. + + self.session.execute( + 'refresh materialized view text_field_id_view;') + + self.etymology_dict = {} + + if __debug_flag__: + + # Simple text field ids query. + + self.text_field_query = session.query( + Field.client_id, Field.object_id).filter_by( data_type_translation_gist_client_id = 1, - data_type_translation_gist_object_id = 47).all()) + data_type_translation_gist_object_id = 47, + marked_for_deletion = False) + + # Simple list of text field ids. + + self.text_fields = self.text_field_query.all() + + # Temporary table with text field ids. + + self.text_field_id_table_name = ( + 'text_field_id_table_' + + str(uuid.uuid4()).replace('-', '_')) + + self.session.execute(''' + + create temporary table {text_field_id_table_name} ( + client_id BIGINT, + object_id BIGINT, + primary key (client_id, object_id)) + on commit drop; + + insert into {text_field_id_table_name} + select client_id, object_id + from field + where + data_type_translation_gist_client_id = 1 and + data_type_translation_gist_object_id = 47 and + marked_for_deletion = false; + + '''.format( + text_field_id_table_name = self.text_field_id_table_name)) def ready_perspective( self, perspective, dictionary = None, - list_flag = False): + list_flag = False, + __debug_flag__ = False): """ Prepares for saving data of lexical entries of another perspective. """ @@ -200,7 +811,7 @@ def ready_perspective( # Getting field data. - self.fields = ( + field_query = ( self.session .query(DictionaryPerspectiveToField) @@ -213,9 +824,24 @@ def ready_perspective( .filter(tuple_( DictionaryPerspectiveToField.field_client_id, DictionaryPerspectiveToField.field_object_id) - .in_(self.text_fields)) + .in_(sqlalchemy.text('select * from text_field_id_view'))) + + .order_by(DictionaryPerspectiveToField.position)) + + if __debug_flag__: + + # Showing PostgreSQL's explain analyze, if required. + + row_list = ( + self.session.execute( + explain_analyze(field_query)).fetchall()) - .order_by(DictionaryPerspectiveToField.position).all()) + log.debug(''.join( + '\n' + row[0] for row in row_list)) + + self.fields = field_query.all() + + # Etymology field. self.etymology_field = ( self.session.query(DictionaryPerspectiveToField).filter_by( @@ -247,7 +873,40 @@ def ready_perspective( self.row += 1 - def save_lexical_entry(self, entry, published = None, accepted = True): + def get_etymology_text( + self, + tag, + published): + """ + Gets info of etymologycally linked lexical entries, caches results. + """ + + row_list = ( + + self.session.execute( + 'select * from etymology_group_text(:tag, :publish)', + {'tag': tag, 'publish': published}).fetchall()) + + entry_id_list = [] + text_list = [] + + for client_id, object_id, text in row_list: + + entry_id_list.append((client_id, object_id)) + text_list.append(text) + + etymology_text = '\n'.join(text_list) + + for entry_id in entry_id_list: + self.etymology_dict[entry_id] = etymology_text + + return etymology_text + + def save_lexical_entry( + self, + entry, + published = None, + accepted = True): """ Save data of a lexical entry of the current perspective. """ @@ -281,14 +940,25 @@ def save_lexical_entry(self, entry, published = None, accepted = True): else: row_to_write[self.field_to_column[ent_field_ids]] += "\n" + entity.content - if (self.etymology_field and - len(row_to_write) == len(self.fields) and - ent_field_ids == "66_25"): + elif ( + ent_field_ids == "66_25" and + self.etymology_field and + len(row_to_write) == len(self.fields)): + + entry_id = (entry.client_id, entry.object_id) - row_to_write.append(generate_massive_cell( - entity.content, self.session, self.text_fields, published)) + if entry_id in self.etymology_dict: - if not is_empty(row_to_write): + row_to_write.append( + self.etymology_dict[entry_id]) + + else: + + row_to_write.append( + self.get_etymology_text( + entity.content, published)) + + if any(row_to_write): self.worksheet.write_row(self.row, 0, row_to_write) self.row += 1 @@ -305,6 +975,7 @@ def compile_workbook(context, client_id, object_id, session, locale_id, publishe perspectives = session.query(DictionaryPerspective).filter_by(parent_client_id=client_id, parent_object_id=object_id, marked_for_deletion=False).all() + for perspective in perspectives: context.ready_perspective(perspective) @@ -326,18 +997,17 @@ def compile_workbook(context, client_id, object_id, session, locale_id, publishe # @profile() def save( - client_id, - object_id, - storage, - sqlalchemy_url, - task_key, - cache_kwargs, - dict_name, - locale_id, - published -): # :( - + client_id, + object_id, + storage, + sqlalchemy_url, + task_key, + cache_kwargs, + dict_name, + locale_id, + published, __debug_flag__ = False +): # :( initialize_cache(cache_kwargs) task_status = TaskStatus.get_from_cache(task_key) if task_key else None @@ -351,7 +1021,8 @@ def save( if task_status: task_status.set(3, 20, 'Running async process') - save_context = Save_Context(locale_id, session) + save_context = Save_Context( + locale_id, session, __debug_flag__) try: compile_workbook(save_context, client_id, object_id, session, locale_id, published) @@ -450,7 +1121,7 @@ def save( save_context.stream.seek(0) copyfileobj(save_context.stream, xlsx_file) - # Successfully compiled phonology, finishing and returning links to files with results. + # Successfully saved dictionary, finishing and returning links to files with results. url_list = [ diff --git a/lingvodoc/utils/__init__.py b/lingvodoc/utils/__init__.py index c886c4c45..16d02af3f 100644 --- a/lingvodoc/utils/__init__.py +++ b/lingvodoc/utils/__init__.py @@ -1 +1,100 @@ + __author__ = 'student' + + +import os + +try: + PAGE_SIZE = os.sysconf('SC_PAGE_SIZE') +except: + pass + + +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql.expression import Executable, ClauseElement, _literal_as_text + + +class explain(Executable, ClauseElement): + """ + PostgreSQL EXPLAIN [ANALYZE] for queries, example: + + query = DBSession.query(...) + + log.debug(''.join( + '\n' + row[0] for row in + session.execute(explain(query)).fetchall())) + + See also in lingvodoc/scripts/save_dictionary.py. + + Mostly copied from + https://github.com/sqlalchemy/sqlalchemy/wiki/Explain. + """ + + def __init__( + self, + statement, + analyze = False): + + self.statement = _literal_as_text(statement) + self.analyze = analyze + + # Apparently helps with INSERT statements. + + self.inline = getattr( + statement, 'inline', None) + + +@compiles(explain, 'postgresql') +def pg_explain(element, compiler, **kwargs): + """ + Compilation of EXPLAIN [ANALYZE] query for PostgreSQL backend. + """ + + text = "EXPLAIN " + + if element.analyze: + text += "ANALYZE " + + text += compiler.process(element.statement, **kwargs) + + # Allow EXPLAIN for INSERT / UPDATE / DELETE, turn off compiler flags that would otherwise start + # treating this like INSERT / UPDATE / DELETE (gets confused with RETURNING or autocloses cursor + # which we don't want). + + compiler.isinsert = False + compiler.isupdate = False + compiler.isdelete = False + + return text + + +@compiles(explain) +def default_explain(element, compiler, **kwargs): + """ + Default compilation handler, e.g. for str(explain(query)). + """ + + return pg_explain(element, compiler, **kwargs) + + +def explain_analyze(statement): + """ + Helper wrapper for EXPLAIN ANALYZE. + """ + + return explain(statement, analyze = True) + + +def get_resident_memory(): + """ + Returns curren resident memory size of the process. + + See + https://stackoverflow.com/questions/938733/total-memory-used-by-python-process, + http://fa.bianp.net/blog/2013/different-ways-to-get-memory-consumption-or-lessons-learned-from-memory_profiler/, + https://github.com/giampaolo/psutil/blob/386a9288fc854626c96eb32d1a5bdd3f7f260b12/psutil/_pslinux.py#L1733. + """ + + with open('/proc/self/statm', 'rb') as statm_file: + return int(statm_file.readline().split()[1]) * PAGE_SIZE +