Commit f10ac667 by source_reader

initial commit

parents
*~
*.pyc
[Jouled]
ModuleDirectory = example_configs/module_configs
StreamDirectory = example_configs/stream_configs
\ No newline at end of file
[Main]
exec_cmd = python3 filter.py
name = Demo Filter
[Source]
input = /demo/raw
[Destination]
output = /demo/filtered
\ No newline at end of file
[Main]
exec_cmd = python3 reader.py
name = Demo Reader
[Source]
[Destination]
output = /demo/raw
\ No newline at end of file
[Main]
path = /demo/filtered
datatype = int32
keep = 1w
[Element1]
name = counter
[Main]
path = /demo/raw
datatype = int32
keep = 1w
[Element1]
name = counter
from joule.client import FilterModule
class FilterDemo(FilterModule):
" Example filter: passes input to output "
def __init__(self):
super(FilterDemo, self).__init__("Demo Filter")
self.description = "one line: demo filter"
self.help = "a paragraph: this filter does x,y,z etc..."
def custom_args(self, parser):
parser.add_argument("-o", "--optional", action="store_true",
help="custom arg")
async def run(self, parsed_args, inputs, outputs):
stream_in = inputs["input"]
stream_out = outputs["output"]
if(parsed_args.optional):
print("option set")
while(1):
sarray = await stream_in.read()
await stream_out.write(sarray)
stream_in.consume(len(sarray))
if __name__ == "__main__":
r = FilterDemo()
r.start()
from joule.utils.time import now as time_now
from joule.client import ReaderModule
import asyncio
class ReaderDemo(ReaderModule):
"Example reader: generates an incrementing value every second"
def __init__(self):
super(ReaderDemo, self).__init__("Demo Reader")
self.description = "one line: demo reader"
self.help = "a paragraph: this reader does x,y,z etc..."
def custom_args(self, parser):
parser.add_argument("-o", "--optional", action="store_true",
help="custom arg")
async def run(self, parsed_args, output):
if(parsed_args.optional):
print("option set")
count = 0
while(1):
await output.write([[time_now(), count]])
await asyncio.sleep(1)
count += 1
if __name__ == "__main__":
r = ReaderDemo()
r.start()
from joule.utils.localnumpypipe import LocalNumpyPipe
import asynctest
import asyncio
import numpy as np
import numpy.matlib
import argparse
from joule.client.filters.mean import MeanFilter
class TestMeanFilter(asynctest.TestCase):
def test_computes_moving_average(self):
# data is a repeating series of 0,1,2,..N
# the moving average of this array should be N/2
# timestamps is just an increasing index
WINDOW = 5
WIDTH = 4
REPS_PER_BLOCK = 9
NUM_BLOCKS = 3
my_filter = MeanFilter()
pipe_in = LocalNumpyPipe("input", layout="float32_%d" % WIDTH)
pipe_out = LocalNumpyPipe("output", layout="float32_%d" % WIDTH)
args = argparse.Namespace(window=WINDOW, pipes="unset")
base = np.array([np.arange(x, WINDOW+x) for x in range(WIDTH)]).T
async def writer():
prev_ts = 0
for block in range(NUM_BLOCKS):
data = numpy.matlib.repmat(base, REPS_PER_BLOCK, 1)
ts = np.arange(prev_ts, prev_ts + len(data))
input_block = np.hstack((ts[:, None], data))
pipe_in.write_nowait(input_block)
await asyncio.sleep(0.1)
prev_ts = ts[-1]+1
await asyncio.sleep(0.2)
my_filter.stop()
# run reader in an event loop
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(writer()),
my_filter.run(args, {"input": pipe_in},
{"output": pipe_out})]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
# check the results
result = pipe_out.read_nowait()
# expect the output to be a constant (WINDOW-1)/2
base_output = np.ones((WINDOW * REPS_PER_BLOCK *
NUM_BLOCKS - (WINDOW-1), WIDTH))
expected = base_output*range(int(WINDOW/2), int(WINDOW/2) + WIDTH)
np.testing.assert_array_equal(expected,
result['data'])
from joule.utils.localnumpypipe import LocalNumpyPipe
import asynctest
import asyncio
import numpy as np
import argparse
from joule.client.readers.random import RandomReader
class TestRandomReader(asynctest.TestCase):
def test_generates_random_values(self):
WIDTH = 2
RATE = 100
my_reader = RandomReader(output_rate=100)
pipe = LocalNumpyPipe("output", layout="float32_%d" % WIDTH)
args = argparse.Namespace(width=WIDTH, rate=RATE, pipes="unset")
# run reader in an event loop
loop = asyncio.get_event_loop()
loop.call_later(0.1, my_reader.stop)
loop.run_until_complete(my_reader.run(args, pipe))
loop.close()
# check the results
result = pipe.read_nowait()
diffs = np.diff(result['timestamp'])
self.assertEqual(np.mean(diffs), 1/RATE*1e6)
self.assertEqual(np.shape(result['data'])[1], WIDTH)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment