Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve slow import/export duration #82

Merged
merged 4 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
=======
History
=======
UNRELEASED
------------------
* Fix issue with slow export duration (https://github.com/saritasa-nest/django-import-export-extensions/issues/79):

* Add setting ``STATUS_UPDATE_ROW_COUNT`` (default: 100) which defines the number of rows after import/export of which the task status is updated;
* Add ability to specify ``status_update_row_count`` for each resource;

1.1.0 (2024-12-06)
------------------
Expand Down
8 changes: 8 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ Mapping file extensions to mime types to import files.
By default, it uses the `mimetypes.types_map <https://docs.python.org/3/library/mimetypes.html#mimetypes.types_map>`_
from Python's mimetypes module.

``STATUS_UPDATE_ROW_COUNT``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Defines the number of rows after import/export of which the task status is
updated. This helps to increase the speed of import/export. The default value
is 100. This parameter can be specified separately for each resource by adding
``status_update_row_count`` to its ``Meta``.

Settings from django-import-export
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Additionally, the package supports settings from the original django-import-export package.
Expand Down
7 changes: 7 additions & 0 deletions import_export_extensions/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# Default configuration
# Maximum num of rows to be imported
DEFAULT_MAX_DATASET_ROWS = 100000
# After how many imported/exported rows celery task status will be updated
DEFAULT_STATUS_UPDATE_ROW_COUNT = 100


class CeleryImportExport(AppConfig):
Expand All @@ -24,3 +26,8 @@ def ready(self):
DEFAULT_MAX_DATASET_ROWS,
)
settings.MIME_TYPES_MAP = mimetypes.types_map.copy()
settings.STATUS_UPDATE_ROW_COUNT = getattr(
settings,
"STATUS_UPDATE_ROW_COUNT",
DEFAULT_STATUS_UPDATE_ROW_COUNT,
)
73 changes: 53 additions & 20 deletions import_export_extensions/resources.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import collections
import enum
import functools
import typing
from enum import Enum

from django.conf import settings
from django.db.models import QuerySet
from django.utils import timezone
from django.utils.functional import classproperty
from django.utils.translation import gettext_lazy as _

import tablib
from celery import current_task, result
from celery import current_task
from django_filters import rest_framework as filters
from django_filters.utils import translate_validation
from import_export import fields, resources
Expand All @@ -17,7 +19,7 @@
from .results import Error, Result, RowResult


class TaskState(Enum):
class TaskState(enum.Enum):
"""Class with possible task state values."""

IMPORTING = _("Importing")
Expand All @@ -41,8 +43,19 @@ def __init__(
"""Remember init kwargs."""
self._filter_kwargs = filter_kwargs
self.resource_init_kwargs: dict[str, typing.Any] = kwargs
self.total_objects_count = 0
self.current_object_number = 0
super().__init__()

@functools.cached_property
def status_update_row_count(self):
"""Rows count after which to update celery task status."""
return getattr(
self._meta,
"status_update_row_count",
settings.STATUS_UPDATE_ROW_COUNT,
)

def get_queryset(self):
"""Filter export queryset via filterset class."""
queryset = super().get_queryset()
Expand Down Expand Up @@ -194,6 +207,11 @@ def export(
"""Init task state before exporting."""
if queryset is None:
queryset = self.get_queryset()

# Necessary for correct calculation of the total, this method is called
# later inside parent resource class
queryset = self.filter_export(queryset, **kwargs)

self.initialize_task_state(
state=TaskState.EXPORTING.name,
queryset=queryset,
Expand Down Expand Up @@ -228,17 +246,18 @@ def initialize_task_state(
if not current_task or current_task.request.called_directly:
return

if isinstance(queryset, QuerySet):
total = queryset.count()
else:
total = len(queryset)
self.total_objects_count = (
queryset.count()
if isinstance(queryset, QuerySet)
else len(queryset)
)

self._update_current_task_state(
state=state,
meta=dict(
current=0,
total=total,
),
meta={
"current": self.current_object_number,
"total": self.total_objects_count,
},
)

def update_task_state(
Expand All @@ -247,22 +266,36 @@ def update_task_state(
):
"""Update state of the current event.

Receives meta of the current task and increase the `current`
field by 1.
Receives meta of the current task and increase the `current`. Task
state is updated when current item is a multiple of
`self.status_update_row_count` or equal to total number of items.

For example: once every 1000 objects (if the current object is 1000,
2000, 3000) or when current object is the last object, in order to
complete the import/export.

This needed to increase the speed of import/export by reducing number
of task status updates.

"""
if not current_task or current_task.request.called_directly:
return

async_result = result.AsyncResult(current_task.request.get("id"))
self.current_object_number += 1

self._update_current_task_state(
state=state,
meta=dict(
current=async_result.result.get("current", 0) + 1,
total=async_result.result.get("total", 0),
),
is_reached_update_count = (
self.current_object_number % self.status_update_row_count == 0
)
is_last_object = self.current_object_number == self.total_objects_count

if is_reached_update_count or is_last_object:
self._update_current_task_state(
state=state,
meta={
"current": self.current_object_number,
"total": self.total_objects_count,
},
)

def _update_current_task_state(self, state: str, meta: dict[str, int]):
"""Update state of task where resource is executed."""
Expand Down
Loading