diff --git a/lingvodoc/schema/gql_cognate.py b/lingvodoc/schema/gql_cognate.py index 62928867..646a0caa 100644 --- a/lingvodoc/schema/gql_cognate.py +++ b/lingvodoc/schema/gql_cognate.py @@ -5630,7 +5630,6 @@ class Arguments: base_language_id = LingvodocID() input_pairs = ObjectVal() truth_threshold = graphene.Float() - stamp = graphene.Float() debug_flag = graphene.Boolean() @@ -5640,27 +5639,25 @@ class Arguments: message = graphene.String() perspective_name_list = graphene.List(graphene.String) transcription_count = graphene.Int() - stamp = graphene.Float() @staticmethod def neuro_cognate_statistics( - #language_str, - #base_language_id, - #base_language_name, perspective_info_list, source_perspective_id, match_translations, input_pairs, locale_id, + user_id, truth_threshold, - stamp, - #storage, + storage, + host_url, + cache_kwargs, debug_flag = False): input_pairs_list = input_pairs or [] compare_pairs_list = [] - total_transcription_count = len(input_pairs) if input_pairs else 0 input_index = None + dictionary_name_list = [] perspective_name_list = [] for ( @@ -5686,13 +5683,11 @@ def neuro_cognate_statistics( if perspective_id != source_perspective_id: compare_pairs_list.append(current_pairs_list[:]) - total_transcription_count += len(current_pairs_list) else: input_index = idx compare_pairs_list.append([]) if not input_pairs_list: input_pairs_list = current_pairs_list[:] - total_transcription_count += len(current_pairs_list) perspective = DBSession.query(dbPerspective).filter_by( client_id = perspective_id[0], object_id = perspective_id[1]).first() @@ -5700,38 +5695,45 @@ def neuro_cognate_statistics( perspective_name = perspective.get_translation(locale_id) dictionary_name = perspective.parent.get_translation(locale_id) + dictionary_name_list.append(dictionary_name) perspective_name_list.append(f"{perspective_name} - {dictionary_name}") message = "" triumph = True - prediction = None + input_len = len(input_pairs_list) compare_len = sum(map(len, compare_pairs_list)) - stamp_file = f"/tmp/lingvodoc_stamps/{stamp}" + dictionaries = [] - if not input_pairs_list or not compare_len: + if not input_len or not compare_len: triumph = False message = "No input words or words to compare is received" elif compare_len > 10 ** 4: triumph = False - message = "Too large dictionaries to compare" + message = f"Too many words to compare: {compare_len}" else: + for i, d in enumerate(dictionary_name_list, 1): + dictionaries.append(f"{i}. {d}") + + task = TaskStatus(user_id, 'Neuro cognates computation', '\n\n'.join(dictionaries), input_len) + task.set(1, 0, "first words processing...", "") + NeuroCognatesEngine = NeuroCognates( compare_pairs_list, input_index, + source_perspective_id, + perspective_name_list, + storage, + host_url, + cache_kwargs, match_translations, - truth_threshold, - stamp_file) + truth_threshold) - prediction = NeuroCognatesEngine.index(input_pairs_list) + NeuroCognatesEngine.index(input_pairs_list, task) result_dict = ( dict( triumph=triumph, - stamp=stamp, - suggestion_list=prediction, - message=message, - perspective_name_list=perspective_name_list, - transcription_count=total_transcription_count)) + message=message)) return NeuroCognateAnalysis(**result_dict) @@ -5739,7 +5741,6 @@ def neuro_cognate_statistics( def mutate( self, info, - stamp, source_perspective_id, perspective_info_list, match_translations, @@ -5802,18 +5803,12 @@ def mutate( base_language_id[0], base_language_id[1])) try: - - # Getting base language info. - locale_id = info.context.locale_id - #base_language = DBSession.query(dbLanguage).filter_by( - #client_id = base_language_id[0], object_id = base_language_id[1]).first() - - #base_language_name = base_language.get_translation(locale_id) - - #request = info.context.request - #storage = request.registry.settings['storage'] + request = info.context.request + host_url = request.environ['HTTP_ORIGIN'] + storage = request.registry.settings['storage'] + cache_kwargs = request.registry.settings['cache_kwargs'] # Transforming client/object pair ids from lists to 2-tuples. @@ -5836,17 +5831,16 @@ def mutate( _ in perspective_info_list] return NeuroCognateAnalysis.neuro_cognate_statistics( - #language_str, - #base_language_id, - #base_language_name, perspective_info_list, source_perspective_id, match_translations, input_pairs, locale_id, + user.id, truth_threshold, - stamp, - #storage, + storage, + host_url, + cache_kwargs, debug_flag) # Exception occurred while we tried to perform swadesh analysis. diff --git a/lingvodoc/schema/gql_sync.py b/lingvodoc/schema/gql_sync.py index 21161132..b8a21d33 100644 --- a/lingvodoc/schema/gql_sync.py +++ b/lingvodoc/schema/gql_sync.py @@ -43,15 +43,15 @@ HTTPUnauthorized ) from lingvodoc.views.v2.desktop_sync.core import async_download_dictionary +import os import json import requests from pyramid.request import Request -from pathlib import Path from pyramid.response import Response from lingvodoc.utils.search import recursive_sort from pdb import set_trace as A -from lingvodoc.cache.caching import CACHE +from lingvodoc.cache.caching import CACHE, initialize_cache log = logging.getLogger(__name__) @@ -383,7 +383,7 @@ def mutate(root, info, **args): class StopMutation(graphene.Mutation): class Arguments: - stamp = graphene.Float(required=True) + stamp = graphene.String(required=True) triumph = graphene.Boolean() @@ -396,12 +396,23 @@ def mutate(root, info, stamp): if not client: return ResponseError('Only authorized users can stop running mutations.') - stamps_path = "/tmp/lingvodoc_stamps" - - # Touch stamp file - Path(stamps_path).mkdir(exist_ok=True) - open(f"{stamps_path}/{stamp}", 'a').close() - - print("!!! Stamp-to-stop") + request = info.context.request + storage = request.registry.settings['storage'] + stamp_path = os.path.join(storage['path'], 'lingvodoc_stamps') + stamp_file = os.path.join(stamp_path, stamp) + os.makedirs(stamp_path, exist_ok=True) + + ''' + cache_kwargs = request.registry.settings['cache_kwargs'] + initialize_cache(cache_kwargs) + task = TaskStatus.get_from_cache(stamp) + ''' + + # Touch stamp file if it does not exist, + # otherwise we delete it + if not os.path.isfile(stamp_file): + open(stamp_file, 'a').close() + else: + os.remove(stamp_file) return StopMutation(triumph=True) diff --git a/lingvodoc/schema/query.py b/lingvodoc/schema/query.py index 9d506333..0ff00003 100644 --- a/lingvodoc/schema/query.py +++ b/lingvodoc/schema/query.py @@ -695,6 +695,11 @@ class Query(graphene.ObjectType): xcript_fid = LingvodocID(required = True), xlat_fid = LingvodocID(required = True))) + result_suggestions = ( + graphene.Field( + ObjectVal, + result_file = graphene.String(required = True))) + def resolve_fill_logs(self, info, worker=1): # Check if the current user is administrator client_id = info.context.client_id @@ -5323,6 +5328,24 @@ def resolve_words(self, return result_pairs_list + def resolve_result_suggestions(self, + info, + result_file): + storage = ( + info.context.request.registry.settings['storage']) + + pickle_path = os.path.join(storage['path'], 'neuro_cognates', result_file) + + try: + with gzip.open(pickle_path, 'rb') as pickle_file: + result_dict = pickle.load(pickle_file) + + except: + return ResponseError(f'Cannot access file \'{pickle_path}\'.') + + return result_dict + + class PerspectivesAndFields(graphene.InputObjectType): perspective_id = LingvodocID() field_id = LingvodocID() diff --git a/lingvodoc/utils/neuro_cognates/app.py b/lingvodoc/utils/neuro_cognates/app.py index 36d4ea7b..1dcdd365 100644 --- a/lingvodoc/utils/neuro_cognates/app.py +++ b/lingvodoc/utils/neuro_cognates/app.py @@ -5,12 +5,17 @@ import itertools import json import os +import gzip +import pickle import multiprocess -from pdb import set_trace as A +from time import time as now +from lingvodoc.queue.celery import celery +from lingvodoc.cache.caching import TaskStatus, initialize_cache class AbsDiffLayer(Layer): - def call(self, inputs): + @staticmethod + def call(inputs): x1, x2 = inputs return tf.math.abs(x1 - x2) @@ -20,17 +25,31 @@ def get_config(self): class NeuroCognates: - def __init__(self, compare_lists, input_index, four_tensors, truth_threshold, stamp_file): + def __init__(self, + compare_lists, + input_index, + source_perspective_id, + perspective_name_list, + storage, + host_url, + cache_kwargs, + four_tensors=False, + truth_threshold=0.97): self.compare_lists = compare_lists self.input_index = input_index + self.source_perspective_id = source_perspective_id self.four_tensors = four_tensors self.truth_threshold = truth_threshold - self.stamp_file = stamp_file + self.perspective_name_list = perspective_name_list + self.storage = storage + self.host_url = host_url + self.cache_kwargs = cache_kwargs - abspath = os.path.abspath(__file__) - dname = os.path.dirname(abspath) - os.chdir(dname) + project_dir = os.path.abspath(os.getcwd()) + script_path = os.path.abspath(__file__) + script_dir = os.path.dirname(script_path) + os.chdir(script_dir) if four_tensors: try: @@ -84,111 +103,183 @@ def __init__(self, compare_lists, input_index, four_tensors, truth_threshold, st tokenizer_data = json.load(f) self.tokenizer = tokenizer_from_json(tokenizer_data) - @staticmethod - def split_items(items): - return ( - list(map(lambda x: x[0], items)), - list(map(lambda x: x[1], items)), - list(map(lambda x: x[2], items))) + # Change dir back + os.chdir(project_dir) - @staticmethod + @celery.task def predict_cognates( + self, word_pairs, - compare_lists, - input_index, + task, tokenizer, model, - max_len, - stamp_file, - four_tensors=False, - truth_threshold=0.97): + max_len): + + def split_items(items): + return ( + list(map(lambda x: x[0], items)), + list(map(lambda x: x[1], items)), + list(map(lambda x: x[2], items))) # Разделяем входные пары на слова и переводы - input_words, input_translations, input_lex_ids = NeuroCognates.split_items(word_pairs) + input_words, input_translations, input_lex_ids = split_items(word_pairs) # Токенизация и паддинг входных данных seq_input_words = [tokenizer.texts_to_sequences([word]) for word in input_words] X_input_words = [pad_sequences(seq, maxlen=max_len, padding='post') for seq in seq_input_words] X_input_translations = [] - if four_tensors: + if self.four_tensors: seq_input_translations = [tokenizer.texts_to_sequences([trans]) for trans in input_translations] - X_input_translations = [pad_sequences(seq, maxlen=max_len, padding='post') for seq in seq_input_translations] + X_input_translations = [pad_sequences(seq, maxlen=max_len, padding='post') for seq in + seq_input_translations] X_compare_words = [] X_compare_translations = [] # Проход по каждому списку для сравнения - for compare_list in compare_lists: + for compare_list in self.compare_lists: - compare_words, compare_translations, compare_lex_ids = NeuroCognates.split_items(compare_list) + compare_words, compare_translations, compare_lex_ids = split_items(compare_list) # Токенизация и паддинг данных для сравнения seq_compare_words = [tokenizer.texts_to_sequences([word]) for word in compare_words] X_compare_words.append([pad_sequences(seq, maxlen=max_len, padding='post') for seq in seq_compare_words]) - if four_tensors: + if self.four_tensors: seq_compare_translations = [tokenizer.texts_to_sequences([trans]) for trans in compare_translations] X_compare_translations.append([pad_sequences(seq, maxlen=max_len, padding='post') - for seq in seq_compare_translations]) + for seq in seq_compare_translations]) else: X_compare_translations.append([]) + stamp_file = os.path.join(self.storage['path'], 'lingvodoc_stamps', str(task.id)) + # Calculate prediction - def get_prediction(input_word, input_trans, input_id, X_word, X_trans): + def get_prediction(input_word, input_trans, input_id, X_word, X_trans, event): + + if event.is_set(): + return None similarities = [] result = [] # Проход по каждому списку для сравнения - for i, compare_list in enumerate(compare_lists): + for i, compare_list in enumerate(self.compare_lists): if not compare_list: continue - compare_words, compare_translations, compare_lex_ids = NeuroCognates.split_items(compare_list) + compare_words, compare_translations, compare_lex_ids = split_items(compare_list) count = 0 for compare_word, compare_trans, compare_id, X_comp_word, X_comp_trans in itertools.zip_longest( - compare_words, compare_translations, compare_lex_ids, X_compare_words[i], X_compare_translations[i]): + compare_words, compare_translations, compare_lex_ids, X_compare_words[i], + X_compare_translations[i]): # Checking stamp-to-stop every hundred comparings count += 1 if count % 100 == 0 and os.path.isfile(stamp_file): - print("Killed process !!!") - return result + event.set() + return None # Передаем 2 или 4 тензора в модель pred = (model.predict([X_word, X_trans, X_comp_word, X_comp_trans])[0][0] - if four_tensors else + if self.four_tensors else model.predict([X_word, X_comp_word])[0][0]) - if pred > truth_threshold: # Фильтр по вероятности + if pred > self.truth_threshold: # Фильтр по вероятности similarities.append((i, [compare_word, compare_trans], compare_id, f"{pred:.4f}")) if similarities: result.append(( - input_index, + self.input_index, f"{input_word} '{input_trans}'", input_id, None, similarities, [])) + if os.path.isfile(stamp_file): + event.set() + return None + return result - with multiprocess.Pool(multiprocess.cpu_count() * 2) as p: + start_time = now() + results = [] + current_stage = 0 + flushed = 0 + result_link = "" + input_len = len(word_pairs) + compare_len = sum(map(len, self.compare_lists)) + initialize_cache(self.cache_kwargs) + task = TaskStatus.get_from_cache(task.key) + + def add_result(res): + + if res is None: + return + + nonlocal current_stage, flushed, result_link + current_stage += 1 + finished = (current_stage == input_len) + passed = now() - start_time + left = passed / current_stage * input_len - passed - results = p.starmap(get_prediction, itertools.zip_longest( - input_words, input_translations, input_lex_ids, X_input_words, X_input_translations)) + days = int(left / 86400) + hours = int((left - days * 86400) / 3600) + minutes = int((left - days * 86400 - hours * 3600) / 60) - plain_results = [] - for result in results: - plain_results.extend(result) + progress = 100 if finished else int(current_stage / input_len * 100) + status = "Finished" if finished else f"~ {days}d:{hours}h:{minutes}m left ~" + + results.extend(res) + + if passed - flushed > 300 or finished: + flushed = passed + + result_dict = ( + dict( + suggestion_list=results, + perspective_name_list=self.perspective_name_list, + transcription_count=compare_len * current_stage, + source_perspective_id=self.source_perspective_id)) + + storage_dir = os.path.join(self.storage['path'], 'neuro_cognates') + pickle_path = os.path.join(storage_dir, str(task.id)) + os.makedirs(storage_dir, exist_ok=True) + + with gzip.open(pickle_path, 'wb') as result_data_file: + pickle.dump(result_dict, result_data_file) + + result_link = ''.join([self.host_url, '/suggestions/', str(task.id)]) + + task.set(current_stage, progress, status, result_link) + + with multiprocess.Pool(multiprocess.cpu_count() // 2) as p: + + event = multiprocess.Manager().Event() + + for args in itertools.zip_longest( + input_words, + input_translations, + input_lex_ids, + X_input_words, + X_input_translations, + [event] * input_len + ): + p.apply_async(get_prediction, args, callback=add_result, + error_callback=(lambda e: print(e, flush=True))) p.close() - p.join() + + # Terminate all the processes on event + event.wait() + print("Killed process !!!") + task.set(None, -1, "Stopped manually", result_link) + p.terminate() # Removing stamp-to-stop if exists try: @@ -196,30 +287,27 @@ def get_prediction(input_word, input_trans, input_id, X_word, X_trans): except OSError: pass - return plain_results + return results + + def index(self, word_pairs, task): - def index(self, word_pairs): if self.four_tensors: # Вызов функции для сравнения (модель с 4 тензорами) - return NeuroCognates.predict_cognates( + # Celery_task needs SELF as an argument! + return NeuroCognates.predict_cognates.delay( + self, word_pairs, - self.compare_lists, - self.input_index, + task, self.tokenizer_dict, self.model_dict, - self.max_len_dict, - self.stamp_file, - self.four_tensors, - self.truth_threshold) + self.max_len_dict) else: # Вызов функции для сравнения (модель с 2 тензорами) - return NeuroCognates.predict_cognates( + # Celery_task needs SELF as an argument! + return NeuroCognates.predict_cognates.delay( + self, word_pairs, - self.compare_lists, - self.input_index, + task, self.tokenizer, self.model, - self.max_len, - self.stamp_file, - self.four_tensors, - self.truth_threshold) + self.max_len)