AI test panel

AI_integrated_gui.PNG

  1##  Example_analog_input/main.py
  2##  This is example for AI streaming with WPC DAQ Device with synchronous mode.
  3##  Copyright (c) 2022-2024 WPC Systems Ltd. All rights reserved.
  4##  All rights reserved.
  5
  6## Python
  7import os
  8import sys
  9
 10## Third party
 11import numpy as np
 12import time
 13import threading
 14import matplotlib.animation as animation
 15from PyQt5 import QtWidgets, QtGui
 16from PyQt5.QtWidgets import QWidget, QMessageBox
 17from UI_design.Ui_example_GUI_AI import Ui_MainWindow
 18
 19## WPC
 20from wpcsys import pywpc
 21import time
 22
 23class MatplotlibWidget(QWidget):
 24    def __init__(self, parent=None):
 25        super(MatplotlibWidget, self).__init__(parent)
 26
 27class MainWindow(QtWidgets.QMainWindow):
 28    def __init__(self):
 29        super(MainWindow, self).__init__()
 30
 31        ## UI initialize
 32        self.ui = Ui_MainWindow()
 33        self.ui.setupUi(self)
 34
 35        ## Material path
 36        file_path = os.path.dirname(__file__)
 37        self.trademark_path = file_path + "\Material\\trademark.jpg"
 38        self.blue_led_path = file_path + "\Material\LED_blue.png"
 39        self.red_led_path = file_path + "\Material\LED_red.png"
 40        self.green_led_path = file_path + "\Material\LED_green.png"
 41        self.gray_led_path = file_path + "\Material\LED_gray.png"
 42
 43        ## Set trademark & LED path
 44        self.ui.lb_trademark.setPixmap(QtGui.QPixmap(self.trademark_path))
 45        self.ui.lb_led.setPixmap(QtGui.QPixmap(self.gray_led_path))
 46
 47        ## Get Python driver version
 48        print(f'{pywpc.PKG_FULL_NAME} - Version {pywpc.__version__}')
 49
 50        ## Connection flag
 51        self.connect_flag = 0
 52
 53        ## Handle declaration
 54        self.dev = None
 55
 56        ## Plot parameter
 57        self.plot_y_min = -10
 58        self.plot_y_max = 10
 59
 60        ## List parameter
 61        self.channel_list = []
 62
 63        for j in range(8):
 64            self.channel_list.append([])
 65        self.plot_total_times = 0
 66
 67        ## Define callback events
 68        self.ui.btn_connect.clicked.connect(self.connectEvent)
 69        self.ui.btn_disconnect.clicked.connect(self.disconnectEvent)
 70        self.ui.btn_AIStart.clicked.connect(self.startAIEvent)
 71        self.ui.btn_AIStop.clicked.connect(self.stopAIEvent)
 72        self.ui.lineEdit_samplingRate.returnPressed.connect(self.setSamplingRateEvent)
 73        self.ui.lineEdit_numSamples.returnPressed.connect(self.setNumofSampleEvent)
 74        self.ui.comboBox_aiMode.currentIndexChanged.connect(self.chooseAIModeEvent)
 75        self.ui.lineEdit_yscaleMax.returnPressed.connect(self.setYscaleMaxEvent)
 76        self.ui.lineEdit_yscaleMin.returnPressed.connect(self.setYscaleMinEvent)
 77
 78        ## Define thread
 79        self.killed = False
 80        self.ai_lock = threading.Lock() ## Define thread lock
 81        self.ai_lock.acquire() ## Acquire thread lock
 82        self.ai_stream_thread = threading.Thread(target=self.AIStreamThread) ## Create thread
 83        self.ai_stream_thread.start() ## Start thread
 84
 85        ## Plotting
 86        self.plotInitial()
 87        self.plotAnimation()
 88
 89    def closeEvent(self, event):
 90        if self.dev is not None:
 91            ## Disconnect device
 92            self.disconnectEvent()
 93            ## Release device handle
 94            self.dev.close()
 95        else:
 96            self.killed = True
 97
 98    def selectHandle(self):
 99        handle_idx = int(self.ui.comboBox_handle.currentIndex())
