Commit 7c742517 by source_reader

finished tests, added README

parent f10ac667
Example Modules
===============
.. note::
For more information read the Joule Documentation at
http://docs.wattsworth.net/joule/writing_modules.html
This repository contains reference implementations of a custom
FilterModule and ReaderModule. Unittests for both modules are
included in the respective *_test.py files. Run the tests using
nose2:
.. code-block:: bash
$> pip3 install nose2
# ...output...
$> cd example_modules
$> nose2
..
------------------------------
Ran 2 tests in 1.105s
OK
You may additionally want to run end-to-end tests using the jouled
process. The configuration directory contains an main.conf file and
corresponding stream and module configurations to run a local joule
instance. To automate this testing you can run the joule process in
Docker. See the e2e testing section in Joule for an example.
.. code-block:: bash
$> cd example_modules
$> jouled -d e2e/main.conf &
$> joule -d e2e/main.conf modules
$> fg # Ctrl-C to exit joule
......@@ -2,7 +2,7 @@ from joule.client import FilterModule
class FilterDemo(FilterModule):
" Example filter: passes input to output "
" Example filter: applies a dc offset "
def __init__(self):
super(FilterDemo, self).__init__("Demo Filter")
......@@ -10,21 +10,19 @@ class FilterDemo(FilterModule):
self.help = "a paragraph: this filter does x,y,z etc..."
def custom_args(self, parser):
parser.add_argument("-o", "--optional", action="store_true",
help="custom arg")
parser.add_argument("offset", type=float, default=0,
help="apply an offset")
async def run(self, parsed_args, inputs, outputs):
stream_in = inputs["input"]
stream_out = outputs["output"]
if(parsed_args.optional):
print("option set")
while(1):
sarray = await stream_in.read()
sarray["data"] += parsed_args.offset
await stream_out.write(sarray)
stream_in.consume(len(sarray))
if __name__ == "__main__":
r = FilterDemo()
r.start()
from joule.utils.time import now as time_now
from joule.client import ReaderModule
import asyncio
import numpy as np
class ReaderDemo(ReaderModule):
"Example reader: generates an incrementing value every second"
"Example reader: generates incrementing values at user specified rate"
def __init__(self):
super(ReaderDemo, self).__init__("Demo Reader")
......@@ -12,16 +12,13 @@ class ReaderDemo(ReaderModule):
self.help = "a paragraph: this reader does x,y,z etc..."
def custom_args(self, parser):
parser.add_argument("-o", "--optional", action="store_true",
help="custom arg")
parser.add_argument("rate", type=float, help="period in seconds")
async def run(self, parsed_args, output):
if(parsed_args.optional):
print("option set")
count = 0
while(1):
await output.write([[time_now(), count]])
await asyncio.sleep(1)
await output.write(np.array([[time_now(), count]]))
await asyncio.sleep(parsed_args.rate)
count += 1
......
......@@ -2,53 +2,43 @@ from joule.utils.localnumpypipe import LocalNumpyPipe
import asynctest
import asyncio
import numpy as np
import numpy.matlib
import argparse
from joule.client.filters.mean import MeanFilter
from filter import FilterDemo
class TestMeanFilter(asynctest.TestCase):
class TestFilter(asynctest.TestCase):
def test_computes_moving_average(self):
# data is a repeating series of 0,1,2,..N
# the moving average of this array should be N/2
# timestamps is just an increasing index
WINDOW = 5
WIDTH = 4
REPS_PER_BLOCK = 9
NUM_BLOCKS = 3
my_filter = MeanFilter()
pipe_in = LocalNumpyPipe("input", layout="float32_%d" % WIDTH)
pipe_out = LocalNumpyPipe("output", layout="float32_%d" % WIDTH)
args = argparse.Namespace(window=WINDOW, pipes="unset")
base = np.array([np.arange(x, WINDOW+x) for x in range(WIDTH)]).T
async def writer():
prev_ts = 0
for block in range(NUM_BLOCKS):
data = numpy.matlib.repmat(base, REPS_PER_BLOCK, 1)
ts = np.arange(prev_ts, prev_ts + len(data))
input_block = np.hstack((ts[:, None], data))
pipe_in.write_nowait(input_block)
await asyncio.sleep(0.1)
prev_ts = ts[-1]+1
await asyncio.sleep(0.2)
my_filter.stop()
def test_filter(self):
" with offset=2, output should be 2+input "
# build test objects
my_filter = FilterDemo()
pipe_in = LocalNumpyPipe("input", layout="float32_1")
pipe_out = LocalNumpyPipe("output", layout="float32_1")
args = argparse.Namespace(offset=2)
# create the input data 0,1,2,...,9
# fake timestamps are ok, just use an increasing sequence
test_input = np.hstack((np.arange(10)[:, None], # timestamp 0-9
np.arange(10)[:, None])) # data, also 0-9
pipe_in.write_nowait(test_input)
# run reader in an event loop
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(writer()),
my_filter.run(args, {"input": pipe_in},
{"output": pipe_out})]
loop.run_until_complete(asyncio.gather(*tasks))
my_task = asyncio.ensure_future(
my_filter.run(args,
{"input": pipe_in},
{"output": pipe_out}))
loop.call_later(0.1, my_task.cancel)
try:
loop.run_until_complete(my_task)
except asyncio.CancelledError:
pass
loop.close()
# check the results
result = pipe_out.read_nowait()
# expect the output to be a constant (WINDOW-1)/2
base_output = np.ones((WINDOW * REPS_PER_BLOCK *
NUM_BLOCKS - (WINDOW-1), WIDTH))
expected = base_output*range(int(WINDOW/2), int(WINDOW/2) + WIDTH)
np.testing.assert_array_equal(expected,
result['data'])
# data should be 2,3,4,...,11
np.testing.assert_array_equal(result['data'],
test_input[:, 1]+2)
# timestamps should be the same as the input
np.testing.assert_array_almost_equal(result['timestamp'],
test_input[:, 0])
......@@ -3,26 +3,32 @@ import asynctest
import asyncio
import numpy as np
import argparse
from joule.client.readers.random import RandomReader
from reader import ReaderDemo
class TestRandomReader(asynctest.TestCase):
class TestReader(asynctest.TestCase):
def test_generates_random_values(self):
WIDTH = 2
RATE = 100
my_reader = RandomReader(output_rate=100)
pipe = LocalNumpyPipe("output", layout="float32_%d" % WIDTH)
args = argparse.Namespace(width=WIDTH, rate=RATE, pipes="unset")
def test_reader(self):
" with a rate=0.1, reader should generate 10 values in 1 second "
# build test objects
my_reader = ReaderDemo()
pipe = LocalNumpyPipe("output", layout="float32_1")
args = argparse.Namespace(rate=0.1, pipes="unset")
# run reader in an event loop
loop = asyncio.get_event_loop()
loop.call_later(0.1, my_reader.stop)
loop.run_until_complete(my_reader.run(args, pipe))
my_task = asyncio.ensure_future(my_reader.run(args, pipe))
loop.call_later(1, my_task.cancel)
try:
loop.run_until_complete(my_task)
except asyncio.CancelledError:
pass
loop.close()
# check the results
result = pipe.read_nowait()
diffs = np.diff(result['timestamp'])
self.assertEqual(np.mean(diffs), 1/RATE*1e6)
self.assertEqual(np.shape(result['data'])[1], WIDTH)
# data should be 0,1,2,...,9
np.testing.assert_array_equal(result['data'],
np.arange(10))
# timestamps should be about 0.1s apart
np.testing.assert_array_almost_equal(np.diff(result['timestamp'])/1e6,
np.ones(9)*0.1, decimal=2)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment