Commit 51900b4b by John Donnal

added buffer for different size reads and recovery from failed streaming sessions

parent 14dcf3b8
Showing with 24 additions and 7 deletions
...@@ -109,6 +109,8 @@ class U3Reader(ReaderModule): ...@@ -109,6 +109,8 @@ class U3Reader(ReaderModule):
# set up timestamps # set up timestamps
data_ts = int(time.time()*1e6) data_ts = int(time.time()*1e6)
# initialize the buffer
self.buffer = [[] for x in range(len(channels))]
try: try:
device = self._setup_u3(channels, parsed_args.rate) device = self._setup_u3(channels, parsed_args.rate)
device.streamStart() device.streamStart()
...@@ -131,23 +133,38 @@ class U3Reader(ReaderModule): ...@@ -131,23 +133,38 @@ class U3Reader(ReaderModule):
device.close() device.close()
except LabJackPython.LabJackException as e: except LabJackPython.LabJackException as e:
print("LabJack Error: "+str(e)) print("LabJack Error: "+str(e))
except:
print("".join(i for i in traceback.format_exc()))
def build_block(self,r,data_ts,data_ts_inc,channels): def build_block(self,r,data_ts,data_ts_inc,channels):
data = [r['AIN%d'%c] for c in channels] # raw data may different sample counts per channel
rows = len(data[0]) raw_data = [r['AIN%d'%c] for c in channels]
block = np.empty((rows,len(channels)+1)) # add in any buffered data
top_ts = data_ts + rows * data_ts_inc data = [self.buffer[i]+raw_data[i] for i in range(len(channels))]
# put extra data in the buffer
shortest_row = min([len(d) for d in data])
longest_row = max([len(d) for d in data])
self.buffer = [d[shortest_row:longest_row] for d in data]
sys.stderr.write(repr(self.buffer)+"\n")
# work with uniform data size
data = [d[:shortest_row] for d in data]
nrows = shortest_row
block = np.empty((nrows,len(channels)+1))
top_ts = data_ts + nrows * data_ts_inc
ts = np.array(np.linspace(data_ts, top_ts, ts = np.array(np.linspace(data_ts, top_ts,
rows, endpoint=False), dtype=np.uint64) nrows, endpoint=False), dtype=np.uint64)
block = np.vstack((ts,*data)).T block = np.vstack((ts,*data)).T
return (top_ts, block) return (top_ts, block)
def _setup_u3(self, channels, rate): def _setup_u3(self, channels, rate):
device = u3.U3() device = u3.U3()
try:
device.streamStop()
sys.stderr.write("warning: stopped a previous stream\n")
except:
pass
device.configU3() device.configU3()
# set all IO channels to analog signals # set all IO channels to analog signals
device.configIO(FIOAnalog=0xFF, EIOAnalog=0xFF) device.configIO(FIOAnalog=0xFF, EIOAnalog=0xFF)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment