Skip to content

Commit 22a857f

Browse files
antbaez9cornbread5
authored andcommitted
filtering with distributask
1 parent fdb8dfe commit 22a857f

File tree

6 files changed

+55
-40
lines changed

6 files changed

+55
-40
lines changed

.DS_Store

-6 KB
Binary file not shown.
366 Bytes
Binary file not shown.
39 Bytes
Binary file not shown.

filtered/distributed.py

+43-32
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,37 @@
1-
2-
from distributaur.distributaur import create_from_config
1+
from distributask.distributask import create_from_config
32
from .filter import read_json_in_batches
43
from .worker import run_job
54
from tqdm import tqdm
65
import time
76

8-
97
if __name__ == "__main__":
108

11-
input_filename = 'datasets/cap3d_captions.json'
12-
batch_size = 1000
9+
input_filename = "datasets/cap3d_captions.json"
10+
batch_size = 10000
1311

14-
distributaur = create_from_config()
12+
distributask = create_from_config()
1513

16-
max_price = 0.1
17-
max_nodes = 50
14+
max_price = 0.25
15+
max_nodes = 25
1816
docker_image = "antbaez/filter-worker:latest"
1917
module_name = "filtered.worker"
2018

21-
redis_client = distributaur.get_redis_connection()
19+
redis_client = distributask.get_redis_connection()
2220

23-
rented_nodes = distributaur.rent_nodes(max_price, max_nodes, docker_image, module_name)
21+
rented_nodes = distributask.rent_nodes(
22+
max_price, max_nodes, docker_image, module_name
23+
)
2424
print("Total nodes rented: ", len(rented_nodes))
2525

26-
distributaur.register_function(run_job)
26+
distributask.register_function(run_job)
2727

2828
while True:
2929
user_input = input("press r when workers are ready: ")
3030
if user_input == "r":
3131
break
3232

33-
3433
total_batches = 0
35-
34+
3635
print("Sending tasks")
3736
tasks = []
3837

@@ -43,32 +42,44 @@
4342
for i in range(num_batches):
4443

4544
batch = json_batches[i]
46-
4745
total_batches += 1
48-
task = distributaur.execute_function("run_job", {
49-
"batch_index" : total_batches,
50-
"batch" : batch
51-
})
46+
47+
print(total_batches)
48+
task = distributask.execute_function(
49+
"run_job", {"batch_index": total_batches, "batch": batch}
50+
)
5251

5352
tasks.append(task)
5453

5554
first_task_done = False
5655
print("Tasks sent. Starting monitoring")
56+
57+
inactivity_log = {node["instance_id"]: 0 for node in rented_nodes}
58+
59+
start_time = time.time()
5760
with tqdm(total=len(tasks), unit="task") as pbar:
5861
while not all(task.ready() for task in tasks):
62+
5963
current_tasks = sum([task.ready() for task in tasks])
6064
pbar.update(current_tasks - pbar.n)
61-
if current_tasks > 0:
62-
if not first_task_done:
63-
first_task_done = True
64-
first_task_start_time = time.time()
65-
66-
end_time = time.time()
67-
elapsed_time = end_time - first_task_start_time
68-
time_per_tasks = elapsed_time / current_tasks
69-
time_left = time_per_tasks * (len(tasks) - current_tasks)
70-
71-
pbar.set_postfix(
72-
elapsed=f"{elapsed_time:.2f}s", time_left=f"{time_left:.2f}"
73-
)
74-
time.sleep(2)
65+
66+
time.sleep(1)
67+
68+
current_time = time.time()
69+
if current_time - start_time > 60:
70+
start_time = time.time()
71+
72+
for node in rented_nodes:
73+
log_response = distributask.get_node_log(node)
74+
if log_response.status_code == 200:
75+
try:
76+
last_msg = log_response.text.splitlines()[-1]
77+
if ("Task complete" in last_msg and inactivity_log[node["instance_id"]] == 0):
78+
inactivity_log[node["instance_id"]] = 1
79+
elif ("Task complete" in last_msg and inactivity_log[node["instance_id"]] == 1):
80+
distributask.terminate_nodes([node])
81+
print("node terminated")
82+
else:
83+
inactivity_log[node["instance_id"]] == 0
84+
except:
85+
pass

filtered/worker.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
11
import sys
22
from .filter import filter_captions, write_filtered_json
33

4-
54
def run_job(batch_index, batch):
65

7-
output_filename = f"batch_{batch_index}"
6+
if len(str(batch_index)) == 1:
7+
batch_num = f"0{batch_index}"
8+
else:
9+
batch_num = f"{batch_index}"
10+
11+
output_filename = f"batch_{batch_num}"
812

913
filtered_batch = filter_captions(batch)
1014
write_filtered_json(output_filename, filtered_batch)
1115

12-
distributaur.upload_file(output_filename)
16+
distributask.upload_file(output_filename)
1317

1418
return "Task complete"
1519

1620

1721
if __name__ == "__main__" or any("celery" in arg for arg in sys.argv):
18-
from distributaur.distributaur import create_from_config
22+
from distributask.distributask import create_from_config
1923

20-
distributaur = create_from_config()
21-
distributaur.register_function(run_job)
24+
distributask = create_from_config()
25+
distributask.register_function(run_job)
2226

23-
celery = distributaur.app
27+
celery = distributask.app

requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ python-dotenv
77
omegaconf
88
tqdm
99
gliner
10-
distributaur
10+
distributask

0 commit comments

Comments
 (0)