Skip to content

Commit 70f66c5

Browse files
committed
first functional flow
start visualization abstraction in container
1 parent 257f4b0 commit 70f66c5

13 files changed

+247
-0
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
__pycache__
2+
venv
3+
/cache

README.md

+22
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,24 @@
11
# pipeline-visualization
22
how to render a pipeline flow with graph and time chart
3+
4+
# usage
5+
to run the pipeline through
6+
7+
python runner.py
8+
9+
testing containers e.g. graphviz rendering
10+
11+
create `cache/test/example.dot` with content such as
12+
```dot
13+
digraph G {
14+
A -> B;
15+
B -> C;
16+
C -> A;
17+
}
18+
```
19+
note cache is mapped in `/data` inside the container.
20+
21+
Then generate the graph with
22+
23+
run graphviz test/example.dot
24+

activate.cmd

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
venv\Scripts\activate

containers/docker-compose.yaml

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
services:
2+
graphviz:
3+
build:
4+
context: ./dockerfiles
5+
dockerfile: graphviz.Dockerfile
6+
volumes:
7+
- ../cache:/data
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Use an official Ubuntu base image
2+
FROM ubuntu:latest
3+
4+
# Install Graphviz
5+
RUN apt-get update && apt-get install -y graphviz
6+
7+
# Set the working directory
8+
WORKDIR /data
9+
10+
# Default command uses the `dot` tool to process a graph file
11+
ENTRYPOINT ["dot"]
12+
CMD ["-Tpng", "-o", "output.png"] # Default command if no argument is provided

containers/run.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import sys
2+
import subprocess
3+
import os
4+
5+
def graphviz(filename):
6+
command_line = f"docker compose run --rm graphviz -Tsvg {filename} -o {filename}.svg"
7+
subprocess.run(command_line.split(" "))
8+
return
9+
10+
def run(command):
11+
# Path to the directory containing docker-compose.yml
12+
docker_compose_dir = os.path.join(os.path.dirname(__file__))
13+
# Change the working directory
14+
os.chdir(docker_compose_dir)
15+
16+
if(command == "graphviz"):
17+
filename = sys.argv[2]
18+
graphviz(filename)
19+
20+
if __name__ == '__main__':
21+
# Default command if no argument is provided
22+
command = "build"
23+
# Check if an argument is provided and use it as the command
24+
if len(sys.argv) > 1:
25+
command = sys.argv[1]
26+
run(command)

manifest.yaml

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Stage > Job
2+
pipeline:
3+
fetch:
4+
fetch-data-1: pipeline.py#fetch
5+
process:
6+
calculate-functions: pipeline.py#calculate
7+
compute-statistics: pipeline.py#compute
8+
build:
9+
generate-website: pipeline.py#build

pipeline.py

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import time
2+
import runner as run
3+
4+
def fetch():
5+
print("- fetching file")
6+
time.sleep(0.1)
7+
my_data = {
8+
"hello":"world"
9+
}
10+
run.set_artifact(my_data,"fetch/content.json")
11+
return
12+
13+
def calculate():
14+
print("- hello from calc2")
15+
new_data = run.get_artifact("content")
16+
print(new_data)
17+
return
18+
19+
def compute():
20+
print("- hellor from comp3")
21+
return
22+
23+
def build():
24+
print("- hi build4")
25+
return

run.cmd

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
@echo off
2+
python containers\run.py %*

