|
| 1 | +#-*- coding: utf-8 -*- |
| 2 | +''' |
| 3 | +Copyright (C) Thibault Francois |
| 4 | +
|
| 5 | +This program is free software: you can redistribute it and/or modify |
| 6 | +it under the terms of the GNU Lesser General Public License as |
| 7 | +published by the Free Software Foundation, version 3. |
| 8 | +
|
| 9 | +This program is distributed in the hope that it will be useful, but |
| 10 | +WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 12 | +Lesser General Lesser Public License for more details. |
| 13 | +
|
| 14 | +You should have received a copy of the GNU Lesser General Public License |
| 15 | +along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 16 | +''' |
| 17 | + |
| 18 | + |
| 19 | +import sys |
| 20 | +import csv |
| 21 | + |
| 22 | +from time import time |
| 23 | +from itertools import islice, chain |
| 24 | +from xmlrpclib import Fault |
| 25 | + |
| 26 | +from lib import conf_lib |
| 27 | +from lib.conf_lib import log_error, log_info, log |
| 28 | +from lib.internal.rpc_thread import RpcThread |
| 29 | +from lib.internal.io import ListWriter |
| 30 | +from lib.internal.csv_reader import UnicodeReader, UnicodeWriter |
| 31 | + |
| 32 | +csv.field_size_limit(sys.maxsize) |
| 33 | + |
| 34 | + |
| 35 | +def batch(iterable, size): |
| 36 | + sourceiter = iter(iterable) |
| 37 | + while True: |
| 38 | + batchiter = islice(sourceiter, size) |
| 39 | + yield chain([batchiter.next()], batchiter) |
| 40 | + |
| 41 | +class RPCThreadImport(RpcThread): |
| 42 | + |
| 43 | + def __init__(self, max_connection, model, header, writer, batch_size=20, context=None): |
| 44 | + super(RPCThreadImport, self).__init__(max_connection) |
| 45 | + self.model = model |
| 46 | + self.header = header |
| 47 | + self.batch_size = batch_size |
| 48 | + self.writer = writer |
| 49 | + self.context = context |
| 50 | + |
| 51 | + |
| 52 | + def launch_batch(self, data_lines, batch_number, check=False): |
| 53 | + def launch_batch_fun(lines, batch_number, check=False): |
| 54 | + i = 0 |
| 55 | + for lines_batch in batch(lines, self.batch_size): |
| 56 | + lines_batch = [l for l in lines_batch] |
| 57 | + self.sub_batch_run(lines_batch, batch_number, i, len(lines), check=check) |
| 58 | + i += 1 |
| 59 | + |
| 60 | + self.spawn_thread(launch_batch_fun, [data_lines, batch_number], {'check' : check}) |
| 61 | + |
| 62 | + def sub_batch_run(self, lines, batch_number, sub_batch_number, total_line_nb, check=False): |
| 63 | + success = False |
| 64 | + |
| 65 | + st = time() |
| 66 | + try: |
| 67 | + success = self._send_rpc(lines, batch_number, sub_batch_number, check=check) |
| 68 | + except Fault as e: |
| 69 | + log_error("Line %s %s failed" % (batch_number, sub_batch_number)) |
| 70 | + log_error(e.faultString) |
| 71 | + except ValueError as e: |
| 72 | + log_error("Line %s %s failed value error" % (batch_number, sub_batch_number)) |
| 73 | + except Exception as e: |
| 74 | + log_info("Unknown Problem") |
| 75 | + exc_type, exc_value, _ = sys.exc_info() |
| 76 | + #traceback.print_tb(exc_traceback, file=sys.stdout) |
| 77 | + log_error(exc_type) |
| 78 | + log_error(exc_value) |
| 79 | + |
| 80 | + if not success: |
| 81 | + self.writer.writerows(lines) |
| 82 | + |
| 83 | + log_info("time for batch %s - %s of %s : %s" % (batch_number, (sub_batch_number + 1) * self.batch_size, total_line_nb, time() - st)) |
| 84 | + |
| 85 | + |
| 86 | + def _send_rpc(self, lines, batch_number, sub_batch_number, check=False): |
| 87 | + res = self.model.load(self.header, lines, context=self.context) |
| 88 | + if res['messages']: |
| 89 | + for msg in res['messages']: |
| 90 | + log_error('batch %s, %s' % (batch_number, sub_batch_number)) |
| 91 | + log_error(msg) |
| 92 | + log_error(lines[msg['record']]) |
| 93 | + return False |
| 94 | + if len(res['ids']) != len(lines) and check: |
| 95 | + log_error("number of record import is different from the record to import, probably duplicate xml_id") |
| 96 | + return False |
| 97 | + |
| 98 | + return True |
| 99 | + |
| 100 | +def do_not_split(split, previous_split_value, split_index, line): |
| 101 | + if not split: # If no split no need to continue |
| 102 | + return False |
| 103 | + |
| 104 | + split_value = line[split_index] |
| 105 | + if split_value != previous_split_value: #Different Value no need to not split |
| 106 | + return False |
| 107 | + |
| 108 | + return True |
| 109 | + |
| 110 | +def filter_line_ignore(ignore, header, line): |
| 111 | + new_line = [] |
| 112 | + for k, val in zip(header, line): |
| 113 | + if k not in ignore: |
| 114 | + new_line.append(val) |
| 115 | + return new_line |
| 116 | + |
| 117 | +def filter_header_ignore(ignore, header): |
| 118 | + new_header = [] |
| 119 | + for val in header: |
| 120 | + if val not in ignore: |
| 121 | + new_header.append(val) |
| 122 | + return new_header |
| 123 | + |
| 124 | +def read_file(file_to_read, delimiter=';', encoding='utf-8-sig', skip=0): |
| 125 | + def get_real_header(header): |
| 126 | + """ Get real header cut at the first empty column """ |
| 127 | + new_header = [] |
| 128 | + for head in header: |
| 129 | + if head: |
| 130 | + new_header.append(head) |
| 131 | + else: |
| 132 | + break |
| 133 | + return new_header |
| 134 | + |
| 135 | + def check_id_column(header): |
| 136 | + try: |
| 137 | + header.index('id') |
| 138 | + except ValueError as ve: |
| 139 | + log_error("No External Id (id) column defined, please add one") |
| 140 | + raise ve |
| 141 | + |
| 142 | + def skip_line(reader): |
| 143 | + log_info("Skipping until line %s excluded" % skip) |
| 144 | + for _ in xrange(1, skip): |
| 145 | + reader.next() |
| 146 | + |
| 147 | + log('open %s' % file_to_read) |
| 148 | + file_ref = open(file_to_read, 'r') |
| 149 | + reader = UnicodeReader(file_ref, delimiter=delimiter, encoding='utf-8-sig') |
| 150 | + header = reader.next() |
| 151 | + header = get_real_header(header) |
| 152 | + check_id_column(header) |
| 153 | + skip_line(reader) |
| 154 | + data = [l for l in reader] |
| 155 | + return header, data |
| 156 | + |
| 157 | +def split_sort(split, header, data): |
| 158 | + split_index = 0 |
| 159 | + if split: |
| 160 | + try: |
| 161 | + split_index = header.index(split) |
| 162 | + except ValueError as ve: |
| 163 | + log("column %s not defined" % split) |
| 164 | + raise ve |
| 165 | + data = sorted(data, key=lambda d: d[split_index]) |
| 166 | + return data, split_index |
| 167 | + |
| 168 | +def import_data(config_file, model, header=None, data=None, file_csv=None, context=None, fail_file=False, encoding='utf-8-sig', separator=";", ignore=False, split=False, check=True, max_connection=1, batch_size=10, skip=0): |
| 169 | + """ |
| 170 | + header and data mandatory in file_csv is not provided |
| 171 | +
|
| 172 | + """ |
| 173 | + ignore = ignore or [] |
| 174 | + context = context or {} |
| 175 | + |
| 176 | + if file_csv: |
| 177 | + header, data = read_file(file_csv, delimiter=separator, encoding=encoding, skip=skip) |
| 178 | + fail_file = fail_file or file_csv + ".fail" |
| 179 | + file_result = open(fail_file, "wb") |
| 180 | + |
| 181 | + if not header or data == None: |
| 182 | + raise ValueError("Please provide either a data file or a header and data") |
| 183 | + |
| 184 | + object_registry = conf_lib.get_server_connection(config_file).get_model(model) |
| 185 | + |
| 186 | + if file_csv: |
| 187 | + writer = UnicodeWriter(file_result, delimiter=separator, encoding=encoding, quoting=csv.QUOTE_ALL) |
| 188 | + else: |
| 189 | + writer = ListWriter() |
| 190 | + |
| 191 | + writer.writerow(filter_header_ignore(ignore, header)) |
| 192 | + if file_csv: |
| 193 | + file_result.flush() |
| 194 | + rpc_thread = RPCThreadImport(int(max_connection), object_registry, filter_header_ignore(ignore, header), writer, batch_size, context) |
| 195 | + st = time() |
| 196 | + |
| 197 | + |
| 198 | + data, split_index = split_sort(split, header, data) |
| 199 | + |
| 200 | + i = 0 |
| 201 | + previous_split_value = False |
| 202 | + while i < len(data): |
| 203 | + lines = [] |
| 204 | + j = 0 |
| 205 | + while i < len(data) and (j < batch_size or do_not_split(split, previous_split_value, split_index, data[i])): |
| 206 | + line = data[i][:len(header)] |
| 207 | + lines.append(filter_line_ignore(ignore, header, line)) |
| 208 | + previous_split_value = line[split_index] |
| 209 | + j += 1 |
| 210 | + i += 1 |
| 211 | + batch_number = split and "[%s] - [%s]" % (rpc_thread.thread_number(), previous_split_value) or "[%s]" % rpc_thread.thread_number() |
| 212 | + rpc_thread.launch_batch(lines, batch_number, check) |
| 213 | + |
| 214 | + rpc_thread.wait() |
| 215 | + if file_csv: |
| 216 | + file_result.close() |
| 217 | + |
| 218 | + log_info("%s %s imported, total time %s second(s)" % (len(data), model, (time() - st))) |
| 219 | + if file_csv: |
| 220 | + return False, False |
| 221 | + else: |
| 222 | + return writer.header, writer.data |
| 223 | + |
0 commit comments