Commit 100ae0e0 by John Donnal

added support for event streams (not fully unit tested)

parent 2cafc01f
...@@ -105,6 +105,12 @@ module Joule ...@@ -105,6 +105,12 @@ module Joule
end end
# === END ANNOTATIONS === # === END ANNOTATIONS ===
# === BEGIN EVENTS ===
def read_events(stream, start_time, end_time)
events = @backend.read_events(stream.joule_id, start_time, end_time)
{id: stream.id, valid: true, events: events}
end
def node_type def node_type
'joule' 'joule'
end end
......
...@@ -214,6 +214,20 @@ module Joule ...@@ -214,6 +214,20 @@ module Joule
{error: false, msg: 'success'} {error: false, msg: 'success'}
end end
# === EVENT METHODS ===
def read_events(stream_id, start_time, end_time)
query = {'id': stream_id}
query['start'] = start_time unless start_time.nil?
query['end'] = end_time unless end_time.nil?
options = {query: query}
begin
resp = self.class.get("#{@url}/event/data.json", options)
raise "error reading events #{resp.body}" unless resp.success?
rescue
raise "connection error"
end
resp.parsed_response.map{|event| event.deep_symbolize_keys}
end
# === ANNOTATION METHODS === # === ANNOTATION METHODS ===
def create_annotation(annotation) def create_annotation(annotation)
data = { data = {
...@@ -246,6 +260,21 @@ module Joule ...@@ -246,6 +260,21 @@ module Joule
resp.parsed_response resp.parsed_response
end end
# === EVENT METHODS ===
def read_events(stream_id, start_time, end_time)
query = {'id': stream_id}
query['start'] = start_time unless start_time.nil?
query['end'] = end_time unless end_time.nil?
options = {query: query}
begin
resp = self.class.get("#{@url}/event/data.json", options)
raise "error reading events #{resp.body}" unless resp.success?
rescue
raise "connection error"
end
resp.parsed_response.map{|event| event.deep_symbolize_keys}
end
def delete_annotation(annotation_id) def delete_annotation(annotation_id)
query = {'id': annotation_id} query = {'id': annotation_id}
options = {query: query} options = {query: query}
......
...@@ -68,7 +68,7 @@ module Joule ...@@ -68,7 +68,7 @@ module Joule
# remove any subfolders that are no longer on the folder # remove any subfolders that are no longer on the folder
db_folder.subfolders.where.not(joule_id: updated_ids).destroy_all db_folder.subfolders.where.not(joule_id: updated_ids).destroy_all
# update or create streams # update or create data streams
updated_ids=[] updated_ids=[]
schema[:streams].each do |stream_schema| schema[:streams].each do |stream_schema|
stream = db_folder.db_streams.find_by_joule_id(stream_schema[:id]) stream = db_folder.db_streams.find_by_joule_id(stream_schema[:id])
...@@ -94,6 +94,34 @@ module Joule ...@@ -94,6 +94,34 @@ module Joule
end end
# remove any streams that are no longer in the folder # remove any streams that are no longer in the folder
db_folder.db_streams.where.not(joule_id: updated_ids).destroy_all db_folder.db_streams.where.not(joule_id: updated_ids).destroy_all
# update or create event streams
updated_ids=[]
schema[:event_streams] ||= []
schema[:event_streams].each do |stream_schema|
stream = db_folder.event_streams.find_by_joule_id(stream_schema[:id])
stream ||= EventStream.new(db_folder: db_folder, db: db_folder.db)
__update_event_stream(stream, stream_schema, db_folder.path)
size_on_disk+=stream.size_on_disk
unless stream.start_time.nil?
if start_time.nil?
start_time = stream.start_time
else
start_time = [stream.start_time, start_time].min
end
end
unless stream.end_time.nil?
if end_time.nil?
end_time = stream.end_time
else
end_time = [stream.end_time, end_time].max
end
end
updated_ids << stream_schema[:id]
end
# remove any streams that are no longer in the folder
db_folder.event_streams.where.not(joule_id: updated_ids).destroy_all
# save the new disk size # save the new disk size
db_folder.size_on_disk = size_on_disk db_folder.size_on_disk = size_on_disk
db_folder.start_time = start_time db_folder.start_time = start_time
...@@ -128,6 +156,20 @@ module Joule ...@@ -128,6 +156,20 @@ module Joule
end end
end end
def __update_event_stream(event_stream, schema, parent_path)
attrs = schema.slice(*EventStream.defined_attributes)
# add in extra attributes that require conversion
attrs[:path] = "#{parent_path}/#{schema[:name]}"
attrs[:joule_id] = schema[:id]
attrs[:start_time] = schema[:data_info][:start_time]
attrs[:end_time] = schema[:data_info][:end_time]
attrs[:total_rows] = schema[:data_info][:rows]
attrs[:total_time] = schema[:data_info][:total_time]
attrs[:size_on_disk] = schema[:data_info][:bytes]
event_stream.update(attrs)
end
end end
end end
class EventsController < ApplicationController
before_action :authenticate_user!
before_action :set_stream, except: :data
before_action :authorize_viewer, except: :data
before_action :create_adapter, except: :data
def show
end
def update
@service = EditEventStream.new(@node_adapter)
@service.run(@event_stream, stream_params)
render status: @service.success? ? :ok : :unprocessable_entity
end
def data
req_streams = EventStream.find(JSON.parse(params[:streams]))
# make sure the user is allowed to view these elements
req_streams.each do |stream|
unless current_user.views_nilm?(stream.db.nilm)
head :unauthorized
return
end
end
# make sure the time range makes sense
start_time = (params[:start_time].to_i unless params[:start_time].nil?)
end_time = (params[:end_time].to_i unless params[:end_time].nil?)
#requested resolution (leave blank for max possible)
resolution = (params[:resolution].to_i unless params[:resolution].nil?)
# padding: percentage of data to retrieve beyond start|end
padding = params[:padding].nil? ? 0 : params[:padding].to_f
# retrieve the data for the requested elements
@service = ReadEvents.new
#if start and end are specified, calculate padding
if !start_time.nil? && !end_time.nil?
actual_start = (start_time - (end_time-start_time)*padding).to_i
actual_end = (end_time + (end_time-start_time)*padding).to_i
@service.run(req_streams, actual_start, actual_end)
@start_time = start_time
@end_time = end_time
#otherwise let the service determine the start/end automatically
else
@service.run(req_streams, start_time, end_time)
@start_time = @service.start_time
@end_time = @service.end_time
end
render status: @service.success? ? :ok : :unprocessable_entity
end
private
def stream_params
params.permit(:name, :description)
end
def set_stream
@event_stream = EventStream.find(params[:id])
@db = @event_stream.db
@nilm = @db.nilm
end
def authorize_viewer
head :unauthorized unless current_user.views_nilm?(@nilm)
end
def create_adapter
@node_adapter = NodeAdapterFactory.from_nilm(@nilm)
if @node_adapter.nil?
@service = StubService.new
@service.add_error("Cannot contact installation")
render 'helpers/empty_response', status: :unprocessable_entity
end
end
end
...@@ -16,6 +16,9 @@ class DbFolder < ApplicationRecord ...@@ -16,6 +16,9 @@ class DbFolder < ApplicationRecord
has_many :db_streams, has_many :db_streams,
dependent: :destroy dependent: :destroy
has_many :event_streams,
dependent: :destroy
validates_presence_of :name validates_presence_of :name
# validates_with DbFolderValidator # validates_with DbFolderValidator
validates :name, uniqueness: { scope: :parent_id, validates :name, uniqueness: { scope: :parent_id,
......
# A file in the database, contains one or more Streams
class EventStream < ApplicationRecord
belongs_to :db_folder
belongs_to :db
validates :name, presence: true
validates :name, uniqueness: { scope: :db_folder_id,
message: ' is already used in this folder'}
def self.defined_attributes
[:name, :description]
end
def self.json_keys
[:id, :name, :description, :path, :start_time,
:end_time, :size_on_disk, :total_rows, :total_time]
end
end
\ No newline at end of file
# frozen_string_literal: true
# Handles changing DbStream attributes
class EditEventStream
include ServiceStatus
def initialize(node_adapter)
super()
@node_adapter = node_adapter
end
def run(event_stream, attribs)
# only accept valid attributes
attribs.slice!(:name, :description)
# assign the new attributes and check if the
# result is valid (eg elements can't have the same name)
event_stream.assign_attributes(attribs)
unless event_stream.valid?
event_stream.errors
.full_messages
.each { |e| add_error(e) }
return self
end
# local model checks out, update the remote Joule
status = @node_adapter.save_event_stream(event_stream)
# if there was an error don't save the model
if status[:error]
add_error(status[:msg])
return self
end
# everything went well, save the model
event_stream.save!
set_notice("Stream updated")
self
end
end
# frozen_string_literal: true
# Handles changing DbStream attributes
class ReadEvents
include ServiceStatus
attr_reader :data, :start_time, :end_time
def initialize()
super()
end
def run(event_streams, start_time, end_time)
@start_time = start_time
@end_time = end_time
if (not @start_time.nil?) and (not @end_time.nil?) and (@start_time > @end_time)
add_error("invalid time bounds")
return self
end
# pull data from streams
@data = []
event_streams.each do |stream|
adapter = NodeAdapterFactory.from_nilm(stream.db.nilm)
result = adapter.read_events(stream, @start_time, @end_time)
if not result.nil?
@data.append(result)
else
@data.append({id: stream.id, valid: false, events: nil})
add_warning("unable to retrieve events for #{stream.path}")
end
end
# set the time boundaries if they were nil
@start_time = @start_time.nil? ? _data_start : @start_time
@end_time = @end_time.nil? ? _data_end : @end_time
self
end
def _data_start
min_start = nil
@data.each do |event_stream|
next unless event_stream[:valid]
next unless event_stream[:events].length > 0
first_event=event_stream[:events][0]
first_time = first_event[:start_time]
min_start = min_start.nil? ? first_time : [first_time,min_start].min
end
min_start
end
def _data_end
max_end = nil
@data.each do |event_stream|
next unless event_stream[:valid]
next unless event_stream[:events].length > 0
last_event=event_stream[:events][-1]
last_time = last_event[:end_time].nil? ? last_event[:start_time] : last_event[:end_time]
max_end = max_end.nil? ? last_time : [last_time,max_end].max
end
max_end
end
end
...@@ -4,6 +4,11 @@ json.subfolders(db_folder.subfolders) do |folder| ...@@ -4,6 +4,11 @@ json.subfolders(db_folder.subfolders) do |folder|
json.extract! folder, *DbFolder.json_keys json.extract! folder, *DbFolder.json_keys
end end
json.event_streams(db_folder.event_streams) do |stream|
json.extract! stream, *EventStream.json_keys
json.nilm_id nilm.id
end
json.streams(db_folder.db_streams.includes(:db_elements)) do |stream| json.streams(db_folder.db_streams.includes(:db_elements)) do |stream|
json.extract! stream, *DbStream.json_keys json.extract! stream, *DbStream.json_keys
json.nilm_id nilm.id json.nilm_id nilm.id
......
json.data do
json.array! @service.data.each do |event_stream|
json.id event_stream[:id]
json.valid event_stream[:valid]
json.events event_stream[:events]
json.start_time @start_time
json.end_time @end_time
end
end
json.partial! "helpers/messages", service: @service
\ No newline at end of file
...@@ -38,7 +38,8 @@ module ControlPanel ...@@ -38,7 +38,8 @@ module ControlPanel
config.middleware.use ActionDispatch::Cookies config.middleware.use ActionDispatch::Cookies
config.middleware.use ActionDispatch::Session::CookieStore config.middleware.use ActionDispatch::Session::CookieStore
# Add folders under the services and adapters directory # Add folders under the services and adapters directory
%w(annotations data nilm db db_folder db_stream permission user_group user data_view joule_modules).each do |service| %w(annotations data nilm db db_folder db_stream
permission user_group user data_view joule_modules event_stream).each do |service|
config.autoload_paths << Rails.root.join("app/services/#{service}") config.autoload_paths << Rails.root.join("app/services/#{service}")
end end
config.autoload_paths << Rails.root.join("app/adapters/nilmdb") config.autoload_paths << Rails.root.join("app/adapters/nilmdb")
......
...@@ -22,6 +22,11 @@ Rails.application.routes.draw do ...@@ -22,6 +22,11 @@ Rails.application.routes.draw do
end end
end end
resources :events, only: [:index, :show, :update] do
collection do
get 'data'
end
end
# fix for devise invitable from: # fix for devise invitable from:
#http://gabrielhilal.com/2015/11/07/integrating-devise_invitable-into-devise_token_auth/ #http://gabrielhilal.com/2015/11/07/integrating-devise_invitable-into-devise_token_auth/
mount_devise_token_auth_for 'User', at: 'auth', skip: [:invitations] mount_devise_token_auth_for 'User', at: 'auth', skip: [:invitations]
......
class CreateEventStreams < ActiveRecord::Migration[6.0]
def change
create_table :event_streams do |t|
t.belongs_to :db_folder, index: true
t.belongs_to :db, index: true
t.string :path
t.integer :start_time, limit: 8
t.integer :end_time, limit: 8
t.integer :total_rows, limit: 8
t.integer :total_time, limit: 8
t.integer :size_on_disk, limit: 8
t.integer :joule_id, index: true
t.string :name
t.string :description
t.timestamps
end
end
end
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 2019_09_18_011239) do ActiveRecord::Schema.define(version: 2021_03_03_014945) do
create_table "data_apps", force: :cascade do |t| create_table "data_apps", force: :cascade do |t|
t.string "name" t.string "name"
...@@ -119,6 +119,25 @@ ActiveRecord::Schema.define(version: 2019_09_18_011239) do ...@@ -119,6 +119,25 @@ ActiveRecord::Schema.define(version: 2019_09_18_011239) do
t.boolean "available" t.boolean "available"
end end
create_table "event_streams", force: :cascade do |t|
t.integer "db_folder_id"
t.integer "db_id"
t.string "path"
t.integer "start_time", limit: 8
t.integer "end_time", limit: 8
t.integer "total_rows", limit: 8
t.integer "total_time", limit: 8
t.integer "size_on_disk", limit: 8
t.integer "joule_id"
t.string "name"
t.string "description"
t.datetime "created_at", precision: 6, null: false
t.datetime "updated_at", precision: 6, null: false
t.index ["db_folder_id"], name: "index_event_streams_on_db_folder_id"
t.index ["db_id"], name: "index_event_streams_on_db_id"
t.index ["joule_id"], name: "index_event_streams_on_joule_id"
end
create_table "interface_auth_tokens", force: :cascade do |t| create_table "interface_auth_tokens", force: :cascade do |t|
t.integer "user_id" t.integer "user_id"
t.integer "data_app_id" t.integer "data_app_id"
......
...@@ -4,6 +4,30 @@ require 'rails_helper' ...@@ -4,6 +4,30 @@ require 'rails_helper'
describe Joule::Adapter do describe Joule::Adapter do
it 'reads events' do
adapter = Joule::Adapter.new("url", "key")
mock_backend = instance_double(Joule::Backend)
adapter.backend = mock_backend
raw = File.read(File.dirname(__FILE__) + "/events.json")
json = JSON.parse(raw)
expect(mock_backend).to receive(:read_events) { json }
nilm = FactoryBot.create(:nilm, name: "test")
stream = FactoryBot.create(:event_stream, db: nilm.db, db_folder: nilm.db.root_folder,
name: 'test_stream')
result = adapter.read_events(stream,1611421200000000, 1611421230000000)
expect(result[:id]).to eq stream.id
expect(result[:valid]).to be true
expect(result[:events].length).to eq 4
expected_event = {
"start_time" => 1611421200000000,
"end_time" => 1611421205000000,
"content" => {"name" => "test event 0"}}
expect(result[:events][0]).to eq expected_event
end
it 'creates annotations' do it 'creates annotations' do
annotation = Annotation.new annotation = Annotation.new
annotation.title = "test" annotation.title = "test"
...@@ -20,7 +44,7 @@ describe Joule::Adapter do ...@@ -20,7 +44,7 @@ describe Joule::Adapter do
mock_backend = instance_double(Joule::Backend) mock_backend = instance_double(Joule::Backend)
adapter.backend = mock_backend adapter.backend = mock_backend
raw = File.read(File.dirname(__FILE__)+"/annotations.json") raw = File.read(File.dirname(__FILE__) + "/annotations.json")
json = JSON.parse(raw) json = JSON.parse(raw)
expect(mock_backend).to receive(:get_annotations) { json } expect(mock_backend).to receive(:get_annotations) { json }
...@@ -29,7 +53,7 @@ describe Joule::Adapter do ...@@ -29,7 +53,7 @@ describe Joule::Adapter do
name: 'test_stream') name: 'test_stream')
annotations = adapter.get_annotations(stream) annotations = adapter.get_annotations(stream)
expect(annotations.length).to eq 6 expect(annotations.length).to eq 6
annotations.each do | annotation | annotations.each do |annotation|
expect(annotation.db_stream).to be stream expect(annotation.db_stream).to be stream
end end
end end
......
...@@ -7,12 +7,14 @@ describe Joule::Backend do ...@@ -7,12 +7,14 @@ describe Joule::Backend do
let (:url) {'http://nuc:8088'} let (:url) {'http://nuc:8088'}
let (:key) {'api_key'} let (:key) {'api_key'}
it 'retrieves database schema', :vcr do it 'retrieves database schema', :vcr do
url = 'https://localhost:3030'
key = 'apkQ3_tvTo-bCzFrIADW2uvlZ6nboISwC6tvsoH64mc'
backend = Joule::Backend.new(url, key) backend = Joule::Backend.new(url, key)
schema = backend.db_schema schema = backend.db_schema
# make sure keys are symbolized # make sure keys are symbolized
expect(schema).to include(:name, :id, :streams, :children) expect(schema).to include(:name, :id, :streams, :children)
# should be a tree structure # should be a tree structure
expect(schema[:children][0]).to include(:name, :id, :streams, :children) expect(schema[:children][0]).to include(:name, :id, :streams, :event_streams, :children)
end end
it 'retrieves module schema', :vcr do it 'retrieves module schema', :vcr do
backend = Joule::Backend.new(url, key) backend = Joule::Backend.new(url, key)
...@@ -45,6 +47,32 @@ describe Joule::Backend do ...@@ -45,6 +47,32 @@ describe Joule::Backend do
expect(resp[:result][:data].count).to be < 200 expect(resp[:result][:data].count).to be < 200
end end
describe "Events" do
let(:url) { "https://127.0.0.1:3030"}
let(:key) { "apkQ3_tvTo-bCzFrIADW2uvlZ6nboISwC6tvsoH64mc"}
describe "read_events" do
it 'loads events', :vcr do
backend = Joule::Backend.new(url, key)
start_time = 1611421210000000
end_time = 1611421235000000
events = backend.read_events(2, start_time, end_time)
# should have 3 events
expect(events.length).to eq 3
expected_event = {
"start_time":1611421210000000,
"end_time":1611421215000000,
"content":{
"name":"test event 1"}}
expect(events[0]).to eq expected_event
end
it 'handles errors', :vcr do
backend = Joule::Backend.new(url, key)
# server was stopped for this request
expect{backend.get_annotations(101)}.to raise_error(RuntimeError)
end
end
end
describe "Annotations" do describe "Annotations" do
let(:url) { "https://172.34.31.8:8088"} let(:url) { "https://172.34.31.8:8088"}
let(:key) { "cR0JqqTM8bizW73MY1IAHCPJUTwDmOdunhYK9b2VQ98"} let(:key) { "cR0JqqTM8bizW73MY1IAHCPJUTwDmOdunhYK9b2VQ98"}
......
[
{
"start_time":1611421200000000,
"end_time":1611421205000000,
"content":{
"name":"test event 0"
}
},
{
"start_time":1611421210000000,
"end_time":1611421215000000,
"content":{
"name":"test event 1"
}
},
{
"start_time":1611421220000000,
"end_time":1611421225000000,
"content":{
"name":"test event 2"
}
},
{
"start_time":1611421230000000,
"end_time":1611421235000000,
"content":{
"name":"test event 3"
}
}
]
\ No newline at end of file
...@@ -10,6 +10,32 @@ ...@@ -10,6 +10,32 @@
"children": [ "children": [
], ],
"event_streams":[
{
"id":100,
"name":"events_2_1",
"description":"",
"data_info":{
"start":null,
"end":null,
"event_count":0,
"bytes":0,
"total_time":0
}
},
{
"id":101,
"name":"events_2_2",
"description":"",
"data_info":{
"start":1611421200000000,
"end":1611422190000000,
"event_count":100,
"bytes":0,
"total_time":990000000
}
}
],
"streams": [ "streams": [
{ {
"id": 1, "id": 1,
......
...@@ -10,6 +10,8 @@ require 'json' ...@@ -10,6 +10,8 @@ require 'json'
# │ └── stream_1_2: uint8_3 # │ └── stream_1_2: uint8_3
# ├── folder_2 # ├── folder_2
# │ └── stream_2_1: int16_2 # │ └── stream_2_1: int16_2
# │ └── transients (event stream)
# │ └── loads (event stream)
# ├── folder_3 # ├── folder_3
# │ ├── folder_3_1 # │ ├── folder_3_1
# │ │ └── stream_3_1_1: int32_3 # │ │ └── stream_3_1_1: int32_3
...@@ -52,11 +54,18 @@ describe Joule::UpdateDb do ...@@ -52,11 +54,18 @@ describe Joule::UpdateDb do
expect(z.display_type).to eq 'discrete' expect(z.display_type).to eq 'discrete'
expect(z.column).to eq 2 expect(z.column).to eq 2
expect(z.units).to eq "watts" expect(z.units).to eq "watts"
# check for event streams in Folder 2
folder_2 = @db.root_folder.subfolders.where(name: 'folder_2').first
expect(folder_2.event_streams.count).to eq 2
events_2_1 = folder_2.event_streams.where(name: 'events_2_1').first
expect(events_2_1.path).to eq '/folder_2/events_2_1'
expect(events_2_1.joule_id).to eq 100
# quick checks # quick checks
expect(DbElement.count).to eq 14 expect(DbElement.count).to eq 14
expect(DbStream.count).to eq 5 expect(DbStream.count).to eq 5
expect(DbFolder.count).to eq 7 expect(DbFolder.count).to eq 7
expect(EventStream.count).to eq 2
end end
end end
end end
......
---
http_interactions:
- request:
method: get
uri: https://127.0.0.1:3030/annotations.json?stream_id=101
body:
encoding: US-ASCII
string: ''
headers:
X-Api-Key:
- AAX8ItOxNSyrBOOYL7zfuaPOHqTXxzspD6LVJAIa9-I
Accept-Encoding:
- gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Accept:
- "*/*"
User-Agent:
- Ruby
response:
status:
code: 403
message: Forbidden
headers:
Content-Type:
- text/plain; charset=utf-8
Content-Length:
- '14'
Date:
- Wed, 03 Mar 2021 18:06:11 GMT
Server:
- Python/3.8 aiohttp/3.7.3
body:
encoding: UTF-8
string: '403: Forbidden'
recorded_at: Wed, 03 Mar 2021 18:06:11 GMT
recorded_with: VCR 6.0.0
---
http_interactions:
- request:
method: get
uri: https://127.0.0.1:3030/event/data.json?end=1611421235000000&id=2&start=1611421210000000
body:
encoding: US-ASCII
string: ''
headers:
X-Api-Key:
- apkQ3_tvTo-bCzFrIADW2uvlZ6nboISwC6tvsoH64mc
Accept-Encoding:
- gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Accept:
- "*/*"
User-Agent:
- Ruby
response:
status:
code: 200
message: OK
headers:
Content-Type:
- application/json; charset=utf-8
Content-Length:
- '303'
Date:
- Wed, 03 Mar 2021 18:31:31 GMT
Server:
- Python/3.8 aiohttp/3.7.3
body:
encoding: UTF-8
string: '[{"start_time": 1611421210000000, "end_time": 1611421215000000, "content":
{"name": "test event 1"}}, {"start_time": 1611421220000000, "end_time": 1611421225000000,
"content": {"name": "test event 2"}}, {"start_time": 1611421230000000, "end_time":
1611421235000000, "content": {"name": "test event 3"}}]'
recorded_at: Wed, 03 Mar 2021 18:31:31 GMT
recorded_with: VCR 6.0.0
...@@ -2,11 +2,13 @@ ...@@ -2,11 +2,13 @@
http_interactions: http_interactions:
- request: - request:
method: get method: get
uri: http://nuc:8088/streams.json uri: https://localhost:3030/streams.json
body: body:
encoding: US-ASCII encoding: US-ASCII
string: '' string: ''
headers: headers:
X-Api-Key:
- apkQ3_tvTo-bCzFrIADW2uvlZ6nboISwC6tvsoH64mc
Accept-Encoding: Accept-Encoding:
- gzip;q=1.0,deflate;q=0.6,identity;q=0.3 - gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Accept: Accept:
...@@ -21,60 +23,59 @@ http_interactions: ...@@ -21,60 +23,59 @@ http_interactions:
Content-Type: Content-Type:
- application/json; charset=utf-8 - application/json; charset=utf-8
Content-Length: Content-Length:
- '3683' - '3663'
Date: Date:
- Wed, 11 Jul 2018 14:58:33 GMT - Wed, 03 Mar 2021 18:12:28 GMT
Server: Server:
- Python/3.6 aiohttp/3.2.1 - Python/3.8 aiohttp/3.7.3
body: body:
encoding: UTF-8 encoding: UTF-8
string: '{"id": 1, "name": "root", "description": null, "children": [{"id": string: '{"id": 2, "name": "root", "description": null, "locked": true, "children":
2, "name": "folder_2", "description": null, "children": [], "streams": [{"id": [{"id": 3, "name": "basic", "description": null, "locked": false, "children":
1, "name": "stream_2_1", "description": "", "datatype": "INT16", "keep_us": [{"id": 4, "name": "aux", "description": null, "locked": false, "children":
-1, "decimate": true, "elements": [{"id": 12, "index": 0, "name": "top", "units": [], "streams": [{"id": 1, "name": "Encoder", "description": "", "datatype":
null, "plottable": true, "display_type": "CONTINUOUS", "offset": 0.0, "scale_factor": "float32", "layout": "float32_3", "keep_us": 604800000000, "is_configured":
1.0, "default_max": null, "default_min": null}, {"id": 13, "index": 1, "name": false, "is_source": false, "is_destination": false, "locked": false, "active":
"bottom", "units": null, "plottable": true, "display_type": "CONTINUOUS", false, "decimate": true, "elements": [{"id": 352, "index": 0, "name": "X",
"offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min": null}]}]}, "units": null, "plottable": true, "display_type": "CONTINUOUS", "offset":
{"id": 3, "name": "folder_1", "description": null, "children": [], "streams":
[{"id": 2, "name": "stream_1_2", "description": "", "datatype": "UINT8", "keep_us":
-1, "decimate": true, "elements": [{"id": 14, "index": 0, "name": "a", "units":
null, "plottable": true, "display_type": "CONTINUOUS", "offset": 0.0, "scale_factor":
1.0, "default_max": null, "default_min": null}, {"id": 15, "index": 1, "name":
"b", "units": null, "plottable": true, "display_type": "CONTINUOUS", "offset":
0.0, "scale_factor": 1.0, "default_max": null, "default_min": null}, {"id":
16, "index": 2, "name": "c", "units": null, "plottable": true, "display_type":
"CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min":
null}]}, {"id": 3, "name": "stream_1_1", "description": "", "datatype": "FLOAT32",
"keep_us": -1, "decimate": true, "elements": [{"id": 17, "index": 0, "name":
"x", "units": null, "plottable": true, "display_type": "CONTINUOUS", "offset":
0.0, "scale_factor": 1.0, "default_max": null, "default_min": null}, {"id": 0.0, "scale_factor": 1.0, "default_max": null, "default_min": null}, {"id":
18, "index": 1, "name": "y", "units": null, "plottable": true, "display_type": 353, "index": 1, "name": "Y", "units": null, "plottable": true, "display_type":
"CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min": "CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min":
null}, {"id": 19, "index": 2, "name": "z", "units": null, "plottable": true, null}, {"id": 354, "index": 2, "name": "Z", "units": null, "plottable": true,
"display_type": "CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": "display_type": "CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max":
null, "default_min": null}]}]}, {"id": 4, "name": "folder_3", "description": null, "default_min": null}], "data_info": {"start": null, "end": null, "rows":
null, "children": [{"id": 5, "name": "folder_3_1", "description": null, "children": 0, "bytes": 0, "total_time": 0}}, {"id": 3, "name": "Accel", "description":
[], "streams": [{"id": 4, "name": "stream_3_1_1", "description": "", "datatype": "", "datatype": "float32", "layout": "float32_3", "keep_us": 604800000000,
"INT32", "keep_us": -1, "decimate": true, "elements": [{"id": 20, "index": "is_configured": false, "is_source": false, "is_destination": false, "locked":
0, "name": "a", "units": null, "plottable": true, "display_type": "CONTINUOUS", false, "active": false, "decimate": true, "elements": [{"id": 358, "index":
0, "name": "X", "units": null, "plottable": true, "display_type": "CONTINUOUS",
"offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min": null}, "offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min": null},
{"id": 21, "index": 1, "name": "b", "units": null, "plottable": true, "display_type": {"id": 359, "index": 1, "name": "Y", "units": null, "plottable": true, "display_type":
"CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min": "CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min":
null}, {"id": 22, "index": 2, "name": "c", "units": null, "plottable": true, null}, {"id": 360, "index": 2, "name": "Z", "units": null, "plottable": true,
"display_type": "CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": "display_type": "CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max":
null, "default_min": null}]}]}], "streams": [{"id": 5, "name": "stream_3_1", null, "default_min": null}], "data_info": {"start": null, "end": null, "rows":
"description": "", "datatype": "UINT16", "keep_us": -1, "decimate": true, 0, "bytes": 0, "total_time": 0}}], "event_streams": [{"id": 1, "name": "events0",
"elements": [{"id": 23, "index": 0, "name": "a", "units": null, "plottable": "description": "", "data_info": {"start": null, "end": null, "event_count":
0, "bytes": 0, "total_time": 0}}, {"id": 2, "name": "events1", "description":
"", "data_info": {"start": 1611421200000000, "end": 1611422190000000, "event_count":
100, "bytes": 0, "total_time": 990000000}}]}, {"id": 5, "name": "sensors",
"description": null, "locked": false, "children": [], "streams": [{"id": 2,
"name": "Gyro", "description": "", "datatype": "float32", "layout": "float32_3",
"keep_us": 604800000000, "is_configured": false, "is_source": false, "is_destination":
false, "locked": false, "active": false, "decimate": true, "elements": [{"id":
355, "index": 0, "name": "Roll", "units": null, "plottable": true, "display_type":
"CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": null, "default_min":
null}, {"id": 356, "index": 1, "name": "Pitch", "units": null, "plottable":
true, "display_type": "CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max": true, "display_type": "CONTINUOUS", "offset": 0.0, "scale_factor": 1.0, "default_max":
null, "default_min": null}, {"id": 24, "index": 1, "name": "b", "units": null, null, "default_min": null}, {"id": 357, "index": 2, "name": "Yaw", "units":
"plottable": true, "display_type": "CONTINUOUS", "offset": 0.0, "scale_factor": null, "plottable": true, "display_type": "CONTINUOUS", "offset": 0.0, "scale_factor":
1.0, "default_max": null, "default_min": null}, {"id": 25, "index": 2, "name": 1.0, "default_max": null, "default_min": null}], "data_info": {"start": null,
"c", "units": null, "plottable": true, "display_type": "CONTINUOUS", "offset": "end": null, "rows": 0, "bytes": 0, "total_time": 0}}], "event_streams": []},
0.0, "scale_factor": 1.0, "default_max": null, "default_min": null}]}]}, {"id": {"id": 6, "name": "event", "description": null, "locked": false, "children":
6, "name": "folder_4", "description": null, "children": [{"id": 7, "name": [], "streams": [], "event_streams": [{"id": 3, "name": "events2", "description":
"folder_4_1", "description": null, "children": [], "streams": []}], "streams": "", "data_info": {"start": null, "end": null, "event_count": 0, "bytes": 0,
[]}], "streams": []}' "total_time": 0}}]}], "streams": [], "event_streams": []}], "streams": [],
http_version: "event_streams": []}'
recorded_at: Wed, 11 Jul 2018 14:58:33 GMT recorded_at: Wed, 03 Mar 2021 18:12:28 GMT
recorded_with: VCR 4.0.0 recorded_with: VCR 6.0.0
require 'rails_helper'
RSpec.describe EventsController, type: :controller do
end
FactoryBot.define do
factory :event_stream do
name { Faker::Lorem.word }
description { Faker::Lorem.sentence }
end
end
require 'rails_helper'
RSpec.describe EventStream, type: :model do
pending "add some examples to (or delete) #{__FILE__}"
end
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe 'ReadEvents' do
let(:nilm) {create(:nilm, name: 'test')}
let(:db) { create(:db, nilm: nilm, max_points_per_plot: 100) }
describe 'when there are multiple event streams' do
before do
db = create(:db, nilm: nilm, url: 'http://test/nilmdb')
@event_stream1 = create(:event_stream, db_folder: db.root_folder, db: db)
@event_stream1_data = {id: @event_stream1.id, valid: true, events: ["event1", "event2"]}
@event_stream2 = create(:event_stream, db_folder: db.root_folder, db: db)
@event_stream2_data = {id: @event_stream2.id, valid: true, events: ["event3", "event4"]}
@mock_adapter = MockAdapter.new(nil,
[{event_stream: @event_stream1, data: @event_stream1_data},
{event_stream: @event_stream2, data: @event_stream2_data}])
allow(NodeAdapterFactory).to receive(:from_nilm).and_return(@mock_adapter)
end
it 'makes one request per stream' do
service = ReadEvents.new
service.run([@event_stream1, @event_stream2],0,100)
expect(service.success?).to be true
expect(service.data).to eq [@event_stream1_data, @event_stream2_data]
expect(@mock_adapter.event_run_count).to eq 2
end
end
describe 'when a nilm does not respond' do
before do
db = create(:db, nilm: nilm, url: 'http://test/nilmdb')
@event_stream1 = create(:event_stream, db_folder: db.root_folder, db: db)
@event_stream1_data = {id: @event_stream1.id, valid: true, events: ["event1", "event2"]}
@event_stream2 = create(:event_stream, db_folder: db.root_folder, db: db)
@mock_adapter = MockAdapter.new(nil,
[{event_stream: @event_stream1, data: @event_stream1_data},
{event_stream: @event_stream2, data: nil}])
allow(NodeAdapterFactory).to receive(:from_nilm).and_return(@mock_adapter)
end
it 'fills in the data that is available' do
service = ReadEvents.new
service.run([@event_stream1, @event_stream2],0,100)
expect(service.warnings.length).to eq 1
expect(service.data).to eq [
@event_stream1_data,
{id: @event_stream2.id, valid: false, events: nil}
]
expect(@mock_adapter.event_run_count).to eq 2
end
end
#NOTE: This is really quite a large integration test, it
#builds the full test nilm and then retrieves events from it.
#might be overkill but it really tests out the pipeline :)
#
describe 'when boundary times are not specified' do
let (:url) {'https://localhost:3030'}
let(:key) {'EuOjCqFd4lpin7U-oPApNiQTReO6HaQUxfnkLZglkYQ'}
let(:user) {create(:user)}
it 'updates the streams', :vcr do
@adapter = Joule::Adapter.new(url, key)
service = CreateNilm.new(@adapter)
service.run(name: 'test', url: url, owner: user, key:key)
events1 = EventStream.find_by_path("/Homes/AB Transients")
events2 = EventStream.find_by_path("/basic/aux/events0")
service = ReadEvents.new
service.run([events1, events2], nil, nil)
#bounds taken from test joule on vagrant instance
# AB Transients: [1564632656344436 - 1564637216855134]
# events 0 - no events
expect(service.start_time).to eq(1564632656344436)
expect(service.end_time).to eq(1564637216855134)
#check the events
expect(service.data.length).to eq 2
if service.data[0][:id] == events1.id
rx_events1 = service.data[0][:events]
rx_events2 = service.data[1][:events]
else
rx_events1 = service.data[1][:events]
rx_events2 = service.data[0][:events]
end
expect(rx_events2.length).to eq 0
expect(rx_events1.length).to eq 21
expect(rx_events1[0][:start_time]).to eq service.start_time
expect(rx_events1[0][:start_time]).to eq service.start_time
expect(rx_events1[-1][:content][:height]).to eq -1028.9400634765625
end
end
end
class MockAdapter class MockAdapter
attr_reader :run_count attr_reader :run_count, :event_run_count
def initialize(dataset) def initialize(dataset=nil, events = nil)
super() super()
@dataset = dataset @dataset = dataset
@events = events
@run_count = 0 @run_count = 0
@event_run_count = 0
end end
def load_data(db_stream, start_time, end_time, elements=[], resolution=nil) def load_data(db_stream, start_time, end_time, elements=[], resolution=nil)
data = @dataset.select{|d| d[:stream]==db_stream}.first[:data] data = @dataset.select{|d| d[:stream]==db_stream}.first[:data]
...@@ -14,4 +16,9 @@ class MockAdapter ...@@ -14,4 +16,9 @@ class MockAdapter
end end
{data: data, decimation_factor: 1} {data: data, decimation_factor: 1}
end end
def read_events(event_stream,start_time, end_time)
data = @events.select{|d| d[:event_stream]==event_stream}.first[:data]
@event_run_count += 1
data
end
end end
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment