Python的DAQ数据采集

编程入门 行业动态 更新时间:2024-10-26 12:31:26

Python的DAQ<a href=https://www.elefans.com/category/jswz/34/1766735.html style=数据采集"/>

Python的DAQ数据采集

1.DAQ的driver下载 =kA00Z0000019Pf1SAE&l=zh-CN

2.github 里的下载,读使用说明read.me

.rst

按照操作配置,使用python 相应命令实现数据连续采集

3.python程序

#!/usr/bin/env python3
# from pyqtgraph.Qt import QtCore, QtGui
import sys
from pyqtgraph.Qt import QtGui, QtCore
import pyqtgraph as pg
import pprint
import nidaqmx
from nidaqmx.constants import AcquisitionType, TaskMode
from nidaqmx.errors import DaqError
import numpy as np
from scipy.io import savemat
from scipy import signal
from time import sleep
import datetime
pp = pprint.PrettyPrinter(indent=4)# print setup
system = nidaqmx.system.System.local()
try:dev = system.devices.device_names[1]#according to the device numberprint(dev)except:print("check Nidaqmx driver or python version3.x or DO YOU connect the DAQ?")sys.exit()# plot cure sim# QtGui.QApplication.setGraphicsSystem('raster')
app = QtGui.QApplication([]) #GUI 
win = pg.GraphicsLayoutWidget(show=True, title="Basic plotting examples")
win.resize(1000, 600) #1000*600 
win.setWindowTitle('pyqtgraph example: Plotting')# Enable antialiasing for prettier plots
pg.setConfigOptions(antialias=True)p1 = win.addPlot(title="mmg updating plot")
# define a filter,filter from scipy signal 
b, a = signal.butter(8, [0.01, 0.2], 'bandpass')filtered_data = {'ch1': [], 'ch2': [], 'ch3': [], 'ch4': []}
"""
setup require task and then get data,dictionary
"""with nidaqmx.Task() as task:task.ai_channels.add_ai_voltage_chan(dev+"/ai0:3")task.timing.cfg_samp_clk_timing(rate=200, sample_mode=AcquisitionType.CONTINUOUS)# freq=1k,continuously get data# Python 2.X does not have nonlocal keyword.non_local_var = {'ch1': [], 'ch2': [], 'ch3': [], 'ch4': []}w_start = 0window = 30def callback(task_handle, every_n_samples_event_type,number_of_samples, callback_data):# print('Every N Samples callback invoked.')samples = np.array(task.read(number_of_samples_per_channel=200))#tranfer to array# print(np.shape(samples))non_local_var['ch1'].extend(samples[0, :])non_local_var['ch2'].extend(samples[1, :])non_local_var['ch3'].extend(samples[2, :])non_local_var['ch4'].extend(samples[3, :])data_ = signal.filtfilt(b, a, samples)filtered_data['ch1'].extend(data_[0, :])filtered_data['ch2'].extend(data_[1, :])filtered_data['ch3'].extend(data_[2, :])filtered_data['ch4'].extend(data_[3, :])# print(data_filtered)return 0task.register_every_n_samples_acquired_into_buffer_event(200, callback)task.start()curve1 = p1.plot(non_local_var['ch1'], pen=(255, 0, 0), name="channel1")curve2 = p1.plot(non_local_var['ch2'], pen=(0, 255, 0), name="channel2")curve3 = p1.plot(non_local_var['ch3'], pen=(0, 0, 255), name="channel3")curve4 = p1.plot(non_local_var['ch4'], pen=(127, 127, 0), name="channel4")#ch1 use channel1 ,pen  colorp1.setXRange(0, 500) # data range start_time = pg.ptime.time()win.nextRow()p2 = win.addPlot()curve1_filtered = p2.plot(filtered_data['ch1'], pen=(255, 0, 0), name="channel1")curve2_filtered = p2.plot(filtered_data['ch2'], pen=(0, 255, 0), name="channel2")curve3_filtered = p2.plot(filtered_data['ch3'], pen=(0, 0, 255), name="channel3")curve4_filtered = p2.plot(filtered_data['ch4'], pen=(127, 127, 0), name="channel4")p2.setXRange(0, 500)def update():global curve1, curve2, curve3, curve4, window, p1, w_start, p2, curve1_filtered, curve2_filtered, curve3_filtered, curve4_filteredcurve1.setData(non_local_var['ch1'][w_start:-1])curve2.setData(non_local_var['ch2'][w_start:-1])curve3.setData(non_local_var['ch3'][w_start:-1])curve4.setData(non_local_var['ch4'][w_start:-1])curve1_filtered.setData(filtered_data['ch1'][w_start:-1])curve2_filtered.setData(filtered_data['ch2'][w_start:-1])curve3_filtered.setData(filtered_data['ch3'][w_start:-1])curve4_filtered.setData(filtered_data['ch4'][w_start:-1])# acummlite the curve# curve1.setData(non_local_var['ch1'])# curve2.setData(non_local_var['ch2'])# curve3.setData(non_local_var['ch3'])# curve4.setData(non_local_var['ch4'])w_start = w_start+window# p1.enableAutoRange('xy',False)## stop auto scaling# print(pg.ptime.time()-start_time)timer = QtCore.QTimer()timer.timeout.connect(update)# connectsleep(0.5)  # delay x sprint(pg.ptime.time()-start_time)timer.start(1000)  # every x ms go to func updateinput('Running task. Press Enter to stop and see number of ''accumulated samples.\n')#wait the input to stop task.close()print(len(non_local_var['ch1']))
today = datetime.date.today()
date = str(today)
try:dir_name = input('input a dir where you want to save the file:\n')import osos.chdir(dir_name)
except FileNotFoundError as e:print(e) filename = input('input a saving name:\n')
if input("save original data? y/n\n") == 'y':savemat('{0}_{1}_filtered.mat'.format(date, filename), non_local_var)
savemat('{0}_{1}.mat'.format(date, filename), filtered_data)

4.完成数据采集

通过输入按键的命令实现数据的task.close 

5.数据的实时显示

上面显示原始数据

下面显示滤波后的数据

注意:

1.下载好必须的python库

2.设置好update频率(太快无法显示)

3.通过定时器实现了update

4.可以通过thread线程优化

该结果用于window,同时适用于树莓派,ubuntu程序

更多推荐

Python的DAQ数据采集

本文发布于:2024-02-26 02:05:15,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1701002.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数据采集   Python   DAQ

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!