Commit 674f1495 by John Donnal

new modules

parent 9329fb6f
FROM jdonnal/joule:latest
MAINTAINER John Donnal <donnal@usna.edu>
ADD . /joule-modules
ADD ./e2e /etc/joule
CMD /usr/local/bin/jouled
#!/usr/bin/python3
import sys
import subprocess
import os
import signal
FORCE_DUMP = False
def main():
jouled = subprocess.Popen(["jouled", "--config",
"/etc/joule/main.conf"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
print("---------[running e2e test suite]---------")
sys.stdout.flush()
test = subprocess.run(["python3", os.path.join("/etc/joule/test.py")])
jouled.send_signal(signal.SIGINT)
if(test.returncode != 0 or FORCE_DUMP):
print("----dump from jouled----")
stdout, _ = jouled.communicate()
for line in stdout.rstrip().split('\n'):
print("> %s" % line)
return test.returncode
if __name__ == "__main__":
exit(main())
version: '2'
services:
joule:
build: ../.
links:
- nilmdb
entrypoint: python3 /etc/joule/bootstrap-inner.py
nilmdb:
image: jdonnal/nilmdb
logging:
driver: none
stop_signal: SIGKILL
[NilmDB]
url=http://nilmdb
InsertionPeriod=1
[Main]
exec_cmd = python3 /joule-modules/filter.py 2
name = Demo Filter
[Source]
input = /demo/raw
[Destination]
output = /demo/filtered
\ No newline at end of file
[Main]
exec_cmd = python3 /joule-modules/reader.py 0.1
name = Demo Reader
[Source]
[Destination]
output = /demo/raw
\ No newline at end of file
#!/bin/bash
docker-compose up --abort-on-container-exit --build
docker-compose rm -f
[Main]
name = Filtered Data
path = /demo/filtered
datatype = int32
keep = 1w
decimate = yes
[Element1]
name = filtered counter
[Main]
name = Raw Data
path = /demo/raw
datatype = int32
keep = 1w
decimate = yes
[Element1]
name = counter
import time
import numpy as np
import re
from joule.testing.e2eutils import joule
from joule.testing.e2eutils import nilmtool
import sys
def main():
time.sleep(8) # wait for jouled to boot and get data
check_modules()
check_data()
check_logs()
def check_modules():
"""
Test: check module status
Goal:
Demo Filter: running some nonzero memory
Demo Reader: running some nonzero memory
"""
modules = joule.modules()
assert(len(modules) == 2) # normal1,normal2,filter,broken
for module in modules:
title = module[joule.MODULES_TITLE_FIELD]
status = module[joule.MODULES_STATUS_FIELD]
if(title['name'] in ['Demo Filter', 'Demo Reader']):
assert status == joule.MODULES_STATUS_RUNNING, "%s not running"%title
else:
assert(0) # unexpected module in status report
def check_data():
"""
Test: check data inserted into nilmdb
Goal:
/demo/raw is float32_1, has 1 interval with >50 samples
/demo/filtered is float32_2, has 1 interval with >50 samples
both filtered and raw have decimations
"""
for path in ["/demo/raw", "/demo/filtered"]:
# 1.) check streams have one continuous interval
base_intervals = nilmtool.intervals(path)
decim_intervals = nilmtool.intervals(
path + "~decim-16") # check level 2 decimation
assert len(base_intervals) == 1,\
"%s has %d intervals" % (path, len(base_intervals))
assert len(decim_intervals) == 1,\
"%s has %d intervals" % (path+"~decim-16", len(decim_intervals))
# 2.) make sure this interval has data in it
num_samples = nilmtool.data_count(path)
assert(num_samples > 50)
# 3.) make sure decimations have data
assert(nilmtool.is_decimated(path, level=16, min_size=2))
# verify stream layouts
assert nilmtool.layout("/demo/raw") == "int32_1"
assert nilmtool.layout("/demo/filtered") == "int32_1"
# verify the filter module executed correctly
# check the first 50 rows, the filter won't have
# all the source data because the process was stopped
expected_data = nilmtool.data_extract("/demo/raw")
expected_data[:, 1:] += 2
actual_data = nilmtool.data_extract("/demo/filtered")
np.testing.assert_almost_equal(
actual_data[:50, :], expected_data[:50, :])
def check_logs():
"""
Test: logs should contain info and stderr from modules
Goal:
Demo Filter: says "starting" somewhere once
Demo Reader: says "starting" somewhere once
"""
for module_name in ["Demo Filter", "Demo Reader"]:
logs = joule.logs(module_name)
num_starts = len(re.findall(joule.LOG_STARTING_STRING, logs))
assert(num_starts == 1)
if __name__ == "__main__":
main()
print("OK")
#!/usr/bin/python3
from joule import FilterModule, EmptyPipe
from scipy.signal import medfilt
import asyncio
WINDOW = 21
EDGE = (WINDOW-1)//2
class ExampleFilter(FilterModule):
#Implement a WINDOW sized median filter
async def run(self, parsed_args, inputs, outputs):
raw = inputs["raw"]
filtered = outputs["filtered"]
while(1):
#read new data
try:
vals= await raw.read()
except EmptyPipe:
break
#execute median filter in place
vals["data"] = medfilt(vals["data"],WINDOW)
#write out valid samples
await filtered.write(vals[EDGE:-EDGE])
#prepend trailing samples to next read
nsamples = len(vals)-2*EDGE
if(nsamples>0):
raw.consume(nsamples)
#allow other routines to execute
await asyncio.sleep(0.5)
if __name__ == "__main__":
r = ExampleFilter()
r.start()
import asyncio
from aiohttp import web
from joule.client.reader_module import ReaderModule
import aiohttp_jinja2
import jinja2
import os
CSS_DIR = os.path.join(os.path.dirname(__file__), 'assets', 'css')
JS_DIR = os.path.join(os.path.dirname(__file__), 'assets', 'js')
TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), 'assets', 'templates')
class BootstrapInterface(ReaderModule):
async def setup(self, parsed_args, app, output):
loader = jinja2.FileSystemLoader(TEMPLATES_DIR)
aiohttp_jinja2.setup(app, loader=loader)
async def run(self, parsed_args, output):
# data processing...
while True:
await asyncio.sleep(1)
def routes(self):
return [
web.get('/', self.index),
web.get('/data.json', self.data),
web.static('/assets/css', CSS_DIR),
web.static('/assets/js', JS_DIR)
]
@aiohttp_jinja2.template('index.jinja2')
async def index(self, request):
return {'name': "Data Visualizer",
'version': 1.0}
# json end point for AJAX requests
async def data(self, request):
# return summary statistics, etc.
return web.json_response(data=[1,2,8])
if __name__ == "__main__":
r = ComplexInterface()
r.start()
#!/usr/bin/python3
from joule import FilterModule, EmptyPipe
from scipy.signal import medfilt
import asyncio
class ExampleFilter(FilterModule):
"""Apply linear scaling to input"""
async def run(self, parsed_args, inputs, outputs):
# data pipes (specified in configuration file)
raw = inputs["raw"]
scaled = outputs["scaled"]
# linear scaling: y=mx+b
m = 2.0
b = 1.5
while True:
# read new data
vals = await raw.read()
# apply linear scaling y=mx+b
vals["data"] = vals["data"] * m + b
# write output
await scaled.write(vals)
# remove read data from the buffer
raw.consume(len(vals))
# propagate interval boundaries
if raw.end_of_interval:
await scaled.close_interval()
# limit execution to 1Hz chunks
await asyncio.sleep(1)
if __name__ == "__main__":
r = ExampleFilter()
r.start()
import asyncio
from aiohttp import web
from joule.client.reader_module import ReaderModule
class ExampleInterface(ReaderModule):
async def run(self, parsed_args, output):
# data processing...
while True:
await asyncio.sleep(1)
def routes(self):
return [web.get('/', self.index)]
async def index(self, request):
return web.Response(text="hello world!")
if __name__ == "__main__":
r = ExampleVisualizer()
r.start()
#!/usr/bin/python3
from joule import ReaderModule, time_now
from joule import ReaderModule
from joule.utilities import time_now
import asyncio
import numpy as np
......
#!/usr/bin/python3
from joule import ReaderModule, time_now
from joule import ReaderModule
from joule.utilities import time_now
import asyncio
import numpy as np
class HighBandwidthReader(ReaderModule):
#Produce sawtooth waveform at specified rate"
#Produce sawtooth waveform at specified sample rate"
def custom_args(self, parser):
grp = parser.add_argument_group("module",
"module specific arguments")
grp.add_argument("--rate", type=float,
required=True,
help=" output rate in Hz")
help="sample rate in Hz")
async def run(self, parsed_args, output):
start_ts = time_now()
#run 5 times per second
......@@ -26,8 +26,8 @@ class HighBandwidthReader(ReaderModule):
samples_per_period,endpoint=False)
vals=np.linspace(0,33,samples_per_period)
start_ts = end_ts
await output.write(np.hstack((ts[:,None],
vals[:,None])))
chunk = np.hstack((ts[:,None], vals[:,None]))
await output.write(chunk)
await asyncio.sleep(period)
if __name__ == "__main__":
......
[Main]
name = Example Filter
# replace with correct path:
exec_cmd = /PATH/TO/example_modules/example_filter.py
exec_cmd = python /PATH/TO/example_modules/example_filter.py
[Arguments]
[Inputs]
raw = /examples/sawtooth
raw = /examples/random:float32[x,y,z]
[Outputs]
filtered = /examples/medfilt
\ No newline at end of file
scaled = /examples/scaled:float32[x,y,z]
\ No newline at end of file
aiohttp==3.3.2
async-timeout==3.0.0
attrs==18.1.0
beautifulsoup4==4.6.1
certifi==2018.4.16
chardet==3.0.4
click==6.7
dateparser==0.7.0
idna==2.7
idna-ssl==1.1.0
Markdown==2.6.11
multidict==4.4.0a4
numpy==1.15.0
pkg-resources==0.0.0
psutil==5.4.6
python-dateutil==2.7.3
pytz==2018.5
regex==2018.7.11
requests==2.19.1
scipy==1.1.0
six==1.11.0
SQLAlchemy==1.2.10
tabulate==0.8.2
treelib==1.5.3
tzlocal==1.5.1
urllib3==1.23
yarl==1.2.6
Jinja2==2.10
aiohttp-jinja2==1.0.0
\ No newline at end of file
#!/usr/bin/env python
from setuptools import setup
import versioneer
PROJECT = 'Joule'
# Change docs/sphinx/conf.py too!
try:
long_description = open('README.rst', 'rt').read()
except IOError:
long_description = ''
setup(
name=PROJECT,
version = versioneer.get_version(),
cmdclass=versioneer.get_cmdclass(),
description='Process manager for embedded systems',
long_description=long_description,
author='John Donnal',
author_email='donnal@usna.edu',
url='https://git.wattsworth.net/wattsworth/example_modules.git',
download_url='[none]',
classifiers=['Programming Language :: Python',
'Environment :: Console',
],
platforms=['Any'],
scripts=[],
provides=[],
install_requires=['click',
'treelib',
'numpy',
'scipy',
'psutil',
'requests',
'aiohttp',
'markdown',
'BeautifulSoup4',
'dateparser',
'tabulate',
'sqlalchemy',
'aiohttp-jinja2',
'jinja2'],
namespace_packages=[],
packages=['jouleexamples'],
include_package_data=True,
entry_points={
'console_scripts': [
'joule-example-reader = jouleexamples.example_reader:main',
'joule-high-bandwidth-reader = jouleexamples.high_bandwidth_reader:main',
'joule-example-filter = jouleexamples.example_filter:main',
'joule-example-median-filter = jouleexamples.median_filter:main',
'joule-example-composite = jouleexamples.example_composite:main',
'joule-example-interface = jouleexamples.example_interface:main',
'joule-bootstrap-interface = jouleexamples.bootstrap_interface:main'
]
},
zip_safe=False,
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment