|
| 1 | +import unittest |
| 2 | +import contextlib |
| 3 | +import sys |
| 4 | +from test import support |
| 5 | +from test.support import import_helper |
| 6 | + |
| 7 | +try: |
| 8 | + import _testcapi |
| 9 | +except ImportError: |
| 10 | + _testcapi = None |
| 11 | + |
| 12 | +NULL = None |
| 13 | + |
| 14 | +class CAPITest(unittest.TestCase): |
| 15 | + # TODO: Test the following functions: |
| 16 | + # |
| 17 | + # PySys_Audit() |
| 18 | + # PySys_AuditTuple() |
| 19 | + |
| 20 | + maxDiff = None |
| 21 | + |
| 22 | + @support.cpython_only |
| 23 | + @unittest.skipIf(_testcapi is None, 'need _testcapi module') |
| 24 | + def test_sys_getobject(self): |
| 25 | + # Test PySys_GetObject() |
| 26 | + getobject = _testcapi.sys_getobject |
| 27 | + |
| 28 | + self.assertIs(getobject(b'stdout'), sys.stdout) |
| 29 | + with support.swap_attr(sys, '\U0001f40d', 42): |
| 30 | + self.assertEqual(getobject('\U0001f40d'.encode()), 42) |
| 31 | + |
| 32 | + self.assertIs(getobject(b'nonexisting'), AttributeError) |
| 33 | + self.assertIs(getobject(b'\xff'), AttributeError) |
| 34 | + # CRASHES getobject(NULL) |
| 35 | + |
| 36 | + @support.cpython_only |
| 37 | + @unittest.skipIf(_testcapi is None, 'need _testcapi module') |
| 38 | + def test_sys_setobject(self): |
| 39 | + # Test PySys_SetObject() |
| 40 | + setobject = _testcapi.sys_setobject |
| 41 | + |
| 42 | + value = ['value'] |
| 43 | + value2 = ['value2'] |
| 44 | + try: |
| 45 | + self.assertEqual(setobject(b'newattr', value), 0) |
| 46 | + self.assertIs(sys.newattr, value) |
| 47 | + self.assertEqual(setobject(b'newattr', value2), 0) |
| 48 | + self.assertIs(sys.newattr, value2) |
| 49 | + self.assertEqual(setobject(b'newattr', NULL), 0) |
| 50 | + self.assertFalse(hasattr(sys, 'newattr')) |
| 51 | + self.assertEqual(setobject(b'newattr', NULL), 0) |
| 52 | + finally: |
| 53 | + with contextlib.suppress(AttributeError): |
| 54 | + del sys.newattr |
| 55 | + try: |
| 56 | + self.assertEqual(setobject('\U0001f40d'.encode(), value), 0) |
| 57 | + self.assertIs(getattr(sys, '\U0001f40d'), value) |
| 58 | + self.assertEqual(setobject('\U0001f40d'.encode(), NULL), 0) |
| 59 | + self.assertFalse(hasattr(sys, '\U0001f40d')) |
| 60 | + finally: |
| 61 | + with contextlib.suppress(AttributeError): |
| 62 | + delattr(sys, '\U0001f40d') |
| 63 | + |
| 64 | + with self.assertRaises(UnicodeDecodeError): |
| 65 | + setobject(b'\xff', value) |
| 66 | + # CRASHES setobject(NULL, value) |
| 67 | + |
| 68 | + @support.cpython_only |
| 69 | + @unittest.skipIf(_testcapi is None, 'need _testcapi module') |
| 70 | + def test_sys_getxoptions(self): |
| 71 | + # Test PySys_GetXOptions() |
| 72 | + getxoptions = _testcapi.sys_getxoptions |
| 73 | + |
| 74 | + self.assertIs(getxoptions(), sys._xoptions) |
| 75 | + |
| 76 | + xoptions = sys._xoptions |
| 77 | + try: |
| 78 | + sys._xoptions = 'non-dict' |
| 79 | + self.assertEqual(getxoptions(), {}) |
| 80 | + self.assertIs(getxoptions(), sys._xoptions) |
| 81 | + |
| 82 | + del sys._xoptions |
| 83 | + self.assertEqual(getxoptions(), {}) |
| 84 | + self.assertIs(getxoptions(), sys._xoptions) |
| 85 | + finally: |
| 86 | + sys._xoptions = xoptions |
| 87 | + self.assertIs(getxoptions(), sys._xoptions) |
| 88 | + |
| 89 | + def _test_sys_formatstream(self, funname, streamname): |
| 90 | + import_helper.import_module('ctypes') |
| 91 | + from ctypes import pythonapi, c_char_p, py_object |
| 92 | + func = getattr(pythonapi, funname) |
| 93 | + func.argtypes = (c_char_p,) |
| 94 | + |
| 95 | + # Supports plain C types. |
| 96 | + with support.captured_output(streamname) as stream: |
| 97 | + func(b'Hello, %s!', c_char_p(b'world')) |
| 98 | + self.assertEqual(stream.getvalue(), 'Hello, world!') |
| 99 | + |
| 100 | + # Supports Python objects. |
| 101 | + with support.captured_output(streamname) as stream: |
| 102 | + func(b'Hello, %R!', py_object('world')) |
| 103 | + self.assertEqual(stream.getvalue(), "Hello, 'world'!") |
| 104 | + |
| 105 | + # The total length is not limited. |
| 106 | + with support.captured_output(streamname) as stream: |
| 107 | + func(b'Hello, %s!', c_char_p(b'world'*200)) |
| 108 | + self.assertEqual(stream.getvalue(), 'Hello, ' + 'world'*200 + '!') |
| 109 | + |
| 110 | + def test_sys_formatstdout(self): |
| 111 | + # Test PySys_FormatStdout() |
| 112 | + self._test_sys_formatstream('PySys_FormatStdout', 'stdout') |
| 113 | + |
| 114 | + def test_sys_formatstderr(self): |
| 115 | + # Test PySys_FormatStderr() |
| 116 | + self._test_sys_formatstream('PySys_FormatStderr', 'stderr') |
| 117 | + |
| 118 | + def _test_sys_writestream(self, funname, streamname): |
| 119 | + import_helper.import_module('ctypes') |
| 120 | + from ctypes import pythonapi, c_char_p |
| 121 | + func = getattr(pythonapi, funname) |
| 122 | + func.argtypes = (c_char_p,) |
| 123 | + |
| 124 | + # Supports plain C types. |
| 125 | + with support.captured_output(streamname) as stream: |
| 126 | + func(b'Hello, %s!', c_char_p(b'world')) |
| 127 | + self.assertEqual(stream.getvalue(), 'Hello, world!') |
| 128 | + |
| 129 | + # There is a limit on the total length. |
| 130 | + with support.captured_output(streamname) as stream: |
| 131 | + func(b'Hello, %s!', c_char_p(b'world'*100)) |
| 132 | + self.assertEqual(stream.getvalue(), 'Hello, ' + 'world'*100 + '!') |
| 133 | + with support.captured_output(streamname) as stream: |
| 134 | + func(b'Hello, %s!', c_char_p(b'world'*200)) |
| 135 | + out = stream.getvalue() |
| 136 | + self.assertEqual(out[:20], 'Hello, worldworldwor') |
| 137 | + self.assertEqual(out[-13:], '... truncated') |
| 138 | + self.assertGreater(len(out), 1000) |
| 139 | + |
| 140 | + def test_sys_writestdout(self): |
| 141 | + # Test PySys_WriteStdout() |
| 142 | + self._test_sys_writestream('PySys_WriteStdout', 'stdout') |
| 143 | + |
| 144 | + def test_sys_writestderr(self): |
| 145 | + # Test PySys_WriteStderr() |
| 146 | + self._test_sys_writestream('PySys_WriteStderr', 'stderr') |
| 147 | + |
| 148 | + |
| 149 | +if __name__ == "__main__": |
| 150 | + unittest.main() |
0 commit comments