-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathdb_connector.py
91 lines (69 loc) · 2.36 KB
/
db_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from pony.orm import *
from datetime import datetime
import env
import os
DB = Database()
class Event(DB.Entity):
model_name = Required(str)
instance_id = Required(str)
activity_id = Required(str)
timestamp = Required(datetime, precision=6)
pending = Required(StrArray)
activity_variables = Required(Json)
class RunningInstance(DB.Entity):
running = Required(bool)
instance_id = Required(str, unique=True)
def setup_db(provider=None, recreate=False):
if not provider:
provider == env.DB["provider"]
if not os.path.isdir("database"):
os.mkdir("database")
if provider == "postgres":
DB.bind(**env.DB)
else:
DB.bind(provider="sqlite", filename="database/database.sqlite", create_db=True)
DB.generate_mapping()
if recreate:
DB.drop_all_tables(with_all_data=True)
DB.create_tables()
@db_session
def add_event(
model_name, instance_id, activity_id, timestamp, pending, activity_variables
):
Event(
model_name=model_name,
instance_id=instance_id,
activity_id=activity_id,
timestamp=timestamp,
pending=pending,
activity_variables=activity_variables,
)
@db_session
def add_running_instance(instance_id):
RunningInstance(instance_id=instance_id, running=True)
@db_session
def finish_running_instance(instance):
finished_instance = RunningInstance.get(instance_id=instance)
finished_instance.running = False
@db_session
def get_running_instances_log():
log = []
running_instances = RunningInstance.select(lambda ri: ri.running == True)[:]
for instance in running_instances:
instance_dict = {}
instance_dict[instance.instance_id] = {}
events = Event.select(lambda e: e.instance_id == instance.instance_id).order_by(
Event.timestamp
)[:]
events_list = []
for event in events:
model_path = event.model_name
event_dict = {}
event_dict["activity_id"] = event.activity_id
event_dict["pending"] = event.pending
event_dict["activity_variables"] = event.activity_variables
events_list.append(event_dict)
instance_dict[instance.instance_id]["model_path"] = model_path
instance_dict[instance.instance_id]["events"] = events_list
log.append(instance_dict)
return log