runner.py

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import utils as utl
2+
import importlib
3+
from os.path import dirname,splitext,basename,join
4+
import state
5+
from datetime import datetime
6+
7+
class ArtifactError(Exception):
8+
"""Custom exception for artifact management errors."""
9+
pass
10+
11+
def log_job(stage_name, job_name,start):
12+
stop = datetime.now()
13+
state.pipe.append({
14+
"stage":stage_name,
15+
"job":job_name,
16+
"start":str(start),
17+
"stop":str(stop),
18+
"duration": str(stop-start),
19+
"duration_text": utl.duration_text(stop-start)
20+
})
21+
return
22+
23+
def set_artifact(data,filepath,type="generic"):
24+
id,ext = splitext(basename(filepath))
25+
if(id in state.artifacts):
26+
raise ArtifactError(f"Artifact with ID '{id}' already exists.")
27+
path = dirname(filepath)
28+
state.artifacts[id] = {
29+
"path":path,
30+
"ext":ext,
31+
"type":type,
32+
"filepath":filepath
33+
}
34+
abs_filepath = join("cache",filepath)
35+
if(ext == ".json"):
36+
utl.save_json(data,abs_filepath)
37+
return
38+
39+
def get_artifact(id):
40+
if(id not in state.artifacts):
41+
raise ArtifactError(f"Artifact with ID '{id}' does not exist")
42+
artifact = state.artifacts[id]
43+
if(artifact["ext"] == ".json"):
44+
return utl.load_json(join("cache",artifact["filepath"]))
45+
return None
46+
47+
def run_stage(stage_name, jobs):
48+
print(f"Running stage: {stage_name}")
49+
state.stage = stage_name
50+
for job_name, job in jobs.items():
51+
module_name, function_name = job.split('#')
52+
state.job = job_name
53+
state.step = function_name
54+
module = importlib.import_module(module_name.replace('.py', ''))
55+
func = getattr(module, function_name)
56+
print(f" Executing job: {job_name}")
57+
start = datetime.now()
58+
func()
59+
log_job(stage_name, job_name,start)
60+
61+
def run_pipeline(pipeline):
62+
state.value = 1
63+
for stage, jobs in pipeline.items():
64+
run_stage(stage, jobs)
65+
utl.save_json(state.artifacts,"cache/artifacts.json")
66+
utl.save_json(state.pipe,"cache/pipeline.json")
67+
68+
69+
if __name__ == '__main__':
70+
manifest = utl.load_yaml("manifest.yaml")
71+
run_pipeline(manifest["pipeline"])

state.py

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
pipe = []
3+
artifacts = {}
4+
5+
stage = ""
6+
job = ""
7+
step = ""

test-manifest.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
- group: fetch
2+
step: fetch-data-1
3+
- group: process
4+
step: calculate-functions
5+
- group: process
6+
step: compute-statistics
7+
- group: build
8+
step: generate-website

utils.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import json
2+
import yaml
3+
from os import makedirs
4+
from os.path import dirname
5+
from datetime import timedelta
6+
7+
def load_yaml(fileName):
8+
with open(fileName, "r") as stream:
9+
try:
10+
return yaml.safe_load(stream)
11+
except yaml.YAMLError as e:
12+
print(e)
13+
return
14+
15+
def load_json(fileName):
16+
return json.load(open(fileName,encoding='utf-8'))
17+
18+
def save_json(data,fileName):
19+
path = dirname(fileName)
20+
makedirs(path, exist_ok=True)
21+
jfile = open(fileName, "w")
22+
jfile.write(json.dumps(data, indent=4))
23+
jfile.close()
24+
return
25+
26+
def duration_text(duration: timedelta):
27+
# Ensure that the duration is non-negative
28+
if duration < timedelta(0):
29+
duration = -duration
30+
# Extract days, seconds, and microseconds from the timedelta object
31+
days = duration.days
32+
seconds = duration.seconds
33+
microseconds = duration.microseconds
34+
milliseconds = microseconds // 1000 # Convert microseconds to milliseconds
35+
36+
# Calculate hours, minutes, and seconds
37+
hours = seconds // 3600
38+
minutes = (seconds % 3600) // 60
39+
seconds = seconds % 60
40+
41+
# Build the duration text string
42+
text = ""
43+
if days > 0:
44+
text += f"{days} d "
45+
if hours > 0:
46+
text += f"{hours} h "
47+
if minutes > 0:
48+
text += f"{minutes} mn "
49+
if seconds > 0:
50+
text += f"{seconds} s "
51+
if milliseconds > 0:
52+
text += f"{milliseconds} ms "
53+
54+
return text.strip() # Remove any trailing space

0 commit comments

Comments
 (0)