1
+ # coding: utf-8
2
+ """API for setup/usage of Canoe COM Client interface.
3
+ """
4
+ # --------------------------------------------------------------------------
5
+ # Standard library imports
6
+ import os
7
+ import sys
8
+ import subprocess
9
+ # import win32com.client
10
+ import time
11
+ import threading
12
+ from win32com .client import *
13
+ from win32com .client .connect import *
14
+
15
+
16
+
17
+ # Vector Canoe Class
18
+ class CANoe :
19
+ def __init__ (self ):
20
+ self .application = None
21
+ # check if there is any instance of CANoe process
22
+ # output = subprocess.check_output('tasklist', shell=True)
23
+ # if CANoe process is still available, kill the process
24
+ # if "CANoe32.exe" in str(output):
25
+ # os.system("taskkill /im CANoe32.exe /f 2>nul >nul")
26
+
27
+ # re-dispatch object for CANoe Application
28
+ self .application = win32com .client .DispatchEx ("CANoe.Application" )
29
+ self .ver = self .application .Version
30
+ print ('Loaded CANoe version ' ,
31
+ self .ver .major , '.' ,
32
+ self .ver .minor , '.' ,
33
+ self .ver .Build , '...' )#, sep,''
34
+
35
+ self .Measurement = self .application .Measurement .Running
36
+ print (self .Measurement )
37
+
38
+
39
+ def open_simulation (self , cfgname ):
40
+ # open CANoe simulation
41
+ if (self .application != None ):
42
+ # check for valid file and it is *.cfg file
43
+ if os .path .isfile (cfgname ) and (os .path .splitext (cfgname )[1 ] == ".cfg" ):
44
+ self .application .Open (cfgname )
45
+ else :
46
+ raise RuntimeError ("Can't find CANoe cfg file" )
47
+ else :
48
+ raise RuntimeError ("CANoe Application is missing,unable to open simulation" )
49
+
50
+ def close_simulation (self ):
51
+ # close CANoe simulation
52
+ if (self .application != None ):
53
+ self .stop_Measurement ()
54
+ self .application .Quit ()
55
+
56
+ # make sure the CANoe is close properly, otherwise enforce taskkill
57
+ output = subprocess .check_output ('tasklist' , shell = True )
58
+
59
+ if "CANoe32.exe" in str (output ):
60
+ os .system ("taskkill /im CANoe32.exe /f 2>nul >nul" )
61
+
62
+ self .application = None
63
+
64
+ def start_Measurement (self ):
65
+ retry = 0
66
+ retry_counter = 5
67
+ # try to establish measurement within 20s timeout
68
+ while not self .application .Measurement .Running and (retry < retry_counter ):
69
+ self .application .Measurement .Start ()
70
+ time .sleep (1 )
71
+ retry += 1
72
+ if (retry == retry_counter ):
73
+ raise RuntimeWarning ("CANoe start measuremet failed, Please Check Connection!" )
74
+
75
+ def stop_Measurement (self ):
76
+ if self .application .Measurement .Running :
77
+ self .application .Measurement .Stop ()
78
+ else :
79
+ pass
80
+
81
+ def get_EnvVar (self , var ):
82
+
83
+ if (self .application != None ):
84
+ result = self .application .Environment .GetVariable (var )
85
+ return result .Value
86
+ else :
87
+ raise RuntimeError ("CANoe is not open,unable to GetVariable" )
88
+
89
+ def set_EnvVar (self , var , value ):
90
+ result = None
91
+
92
+ if (self .application != None ):
93
+ # set the environment varible
94
+ result = self .application .Environment .GetVariable (var )
95
+ result .Value = value
96
+
97
+ checker = self .get_EnvVar (var )
98
+ # check the environment varible is set properly?
99
+ while (checker != value ):
100
+ checker = self .get_EnvVar (var )
101
+ else :
102
+ raise RuntimeError ("CANoe is not open,unable to SetVariable" )
103
+
104
+ def get_SigVal (self , channel_num , msg_name , sig_name , bus_type = "CAN" ):
105
+ """
106
+ @summary Get the value of a raw CAN signal on the CAN simulation bus
107
+ @param channel_num - Integer value to indicate from which channel we will read the signal, usually start from 1,
108
+ Check with CANoe can channel setup.
109
+ @param msg_name - String value that indicate the message name to which the signal belong. Check DBC setup.
110
+ @param sig_name - String value of the signal to be read
111
+ @param bus_type - String value of the bus type - e.g. "CAN", "LIN" and etc.
112
+ @return The CAN signal value in floating point value.
113
+ Even if the signal is of integer type, we will still return by
114
+ floating point value.
115
+ @exception None
116
+ """
117
+ if (self .application != None ):
118
+ result = self .application .GetBus (bus_type ).GetSignal (channel_num , msg_name , sig_name )
119
+ return result .Value
120
+ else :
121
+ raise RuntimeError ("CANoe is not open,unable to GetVariable" )
122
+
123
+ def get_SysVar (self , ns_name , sysvar_name ):
124
+
125
+ if (self .application != None ):
126
+ systemCAN = self .application .System .Namespaces
127
+ sys_namespace = systemCAN (ns_name )
128
+ sys_value = sys_namespace .Variables (sysvar_name )
129
+ return sys_value .Value
130
+ else :
131
+ raise RuntimeError ("CANoe is not open,unable to GetVariable" )
132
+
133
+ def set_SysVar (self , ns_name , sysvar_name , var ):
134
+
135
+ if (self .application != None ):
136
+ systemCAN = self .application .System .Namespaces
137
+ sys_namespace = systemCAN (ns_name )
138
+ sys_value = sys_namespace .Variables (sysvar_name )
139
+ sys_value .Value = var
140
+ # print(sys_value)
141
+ # result = sys_value(sys_name)
142
+ #
143
+ # result = var
144
+ else :
145
+ raise RuntimeError ("CANoe is not open,unable to GetVariable" )
146
+
147
+ def get_all_SysVar (self , ns_name ):
148
+
149
+ if (self .application != None ):
150
+ sysvars = []
151
+ systemCAN = self .application .System .Namespaces
152
+ sys_namespace = systemCAN (ns_name )
153
+ sys_value = sys_namespace .Variables
154
+ for sys in sys_value :
155
+ sysvars .append (sys .Name )
156
+ sysvars .append (sys .Value )
157
+ return sysvars
158
+ else :
159
+ raise RuntimeError ("CANoe is not open,unable to GetVariable" )
160
+
161
+ #On Event Thread
162
+ class Event_Job (threading .Thread ):
163
+
164
+ def __init__ (self , name ,var ,event ):#*args, **kwargs
165
+ super (Event_Job , self ).__init__ ()#*args, **kwargs
166
+ self .__flag = threading .Event () # 用于暂停线程的标识
167
+ self .__flag .set () # 设置为True
168
+ self .__running = threading .Event () # 用于停止线程的标识
169
+ self .__running .set () # 将running设置为True
170
+
171
+ self .name = name
172
+ self .var = var
173
+ self .event = event
174
+
175
+ def run (self ):
176
+ pythoncom .CoInitialize ()
177
+ self .app = DispatchEx ('CANoe.Application' )
178
+ self .systemCAN = self .app .System .Namespaces
179
+ self .sys_namespace = self .systemCAN (self .name )
180
+ self .sys_value = self .sys_namespace .Variables
181
+ self .result = self .sys_value (self .var )
182
+ WithEvents (self .result , self .event )
183
+
184
+ while self .__running .isSet ():
185
+ self .__flag .wait () # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回
186
+ # self.func(self.txt)
187
+ # print time.time()
188
+ # pythoncom.CoInitialize()
189
+ pythoncom .PumpWaitingMessages ()
190
+ time .sleep (0.01 )
191
+ # pythoncom.CoUninitialize()
192
+
193
+ def pause (self ):
194
+ self .__flag .clear () # 设置为False, 让线程阻塞
195
+
196
+ def resume (self ):
197
+ self .__flag .set () # 设置为True, 让线程停止阻塞
198
+
199
+ def stop (self ):
200
+ self .__flag .set () # 将线程从暂停状态恢复, 如何已经暂停的话
201
+ self .__running .clear () # 设置为False
202
+ pythoncom .CoUninitialize ()
203
+
204
+ class MFL_volplus_Events (object ):
205
+ def __init__ (self ):
206
+ pass
207
+
208
+ """Handler for CANoe var events"""
209
+ def OnChange (self ,value ):
210
+ # self.Changed = True
211
+ print ("< MFL_volplus_Events var change>" )
212
+ # print(self.Name)
213
+ print (value )
214
+
215
+ class MFL_volminus_Events (object ):
216
+ def __init__ (self ):
217
+ pass
218
+
219
+ """Handler for CANoe var events"""
220
+ def OnChange (self ,value ):
221
+ # self.Changed = True
222
+ print ("< MFL_volminus_Events var change>" )
223
+ # print(self.Name)
224
+ print (value )
225
+
226
+
227
+ # #Regserver COM API
228
+ # os.chdir("C:\Program Files (x86)\Vector CANoe 9.0\Exec32")
229
+ # os.popen("canoe32 -regserver").readlines()
230
+ # #or
231
+ # os.chdir("C:\Program Files\Vector CANoe 10.0\Exec64")
232
+ # os.popen("canoe64 -regserver").readlines()
233
+
234
+ # app = CANoe()
235
+ # time.sleep(5111)
236
+ # app.start_Measurement()
237
+ # time.sleep(5)
238
+ # app.stop_Measurement()
239
+ # time.sleep(1111)
240
+ # varnames = app.get_all_SysVar("mfl")
241
+ #
242
+ # time.sleep(1)
243
+ # vol_plus = Event_Job("mfl","vol_plus",MFL_volplus_Events)
244
+ # vol_plus.start()
245
+ #
246
+ # vol_minus = Event_Job("mfl","vol_minus",MFL_volminus_Events)
247
+ # vol_minus.start()
248
+ #
249
+ # time.sleep(31)
250
+ # app.set_SysVar("mfl","vol_plus",1)
251
+ #
252
+ # print(app.get_SysVar("mfl","vol_plus"))
253
+ #
254
+ # vol_plus.stop()
255
+ # vol_minus.stop()
0 commit comments