1
+ import scrcpy_client
2
+ import io
3
+ import sys
4
+ import numpy as np
5
+ import time
6
+ import socket
7
+ import unittest
8
+ import logging
9
+
10
+ SHOWFRAMES = False
11
+
12
+ #~136 frames from the game HyperFlex, with PTS meta enabled
13
+ MOCKFILE = "mocksession_hflex_withmeta"
14
+
15
+ #These were the settings enabled during the saved session
16
+ scrcpy_client .SVR_maxSize = 600
17
+ scrcpy_client .SVR_bitRate = 999999999
18
+ scrcpy_client .SVR_tunnelForward = "true"
19
+ scrcpy_client .SVR_crop = "9999:9999:0:0"
20
+ scrcpy_client .SVR_sendFrameMeta = "true"
21
+
22
+ if SHOWFRAMES :
23
+ try :
24
+ import cv2
25
+ except ImportError :
26
+ SHOWFRAMES = False
27
+
28
+ class MockSocket ():
29
+ '''
30
+ Replay a previously recorded socket session from a file
31
+ '''
32
+ def __init__ (self , * args ):
33
+ #print("Starting Mocked Socket",str(args))
34
+ self .filename = MOCKFILE
35
+ self .fd = None
36
+
37
+ def connect (self , * args ):
38
+ #print("Connecting Mocked Socket",str(args))
39
+ self .fd = open (self .filename ,'rb' )
40
+
41
+ def recv (self , buffersize ,* args ):
42
+ ret = self .fd .read (buffersize )
43
+
44
+ return ret
45
+ def __del__ (self ):
46
+ if self .fd :
47
+ self .fd .close ()
48
+
49
+
50
+ class TestClientMockConnect (unittest .TestCase ):
51
+ def setUp (self ):
52
+
53
+ self .SCRCPY = scrcpy_client .SCRCPY_client ()
54
+ # Replace the socket with our mock filebased "socket"
55
+ scrcpy_client .socket .socket = MockSocket
56
+
57
+ self .assertTrue (self .SCRCPY .connect ())
58
+ self .assertTrue (self .SCRCPY .start_processing ())
59
+
60
+ #Give FFmpeg a moment to catch up
61
+ time .sleep (0.1 )
62
+ def test_resolution_recieved (self ):
63
+ self .assertTrue (self .SCRCPY .WIDTH > 1 )
64
+ self .assertTrue (self .SCRCPY .HEIGHT > 1 )
65
+ def test_ffmpeg_running (self ):
66
+ self .assertIs (self .SCRCPY .ffm .poll (), None )
67
+
68
+ def test_ffmpeg_detected_stream (self ):
69
+ ffinfo = '' .join (self .SCRCPY .FFmpeg_info )
70
+ self .assertTrue ("Stream #0:0 -> #0:0 (h264 -> rawvideo)" in ffinfo )
71
+
72
+ def test_frames_recieved (self ):
73
+
74
+ frames_counted = 0
75
+ while True :
76
+ frm = self .SCRCPY .get_next_frame ()
77
+ if isinstance (frm , (np .ndarray )):
78
+ self .assertEqual (len (frm .shape ),3 )
79
+ self .assertTrue (sum (frm .shape )> 3 )
80
+ frames_counted += 1
81
+ if SHOWFRAMES :
82
+ frm = cv2 .cvtColor (frm , cv2 .COLOR_RGB2BGR )
83
+ cv2 .imshow ("image" , frm )
84
+ cv2 .waitKey (1000 // 60 ) # CAP 60FPS
85
+ else :
86
+ break
87
+ self .assertTrue (frames_counted > 10 )
88
+
89
+ #print("Recieved:",frames_counted)
90
+ def tearDown (self ):
91
+
92
+ self .SCRCPY .kill_ffmpeg ()
93
+ del self .SCRCPY
94
+
95
+ if __name__ == '__main__' :
96
+ unittest .main ()
97
+
0 commit comments