-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuild.py
188 lines (156 loc) · 6.14 KB
/
build.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import os
import json
import subprocess
import sqlite3
from packaging.version import Version
# Directories
TEMP_DIR = "temp_repos"
DATA_DIR = "data"
CATALOGS_FILE = os.path.join(DATA_DIR, "catalogs.json")
# Create necessary directories if they don't exist
os.makedirs(DATA_DIR, exist_ok=True)
# Clear previous catalog metadata
catalogs = []
# Function to clone or update a Git repository
def clone_or_pull_repo(repo_url, repo_path):
if os.path.isdir(repo_path):
print(f"Pulling latest changes for {repo_path}...")
subprocess.run(["git", "-C", repo_path, "pull"], check=True)
else:
print(f"Cloning repository: {repo_url}...")
subprocess.run(["git", "clone", repo_url, repo_path], check=True)
# Function to fetch related data for a solution
def fetch_related_data(cursor, solution_id):
# Fetch authors
cursor.execute("""
SELECT author.name
FROM solution_author
JOIN author ON solution_author.author_id = author.author_id
WHERE solution_author.solution_id = ?
""", (solution_id,))
authors = [row[0] for row in cursor.fetchall()]
# Fetch arguments
cursor.execute("""
SELECT argument.name, argument.type, argument.description, argument.default_value, argument.required
FROM solution_argument
JOIN argument ON solution_argument.argument_id = argument.argument_id
WHERE solution_argument.solution_id = ?
""", (solution_id,))
arguments = [{"name": row[0], "type": row[1], "description": row[2], "default_value": row[3], "required": bool(row[4])} for row in cursor.fetchall()]
# Fetch citations
cursor.execute("""
SELECT citation.text, citation.doi, citation.url
FROM solution_citation
JOIN citation ON solution_citation.citation_id = citation.citation_id
WHERE solution_citation.solution_id = ?
""", (solution_id,))
citations = [{"text": row[0], "doi": row[1], "url": row[2]} for row in cursor.fetchall()]
# Fetch covers
cursor.execute("""
SELECT cover.source, cover.description
FROM cover
WHERE cover.solution_id = ?
""", (solution_id,))
covers = [{"source": row[0], "description": row[1]} for row in cursor.fetchall()]
# Fetch documentation
cursor.execute("""
SELECT documentation.documentation
FROM documentation
WHERE documentation.solution_id = ?
""", (solution_id,))
documentation = [row[0] for row in cursor.fetchall()]
# Fetch tags
cursor.execute("""
SELECT tag.name
FROM solution_tag
JOIN tag ON solution_tag.tag_id = tag.tag_id
WHERE solution_tag.solution_id = ?
""", (solution_id,))
tags = [row[0] for row in cursor.fetchall()]
return {
"authors": authors,
"arguments": arguments,
"citations": citations,
"documentation": documentation,
"tags": tags,
"covers": covers
}
# Function to process a catalog repository
def process_catalog(repo_url):
# Extract repository name from URL
repo_name = os.path.splitext(os.path.basename(repo_url))[0]
repo_path = os.path.join(TEMP_DIR, repo_name)
# Clone or pull the repository
clone_or_pull_repo(repo_url, repo_path)
# Read the catalog name from album_catalog_index.json
index_file = os.path.join(repo_path, "album_catalog_index.json")
if not os.path.isfile(index_file):
print(f"Error: album_catalog_index.json not found in {repo_name}")
return
with open(index_file) as f:
catalog_data = json.load(f)
catalog_name = catalog_data.get("name")
if not catalog_name:
print(f"Error: Catalog name not found in {index_file}")
return
# Process the SQLite database
db_path = os.path.join(repo_path, "album_catalog_index.db")
if not os.path.isfile(db_path):
print(f"Error: SQLite database not found in {repo_name}")
return
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if the solution table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='solution';")
if not cursor.fetchone():
print(f"Error: 'solution' table not found in {repo_name}")
return
# Export solutions to a JSON file (including new fields)
cursor.execute("""
SELECT solution_id, "group", name, title, version, description, doi, license, album_version, album_api_version, changelog, acknowledgement
FROM solution
""")
solutions = []
for solution in cursor.fetchall():
solution_id, group, name, title, version, description, doi, license, album_version, album_api_version, changelog, acknowledgement = solution
related_data = fetch_related_data(cursor, solution_id)
solutions.append({
"group": group,
"name": name,
"title": title,
"version": version,
"description": description,
"doi": doi,
"license": license,
"album_version": album_version,
"album_api_version": album_api_version,
"changelog": changelog,
"acknowledgement": acknowledgement,
**related_data
})
# Sort solutions by version using SemVer
solutions.sort(key=lambda s: Version(s["version"]), reverse=True)
solutions_file = os.path.join(DATA_DIR, f"{catalog_name}.json")
with open(solutions_file, "w") as f:
json.dump(solutions, f, indent=2)
print(f"Solutions for {catalog_name} saved in {solutions_file}")
# Add the catalog name and URL to the catalog list
catalogs.append({
"name": catalog_name,
"url": repo_url
})
# Close the database connection
conn.close()
# Read the catalog URLs from catalogs.txt
with open("catalogs.txt") as f:
repo_urls = f.read().splitlines()
# Process each repository
os.makedirs(TEMP_DIR, exist_ok=True)
for repo_url in repo_urls:
process_catalog(repo_url)
# Write the catalogs.json file (store name and URL for each catalog)
with open(CATALOGS_FILE, "w") as f:
json.dump(catalogs, f, indent=2)
# Clean up temporary directory
subprocess.run(["rm", "-rf", TEMP_DIR])
print("All catalogs processed successfully.")