RTC trigger AI continuous

  1
  2'''
  3RTC_AI - RTC_trigger_AI_continuous.py with asynchronous mode.
  4
  5This example demonstrates how to use RTC to trigger AI with continuous mode from EthanA2.
  6
  7-------------------------------------------------------------------------------------
  8Please change correct serial number or IP and port number BEFORE you run example code.
  9
 10For other examples please check:
 11    https://github.com/WPC-Systems-Ltd/WPC_Python_driver_release/tree/main/examples
 12See README.md file to get detailed usage of this example.
 13
 14Copyright (c) 2022-2025 WPC Systems Ltd. All rights reserved.
 15'''
 16
 17## WPC
 18from wpcsys import pywpc
 19
 20## Python
 21import asyncio
 22import sys
 23sys.path.insert(0, 'src/')
 24
 25
 26async def main():
 27    ## Get Python driver version
 28    print(f'{pywpc.PKG_FULL_NAME} - Version {pywpc.__version__}')
 29
 30    ## Create device handle
 31    dev = pywpc.EthanA2()
 32
 33    ## Connect to device
 34    try:
 35        dev.connect("192.168.1.110")  ## Depend on your device
 36    except Exception as err:
 37        pywpc.printGenericError(err)
 38        ## Release device handle
 39        dev.close()
 40        return
 41
 42    try:
 43        ## Parameters setting
 44        port = 0  ## Depend on your device
 45        mode = 2  ## 1 : N-samples, 2 : Continuous
 46        trigger_mode = 1   ## 1 : Use RTC to start AI streaming
 47        sampling_rate = 200
 48        read_points = 200
 49        read_delay = 0.5  ## [sec]
 50        mode_alarm = 0
 51        year = 2024
 52        month = 4
 53        day = 2
 54        hour = 15
 55        minute = 8
 56        second = 50
 57
 58        ## Get firmware model & version
 59        driver_info = await dev.Sys_getDriverInfo_async()
 60        print(f"Model name: {driver_info[0]}, Firmware version: {driver_info[-1]} ")
 61
 62        ## Open AI
 63        err = await dev.AI_open_async(port)
 64        print(f"AI_open_async in port {port}, status: {err}")
 65
 66        ## Set AI mode
 67        err = await dev.AI_setMode_async(port, mode)
 68        print(f"AI_setMode_async {mode} in port {port}, status: {err}")
 69
 70        ## Set AI trigger mode
 71        err = await dev.AI_setTriggerMode_async(port, trigger_mode)
 72        print(f"AI_setTriggerMode {trigger_mode} in port {port}, status: {err}")
 73
 74        ## Set AI sampling rate
 75        err = await dev.AI_setSamplingRate_async(port, sampling_rate)
 76        print(f"AI_setSamplingRate_async {sampling_rate} in port {port}, status: {err}")
 77
 78        ## Set RTC
 79        err = await dev.Sys_setRTC_async(year, month, day, hour, minute, second - 10)
 80        print(f"Set RTC to {year}-{month}-{day}, {hour}:{minute}:{second - 10}, status: {err}")
 81
 82        ## Open AI streaming
 83        err = await dev.AI_openStreaming_async(port)
 84        print(f"AI_openStreaming_async in port {port}, status: {err}")
 85
 86        ## Start RTC alarm after 10 seconds
 87        err = await dev.Sys_startRTCAlarm_async(mode_alarm, day, hour, minute, second)
 88        print(f"Alarm RTC to {year}-{month}-{day}, {hour}:{minute}:{second}, status: {err}")
 89
 90        stop_flag = 1
 91        for i in range(15):
 92            ## Read data acquisition
 93            ai_2Dlist = await dev.AI_readStreaming_async(port, read_points, read_delay)
 94            print(f"len: {len(ai_2Dlist)}, {await dev.Sys_getRTC_async()}")
 95
 96            if len(ai_2Dlist) > 0 and stop_flag == 1:
 97                ## Close AI streaming
 98                err = await dev.AI_closeStreaming_async(port)
 99                print(f"AI_closeStreaming_async in port {port}, status: {err}")
100                stop_flag = 0
101            await asyncio.sleep(1)  ## delay [sec]
102
103        ## Close AI
104        err = await dev.AI_close_async(port)
105        print(f"AI_close_async in port {port}, status: {err}")
106    except Exception as err:
107        pywpc.printGenericError(err)
108
109    finally:
110        ## Disconnect device
111        dev.disconnect()
112
113        ## Release device handle
114        dev.close()
115
116
117def main_for_spyder(*args):
118    if asyncio.get_event_loop().is_running():
119        return asyncio.create_task(main(*args)).result()
120    else:
121        return asyncio.run(main(*args))
122
123
124if __name__ == '__main__':
125    asyncio.run(main())  ## Use terminal
126    # await main()  ## Use Jupyter or IPython(>=7.0)
127    # main_for_spyder()  ## Use Spyder