Tutorial multiple loops thread

  1'''
  2Tutorial - multiple_loops_thread.py with asynchronous mode.
  3
  4This example project demonstrates how to use two thread to get RTC & print string from USBDAQF1CD.
  5
  6-------------------------------------------------------------------------------------
  7Please change correct serial number or IP and port number BEFORE you run example code.
  8
  9For other examples please check:
 10    https://github.com/WPC-Systems-Ltd/WPC_Python_driver_release/tree/main/examples
 11See README.md file to get detailed usage of this example.
 12
 13Copyright (c) 2022-2025 WPC Systems Ltd. All rights reserved.
 14'''
 15## WPC
 16from wpcsys import pywpc
 17
 18## Python
 19import asyncio
 20import threading
 21import time
 22import sys
 23sys.path.insert(0, 'src/')
 24
 25
 26async def getRTC(handle, delay=1):
 27    data = await handle.Sys_getRTC_async()
 28    print("RTC Time:" + str(data))
 29    await asyncio.sleep(delay)   ## delay [sec]
 30
 31
 32async def printString(handle, delay=1):
 33    print("WPC Systems Ltd")
 34    await asyncio.sleep(delay)   ## delay [sec]
 35
 36
 37def RTC_thread(handle, delay):
 38    while True:
 39        asyncio.run(getRTC(handle, delay))
 40        time.sleep(1)  ## delay [sec]
 41
 42
 43def Print_thread(handle, delay):
 44    while True:
 45        asyncio.run(printString(handle, delay))
 46        time.sleep(1)  ## delay [sec]
 47
 48
 49async def main():
 50    ## Get Python driver version
 51    print(f'{pywpc.PKG_FULL_NAME} - Version {pywpc.__version__}')
 52
 53    ## Create device handle
 54    dev = pywpc.USBDAQF1CD()
 55
 56    ## Connect to device
 57    try:
 58        dev.connect("default")  ## Depend on your device
 59    except Exception as err:
 60        pywpc.printGenericError(err)
 61        ## Release device handle
 62        dev.close()
 63        return
 64
 65    ## Perform two sync thread to query data
 66    try:
 67        ## Get firmware model & version
 68        driver_info = await dev.Sys_getDriverInfo_async()
 69        print(f"Model name: {driver_info[0]}, Firmware version: {driver_info[-1]} ")
 70
 71        _threadPrint = threading.Thread(target=Print_thread, args=[dev, 1])
 72        _threadPrint.start()
 73
 74        _threadRTC = threading.Thread(target=RTC_thread, args=[dev, 1])
 75        _threadRTC.start()
 76    except Exception as err:
 77        pywpc.printGenericError(err)
 78
 79    ## This part will execute immediately because the sync thread is running in parallel.
 80
 81    '''
 82    # Disconnect device
 83    dev.disconnect()
 84
 85    ## Release device handle
 86    dev.close()
 87    '''
 88
 89
 90def main_for_spyder(*args):
 91    if asyncio.get_event_loop().is_running():
 92        return asyncio.create_task(main(*args)).result()
 93    else:
 94        return asyncio.run(main(*args))
 95
 96
 97if __name__ == '__main__':
 98    asyncio.run(main())  ## Use terminal
 99    # await main()  ## Use Jupyter or IPython(>=7.0)
100    # main_for_spyder()  ## Use Spyder