数据采集"/>
Python的DAQ数据采集
1.DAQ的driver下载 =kA00Z0000019Pf1SAE&l=zh-CN
2.github 里的下载,读使用说明read.me
.rst
按照操作配置,使用python 相应命令实现数据连续采集
3.python程序
#!/usr/bin/env python3
# from pyqtgraph.Qt import QtCore, QtGui
import sys
from pyqtgraph.Qt import QtGui, QtCore
import pyqtgraph as pg
import pprint
import nidaqmx
from nidaqmx.constants import AcquisitionType, TaskMode
from nidaqmx.errors import DaqError
import numpy as np
from scipy.io import savemat
from scipy import signal
from time import sleep
import datetime
pp = pprint.PrettyPrinter(indent=4)# print setup
system = nidaqmx.system.System.local()
try:dev = system.devices.device_names[1]#according to the device numberprint(dev)except:print("check Nidaqmx driver or python version3.x or DO YOU connect the DAQ?")sys.exit()# plot cure sim# QtGui.QApplication.setGraphicsSystem('raster')
app = QtGui.QApplication([]) #GUI
win = pg.GraphicsLayoutWidget(show=True, title="Basic plotting examples")
win.resize(1000, 600) #1000*600
win.setWindowTitle('pyqtgraph example: Plotting')# Enable antialiasing for prettier plots
pg.setConfigOptions(antialias=True)p1 = win.addPlot(title="mmg updating plot")
# define a filter,filter from scipy signal
b, a = signal.butter(8, [0.01, 0.2], 'bandpass')filtered_data = {'ch1': [], 'ch2': [], 'ch3': [], 'ch4': []}
"""
setup require task and then get data,dictionary
"""with nidaqmx.Task() as task:task.ai_channels.add_ai_voltage_chan(dev+"/ai0:3")task.timing.cfg_samp_clk_timing(rate=200, sample_mode=AcquisitionType.CONTINUOUS)# freq=1k,continuously get data# Python 2.X does not have nonlocal keyword.non_local_var = {'ch1': [], 'ch2': [], 'ch3': [], 'ch4': []}w_start = 0window = 30def callback(task_handle, every_n_samples_event_type,number_of_samples, callback_data):# print('Every N Samples callback invoked.')samples = np.array(task.read(number_of_samples_per_channel=200))#tranfer to array# print(np.shape(samples))non_local_var['ch1'].extend(samples[0, :])non_local_var['ch2'].extend(samples[1, :])non_local_var['ch3'].extend(samples[2, :])non_local_var['ch4'].extend(samples[3, :])data_ = signal.filtfilt(b, a, samples)filtered_data['ch1'].extend(data_[0, :])filtered_data['ch2'].extend(data_[1, :])filtered_data['ch3'].extend(data_[2, :])filtered_data['ch4'].extend(data_[3, :])# print(data_filtered)return 0task.register_every_n_samples_acquired_into_buffer_event(200, callback)task.start()curve1 = p1.plot(non_local_var['ch1'], pen=(255, 0, 0), name="channel1")curve2 = p1.plot(non_local_var['ch2'], pen=(0, 255, 0), name="channel2")curve3 = p1.plot(non_local_var['ch3'], pen=(0, 0, 255), name="channel3")curve4 = p1.plot(non_local_var['ch4'], pen=(127, 127, 0), name="channel4")#ch1 use channel1 ,pen colorp1.setXRange(0, 500) # data range start_time = pg.ptime.time()win.nextRow()p2 = win.addPlot()curve1_filtered = p2.plot(filtered_data['ch1'], pen=(255, 0, 0), name="channel1")curve2_filtered = p2.plot(filtered_data['ch2'], pen=(0, 255, 0), name="channel2")curve3_filtered = p2.plot(filtered_data['ch3'], pen=(0, 0, 255), name="channel3")curve4_filtered = p2.plot(filtered_data['ch4'], pen=(127, 127, 0), name="channel4")p2.setXRange(0, 500)def update():global curve1, curve2, curve3, curve4, window, p1, w_start, p2, curve1_filtered, curve2_filtered, curve3_filtered, curve4_filteredcurve1.setData(non_local_var['ch1'][w_start:-1])curve2.setData(non_local_var['ch2'][w_start:-1])curve3.setData(non_local_var['ch3'][w_start:-1])curve4.setData(non_local_var['ch4'][w_start:-1])curve1_filtered.setData(filtered_data['ch1'][w_start:-1])curve2_filtered.setData(filtered_data['ch2'][w_start:-1])curve3_filtered.setData(filtered_data['ch3'][w_start:-1])curve4_filtered.setData(filtered_data['ch4'][w_start:-1])# acummlite the curve# curve1.setData(non_local_var['ch1'])# curve2.setData(non_local_var['ch2'])# curve3.setData(non_local_var['ch3'])# curve4.setData(non_local_var['ch4'])w_start = w_start+window# p1.enableAutoRange('xy',False)## stop auto scaling# print(pg.ptime.time()-start_time)timer = QtCore.QTimer()timer.timeout.connect(update)# connectsleep(0.5) # delay x sprint(pg.ptime.time()-start_time)timer.start(1000) # every x ms go to func updateinput('Running task. Press Enter to stop and see number of ''accumulated samples.\n')#wait the input to stop task.close()print(len(non_local_var['ch1']))
today = datetime.date.today()
date = str(today)
try:dir_name = input('input a dir where you want to save the file:\n')import osos.chdir(dir_name)
except FileNotFoundError as e:print(e) filename = input('input a saving name:\n')
if input("save original data? y/n\n") == 'y':savemat('{0}_{1}_filtered.mat'.format(date, filename), non_local_var)
savemat('{0}_{1}.mat'.format(date, filename), filtered_data)
4.完成数据采集
通过输入按键的命令实现数据的task.close
5.数据的实时显示
上面显示原始数据
下面显示滤波后的数据
注意:
1.下载好必须的python库
2.设置好update频率(太快无法显示)
3.通过定时器实现了update
4.可以通过thread线程优化
该结果用于window,同时适用于树莓派,ubuntu程序
更多推荐
Python的DAQ数据采集
发布评论