Skip to content

Commit 70291f5

Browse files
Fast API, many improvements and bug fixes
[Public API] added bulk methods [agent] fast upload/download [jupyter] tutorials and cookbooks use new Public API
1 parent ec49a5b commit 70291f5

File tree

162 files changed

+2084
-2205
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

162 files changed

+2084
-2205
lines changed

agent/Dockerfile

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ RUN pip install --no-cache-dir \
2727
py3exiv2==0.4.0
2828

2929
RUN pip install requests-toolbelt
30+
RUN pip install packaging
31+
3032

3133
############### copy code ###############
3234
ARG MODULE_PATH

agent/VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
agent:4.2.0
1+
agent:4.3.0

agent/src/worker/agent.py

+38-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import threading
77
from concurrent.futures import ThreadPoolExecutor, wait
88
import subprocess
9-
9+
import os
1010
import supervisely_lib as sly
1111

1212
from worker import constants
@@ -36,8 +36,13 @@ def __init__(self):
3636
self.thread_list = []
3737
self.daemons_list = []
3838

39+
self._remove_old_agent()
40+
self._validate_duplicated_agents()
41+
3942
sly.fs.clean_dir(constants.AGENT_TMP_DIR())
40-
self._stop_missed_containers()
43+
self._stop_missed_containers(constants.TASKS_DOCKER_LABEL())
44+
# for compatibility with old plugins
45+
self._stop_missed_containers(constants.TASKS_DOCKER_LABEL_LEGACY())
4146

4247
self.docker_api = docker.from_env(version='auto')
4348
self._docker_login()
@@ -48,6 +53,33 @@ def __init__(self):
4853
self.agent_connect_initially()
4954
self.logger.info('Agent connected to server.')
5055

56+
def _remove_old_agent(self):
57+
container_id = os.getenv('REMOVE_OLD_AGENT', None)
58+
if container_id is None:
59+
return
60+
61+
dc = docker.from_env()
62+
olg_agent = dc.containers.get(container_id)
63+
olg_agent.remove(force=True)
64+
65+
agent_same_token = []
66+
for cont in dc.containers.list():
67+
if constants.TOKEN() in cont.name:
68+
agent_same_token.append(cont)
69+
70+
if len(agent_same_token) > 1:
71+
raise RuntimeError("Several agents with the same token are running. Please, kill them or contact support.")
72+
agent_same_token[0].rename('supervisely-agent-{}'.format(constants.TOKEN()))
73+
74+
def _validate_duplicated_agents(self):
75+
dc = docker.from_env()
76+
agent_same_token = []
77+
for cont in dc.containers.list():
78+
if constants.TOKEN() in cont.name:
79+
agent_same_token.append(cont)
80+
if len(agent_same_token) > 1:
81+
raise RuntimeError("Agent with the same token already exists.")
82+
5183
def agent_connect_initially(self):
5284
try:
5385
hw_info = get_hw_info()
@@ -63,6 +95,7 @@ def agent_connect_initially(self):
6395
self.agent_info = {
6496
'hardware_info': hw_info,
6597
'agent_image': json.loads(docker_img_info)["Config"]["Image"],
98+
'agent_version': json.loads(docker_img_info)["Config"]["Labels"]["VERSION"],
6699
'agent_image_digest': get_self_docker_image_digest()
67100
}
68101

@@ -121,6 +154,7 @@ def start_task(self, task):
121154
self.logger.warning('TASK_ID_ALREADY_STARTED', extra={'task_id': task['task_id']})
122155
else:
123156
task_id = task['task_id']
157+
task["agent_version"] = self.agent_info["agent_version"]
124158
self.task_pool[task_id] = create_task(task, self.docker_api)
125159
self.task_pool[task_id].start()
126160
finally:
@@ -161,9 +195,9 @@ def _remove_containers(label_filter):
161195
cont.remove(force=True)
162196
return stop_list
163197

164-
def _stop_missed_containers(self):
198+
def _stop_missed_containers(self, ecosystem_token):
165199
self.logger.info('Searching for missed containers...')
166-
label_filter = {'label': 'ecosystem_token={}'.format(constants.TASKS_DOCKER_LABEL())}
200+
label_filter = {'label': 'ecosystem_token={}'.format(ecosystem_token)}
167201

