Update p-p.json #70
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: Track Repository Stats | |
on: | |
pull_request: | |
types: | |
- closed | |
jobs: | |
track-stats: | |
runs-on: ubuntu-latest | |
permissions: | |
contents: write | |
pull-requests: read | |
steps: | |
- name: Checkout repository | |
uses: actions/checkout@v4 | |
with: | |
fetch-depth: 0 | |
token: ${{ secrets.PAT_TOKEN }} | |
- name: Get changed files | |
id: changed-files | |
run: | | |
echo "Getting changed files..." | |
BASE_SHA=${{ github.event.pull_request.base.sha }} | |
HEAD_SHA=${{ github.event.pull_request.head.sha }} | |
echo "base_sha=$BASE_SHA" >> $GITHUB_ENV | |
echo "head_sha=$HEAD_SHA" >> $GITHUB_ENV | |
CHANGED_FILES=$(git diff --name-only $BASE_SHA $HEAD_SHA) | |
echo "Changed files: $CHANGED_FILES" | |
if ! echo "$CHANGED_FILES" | grep -q "^infos/"; then | |
echo "No changes in infos/ directory. Skipping workflow." | |
exit 0 | |
fi | |
# Save changed files for later use | |
echo "$CHANGED_FILES" > changed_files.txt | |
- name: Setup Python | |
uses: actions/setup-python@v4 | |
with: | |
python-version: "3.x" | |
- name: Install dependencies | |
run: | | |
python -m pip install --upgrade pip | |
pip install requests python-dateutil pytz cryptography | |
- name: Verify plugins | |
run: | | |
# Save the verification script | |
cat > verify_plugin.py <<'EOL' | |
import os | |
import json | |
import base64 | |
import shutil | |
import requests | |
import tempfile | |
import zipfile | |
import sys | |
from typing import Dict, Any | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.asymmetric import padding | |
from cryptography.hazmat.primitives.serialization import load_pem_public_key | |
from cryptography.exceptions import InvalidSignature | |
class Colors: | |
HEADER = '\033[95m' | |
BLUE = '\033[94m' | |
GREEN = '\033[92m' | |
YELLOW = '\033[93m' | |
RED = '\033[91m' | |
RESET = '\033[0m' | |
class PluginVerifier: | |
def __init__(self, official_key: str): | |
self.official_key = official_key | |
self.public_keys = {} | |
def normalize_content(self, content: str) -> str: | |
"""標準化內容的換行符""" | |
return content.replace('\r\n', '\n').replace('\r', '\n') | |
def get_all_files(self, directory: str, base_dir: str = None) -> Dict[str, str]: | |
"""獲取目錄下所有文件的內容""" | |
if base_dir is None: | |
base_dir = directory | |
results = {} | |
for item in os.listdir(directory): | |
if item.startswith('.') or item == 'signature.json': | |
continue | |
full_path = os.path.join(directory, item) | |
if os.path.isdir(full_path): | |
results.update(self.get_all_files(full_path, base_dir)) | |
else: | |
rel_path = os.path.relpath( | |
full_path, base_dir).replace('\\', '/') | |
with open(full_path, 'r', encoding='utf-8') as f: | |
content = self.normalize_content(f.read()) | |
results[rel_path] = content | |
return results | |
def verify(self, plugin_dir: str) -> Dict[str, Any]: | |
"""驗證插件""" | |
try: | |
signature_path = os.path.join(plugin_dir, 'signature.json') | |
if not os.path.exists(signature_path): | |
return {"valid": False, "error": "Missing signature.json"} | |
with open(signature_path, 'r', encoding='utf-8') as f: | |
signature_data = json.load(f) | |
print(f"{Colors.YELLOW}Signature data loaded:{Colors.RESET}") | |
print(json.dumps(signature_data, indent=2)) | |
file_hashes = signature_data.get('fileHashes') | |
signature = signature_data.get('signature') | |
key_id = signature_data.get('keyId') | |
if not file_hashes or not signature: | |
return {"valid": False, "error": "Invalid signature data format"} | |
print(f"\n{Colors.BLUE}Verifying files...{Colors.RESET}") | |
all_files = self.get_all_files(plugin_dir) | |
print(f"Found {len(all_files)} files to verify:") | |
for file_path in sorted(all_files.keys()): | |
print(f" {file_path}") | |
print(f"\n{Colors.BLUE}Checking file hashes...{Colors.RESET}") | |
for file_path, content in all_files.items(): | |
if file_path == 'trem.json': | |
continue | |
if file_path not in file_hashes: | |
return {"valid": False, "error": f"Extra file: {file_path}"} | |
actual_hash = hashes.Hash(hashes.SHA256()) | |
actual_hash.update(content.encode()) | |
actual_hash_hex = actual_hash.finalize().hex() | |
print(f"File: {file_path}") | |
print(f" Expected: {file_hashes[file_path]}") | |
print(f" Actual: {actual_hash_hex}") | |
if actual_hash_hex != file_hashes[file_path]: | |
return {"valid": False, "error": f"Modified file: {file_path}"} | |
print(f"\n{Colors.BLUE}Verifying signature...{Colors.RESET}") | |
print(f"Using key ID: {key_id or 'official'}") | |
message = json.dumps(file_hashes, sort_keys=True, | |
separators=(',', ':')).encode() | |
signature_bytes = base64.b64decode(signature) | |
print(f"Message length: {len(message)} bytes") | |
print(f"Signature length: {len(signature_bytes)} bytes") | |
print(f"Message (first 100 bytes): {message[:100]}...") | |
public_key_obj = load_pem_public_key(self.official_key.encode()) | |
public_key_obj.verify( | |
signature_bytes, | |
message, | |
padding.PKCS1v15(), | |
hashes.SHA256() | |
) | |
return { | |
"valid": True, | |
"error": None, | |
"keyId": key_id or 'official' | |
} | |
except InvalidSignature: | |
# 嘗試不同的序列化方式 | |
print( | |
f"{Colors.YELLOW}First attempt failed, trying alternative serialization...{Colors.RESET}") | |
message = json.dumps(file_hashes, separators=(',', ':')).encode() | |
try: | |
public_key_obj.verify( | |
signature_bytes, | |
message, | |
padding.PKCS1v15(), | |
hashes.SHA256() | |
) | |
return { | |
"valid": True, | |
"error": None, | |
"keyId": key_id or 'official' | |
} | |
except InvalidSignature: | |
print(f"{Colors.RED}Signature verification details:{Colors.RESET}") | |
print("- File hashes match but signature is invalid") | |
print("- This could mean:") | |
print(" 1. The signature was created with a different private key") | |
print(" 2. The file hash data was serialized differently") | |
print(" 3. The signature was generated with different parameters") | |
return { | |
"valid": False, | |
"error": "Invalid signature", | |
"keyId": key_id or 'official' | |
} | |
except Exception as e: | |
print(f"{Colors.RED}Verification error: {str(e)}{Colors.RESET}") | |
return {"valid": False, "error": str(e)} | |
def download_and_verify_plugin(org: str, repo: str, plugin_name: str, verifier: PluginVerifier): | |
"""下載並驗證插件""" | |
print(f"{Colors.BLUE}Downloading latest release from {org}/{repo}...{Colors.RESET}") | |
try: | |
# 獲取最新release | |
api_url = f"https://api.github.com/repos/{org}/{repo}/releases/latest" | |
response = requests.get(api_url) | |
response.raise_for_status() | |
release_data = response.json() | |
# 下載插件 | |
trem_file = f"{plugin_name}.trem" | |
asset_url = None | |
for asset in release_data["assets"]: | |
if asset["name"] == trem_file: | |
asset_url = asset["browser_download_url"] | |
break | |
if not asset_url: | |
raise ValueError(f"Could not find {trem_file} in latest release") | |
# 下載並解壓 | |
response = requests.get(asset_url) | |
response.raise_for_status() | |
temp_dir = tempfile.mkdtemp() | |
trem_path = os.path.join(temp_dir, trem_file) | |
with open(trem_path, "wb") as f: | |
f.write(response.content) | |
print(f"{Colors.GREEN}Downloaded {trem_file}{Colors.RESET}") | |
# 解壓縮 | |
plugin_dir = os.path.join(temp_dir, "plugin") | |
os.makedirs(plugin_dir, exist_ok=True) | |
with zipfile.ZipFile(trem_path, 'r') as zip_ref: | |
zip_ref.extractall(plugin_dir) | |
# 驗證 | |
result = verifier.verify(plugin_dir) | |
if result["valid"]: | |
print(f"{Colors.GREEN}Plugin verification successful!{Colors.RESET}") | |
return True | |
else: | |
print(f"{Colors.RED}Plugin verification failed: {result['error']}{Colors.RESET}") | |
return False | |
# 清理臨時文件 | |
shutil.rmtree(temp_dir) | |
except Exception as e: | |
print(f"{Colors.RED}Error: {str(e)}{Colors.RESET}") | |
return False | |
# 使用示例 | |
if __name__ == "__main__": | |
public_key = '''-----BEGIN PUBLIC KEY----- | |
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzQn1ouv0mfzVKJevJiq+ | |
6rV9mwCEvQpauQ2QNjy4TiwhqzqNiOPwpM3qo+8+3Ld+DUhzZzSzyx894dmJGlWQ | |
wNss9Vs5/gnuvn6PurNXC42wkxY6Dmsnp/M6g08iqGXVcM6ZWmvCZ3BzBvwExxRR | |
09KxHZVhwoMcF5Kp9l/hNZqXRgYMn3GLt+m78Hr+ZUjHiF8K9UH2TPxKRa/4ttPX | |
6nDBZxZUCwFD7Zh6RePg07JDbO5fI/UYrqZYyDPK8w9xdXtke9LbdXmMuuk/x57h | |
foRArUkhPvUk/77mxo4++3EFnTUxYMnQVuMkDaYNRu7w83abUuhsjNlL/es24HSm | |
lwIDAQAB | |
-----END PUBLIC KEY-----''' | |
if len(sys.argv) != 4: | |
print(f"Usage: {sys.argv[0]} <org> <repo> <plugin_name>") | |
sys.exit(1) | |
verifier = PluginVerifier(public_key) | |
success = download_and_verify_plugin(sys.argv[1], sys.argv[2], sys.argv[3], verifier) | |
sys.exit(0 if success else 1) | |
EOL | |
# Read and process each changed JSON file | |
while IFS= read -r file; do | |
if [[ $file == infos/* ]] && [[ $file == *.json ]]; then | |
echo "Processing $file..." | |
# Extract plugin info from JSON | |
PLUGIN_NAME=$(jq -r '.name' "$file") | |
REPO_LINK=$(jq -r '.link' "$file") | |
# Parse GitHub org and repo from link | |
if [[ $REPO_LINK =~ github.com/([^/]+)/([^/]+)$ ]]; then | |
ORG="${BASH_REMATCH[1]}" | |
REPO="${BASH_REMATCH[2]}" | |
echo "Verifying plugin: $PLUGIN_NAME from $ORG/$REPO" | |
python verify_plugin.py "$ORG" "$REPO" "$PLUGIN_NAME" | |
if [ $? -ne 0 ]; then | |
echo "Plugin verification failed for $PLUGIN_NAME" | |
exit 1 | |
fi | |
else | |
echo "Invalid repository link format in $file" | |
exit 1 | |
fi | |
fi | |
done < changed_files.txt | |
- name: Fetch repository stats | |
env: | |
GITHUB_TOKEN: ${{ secrets.PAT_TOKEN }} | |
run: | | |
python - <<EOF | |
import json | |
import os | |
import requests | |
import subprocess | |
from datetime import datetime | |
from pytz import timezone | |
from operator import itemgetter | |
def get_changed_files(): | |
base_sha = "${{ github.event.pull_request.base.sha }}" | |
head_sha = "${{ github.event.pull_request.head.sha }}" | |
cmd = f"git diff --name-only {base_sha} {head_sha}" | |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
return [f for f in result.stdout.strip().split('\n') if f.startswith('infos/') and f.endswith('.json')] | |
def get_repo_stats(owner, repo, plugin_info): | |
headers = { | |
"Authorization": f"token {os.environ.get('GITHUB_TOKEN')}", | |
"Accept": "application/vnd.github.v3+json" | |
} | |
base_url = f"https://api.github.com/repos/{owner}/{repo}" | |
repo_response = requests.get(base_url, headers=headers) | |
if repo_response.status_code != 200: | |
print(f"Failed to get repo info: {repo_response.status_code}") | |
return None | |
repo_data = repo_response.json() | |
releases_response = requests.get(f"{base_url}/releases", headers=headers) | |
releases = releases_response.json() if releases_response.status_code == 200 else [] | |
total_downloads = 0 | |
release_stats = [] | |
for release in releases: | |
release_downloads = sum(asset["download_count"] for asset in release["assets"]) | |
release_stats.append({ | |
"tag_name": release["tag_name"], | |
"name": release["name"], | |
"downloads": release_downloads, | |
"published_at": release["published_at"] | |
}) | |
total_downloads += release_downloads | |
return { | |
"name": plugin_info["name"], | |
"version": plugin_info["version"], | |
"description": plugin_info["description"], | |
"author": plugin_info["author"], | |
"dependencies": plugin_info.get("dependencies", {}), | |
"resources": plugin_info.get("resources", []), | |
"link": plugin_info["link"], | |
"repository": { | |
"full_name": repo_data["full_name"], | |
"releases": { | |
"total_count": len(releases), | |
"total_downloads": total_downloads, | |
"releases": release_stats | |
} | |
} | |
} | |
def process_plugin_files(repository_stats, changed_info_files): | |
now = datetime.now(timezone("Asia/Taipei")).strftime("%Y-%m-%d %H:%M:%S") | |
oldest_index = None | |
oldest_time = None | |
for i, obj in enumerate(repository_stats): | |
current_time = obj.get("updated_at", "") | |
if oldest_time is None or current_time < oldest_time: | |
oldest_time = current_time | |
oldest_index = i | |
for file_path in changed_info_files: | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
plugin_info = json.load(f) | |
if "link" in plugin_info and "github.com" in plugin_info["link"] and "name" in plugin_info: | |
parts = plugin_info["link"].strip("/").split("/") | |
if len(parts) >= 2: | |
owner, repo = parts[-2], parts[-1] | |
print(f"處理倉庫: {owner}/{repo} ({plugin_info['name']})") | |
stats = get_repo_stats(owner, repo, plugin_info) | |
if stats: | |
stats["updated_at"] = now | |
for i, obj in enumerate(repository_stats): | |
if obj["name"] == stats["name"]: | |
repository_stats[i] = stats | |
print(f"更新倉庫數據: {stats['name']}") | |
break | |
else: | |
repository_stats.append(stats) | |
print(f"新增倉庫數據: {stats['name']}") | |
except Exception as e: | |
print(f"處理 {file_path} 時發生錯誤: {str(e)}") | |
continue | |
if oldest_index is not None: | |
oldest_record = repository_stats[oldest_index] | |
if "link" in oldest_record and "github.com" in oldest_record["link"]: | |
parts = oldest_record["link"].strip("/").split("/") | |
if len(parts) >= 2: | |
owner, repo = parts[-2], parts[-1] | |
stats = get_repo_stats(owner, repo, oldest_record) | |
if stats: | |
oldest_record["repository"] = stats["repository"] | |
oldest_record["updated_at"] = now | |
print(f"更新最舊的倉庫狀態: {oldest_record['name']}") | |
return repository_stats | |
try: | |
print("初始化資料...") | |
os.makedirs("data", exist_ok=True) | |
changed_info_files = get_changed_files() | |
if not changed_info_files: | |
print("No plugin info files were changed in this PR") | |
exit(0) | |
stats_file = "data/repository_stats.json" | |
repository_stats = [] | |
if os.path.exists(stats_file): | |
with open(stats_file, "r", encoding="utf-8") as f: | |
repository_stats = json.load(f) | |
repository_stats = process_plugin_files(repository_stats, changed_info_files) | |
repository_stats.sort(key=lambda x: x.get("updated_at", ""), reverse=True) | |
with open(stats_file, "w", encoding="utf-8") as f: | |
json.dump(repository_stats, f, ensure_ascii=False, separators=(',', ':')) | |
print("數據更新完成!") | |
except Exception as e: | |
print(f"發生錯誤: {str(e)}") | |
raise | |
EOF | |
- name: Commit and push changes | |
run: | | |
git config --local user.email "41898282+github-actions[bot]@users.noreply.github.com" | |
git config --local user.name "github-actions[bot]" | |
git add data/repository_stats.json | |
git diff --quiet && git diff --staged --quiet || (git commit -m "Update repository statistics [skip ci]" && git push) |