100        if handle_idx == 0:
101            self.dev = pywpc.STEM()
102        elif handle_idx == 1:
103            self.dev = pywpc.EthanA()
104        elif handle_idx == 2:
105            self.dev = pywpc.USBDAQF1AD()
106        elif handle_idx == 3:
107            self.dev = pywpc.USBDAQF1AOD()
108        elif handle_idx == 4:
109            self.dev = pywpc.WifiDAQE3A()
110
111    def updateParam(self):
112        ## Get IP or serial_number from GUI
113        self.ip = self.ui.lineEdit_IP.text()
114
115        ## Get port from GUI
116        self.port = int(self.ui.comboBox_port.currentIndex())
117
118        ## Get AI mode from UI
119        self.mode = self.ui.comboBox_aiMode.currentIndex()
120
121        ## Get Sampling rate from UI
122        self.ai_sampling_rate = float(self.ui.lineEdit_samplingRate.text())
123
124        ## Get NumSamples from UI
125        self.ai_n_samples = int(self.ui.lineEdit_numSamples.text())
126
127    def connectEvent(self):
128        if self.connect_flag == 1:
129            return
130
131        ## Select handle
132        self.selectHandle()
133
134        ## Update Param
135        self.updateParam()
136
137        try:
138            ## Connect to device
139            self.dev.connect(self.ip)
140        except pywpc.Error as err:
141            return
142
143        ## Open AI port
144        self.dev.AI_open(self.port)
145
146        ## Change LED status
147        self.ui.lb_led.setPixmap(QtGui.QPixmap(self.blue_led_path))
148
149        ## Change connection flag
150        self.connect_flag = 1
151
152    def disconnectEvent(self):
153        if self.connect_flag == 0:
154            return
155
156        ## close AI port
157        self.dev.AI_close(self.port)
158
159        ## Disconnect device
160        self.dev.disconnect()
161
162        ## Change LED status
163        self.ui.lb_led.setPixmap(QtGui.QPixmap(self.green_led_path))
164
165        self.killed = True
166
167        ## Change connection flag
168        self.connect_flag = 0
169
170    def AIStreamThread(self):
171        while not self.killed:
172            self.ai_lock.acquire()
173            self.ai_lock.release()
174            if self.dev != None:
175                ## data acquisition
176                data = self.dev.AI_readStreaming(self.port, 200, 1)
177                if len(data) > 0:
178                    self.setDisplayPlotNums(data, self.ai_n_samples)
179            time.sleep(0.05)
180
181    def startAIEvent(self):
182        ## Check connection status
183        if self.checkConnectionStatus() == False:
184            return
185
186        ## Update Param
187        self.updateParam()
188
189        ## On Demand
190        if self.mode == 0:
191            data = self.dev.AI_readOnDemand(self.port)
192            self.setDisplayPlotNums([data], self.ai_n_samples)
193        ## N-Samples
194        elif self.mode == 1:
195            self.dev.AI_start(self.port)
196            data = self.dev.AI_readStreaming(self.port, 200, 1)
197            if len(data) > 0:
198                self.setDisplayPlotNums(data, self.ai_n_samples)
199        ## Continuous
200        elif self.mode == 2:
201            self.dev.AI_start(self.port)
202            self.ai_lock.acquire(False)
203            self.ai_lock.release()
204
205    def stopAIEvent(self):
206        ## Check connection status
207        if self.checkConnectionStatus() == False:
208            return
209
210        ## Update Param
211        self.updateParam()
212
213        ## Continuous
214        if self.mode == 2:
215            ## Send AI stop
216            self.dev.AI_stop(self.port)
217            self.ai_lock.acquire() ## Acquire thread lock
218
219    def chooseAIModeEvent(self, *args):
220        ## Check connection status
221        if self.checkConnectionStatus() == False:
222            return
223
224        ## Update Param
225        self.updateParam()
226
227        ## Send AI mode
228        self.dev.AI_setMode(self.port, self.mode)
229
230    def setSamplingRateEvent(self):
231        ## Check connection status
232        if self.checkConnectionStatus() == False:
233            return
234
235        ## Update Param
236        self.updateParam()
237
238        ## Send set sampling rate
239        self.dev.AI_setSamplingRate(self.port, self.ai_sampling_rate)
240
241    def setNumofSampleEvent(self):
242        ## Check connection status
243        if self.checkConnectionStatus() == False:
244            return
245
246        ## Update Param
247        self.updateParam()
248
249        ## Send set number of samples
250        self.dev.AI_setNumSamples(self.port, self.ai_n_samples)
251
252    def plotInitial(self):
253        self.checkboxstatus = [1 for x in range(8)]
254        self.matplotlibwidget = MatplotlibWidget()
255        for i in range(8):
256            self.ui.MplWidget.canvas.axes.plot([0], [0], alpha=self.checkboxstatus[i])
257
258    def plotAnimation(self):
259        ## Rearrage x-axis data according to data amount
260        self.ani = animation.FuncAnimation(self.ui.MplWidget, self.plotChart, self.plotGetAxisData, interval=100, repeat=True)
261        self.ui.MplWidget.canvas.draw()
262
263    def plotGetAxisData(self):
264        # Get ch0~ch7 checkbox status from MainUI window
265        list_ch = [self.ui.cb_ch0, self.ui.cb_ch1, self.ui.cb_ch2, self.ui.cb_ch3, self.ui.cb_ch4, self.ui.cb_ch5,self.ui.cb_ch6, self.ui.cb_ch7]
266        for i in range(8):
267            self.checkboxstatus[i] = int(list_ch[i].isChecked())
268
269        ## Get xmin, xmax and x list
270        m = len(self.channel_list[0])
271        x_max = self.plot_total_times
272        x_min = max(x_max - m, 0)
273        x_list = list(range(x_min, x_max))
274
275        ## Get xticks
276        if m > 5:
277            dx = m // 6
278            ticks = np.arange(x_min, x_max, dx)
279        else:
280            ticks = np.arange(x_min, x_max)
281        yield x_list, ticks, x_min, x_max, self.checkboxstatus
282
283    def plotChart(self, update):
284        ## Define update value
285        x_list, ticks, x_min, x_max, self.checkboxstatus = update
286
287        ## Clear all axes info
288        self.ui.MplWidget.canvas.axes.clear()
289
290        ## Plot 8 channels data
291        try:
292            for i in range(8):
293                self.ui.MplWidget.canvas.axes.plot(x_list, self.channel_list[i], alpha=self.checkboxstatus[i],
294                                                marker='o', markersize=2)
295        except:
296            print("err_xlist " + str(len(x_list)))
297            print("err_ylist " + str(len(self.channel_list[i])))
298
299        ## Set x,y limit
300        self.ui.MplWidget.canvas.axes.set_ylim(float(self.plot_y_min) * 1.05, float(self.plot_y_max) * 1.05)
301        # self.ui.MplWidget.canvas.axes.set_xlim(x_min, x_max)
302
303        ## Set xtickslabel
304        self.ui.MplWidget.canvas.axes.set_xticks(ticks)
305        new_ticks = self.plotConvertXtick(ticks)
306        self.ui.MplWidget.canvas.axes.set_xticklabels(new_ticks)
307
308        ## Set label
309        self.ui.MplWidget.canvas.axes.set_xlabel("Time (s)", fontsize=12)
310        self.ui.MplWidget.canvas.axes.set_ylabel("Voltage (V)", fontsize=12)
311
312        ## Set legend
313        self.ui.MplWidget.canvas.axes.legend(('ch0', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7'),
314                                            loc='center left', shadow=False, fontsize=10, bbox_to_anchor=(1, 0.75),
315                                            facecolor='#f0f0f0')
316        ## Set grid
317        self.ui.MplWidget.canvas.axes.grid(color='#bac3d1', linestyle='-', linewidth=0.8)  # grid
318
319    def plotConvertXtick(self, xtick):
320        return ["{:.2f}".format(x / self.ai_sampling_rate) for i, x in enumerate(xtick)]
321
322    def setDisplayPlotNums(self, data, nums):
323        data = np.array(data)
324        self.plot_total_times += len(data)
325        ## for plotting
326        for k in range(8):
327            self.channel_list[k].extend(data[:, k])
328            self.channel_list[k] = self.channel_list[k][-(nums):]
329
330    def setYscaleMaxEvent(self):
331        ## Get yscaleMax from MainUI window
332        self.plot_y_max = self.ui.lineEdit_yscaleMax.text()
333
334    def setYscaleMinEvent(self):
335        ## Get yscaleMin from MainUI window
336        self.plot_y_min = self.ui.lineEdit_yscaleMin.text()
337
338    def checkConnectionStatus(self):
339        if self.connect_flag == 0:
340            QMessageBox.information(self, "Error Messages", "Please connect server first.", QMessageBox.Ok)
341            return False
342        else:
343            return True
344
345if __name__ == "__main__":
346    app = QtWidgets.QApplication([])
347    WPC_main_ui = MainWindow()
348    WPC_main_ui.show()
349    sys.exit(app.exec_())