-
Notifications
You must be signed in to change notification settings - Fork 225
/
Copy pathcucumber_json.py
188 lines (154 loc) · 6.44 KB
/
cucumber_json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""Cucumber json output formatter."""
import codecs
import json
import math
import os
import sys
import time
import six
from .feature import force_unicode
if six.PY3:
long = int
def add_options(parser):
"""Add pytest-bdd options."""
group = parser.getgroup("bdd", "Cucumber JSON")
group.addoption(
"--cucumberjson",
"--cucumber-json",
action="store",
dest="cucumber_json_path",
metavar="path",
default=None,
help="create cucumber json style report file at given path.",
)
group._addoption(
"--cucumberjson-expanded",
"--cucumber-json-expanded",
action="store_true",
dest="expand",
default=False,
help="expand scenario outlines into scenarios and fill in the step names",
)
def configure(config):
cucumber_json_path = config.option.cucumber_json_path
# prevent opening json log on slave nodes (xdist)
if cucumber_json_path and not hasattr(config, "slaveinput"):
config._bddcucumberjson = LogBDDCucumberJSON(cucumber_json_path, expand=config.option.expand)
config.pluginmanager.register(config._bddcucumberjson)
def unconfigure(config):
xml = getattr(config, "_bddcucumberjson", None)
if xml is not None:
del config._bddcucumberjson
config.pluginmanager.unregister(xml)
class LogBDDCucumberJSON(object):
"""Logging plugin for cucumber like json output."""
def __init__(self, logfile, expand=False):
logfile = os.path.expanduser(os.path.expandvars(logfile))
self.logfile = os.path.normpath(os.path.abspath(logfile))
self.features = {}
self.expand = expand
def append(self, obj):
self.features[-1].append(obj)
def _get_result(self, step, report, error_message=False):
"""Get scenario test run result.
:param step: `Step` step we get result for
:param report: pytest `Report` object
:return: `dict` in form {"status": "<passed|failed|skipped>", ["error_message": "<error_message>"]}
"""
result = {}
if report.passed or not step["failed"]: # ignore setup/teardown
result = {"status": "passed"}
elif report.failed and step["failed"]:
result = {"status": "failed", "error_message": force_unicode(report.longrepr) if error_message else ""}
elif report.skipped:
result = {"status": "skipped"}
result["duration"] = long(math.floor((10 ** 9) * step["duration"])) # nanosec
return result
def _serialize_tags(self, item):
"""Serialize item's tags.
:param item: json-serialized `Scenario` or `Feature`.
:return: `list` of `dict` in the form of:
[
{
"name": "<tag>",
"line": 2,
}
]
"""
return [{"name": tag, "line": item["line_number"] - 1} for tag in item["tags"]]
def _format_name(self, name, keys, values):
for param, value in zip(keys, values):
name = name.replace("<{}>".format(param), value)
return name
def _format_step_name(self, report, step):
examples = report.scenario["examples"]
if len(examples) == 0:
return step["name"]
# we take the keys from the first "examples", but in each table, the keys should
# be the same anyway since all the variables need to be filled in.
keys, values = examples[0]["rows"]
row_index = examples[0]["row_index"]
return self._format_name(step["name"], keys, values[row_index])
def pytest_runtest_logreport(self, report):
try:
scenario = report.scenario
except AttributeError:
# skip reporting for non-bdd tests
return
if not scenario["steps"] or report.when != "call":
# skip if there isn't a result or scenario has no steps
return
def stepmap(step):
error_message = False
if step["failed"] and not scenario.setdefault("failed", False):
scenario["failed"] = True
error_message = True
if self.expand:
# XXX The format is already 'expanded' (scenario oultines -> scenarios),
# but the step names were not filled in with parameters. To be backwards
# compatible, do not fill in the step names unless explicitly asked for.
step_name = self._format_step_name(report, step)
else:
step_name = step["name"]
return {
"keyword": step["keyword"],
"name": step_name,
"line": step["line_number"],
"embeddings": step["embeddings"],
"match": {"location": ""},
"result": self._get_result(step, report, error_message),
}
if scenario["feature"]["filename"] not in self.features:
self.features[scenario["feature"]["filename"]] = {
"keyword": "Feature",
"uri": scenario["feature"]["rel_filename"],
"name": scenario["feature"]["name"] or scenario["feature"]["rel_filename"],
"id": scenario["feature"]["rel_filename"].lower().replace(" ", "-"),
"line": scenario["feature"]["line_number"],
"description": scenario["feature"]["description"],
"tags": self._serialize_tags(scenario["feature"]),
"elements": [],
}
self.features[scenario["feature"]["filename"]]["elements"].append(
{
"keyword": "Scenario",
"id": report.item["name"],
"name": scenario["name"],
"line": scenario["line_number"],
"description": "",
"tags": self._serialize_tags(scenario),
"type": "scenario",
"steps": [stepmap(step) for step in scenario["steps"]],
}
)
def pytest_sessionstart(self):
self.suite_start_time = time.time()
def pytest_sessionfinish(self):
if sys.version_info[0] < 3:
logfile_open = codecs.open
else:
logfile_open = open
with logfile_open(self.logfile, "w", encoding="utf-8") as logfile:
logfile.write(json.dumps(list(self.features.values())))
def pytest_terminal_summary(self, terminalreporter):
terminalreporter.write_sep("-", "generated json file: %s" % (self.logfile))