168202
stopped_list = Agent._remove_containers(label_filter=label_filter)
169203

agent/src/worker/constants.py

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
from urllib.parse import urlparse
55
import supervisely_lib as sly
6+
import hashlib
67

78

89
def HOST_DIR():
@@ -23,6 +24,10 @@ def TOKEN():
2324

2425

2526
def TASKS_DOCKER_LABEL():
27+
return 'supervisely_{}'.format(hashlib.sha256(TOKEN().encode('utf-8')).hexdigest())
28+
29+
30+
def TASKS_DOCKER_LABEL_LEGACY():
2631
return 'supervisely_{}'.format(TOKEN())
2732

2833

agent/src/worker/data_manager.py

+48-49
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,16 @@
55
from worker.agent_storage import AgentStorage
66
from worker.fs_storages import EmptyStorage
77
from worker import constants
8-
from collections import defaultdict
8+
9+
10+
def _maybe_append_image_extension(name, ext):
11+
name_split = os.path.splitext(name)
12+
if name_split[1] == '':
13+
normalized_ext = ('.' + ext).replace('..', '.')
14+
sly.image.validate_ext(normalized_ext)
15+
return name + normalized_ext
16+
else:
17+
return name
918

1019

1120
class DataManager(object):
@@ -34,10 +43,10 @@ def download_nn(self, name, parent_dir):
3443
self.logger.info('NN has been copied from local storage.')
3544
return
3645

37-
model_in_mb = int(float(model_info.size) / 1024 / 1024)
38-
progress = sly.Progress('Download NN: {!r}'.format(name), model_in_mb)
46+
model_in_mb = int(float(model_info.size) / 1024 / 1024 + 1)
47+
progress = sly.Progress('Download NN: {!r}'.format(name), model_in_mb, self.logger)
3948

40-
self.public_api.model.download_to_dir(self.workspace_id, name, parent_dir, progress.iter_done_report)
49+
self.public_api.model.download_to_dir(self.workspace_id, name, parent_dir, progress.iters_done_report)
4150
self.logger.info('NN has been downloaded from server.')
4251

4352
if self.has_nn_storage():
@@ -87,34 +96,37 @@ def download_dataset(self, dataset, dataset_id):
8796
'images_to_download': len(images_to_download)})
8897
if len(images_to_download) + len(images_in_cache) != len(images):
8998
raise RuntimeError("Error with images cache during download. Please contact support.")
90-
for batch_cache in sly.batched(list(zip(images_in_cache, images_cache_paths)), constants.BATCH_SIZE_GET_IMAGES_INFO()):
91-
img_cache_ids = [img_info.id for img_info, _ in batch_cache]
99+
100+
if len(images_in_cache) > 0:
101+
img_cache_ids = [img_info.id for img_info in images_in_cache]
92102
ann_info_list = self.public_api.annotation.download_batch(dataset_id, img_cache_ids, progress_anns.iters_done_report)
93-
img_name_to_ann = {ann.image_name: ann.annotation for ann in ann_info_list}
94-
for img_info, img_cache_path in batch_cache:
95-
dataset.add_item_file(img_info.name, img_cache_path, img_name_to_ann[img_info.name])
103+
img_name_to_ann = {ann.image_id: ann.annotation for ann in ann_info_list}
104+
for img_info, img_cache_path in zip(images_in_cache, images_cache_paths):
105+
item_name = _maybe_append_image_extension(img_info.name, img_info.ext)
106+
dataset.add_item_file(item_name, img_cache_path, img_name_to_ann[img_info.id])
96107
progress_imgs.iter_done_report()
97108

98109
# download images from server
99-
for batch_download in sly.batched(images_to_download, constants.BATCH_SIZE_GET_IMAGES_INFO()):
110+
if len(images_to_download) > 0:
100111
#prepare lists for api methods
101112
img_ids = []
102113
img_paths = []
103-
for img_info in batch_download:
114+
for img_info in images_to_download:
104115
img_ids.append(img_info.id)
105116
# TODO download to a temp file and use dataset api to add the image to the dataset.
106-
img_paths.append(dataset.deprecated_make_img_path(img_info.name, img_info.ext))
117+
img_paths.append(
118+
os.path.join(dataset.img_dir, _maybe_append_image_extension(img_info.name, img_info.ext)))
107119

