diff --git a/HISTORY.rst b/HISTORY.rst index 66bafcd..fe5e10c 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -1,6 +1,12 @@ ======= History ======= +UNRELEASED +------------------ +* Fix issue with slow export duration (https://github.com/saritasa-nest/django-import-export-extensions/issues/79): + + * Add setting ``STATUS_UPDATE_ROW_COUNT`` (default: 100) which defines the number of rows after import/export of which the task status is updated; + * Add ability to specify ``status_update_row_count`` for each resource; 1.1.0 (2024-12-06) ------------------ diff --git a/docs/installation.rst b/docs/installation.rst index 3558f15..a816655 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -80,6 +80,14 @@ Mapping file extensions to mime types to import files. By default, it uses the `mimetypes.types_map `_ from Python's mimetypes module. +``STATUS_UPDATE_ROW_COUNT`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Defines the number of rows after import/export of which the task status is +updated. This helps to increase the speed of import/export. The default value +is 100. This parameter can be specified separately for each resource by adding +``status_update_row_count`` to its ``Meta``. + Settings from django-import-export ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Additionally, the package supports settings from the original django-import-export package. diff --git a/import_export_extensions/apps.py b/import_export_extensions/apps.py index 790fe41..9c531ad 100644 --- a/import_export_extensions/apps.py +++ b/import_export_extensions/apps.py @@ -7,6 +7,8 @@ # Default configuration # Maximum num of rows to be imported DEFAULT_MAX_DATASET_ROWS = 100000 +# After how many imported/exported rows celery task status will be updated +DEFAULT_STATUS_UPDATE_ROW_COUNT = 100 class CeleryImportExport(AppConfig): @@ -24,3 +26,8 @@ def ready(self): DEFAULT_MAX_DATASET_ROWS, ) settings.MIME_TYPES_MAP = mimetypes.types_map.copy() + settings.STATUS_UPDATE_ROW_COUNT = getattr( + settings, + "STATUS_UPDATE_ROW_COUNT", + DEFAULT_STATUS_UPDATE_ROW_COUNT, + ) diff --git a/import_export_extensions/resources.py b/import_export_extensions/resources.py index 1310d6c..bc3eda1 100644 --- a/import_export_extensions/resources.py +++ b/import_export_extensions/resources.py @@ -1,14 +1,16 @@ import collections +import enum +import functools import typing -from enum import Enum +from django.conf import settings from django.db.models import QuerySet from django.utils import timezone from django.utils.functional import classproperty from django.utils.translation import gettext_lazy as _ import tablib -from celery import current_task, result +from celery import current_task from django_filters import rest_framework as filters from django_filters.utils import translate_validation from import_export import fields, resources @@ -17,7 +19,7 @@ from .results import Error, Result, RowResult -class TaskState(Enum): +class TaskState(enum.Enum): """Class with possible task state values.""" IMPORTING = _("Importing") @@ -41,8 +43,19 @@ def __init__( """Remember init kwargs.""" self._filter_kwargs = filter_kwargs self.resource_init_kwargs: dict[str, typing.Any] = kwargs + self.total_objects_count = 0 + self.current_object_number = 0 super().__init__() + @functools.cached_property + def status_update_row_count(self): + """Rows count after which to update celery task status.""" + return getattr( + self._meta, + "status_update_row_count", + settings.STATUS_UPDATE_ROW_COUNT, + ) + def get_queryset(self): """Filter export queryset via filterset class.""" queryset = super().get_queryset() @@ -194,6 +207,11 @@ def export( """Init task state before exporting.""" if queryset is None: queryset = self.get_queryset() + + # Necessary for correct calculation of the total, this method is called + # later inside parent resource class + queryset = self.filter_export(queryset, **kwargs) + self.initialize_task_state( state=TaskState.EXPORTING.name, queryset=queryset, @@ -228,17 +246,18 @@ def initialize_task_state( if not current_task or current_task.request.called_directly: return - if isinstance(queryset, QuerySet): - total = queryset.count() - else: - total = len(queryset) + self.total_objects_count = ( + queryset.count() + if isinstance(queryset, QuerySet) + else len(queryset) + ) self._update_current_task_state( state=state, - meta=dict( - current=0, - total=total, - ), + meta={ + "current": self.current_object_number, + "total": self.total_objects_count, + }, ) def update_task_state( @@ -247,22 +266,36 @@ def update_task_state( ): """Update state of the current event. - Receives meta of the current task and increase the `current` - field by 1. + Receives meta of the current task and increase the `current`. Task + state is updated when current item is a multiple of + `self.status_update_row_count` or equal to total number of items. + + For example: once every 1000 objects (if the current object is 1000, + 2000, 3000) or when current object is the last object, in order to + complete the import/export. + + This needed to increase the speed of import/export by reducing number + of task status updates. """ if not current_task or current_task.request.called_directly: return - async_result = result.AsyncResult(current_task.request.get("id")) + self.current_object_number += 1 - self._update_current_task_state( - state=state, - meta=dict( - current=async_result.result.get("current", 0) + 1, - total=async_result.result.get("total", 0), - ), + is_reached_update_count = ( + self.current_object_number % self.status_update_row_count == 0 ) + is_last_object = self.current_object_number == self.total_objects_count + + if is_reached_update_count or is_last_object: + self._update_current_task_state( + state=state, + meta={ + "current": self.current_object_number, + "total": self.total_objects_count, + }, + ) def _update_current_task_state(self, state: str, meta: dict[str, int]): """Update state of task where resource is executed."""