Skip to content

添加--sarif选项,可将执行结果转为sarif报告 #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions cli/query/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import os.path
import re
import shutil
import sqlite3
Expand Down Expand Up @@ -275,6 +276,190 @@ def merge_execute(args, gdl_list, timeout):
f.write(json.dumps(content, indent=4))


def json_to_sarif(sarif_data, json_data):
rules_dict = dict()
for bug in json_data:
# ruleName
if "ruleName" in bug:
rule_id = bug.get("ruleName")
else:
return False
# filePath
if "filePath" in bug:
file_path = bug.get("filePath")
else:
return False
# startLine
if "startLine" in bug:
start_line = bug.get("startLine")
else:
return False
# ruleDescription
if "ruleDescription" in bug:
rule_description = bug.get("ruleDescription")
else:
return False
# bug message
if "message" in bug:
message = bug.get("message")
else:
message = rule_description
level = "error"
if "level" in bug:
level = bug.get("level").lower()
if rule_id not in rules_dict:
rule_index = len(rules_dict)
rules_dict[rule_id] = rule_index
res = {
"id": rule_id,
"name": rule_id,
"shortDescription": {
"text": rule_description
},
"fullDescription": {
"text": rule_description
},
"defaultConfiguration": {
"level": level
}
}
sarif_data["runs"][0]["tool"]["driver"]["rules"].append(res)
else:
rule_index = rules_dict[rule_id]
thread_flow_locations = []
thread_flow_locations.append({
"location": {
"physicalLocation": {
"artifactLocation": {
"uri": file_path
},
"region": {
"startLine": start_line,
"endLine": start_line,
"snippet": {
"text": ""
}
},
"contextRegion": {
"startLine": start_line,
"endLine": start_line,
"snippet": {
"text": ""
}
}
},
"message": {
"text": message
}
}
})
sarif_data["runs"][0]["results"].append({
"ruleId": rule_id,
"ruleIndex": rule_index,
"level": "error" if bug.get("Importance", "").lower() == "high" else "warning",
"message": {
"text": message
},
"locations": [
{
"physicalLocation": {
"artifactLocation": {
"uri": thread_flow_locations[0]["location"]["physicalLocation"]["artifactLocation"][
"uri"]
},
"region": {
"startLine": thread_flow_locations[0]["location"]["physicalLocation"]["region"][
"startLine"],
"endLine": thread_flow_locations[0]["location"]["physicalLocation"]["region"][
"startLine"],
"snippet": {
"text": ""
}
},
"contextRegion": {
"startLine": thread_flow_locations[0]["location"]["physicalLocation"]["region"][
"startLine"],
"endLine": thread_flow_locations[0]["location"]["physicalLocation"]["region"][
"startLine"],
"snippet": {
"text": ""
}
}
},
"message": {
"text": message
},
}
],
"codeFlows": [
{
"threadFlows": [
{
"locations": thread_flow_locations
}
]
}
]
})
return True


def output_to_sarif(args):
# 脚本收集
godel_path_list = list()
for godel_dir in args.godel_dir:
godel_path_list += get_files(godel_dir, ".gdl")
godel_path_list += get_files(godel_dir, ".gs")
sarif_data = {
"version": "2.1.0",
"$schema": "https://schemastore.azurewebsites.net/schemas/json/sarif-2.1.0.json",
"runs": [
{
"tool": {
"driver": {
"name": "Custom Tool",
"informationUri": "https://example.com",
"rules": []
}
},
"results": []
}
]
}
# 获取脚本对应的结果并写入sarif报告中
for godel_query_script in godel_path_list:
output = str(Path(args.output).expanduser().resolve() / (godel_query_script.stem + "." + args.format))
if not os.path.exists(output):
logging.warning("%s does not exist, it seems that there is a problem with the %s, please check the script",
output, str(godel_query_script))
continue
with open(output, "r") as f:
output_json = json.load(f)
# 脚本单输出直接转
if isinstance(output_json, list):
status = json_to_sarif(sarif_data, output_json)
if not status:
logging.warning("The output of %s needs to include filePath, startLine, ruleName,ruleDescription. it "
"can not trans to sarif", godel_query_script)
else:
logging.info("%s trans to sarif success", godel_query_script)
# 脚本多输出分别转过去
else:
trans = True
for key, value in output_json.items():
status = json_to_sarif(sarif_data, value)
if not status:
logging.warning("The output of %s %s needs to include filePath, startLine, "
"ruleName,ruleDescription. it can not trans to sarif", godel_query_script, key)
trans = False
if trans:
logging.info("%s trans to sarif success", godel_query_script)

output = str(Path(args.output).expanduser().resolve() / ("sparrow-cli-report.sarif"))
with open(output, "w") as f:
f.write(json.dumps(sarif_data, indent=4))


def query_run(args):
# conf 检查
if not conf_check(args):
Expand All @@ -295,6 +480,8 @@ def query_run(args):
logging.warning("When merging execution, please make sure that single reservation of functions or classes "
"with the same name will not affect the execution results.")
merge_execute(args, godel_path_list, args.timeout)
if args.sarif:
output_to_sarif(args)
return
status = 1
# 目前先各自执行:
Expand All @@ -317,6 +504,8 @@ def query_run(args):
logging.error("Task %s is %s, result is %s, execution time is %.2fs.",
str(godel_query_script), "fail", "null", time.time() - start_time)
logging.error("%s execute error, please check by log", str(godel_query_script))
if args.sarif:
output_to_sarif(args)
if status == 1:
logging.info("run success")
else:
Expand Down
8 changes: 8 additions & 0 deletions cli/sparrow-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ def parse_args():
subparser_query_run.add_argument("--gdl", '-gdl', required=True, nargs="*", dest="godel_dir",
help='The location of the godel script that needs to execute')
subparser_query_run.add_argument('--verbose', action='store_true', help='Enable verbose mode')
subparser_query_run.add_argument('--sarif', action='store_true', help='Turn on the sarif report option, all '
'outputs will be merged into one sarif, '
'and the output result will be '
'sparrow-cli-report.sarif in the output '
'directory. The original godel script '
'output must contain information such as '
'filePath, startLine, ruleName, '
'ruleDescription, etc.')
subparser_query_run.add_argument('--merge', '-m', action='store_true', help='Combined execution of multiple '
'scripts,Only one function and class '
'with the same name in the script '
Expand Down