108120
# download annotations
109121
ann_info_list = self.public_api.annotation.download_batch(dataset_id, img_ids, progress_anns.iters_done_report)
110-
img_name_to_ann = {ann.image_name: ann.annotation for ann in ann_info_list}
111-
self.public_api.image.download_batch(dataset_id, img_ids, img_paths, progress_imgs.iters_done_report)
112-
for img_info, img_path in zip(batch_download, img_paths):
113-
dataset.add_item_file(img_info.name, img_path, img_name_to_ann[img_info.name])
122+
img_name_to_ann = {ann.image_id: ann.annotation for ann in ann_info_list}
123+
self.public_api.image.download_paths(dataset_id, img_ids, img_paths, progress_imgs.iters_done_report)
124+
for img_info, img_path in zip(images_to_download, img_paths):
125+
dataset.add_item_file(img_info.name, img_path, img_name_to_ann[img_info.id])
114126

115127
if self.has_images_storage():
116128
progress_cache = sly.Progress('Dataset {!r}: cache images'.format(dataset.name), len(img_paths), self.logger)
117-
img_hashes = [img_info.hash for img_info in batch_download]
129+
img_hashes = [img_info.hash for img_info in images_to_download]
118130
self.storage.images.write_objects(img_paths, img_hashes, progress_cache.iter_done_report)
119131

