RTC trigger AI continuous

  1
  2'''
  3RTC_AI - RTC_trigger_AI_continuous.py with asynchronous mode.
  4
  5This example demonstrates how to use RTC to trigger AI with continuous mode from EthanA.
  6
  7-------------------------------------------------------------------------------------
  8Please change correct serial number or IP and port number BEFORE you run example code.
  9
 10For other examples please check:
 11    https://github.com/WPC-Systems-Ltd/WPC_Python_driver_release/tree/main/examples
 12See README.md file to get detailed usage of this example.
 13
 14Copyright (c) 2022-2024 WPC Systems Ltd. All rights reserved.
 15'''
 16
 17## Python
 18import asyncio
 19
 20## WPC
 21
 22from wpcsys import pywpc
 23import time
 24
 25async def main():
 26    ## Get Python driver version
 27    print(f'{pywpc.PKG_FULL_NAME} - Version {pywpc.__version__}')
 28
 29    ## Create device handle
 30    dev = pywpc.EthanA()
 31
 32    ## Connect to device
 33    try:
 34        dev.connect("192.168.1.110") ## Depend on your device
 35    except Exception as err:
 36        pywpc.printGenericError(err)
 37        ## Release device handle
 38        dev.close()
 39        return
 40
 41    try:
 42        ## Parameters setting
 43        port = 0 ## Depend on your device
 44        mode = 2 ## 1 : N-samples, 2 : Continuous
 45        trigger_mode = 1 ## 1 : Use RTC to start AI streaming
 46        sampling_rate = 200
 47        read_points = 200
 48        read_delay = 0.5 ## second
 49        mode_alarm = 0
 50        year = 2024
 51        month = 4
 52        day = 2
 53        hour = 15
 54        minute = 8
 55        second = 50
 56
 57        ## Get firmware model & version
 58        driver_info = await dev.Sys_getDriverInfo_async()
 59        print("Model name: " + driver_info[0])
 60        print("Firmware version: " + driver_info[-1])
 61
 62        ## Open AI
 63        err = await dev.AI_open_async(port)
 64        print(f"AI_open_async in port {port}, status: {err}")
 65
 66        ## Set AI mode
 67        err = await dev.AI_setMode_async(port, mode)
 68        print(f"AI_setMode_async {mode} in port {port}, status: {err}")
 69
 70        ## Set AI trigger mode
 71        err = await dev.AI_setTriggerMode_async(port, trigger_mode)
 72        print(f"AI_setTriggerMode {trigger_mode} in port {port}, status: {err}")
 73
 74        ## Set AI sampling rate
 75        err = await dev.AI_setSamplingRate_async(port, sampling_rate)
 76        print(f"AI_setSamplingRate_async {sampling_rate} in port {port}, status: {err}")
 77
 78        ## Set RTC
 79        err = await dev.Sys_setRTC_async(year, month, day, hour, minute, second-10)
 80        print(f"Set RTC to {year}-{month}-{day}, {hour}:{minute}:{second-10}, status: {err}")
 81
 82        ## Open AI streaming
 83        err = await dev.AI_openStreaming_async(port)
 84        print(f"AI_openStreaming_async in port {port}, status: {err}")
 85
 86        ## Start RTC alarm after 10 seconds
 87        err = await dev.Sys_startRTCAlarm_async(mode_alarm, day, hour, minute, second)
 88        print(f"Alarm RTC to {year}-{month}-{day}, {hour}:{minute}:{second}, status: {err}")
 89
 90        stop_flag = 1
 91        for i in range(15):
 92            ## Read data acquisition
 93            ai_2Dlist = await dev.AI_readStreaming_async(port, read_points, read_delay)
 94            print(f"len: {len(ai_2Dlist)}, {await dev.Sys_getRTC_async()}")
 95
 96            if len(ai_2Dlist)> 0 and stop_flag == 1 :
 97                ## Close AI streaming
 98                err = await dev.AI_closeStreaming_async(port)
 99                print(f"AI_closeStreaming_async in port {port}, status: {err}")
100                stop_flag = 0
101            await asyncio.sleep(1) ## delay [s]
102
103        ## Close AI
104        err = await dev.AI_close_async(port)
105        print(f"AI_close_async in port {port}, status: {err}")
106    except Exception as err:
107        pywpc.printGenericError(err)
108
109    ## Disconnect device
110    dev.disconnect()
111
112    ## Release device handle
113    dev.close()
114
115    return
116def main_for_spyder(*args):
117    if asyncio.get_event_loop().is_running():
118        return asyncio.create_task(main(*args)).result()
119    else:
120        return asyncio.run(main(*args))
121if __name__ == '__main__':
122    asyncio.run(main()) ## Use terminal
123    # await main() ## Use Jupyter or IPython(>=7.0)
124    # main_for_spyder() ## Use Spyder