Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions crossflow/clients.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
"""
Clients.py: thin wrapper over dask client
clients.py: thin wrapper over dask client
"""

from collections.abc import Iterable
import glob
import pickle
import sys

from dask.distributed import Client as DaskClient

try:
from collections import Iterable
except ImportError:
from collections.abc import Iterable

from dask.distributed import Future

from . import config
Expand All @@ -26,7 +21,7 @@ class Client(DaskClient):
"""

def __init__(self, *args, **kwargs):
self.filehandler = FileHandler(config.stage_point)
self.filehandler = FileHandler(config.STAGE_POINT)
super().__init__(*args, **kwargs)

def upload(self, some_object):
Expand Down Expand Up @@ -59,17 +54,17 @@ def _futurize(self, item):
Upload an item, if it's not already a Future

"""
if isinstance(item, Future):
if isinstance(item, Future): # pylint: disable=no-else-return
return item
else:
if isinstance(item, list):
if isinstance(item, list): # pylint: disable=no-else-return
for i, j in enumerate(item):
if not isinstance(j, Future):
if self._rough_size(j) > 10000:
item[i] = self.upload(j)
return item
else:
if self._rough_size(item) > 10000:
if self._rough_size(item) > 10000: # pylint: disable=no-else-return
return self.upload(item)
else:
return item
Expand Down Expand Up @@ -97,6 +92,9 @@ def _unpack(self, task, future):
return tuple(outputs)

def _filehandlify(self, args):
# pylint: disable=too-many-nested-blocks
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
"""
work through an argument list, converting paths to filehandles
where possible.
Expand Down Expand Up @@ -192,7 +190,9 @@ def submit(self, func, *args, **kwargs):
else:
newargs = self._futurize(newargs)

if isinstance(func, (SubprocessTask, FunctionTask)):
if isinstance( # pylint: disable=no-else-return
func, (SubprocessTask, FunctionTask)
):
kwargs["pure"] = False
future = super().submit(func.run, *newargs, **kwargs)
return self._unpack(func, future)
Expand Down Expand Up @@ -222,13 +222,8 @@ def map(self, func, *iterables, **kwargs):
maxlen = 0
for iterable in iterables:
if isinstance(iterable, (list, tuple)):
n_items = len(iterable)
if n_items > maxlen:
maxlen = n_items
for iterable in iterables:
if isinstance(iterable, (list, tuple)):
n_items = len(iterable)
if n_items != maxlen:
maxlen = max(maxlen, len(iterable))
if len(iterable) != maxlen:
raise ValueError("Error: not all iterables are same length")
its.append(iterable)
else:
Expand Down
6 changes: 5 additions & 1 deletion crossflow/config.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
stage_point = None
"""
config.py: Allow configurable options.
"""

STAGE_POINT = None
68 changes: 49 additions & 19 deletions crossflow/filehandling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,7 @@
filehanding.py: this module provides classes for passing files between
processes on distributed computing platforms that may not share a common
file system.
"""

import os
import os.path as op
import tempfile
import uuid
import zlib

import fsspec

from . import config

"""
This module defines classes to handle files in distributed environments
where filesyatems may not be shared.

Expand All @@ -34,26 +22,52 @@
...
"""

import os
import os.path as op
import tempfile
import uuid
import zlib

import fsspec

from . import config


def set_stage_point(stage_point):
config.stage_point = stage_point
"""
A method to set the stage_point variable.
"""

config.STAGE_POINT = stage_point


class FileHandler:
"""
Handle file operations
"""

class FileHandler(object):
def __init__(self, stage_point=None):
if stage_point is None:
self.stage_point = config.stage_point
self.stage_point = config.STAGE_POINT
else:
self.stage_point = stage_point

def load(self, path):
"""
Method to load file.
"""

return FileHandle(path, self.stage_point, must_exist=True)

def create(self, path):
"""
Method to load file.
"""

return FileHandle(path, self.stage_point, must_exist=False)


class FileHandle(object):
class FileHandle:
"""
A portable container for a file.
"""
Expand All @@ -79,7 +93,7 @@ def __init__(self, path, stage_point, must_exist=True):
with source as s:
with self.store as d:
d.write(s.read())
source.close()
source.close() # pylint: disable=no-member
self.store.close()
self.store.mode = "rb"
else:
Expand Down Expand Up @@ -116,7 +130,7 @@ def save(self, path):
with source as s:
with dest as d:
d.write(s.read())
dest.close()
dest.close() # pylint: disable=no-member
return path

def __fspath__(self):
Expand All @@ -126,7 +140,7 @@ def __fspath__(self):
"""
if self.local_path is None:
self.local_path = os.path.join(tempfile.gettempdir(), self.uid)
if not op.exists(self.local_path):
if not op.exists(self.local_path): # pylint: disable=no-else-return
return self.save(self.local_path)
else:
return self.local_path
Expand All @@ -141,6 +155,10 @@ def __del__(self):
pass

def read_binary(self):
"""
A method for reading binary file formats
"""

source = self.store
if source is None:
return "".encode("utf-8")
Expand All @@ -153,9 +171,17 @@ def read_binary(self):
return data

def read_text(self):
"""
A wrapper for reading binary formatted text.
"""

return self.read_binary().decode()

def write_binary(self, data):
"""
A method for writing binary file formats
"""

compressed_data = zlib.compress(data)
if self.staging_path is None:
self.store = compressed_data
Expand All @@ -166,4 +192,8 @@ def write_binary(self, data):
self.store.mode = "rb"

def write_text(self, text):
"""
A wrapper for writing binary formatted text.
"""

self.write_binary(text.encode("utf-8"))
Loading