120132
# @TODO: remove legacy stuff
@@ -146,44 +158,31 @@ def upload_project(self, parent_dir, project_name, new_title, legacy=False, add_
146158
self.logger.info('PROJECT_CREATED',extra={'event_type': sly.EventType.PROJECT_CREATED, 'project_id': project_id})
147159

148160
def upload_dataset(self, dataset, dataset_id):
149-
progress = None
161+
progress_cache = None
150162
items_count = len(dataset)
151-
hash_to_img_paths = defaultdict(list)
152-
hash_to_ann_paths = defaultdict(list)
153-
hash_to_item_names = defaultdict(list)
163+
164+
item_names = []
165+
img_paths = []
166+
ann_paths = []
154167
for item_name in dataset:
168+
item_names.append(item_name)
155169
item_paths = dataset.get_item_paths(item_name)
156-
img_hash = sly.fs.get_file_hash(item_paths.img_path)
157-
hash_to_img_paths[img_hash].append(item_paths.img_path)
158-
hash_to_ann_paths[img_hash].append(item_paths.ann_path)
159-
hash_to_item_names[img_hash].append(item_name)
170+
img_paths.append(item_paths.img_path)
171+
ann_paths.append(item_paths.ann_path)
172+
160173
if self.has_images_storage():
161-
if progress is None:
162-
progress = sly.Progress('Dataset {!r}: cache images'.format(dataset.name), items_count, self.logger)
174+
if progress_cache is None:
175+
progress_cache = sly.Progress('Dataset {!r}: cache images'.format(dataset.name), items_count, self.logger)
176+
177+
img_hash = sly.fs.get_file_hash(item_paths.img_path)
163178
self.storage.images.write_object(item_paths.img_path, img_hash)
164-
progress.iter_done_report()
179+
progress_cache.iter_done_report()
180+
181+
progress = sly.Progress('Dataset {!r}: upload images'.format(dataset.name), items_count, self.logger)
182+
image_infos = self.public_api.image.upload_paths(dataset_id, item_names, img_paths, progress.iters_done_report)
165183

166-
progress_img = sly.Progress('Dataset {!r}: upload images'.format(dataset.name), items_count, self.logger)
167-
progress_ann = sly.Progress('Dataset {!r}: upload annotations'.format(dataset.name), items_count, self.logger)
168-
169-
def add_images_annotations(hashes, pb_img_cb, pb_ann_cb):
170-
names = [name for hash in hashes for name in hash_to_item_names[hash]]
171-
unrolled_hashes = [hash for hash in hashes for _ in range(len(hash_to_item_names[hash]))]
172-
ann_paths = [path for hash in hashes for path in hash_to_ann_paths[hash]]
173-
remote_infos = self.public_api.image.add_batch(dataset_id, names, unrolled_hashes, pb_img_cb)
174-
self.public_api.annotation.upload_batch_paths(dataset_id, [info.id for info in remote_infos], ann_paths, pb_ann_cb)
175-
176-
# add already uploaded images + attach annotations
177-
remote_hashes = self.public_api.image.check_existing_hashes(list(hash_to_img_paths.keys()))
178-
if len(remote_hashes) > 0:
179-
add_images_annotations(remote_hashes, progress_img.iters_done_report, progress_ann.iters_done_report)
180-
181-
# upload new images + add annotations
182-
new_hashes = list(set(hash_to_img_paths.keys()) - set(remote_hashes))
183-
img_paths = [path for hash in new_hashes for path in hash_to_img_paths[hash]]
184-
self.public_api.image.upload_batch_paths(img_paths, progress_img.iters_done_report)
185-
if len(new_hashes) > 0:
186-
add_images_annotations(new_hashes, None, progress_ann.iters_done_report)
184+
progress = sly.Progress('Dataset {!r}: upload annotations'.format(dataset.name), items_count, self.logger)
185+
self.public_api.annotation.upload_paths([info.id for info in image_infos], ann_paths, progress.iters_done_report)
187186

188187
def upload_archive(self, task_id, dir_to_archive, archive_name):
189188
self.logger.info("PACK_TO_ARCHIVE ...")

agent/src/worker/task_custom.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,14 @@ def init_additional(self):
2929

3030
def download_step(self):
3131
for model_info in self.info['models']:
32-
self.data_mgr.download_nn(model_info['title'], model_info['id'], model_info['hash'], self.dir_model)
32+
self.data_mgr.download_nn(model_info['title'], self.dir_model)
3333

3434
self.logger.info("DOWNLOAD_DATA")
3535
json.dump(self.info['config'], open(self.config_path1, 'w')) # Deprecated 'task_settings.json'
3636
json.dump(self.info['config'], open(self.config_path2, 'w')) # New style task_config.json
3737

3838
for pr_info in self.info['projects']:
39-
project = sly.api_proto.Project(id=pr_info['id'], title=pr_info['title'])
40-
datasets = [sly.api_proto.Dataset(id=ds['id'], title=ds['title']) for ds in pr_info['datasets']]
41-
self.data_mgr.download_project(self.dir_data, project, datasets)
39+
self.data_mgr.download_project(self.dir_data, pr_info['title'])
4240

4341
self.report_step_done(TaskStep.DOWNLOAD)
4442

agent/src/worker/task_dockerized.py

+34-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from threading import Lock
55
import json
66
from docker.errors import DockerException, ImageNotFound as DockerImageNotFound
7+
from packaging import version
78

89
import supervisely_lib as sly
910

@@ -41,8 +42,8 @@ def __init__(self, *args, **kwargs):
4142

4243
self._container = None
4344
self._container_lock = Lock() # to drop container from different threads
44-
self.docker_image_name = self.info['docker_image']
45-
if ':' not in self.docker_image_name:
45+
self.docker_image_name = self.info.get('docker_image', None)
46+
if self.docker_image_name is not None and ':' not in self.docker_image_name:
4647
self.docker_image_name += ':latest'
4748
self.docker_pulled = False # in task
4849

@@ -118,6 +119,34 @@ def _docker_pull(self):
118119
raise DockerException('Unable to pull image: not enough free disk space or something wrong with DockerHub.'
119120
' Please, run the task again or email support.')
120121
self.logger.info('Docker image has been pulled', extra={'pulled': {'tags': pulled_img.tags, 'id': pulled_img.id}})
122+
self._validate_version(self.info["agent_version"], pulled_img.labels.get("VERSION", None))
123+
124+
def _validate_version(self, agent_image, plugin_image):
125+
self.logger.info('Check if agent and plugin versions are compatible')
126+
127+
def get_version(docker_image):
128+
if docker_image is None:
129+
return None
130+
image_parts = docker_image.split(":")
131+
if len(image_parts) != 2:
132+
return None
133+
return image_parts[1]
134+
135+
agent_version = get_version(agent_image.strip())
136+
plugin_version = get_version(plugin_image.strip())
137+
138+
if agent_version is None or plugin_version is None:
139+
self.logger.info('Unknown version')
140+
141+
av = version.parse(agent_version)
142+
pv = version.parse(plugin_version)
143+
144+
if type(av) is version.LegacyVersion or type(pv) is version.LegacyVersion:
145+
self.logger.info('Invalid semantic version, can not compare')
146+
return
147+
148+
if av.release[0] < pv.release[0]:
149+
self.logger.critical('Agent version is lower than plugin version. Please, update agent.')
121150

122151
def _docker_image_exists(self):
123152
try:
@@ -138,17 +167,16 @@ def spawn_container(self, add_envs=None):
138167
add_envs = {}
139168
self._container_lock.acquire()
140169
try:
141-
#@TODO: DEBUG_COPY_IMAGES only for compatibility with old plugins
142170
self._container = self._docker_api.containers.run(
143171
self.docker_image_name,
144172
runtime=self.docker_runtime,
145173
entrypoint=["sh", "-c", "python -u {}".format(self.entrypoint)],
146174
detach=True,
147-
name='sly_task_{}_{}'.format(constants.TOKEN(), self.info['task_id']),
175+
name='sly_task_{}_{}'.format(self.info['task_id'], constants.TASKS_DOCKER_LABEL()),
148176
remove=False,
149177
volumes={self.dir_task_host: {'bind': '/sly_task_data',
150178
'mode': 'rw'}},
151-
environment={'LOG_LEVEL': 'DEBUG', 'LANG': 'C.UTF-8', 'DEBUG_COPY_IMAGES': 1, **add_envs},
179+
environment={'LOG_LEVEL': 'DEBUG', 'LANG': 'C.UTF-8', **add_envs},
152180
labels={'ecosystem': 'supervisely',
153181
'ecosystem_token': constants.TASKS_DOCKER_LABEL(),
154182
'task_id': str(self.info['task_id'])},
@@ -158,7 +186,7 @@ def spawn_container(self, add_envs=None):
158186
)
159187
self._container.reload()
160188
self.logger.debug('After spawning. Container status: {}'.format(str(self._container.status)))
161-
self.logger.info('Docker container spawned',extra={'container_id': self._container.id, 'container_name': self._container.name})
189+
self.logger.info('Docker container is spawned',extra={'container_id': self._container.id, 'container_name': self._container.name})
162190
finally:
163191
self._container_lock.release()
164192

agent/src/worker/task_factory.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,9 @@
1313
from worker.task_upload_images import TaskUploadImages
1414
from worker.task_import_local import TaskImportLocal
1515
from worker.task_custom import TaskCustom
16-
#from worker.task_pipeline import TaskPipeline
16+
from worker.task_update import TaskUpdate
1717

1818

19-
import importlib
20-
2119
_task_class_mapping = {
2220
'export': TaskDTL,
2321
'import': TaskImport,
@@ -30,7 +28,7 @@
3028
'upload_images': TaskUploadImages,
3129
'import_agent': TaskImportLocal,
3230
'custom': TaskCustom,
33-
#'pipeline': TaskPipeline
31+
'update_agent': TaskUpdate
3432
}
3533

3634

@@ -40,7 +38,6 @@ def create_task(task_msg, docker_api):
4038
sly.logger.critical('unknown task type', extra={'task_msg': task_msg})
4139
raise RuntimeError('unknown task type')
4240
task_obj = task_cls(task_msg)
43-
#@TODO: check condition for pipelines and custom tasks
44-
if issubclass(task_cls, TaskDockerized) or (task_msg['task_type'] == 'pipeline'):
41+
if issubclass(task_cls, TaskDockerized) or (task_msg['task_type'] == 'update_agent'):
4542
task_obj.docker_api = docker_api
4643
return task_obj

0 commit comments

Comments
 (0)