Commit cce6d0c9 by source_reader

added API demo jupyter notebook and fixes to make it run smoothly

parent 60eca0f4
......@@ -12,7 +12,7 @@ EOF
apt-get update
apt remove --purge libreoffice-* -y > /dev/null
apt-get remove --purge libreoffice-* thunderbird* -y > /dev/null
apt-get upgrade -y
dpkg -i puppet-release-focal.deb
......@@ -6,14 +6,14 @@ class common {
# note: chromium for Ubuntu is a snap and cannot be installed in a container
'nmap','wget', 'sqlite3', 'gparted', 'net-tools',
package { $pkgs:
ensure => present
$pip_pkgs = ['scipy', 'sklearn', 'pandas']
$pip_pkgs = ['scipy', 'sklearn', 'pandas', 'matplotlib']
package { $pip_pkgs:
ensure => present,
provider => pip3
......@@ -7,6 +7,9 @@ http://localhost:8888).
Create or update the Jupyter password with the following command:
$> sudo jupyter-set-password
To enable access to Joule, run the following command on the Jupyter terminal:
$> sudo -E joule admin authorize
--- (Optional) Access through the Wattsworth Interface ---
To access Jupyter as a Data App add it to the proxy
configuration in [/etc/joule/main.conf]:
......@@ -14,6 +17,7 @@ configuration in [/etc/joule/main.conf]:
For example:
......@@ -6,3 +6,5 @@ joule ALL=(ALL) NOPASSWD:/usr/bin/journalctl -u joule*
joule ALL=(ALL) NOPASSWD:/bin/systemctl status joule
joule ALL=(ALL) NOPASSWD:/bin/systemctl restart joule
joule ALL=(ALL) NOPASSWD:/bin/journalctl -u joule*
joule ALL=(ALL) NOPASSWD:SETENV:/usr/local/bin/joule admin authorize
"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"# Joule API Demonstration Notebook\n",
"This notebook shows how to use the Joule Application Programming Interface (API).\n",
"Before running this notebook, you must authorize API access for the Joule user. To do this run the following command from the terminal:\n",
" $> sudo -E joule admin authorize\n",
" ```"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# run this cell first to import the packages\n",
"import joule\n",
"# convenience imports to make code more compact\n",
"from joule.api import Stream, Element, Annotation\n",
"from joule.errors import EmptyPipeError\n",
"import numpy as np\n",
"from matplotlib import pyplot as plt"
"cell_type": "markdown",
"metadata": {},
"source": [
"To use the API you must have access to the Joule node. To view accessible nodes run the following command:\n",
" \n",
" $> joule node list\n"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# get_node() returns the default node, add a name parameter to request a specific one\n",
"node = joule.api.get_node()\n",
"# all node methods are async so you must use the await keyword\n",
"info = await\n",
"print(\"Node [%s] running joule %s\" % (, info.version))\n",
"print(\"%d GiB used\" % (info.size_db / 2**30))\n",
"print(\"%d GiB available\" % (info.size_free / (2**30)))\n"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create streams and write data"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# create a two element stream of 5Hz sine, cosine waveforms\n",
"freq = 5.0\n",
"t = np.arange(0,1,0.001) # 1ms sample rate\n",
"sine = np.sin(freq*2*np.pi*t)\n",
"cosine = np.cos(freq*2*np.pi*t)\n",
"tangent = np.tan(freq*2*np.pi*t)\n",
"plt.plot(t, sine, 'r', t, cosine, 'g')\n",
"plt.xlabel('Time (sec)')\n",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# create a stream on the Joule Node that can store this data\n",
"stream = Stream(name=\"waves\", elements=[Element(name=\"sine\"), Element(name=\"cosine\")])\n",
"stream = await node.stream_create(stream,\"/Demos\") # now stream is a registered model and can be used with API calls\n",
"# refresh the node in the Data Explorer and you should see the new stream\n",
"# *NOTE* if you run this code more than once you will receive an error that the stream already exists"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# we need to put the data in an M,3 numpy array:\n",
"# [ ts sine cosine\n",
"# ts sine cosine\n",
"# ... ]\n",
"# There are many ways to do this, the following is rather concise\n",
"# *NOTE* make sure the timestamps are in units of microseconds\n",
"data = np.vstack((t*1e6, sine, cosine)).T\n",
"# add data to the stream by using an input pipe\n",
"pipe = await node.data_write(stream)\n",
"await pipe.write(data) # timestamps should be in us\n",
"await pipe.close() # make sure to close the pipe after writing\n",
"# refresh the node in the Data Explorer and you should see the new stream\n",
"# *NOTE* if you run this code more than once you will receive an error that the data already exists"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Manipulate streams and data"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# get information about a stream\n",
"print(\"Stream Info:\\t\", await node.stream_info(stream))\n",
"# get the data intervals (regions of the stream with data)\n",
"print(\"Intervals:\\t\", await node.data_intervals(stream))\n",
"# change the display type of an element to discrete\n",
"await node.stream_update(stream) # refresh the node to see this change\n",
"# remove data from a stream \n",
"# ***DANGEROUS: OMITTING START and END will remove ALL DATA***\n",
"await node.data_delete(stream,start=0.2*1e6, end=0.4*1e6)\n",
"print(\"--removed data--\")\n",
"print(\"Intervals:\\t\", await node.data_intervals(stream))\n",
"# ...many more methods are available, see API docs"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data Annotations"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Annotate a range of data in Lumen then run this cell\n",
"# retrieve a list of annotations (include start,end parameters to limit query to a time range)\n",
"annotations = await node.annotation_get(stream)\n",
"if len(annotations) == 0:\n",
" print(\"ERROR: Create an annotation in Lumen then run this cell\")\n",
"elif annotations[0].end is None:\n",
" print(\"ERROR: Annotate a range in Lumen, not an event\")\n",
" annotation = annotations[0]\n",
" \n",
" # read the data associated with the annotation\n",
" pipe = await node.data_read(stream,start=annotation.start, end=annotation.end)\n",
" data = await pipe.read_all() # this automatically closes the pipe\n",
" \n",
" # plot the data\n",
" plt.plot(data['timestamp']/1e6, data['data'])\n",
" plt.title(annotation.title)\n",
" plt.xlabel('Time (sec)')\n",
"# Annotations can also be created with the API\n",
" annotation = Annotation(title='API Annotation', start=0.8*1e6)\n",
" await node.annotation_create(annotation, stream)\n",
"# refresh the annotations in the Plot Tab of Lumen to see this new annotation\n",
"# *NOTE* If you run this cell multiple times it will create multiple annotations"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Explore Streams and Read Data"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Nodes can be explored through the API\n",
"root = await node.folder_root()\n",
"def print_folder(folder, indent=0):\n",
" for child in folder.children:\n",
" print(\" \"*indent +\n",
" print_folder(child, indent+1)\n",
" for stream in folder.streams:\n",
" print(\" \"*indent + \"[%s: %s]\" % (, stream.layout))\n",
" \n",
"# print the folder directory structure\n",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# streams can be accessed by API object (as shown in previous cells) or by path\n",
"info = await node.stream_info(\"/Demos/waves\")\n",
"print(\"The demo stream has %d rows of data\" % info.rows)\n",
"# To read data, first create a pipe to the stream\n",
"# If you want to treat the data like a simple array you can use the read_all method, but if\n",
"# there is too much data this may fail. In general you should treat a pipe as an infinite\n",
"# data source and read it by chunk. This requires more code, but it scales to very large \n",
"# datasets and is the only way to work with realtime data sources\n",
"print(\"-- reading data --\")\n",
"pipe = await node.data_read(\"/Demos/waves\")\n",
" while True:\n",
" data = await\n",
" plt.plot(data['timestamp']/1e6,data['data'])\n",
" pipe.consume(len(data))\n",
" print(\"%d rows of data\" % len(data))\n",
" # for large data sources the chunk may or may not be an interval boundary\n",
" # you can explicitly check whether this is the end of an interval:\n",
" if pipe.end_of_interval:\n",
" print(\" data boundary\")\n",
"except EmptyPipeError:\n",
" pass\n",
" await pipe.close()\n",
"plt.xlabel('Time (sec)')\n",
"plt.title('Data showing interval break')\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"### Reset the Node to original state\n",
"**Run this cell to undo all changes created by this notebook**"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"await node.folder_delete(\"/Demos\")"
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.2"
"nbformat": 4,
"nbformat_minor": 4
......@@ -7,7 +7,11 @@ class jupyter::config{
owner => joule,
group => joule,
source => 'puppet:///modules/jupyter/API_demo.ipynb',
owner => joule,
group => joule,
file{ '/etc/systemd/system/jupyter.service':
source => 'puppet:///modules/jupyter/jupyter.service',
owner => root,
......@@ -80,7 +80,7 @@ echo "7] Enabling first_boot service"
systemctl enable first_boot.service
echo "8] Clearing command history"
rm -f /etc/wpa_supplicant/wpa_supplicant.conf
history -c
echo "ALL DONE! Remember to remove the puppet directory!!"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment