Skip to content

Commit fdb8dfe

Browse files
antbaez9cornbread5
authored andcommitted
working
1 parent 2d96118 commit fdb8dfe

12 files changed

+138
-3
lines changed

.DS_Store

6 KB
Binary file not shown.

Dockerfile

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
FROM --platform=linux/x86_64 ubuntu:24.04
2+
3+
ENV DEBIAN_FRONTEND=noninteractive
4+
ENV TZ=UTC
5+
6+
RUN apt-get update && \
7+
apt-get install -y \
8+
wget \
9+
xz-utils \
10+
bzip2 \
11+
git \
12+
python3-pip \
13+
python3 \
14+
&& apt-get install -y software-properties-common \
15+
&& apt-get clean \
16+
&& rm -rf /var/lib/apt/lists/*
17+
18+
COPY requirements.txt .
19+
20+
RUN pip install -r requirements.txt --break-system-packages
21+
22+
COPY filtered/ ./filtered/
23+
24+
CMD ["python3.11", "-m", "celery", "-A", "filtered.worker", "worker", "--loglevel=info", "--concurrency=1"]

filtered/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .distributed import *
2+
from .filter import *
3+
from .split import *
4+
from .worker import *
236 Bytes
Binary file not shown.
2.01 KB
Binary file not shown.
8 KB
Binary file not shown.
1.06 KB
Binary file not shown.
826 Bytes
Binary file not shown.

filtered/distributed.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
2+
from distributaur.distributaur import create_from_config
3+
from .filter import read_json_in_batches
4+
from .worker import run_job
5+
from tqdm import tqdm
6+
import time
7+
8+
9+
if __name__ == "__main__":
10+
11+
input_filename = 'datasets/cap3d_captions.json'
12+
batch_size = 1000
13+
14+
distributaur = create_from_config()
15+
16+
max_price = 0.1
17+
max_nodes = 50
18+
docker_image = "antbaez/filter-worker:latest"
19+
module_name = "filtered.worker"
20+
21+
redis_client = distributaur.get_redis_connection()
22+
23+
rented_nodes = distributaur.rent_nodes(max_price, max_nodes, docker_image, module_name)
24+
print("Total nodes rented: ", len(rented_nodes))
25+
26+
distributaur.register_function(run_job)
27+
28+
while True:
29+
user_input = input("press r when workers are ready: ")
30+
if user_input == "r":
31+
break
32+
33+
34+
total_batches = 0
35+
36+
print("Sending tasks")
37+
tasks = []
38+
39+
json_batches = [batch for batch in read_json_in_batches(input_filename, batch_size)]
40+
print(f"number of batches: {len(json_batches)}")
41+
42+
num_batches = len(json_batches)
43+
for i in range(num_batches):
44+
45+
batch = json_batches[i]
46+
47+
total_batches += 1
48+
task = distributaur.execute_function("run_job", {
49+
"batch_index" : total_batches,
50+
"batch" : batch
51+
})
52+
53+
tasks.append(task)
54+
55+
first_task_done = False
56+
print("Tasks sent. Starting monitoring")
57+
with tqdm(total=len(tasks), unit="task") as pbar:
58+
while not all(task.ready() for task in tasks):
59+
current_tasks = sum([task.ready() for task in tasks])
60+
pbar.update(current_tasks - pbar.n)
61+
if current_tasks > 0:
62+
if not first_task_done:
63+
first_task_done = True
64+
first_task_start_time = time.time()
65+
66+
end_time = time.time()
67+
elapsed_time = end_time - first_task_start_time
68+
time_per_tasks = elapsed_time / current_tasks
69+
time_left = time_per_tasks * (len(tasks) - current_tasks)
70+
71+
pbar.set_postfix(
72+
elapsed=f"{elapsed_time:.2f}s", time_left=f"{time_left:.2f}"
73+
)
74+
time.sleep(2)

filtered/filter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def detect_objects(self, caption):
1818
if not objects:
1919
objects = self._extract_noun_phrases(caption)
2020

21-
print("These are the objects:", objects)
21+
# print("These are the objects:", objects)
2222
return objects
2323

2424
def _extract_noun_phrases(self, text):
@@ -147,12 +147,12 @@ def test_caption_filtering():
147147

148148
if total_filtered_count >= write_batch_size or current_batch == total_batches:
149149
write_filtered_json(output_filename, filtered_data, first_batch=first_batch, last_batch=(current_batch == total_batches))
150-
print(f"Wrote batch {current_batch}/{total_batches} with {total_filtered_count} filtered captions")
150+
# print(f"Wrote batch {current_batch}/{total_batches} with {total_filtered_count} filtered captions")
151151
filtered_data = {}
152152
total_filtered_count = 0
153153
first_batch = False
154154

155-
print("Filtering and writing completed.")
155+
# print("Filtering and writing completed.")
156156

157157
# Optionally, you can keep the test function call if you want to run tests
158158
# test_caption_filtering()

filtered/worker.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import sys
2+
from .filter import filter_captions, write_filtered_json
3+
4+
5+
def run_job(batch_index, batch):
6+
7+
output_filename = f"batch_{batch_index}"
8+
9+
filtered_batch = filter_captions(batch)
10+
write_filtered_json(output_filename, filtered_batch)
11+
12+
distributaur.upload_file(output_filename)
13+
14+
return "Task complete"
15+
16+
17+
if __name__ == "__main__" or any("celery" in arg for arg in sys.argv):
18+
from distributaur.distributaur import create_from_config
19+
20+
distributaur = create_from_config()
21+
distributaur.register_function(run_job)
22+
23+
celery = distributaur.app

requirements.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
requests
2+
fsspec
3+
celery
4+
redis
5+
huggingface_hub
6+
python-dotenv
7+
omegaconf
8+
tqdm
9+
gliner
10+
distributaur

0 commit comments

Comments
 (0)