Commit 17cd3d80 by source_reader

updated examples to match new execution pattern

parent a9b5300a
#!/usr/bin/python3
import argparse
from joule import CompositeModule, LocalNumpyPipe
from high_bandwidth_reader import HighBandwidthReader
from example_filter import ExampleFilter
class ExampleComposite(CompositeModule):
""" Merge reader and filter into a single module:
[reader -> filter]->
"""
async def setup(self, parsed_args,
inputs, outputs):
#1.) create nested modules
my_reader = HighBandwidthReader()
my_filter = ExampleFilter()
#2.) create local pipes for interior streams
pipe = LocalNumpyPipe(name="raw", layout="float32_1")
#3.) convert modules into tasks
# output is an interior stream (write-end)
parsed_args = argparse.Namespace(rate=100)
task1 = my_reader.run(parsed_args, pipe)
# raw is an interior stream (read-end)
# filtered is an exterior stream
parsed_args = argparse.Namespace()
task2 = my_filter.run(parsed_args,
{"raw": pipe},
{"filtered": outputs["filtered"]})
#4.) tasks are executed in the main event loop
return [task1, task2]
if __name__ == "__main__":
r = ExampleComposite()
r.start()
#!/usr/bin/python3
from joule import FilterModule, EmptyPipe
from scipy.signal import medfilt
import asyncio
WINDOW = 21
EDGE = (WINDOW-1)//2
class ExampleFilter(FilterModule):
#Implement a WINDOW sized median filter
async def run(self, parsed_args, inputs, outputs):
raw = inputs["raw"]
filtered = outputs["filtered"]
while(1):
#read new data
try:
vals= await raw.read()
except EmptyPipe:
break
#execute median filter in place
vals["data"] = medfilt(vals["data"],WINDOW)
#write out valid samples
await filtered.write(vals[EDGE:-EDGE])
#prepend trailing samples to next read
nsamples = len(vals)-2*EDGE
if(nsamples>0):
raw.consume(nsamples)
#allow other routines to execute
await asyncio.sleep(0.5)
if __name__ == "__main__":
r = ExampleFilter()
r.start()
#!/usr/bin/python3
from joule import ReaderModule, time_now
import asyncio
import numpy as np
class ExampleReader(ReaderModule):
"Example reader: generates random values"
async def run(self, parsed_args, output):
while(1):
value = np.random.rand() # data from sensor
await output.write(np.array([[time_now(), value]]))
await asyncio.sleep(1)
if __name__ == "__main__":
r = ExampleReader()
r.start()
from joule.client import FilterModule
class FilterDemo(FilterModule):
" Example filter: applies a dc offset "
def __init__(self):
super(FilterDemo, self).__init__("Demo Filter")
self.description = "one line: demo filter"
self.help = "a paragraph: this filter does x,y,z etc..."
def custom_args(self, parser):
parser.add_argument("offset", type=int, default=0,
help="apply an offset")
async def run(self, parsed_args, inputs, outputs):
stream_in = inputs["input"]
stream_out = outputs["output"]
while(1):
sarray = await stream_in.read()
sarray["data"] += parsed_args.offset
await stream_out.write(sarray)
stream_in.consume(len(sarray))
if __name__ == "__main__":
r = FilterDemo()
r.start()
#!/usr/bin/python3
from joule import ReaderModule, time_now
from joule import ReaderModule
import asyncio
import numpy as np
class HighBandwidthReader(ReaderModule):
#Produce sawtooth waveform at specified rate"
def custom_args(self, parser):
grp = parser.add_argument_group("module",
"module specific arguments")
grp.add_argument("--rate", type=float,
required=True,
help=" output rate in Hz")
async def run(self, parsed_args, output):
start_ts = time_now()
#run 5 times per second
period=1
samples_per_period=np.round(parsed_args.rate*period)
while(1):
end_ts = start_ts+period*1e6
ts = np.linspace(start_ts,end_ts,
samples_per_period,endpoint=False)
vals=np.linspace(0,33,samples_per_period)
start_ts = end_ts
await output.write(np.hstack((ts[:,None],
vals[:,None])))
await asyncio.sleep(period)
if __name__ == "__main__":
r = HighBandwidthReader()
r.start()
[Main]
name = Example Composite
# replace with correct path:
exec_cmd = /PATH/TO/example_modules/example_composite.py
[Arguments]
[Inputs]
[Outputs]
filtered = /examples/medfilt
\ No newline at end of file
[Main]
name = Example Filter
# replace with correct path:
exec_cmd = /PATH/TO/example_modules/example_filter.py
[Arguments]
[Inputs]
raw = /examples/sawtooth
[Outputs]
filtered = /examples/medfilt
\ No newline at end of file
[Main]
name = Example Reader
# replace with correct path:
exec_cmd = /PATH/TO/example_modules/example_reader.py
[Arguments]
[Inputs]
[Outputs]
output=/examples/random
\ No newline at end of file
[Main]
name = Example Reader
# replace with correct path:
exec_cmd = /PATH/TO/example_modules/high_bandwidth_reader.py
[Arguments]
rate = 10
[Inputs]
[Outputs]
output=/examples/sawtooth
\ No newline at end of file
[Main]
name = Offset Filter
# replace with correct path:
exec_cmd = /PATH/TO/example_modules/offset_filter.py
[Arguments]
offset = 10
[Inputs]
input = /examples/sawtooth
[Outputs]
output = /examples/offset
\ No newline at end of file
#!/usr/bin/python3
from joule import FilterModule, EmptyPipe
class OffsetFilter(FilterModule):
"Add offset to data "
def custom_args(self, parser):
grp = parser.add_argument_group("module","module specific arguments")
grp.add_argument("--offset",
type=int,
required=True,
help="apply an offset")
async def run(self, parsed_args, inputs, outputs):
stream_in = inputs["input"]
stream_out = outputs["output"]
while(True):
try:
sarray = await stream_in.read()
sarray["data"] += parsed_args.offset
await stream_out.write(sarray)
stream_in.consume(len(sarray))
except EmptyPipe:
break
if __name__ == "__main__":
r = OffsetFilter()
r.start()
from joule.utils.time import now as time_now
from joule.client import ReaderModule
import asyncio
import numpy as np
class ReaderDemo(ReaderModule):
"Example reader: generates incrementing values at user specified rate"
def __init__(self):
super(ReaderDemo, self).__init__("Demo Reader")
self.description = "one line: demo reader"
self.help = "a paragraph: this reader does x,y,z etc..."
def custom_args(self, parser):
parser.add_argument("rate", type=float, help="period in seconds")
async def run(self, parsed_args, output):
count = 0
while(1):
await output.write(np.array([[time_now(), count]]))
await asyncio.sleep(parsed_args.rate)
count += 1
if __name__ == "__main__":
r = ReaderDemo()
r.start()
[Main]
name=Median Filtered
description=filtered data
keep=1w
path=/examples/medfilt
datatype=float32
decimate=yes
[Element1]
name = data
[Main]
name=Offset
description=filtered data
keep=1w
path=/examples/offset
datatype=float32
decimate=yes
[Element1]
name = data
[Main]
name=Random
description=random data
keep=1w
path=/examples/random
datatype=float32
decimate=yes
[Element1]
name = data
[Main]
name=Sawtooth
description=high bw data
keep=1w
path=/examples/sawtooth
datatype=float32
decimate=yes
[Element1]
name = data
type = discrete
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment