13
13
import os
14
14
import os .path
15
15
import re
16
- import subprocess
17
16
import sys
18
17
19
18
import pytest # type: ignore # no pytest in typeshed
20
- from typing import Dict , List , Tuple , Optional
19
+
20
+ from typing import List
21
21
22
22
from mypy .test .config import test_temp_dir
23
23
from mypy .test .data import DataDrivenTestCase , DataSuite
24
- from mypy .test .helpers import assert_string_arrays_equal
24
+ from mypy .test .helpers import assert_string_arrays_equal , run_command
25
25
from mypy .util import try_find_python2_interpreter
26
26
from mypy import api
27
27
@@ -79,7 +79,7 @@ def test_python_evaluation(testcase: DataDrivenTestCase) -> None:
79
79
output .append (line .rstrip ("\r \n " ))
80
80
if returncode == 0 :
81
81
# Execute the program.
82
- returncode , interp_out = run ([interpreter , program ])
82
+ returncode , interp_out = run_command ([interpreter , program ])
83
83
output .extend (interp_out )
84
84
# Remove temp file.
85
85
os .remove (program_path )
@@ -88,35 +88,7 @@ def test_python_evaluation(testcase: DataDrivenTestCase) -> None:
88
88
testcase .file , testcase .line ))
89
89
90
90
91
- def split_lines (* streams : bytes ) -> List [str ]:
92
- """Returns a single list of string lines from the byte streams in args."""
93
- return [
94
- s .rstrip ('\n \r ' )
95
- for stream in streams
96
- for s in str (stream , 'utf8' ).splitlines ()
97
- ]
98
-
99
-
100
91
def adapt_output (testcase : DataDrivenTestCase ) -> List [str ]:
101
92
"""Translates the generic _program.py into the actual filename."""
102
93
program = '_' + testcase .name + '.py'
103
94
return [program_re .sub (program , line ) for line in testcase .output ]
104
-
105
-
106
- def run (
107
- cmdline : List [str ], * , env : Optional [Dict [str , str ]] = None , timeout : int = 300
108
- ) -> Tuple [int , List [str ]]:
109
- """A poor man's subprocess.run() for 3.3 and 3.4 compatibility."""
110
- process = subprocess .Popen (
111
- cmdline ,
112
- env = env ,
113
- stdout = subprocess .PIPE ,
114
- stderr = subprocess .PIPE ,
115
- cwd = test_temp_dir ,
116
- )
117
- try :
118
- out , err = process .communicate (timeout = timeout )
119
- except subprocess .TimeoutExpired :
120
- out = err = b''
121
- process .kill ()
122
- return process .returncode , split_lines (out , err )
0 commit comments