Commit 2f731e42 by John Donnal

added support for event filters

parent fe8d739c
...@@ -106,8 +106,8 @@ module Joule ...@@ -106,8 +106,8 @@ module Joule
# === END ANNOTATIONS === # === END ANNOTATIONS ===
# === BEGIN EVENTS === # === BEGIN EVENTS ===
def read_events(stream, max_events, start_time, end_time) def read_events(stream, max_events, start_time, end_time, filter)
result = @backend.read_events(stream.joule_id, max_events, start_time, end_time) result = @backend.read_events(stream.joule_id, max_events, start_time, end_time, filter)
{id: stream.id, valid: true, count: result[:count], events: result[:events]} {id: stream.id, valid: true, count: result[:count], events: result[:events]}
end end
......
...@@ -215,10 +215,13 @@ module Joule ...@@ -215,10 +215,13 @@ module Joule
end end
# === EVENT METHODS === # === EVENT METHODS ===
def read_events(stream_id, start_time, end_time) def read_events(stream_id, max_events, start_time, end_time, filter)
query = {'id': stream_id} query = {'id': stream_id}
query['start'] = start_time unless start_time.nil? query['start'] = start_time unless start_time.nil?
query['end'] = end_time unless end_time.nil? query['end'] = end_time unless end_time.nil?
query['limit'] = max_events
query['filter'] = filter.to_json unless filter.nil?
options = {query: query} options = {query: query}
begin begin
resp = self.class.get("#{@url}/event/data.json", options) resp = self.class.get("#{@url}/event/data.json", options)
...@@ -226,7 +229,14 @@ module Joule ...@@ -226,7 +229,14 @@ module Joule
rescue rescue
raise "connection error" raise "connection error"
end end
resp.parsed_response.map{|event| event.deep_symbolize_keys} if resp.parsed_response.is_a?(Hash)
resp.parsed_response.deep_symbolize_keys!
resp
else # backwards compatibility
events = resp.parsed_response.map{|event| event.deep_symbolize_keys}
count = if events.nil? then 0 else events.length end
{count: count, events: events}
end
end end
# === ANNOTATION METHODS === # === ANNOTATION METHODS ===
def create_annotation(annotation) def create_annotation(annotation)
...@@ -260,29 +270,6 @@ module Joule ...@@ -260,29 +270,6 @@ module Joule
resp.parsed_response resp.parsed_response
end end
# === EVENT METHODS ===
def read_events(stream_id, max_events, start_time, end_time)
query = {'id': stream_id}
query['start'] = start_time unless start_time.nil?
query['end'] = end_time unless end_time.nil?
query['limit'] = max_events
options = {query: query}
begin
resp = self.class.get("#{@url}/event/data.json", options)
raise "error reading events #{resp.body}" unless resp.success?
rescue
raise "connection error"
end
if resp.parsed_response.is_a?(Hash)
resp.parsed_response.deep_symbolize_keys!
resp
else # backwards compatibility
events = resp.parsed_response.map{|event| event.deep_symbolize_keys}
count = if events.nil? then 0 else events.length end
{count: count, events: events}
end
end
def delete_annotation(annotation_id) def delete_annotation(annotation_id)
query = {'id': annotation_id} query = {'id': annotation_id}
options = {query: query} options = {query: query}
......
...@@ -15,10 +15,18 @@ class EventsController < ApplicationController ...@@ -15,10 +15,18 @@ class EventsController < ApplicationController
end end
def data def data
req_streams = EventStream.find(JSON.parse(params[:streams])) # streams parameters is a JSON array
# [{id, filter},{id, filter},...]
stream_param = JSON.parse(params[:streams])
req_streams = stream_param.map do |param|
{ stream: EventStream.find(param["id"]),
filter: param["filter"],
tag: param["tag"]
}
end
# make sure the user is allowed to view these elements # make sure the user is allowed to view these elements
req_streams.each do |stream| req_streams.each do |req_stream|
unless current_user.views_nilm?(stream.db.nilm) unless current_user.views_nilm?(req_stream[:stream].db.nilm)
head :unauthorized head :unauthorized
return return
end end
......
...@@ -10,7 +10,8 @@ class ReadEvents ...@@ -10,7 +10,8 @@ class ReadEvents
end end
def run(event_streams, start_time, end_time) def run(requested_streams, start_time, end_time)
# requested_streams is an array [{stream: EventStream, filter: array},...]
@start_time = start_time @start_time = start_time
@end_time = end_time @end_time = end_time
if (not @start_time.nil?) and (not @end_time.nil?) and (@start_time > @end_time) if (not @start_time.nil?) and (not @end_time.nil?) and (@start_time > @end_time)
...@@ -19,13 +20,21 @@ class ReadEvents ...@@ -19,13 +20,21 @@ class ReadEvents
end end
# pull data from streams # pull data from streams
@data = [] @data = []
event_streams.each do |stream| requested_streams.each do |requested_stream|
stream = requested_stream[:stream]
filter = requested_stream[:filter]
tag = requested_stream[:tag]
adapter = NodeAdapterFactory.from_nilm(stream.db.nilm) adapter = NodeAdapterFactory.from_nilm(stream.db.nilm)
result = adapter.read_events(stream, stream.db.max_events_per_plot, @start_time, @end_time) result = adapter.read_events(stream,
stream.db.max_events_per_plot,
@start_time, @end_time,
filter)
if not result.nil? if not result.nil?
result[:tag] = tag
@data.append(result) @data.append(result)
else else
@data.append({id: stream.id, valid: false, count: 0, events: nil}) @data.append({id: stream.id, valid: false, count: 0, events: nil,
tag: tag})
add_warning("unable to retrieve events for #{stream.path}") add_warning("unable to retrieve events for #{stream.path}")
end end
end end
......
...@@ -4,6 +4,7 @@ json.data do ...@@ -4,6 +4,7 @@ json.data do
json.valid event_stream[:valid] json.valid event_stream[:valid]
json.count event_stream[:count] json.count event_stream[:count]
json.events event_stream[:events] json.events event_stream[:events]
json.tag event_stream[:tag]
json.start_time @start_time json.start_time @start_time
json.end_time @end_time json.end_time @end_time
end end
......
...@@ -17,7 +17,7 @@ describe Joule::Adapter do ...@@ -17,7 +17,7 @@ describe Joule::Adapter do
nilm = FactoryBot.create(:nilm, name: "test") nilm = FactoryBot.create(:nilm, name: "test")
stream = FactoryBot.create(:event_stream, db: nilm.db, db_folder: nilm.db.root_folder, stream = FactoryBot.create(:event_stream, db: nilm.db, db_folder: nilm.db.root_folder,
name: 'test_stream') name: 'test_stream')
result = adapter.read_events(stream, 200, 1611421200000000, 1611421230000000) result = adapter.read_events(stream, 200, 1611421200000000, 1611421230000000, [])
expect(result[:id]).to eq stream.id expect(result[:id]).to eq stream.id
expect(result[:valid]).to be true expect(result[:valid]).to be true
expect(result[:events].length).to eq 4 expect(result[:events].length).to eq 4
......
...@@ -55,7 +55,7 @@ describe Joule::Backend do ...@@ -55,7 +55,7 @@ describe Joule::Backend do
backend = Joule::Backend.new(url, key) backend = Joule::Backend.new(url, key)
start_time = 1611421210000000 start_time = 1611421210000000
end_time = 1611421235000000 end_time = 1611421235000000
result = backend.read_events(2, 200, start_time, end_time) result = backend.read_events(2, 200, start_time, end_time, [])
# should have 3 events # should have 3 events
expect(result[:count]).to eq 3 expect(result[:count]).to eq 3
expect(result[:events].length).to eq 3 expect(result[:events].length).to eq 3
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
http_interactions: http_interactions:
- request: - request:
method: get method: get
uri: https://127.0.0.1:3030/event/data.json?end=1611421235000000&id=2&limit=200&start=1611421210000000 uri: https://127.0.0.1:3030/event/data.json?end=1611421235000000&filter=%5B%5D&id=2&limit=200&start=1611421210000000
body: body:
encoding: US-ASCII encoding: US-ASCII
string: '' string: ''
......
...@@ -289,7 +289,7 @@ http_interactions: ...@@ -289,7 +289,7 @@ http_interactions:
recorded_at: Mon, 08 Mar 2021 21:26:43 GMT recorded_at: Mon, 08 Mar 2021 21:26:43 GMT
- request: - request:
method: get method: get
uri: https://localhost:3030/event/data.json?id=4&limit=200 uri: https://localhost:3030/event/data.json?filter=%5B%5D&id=4&limit=200
body: body:
encoding: US-ASCII encoding: US-ASCII
string: '' string: ''
...@@ -348,7 +348,7 @@ http_interactions: ...@@ -348,7 +348,7 @@ http_interactions:
recorded_at: Mon, 08 Mar 2021 21:26:52 GMT recorded_at: Mon, 08 Mar 2021 21:26:52 GMT
- request: - request:
method: get method: get
uri: https://localhost:3030/event/data.json?id=1&limit=200 uri: https://localhost:3030/event/data.json?filter=%5B%5D&id=1&limit=200
body: body:
encoding: US-ASCII encoding: US-ASCII
string: '' string: ''
......
...@@ -22,7 +22,10 @@ RSpec.describe 'ReadEvents' do ...@@ -22,7 +22,10 @@ RSpec.describe 'ReadEvents' do
end end
it 'makes one request per stream' do it 'makes one request per stream' do
service = ReadEvents.new service = ReadEvents.new
service.run([@event_stream1, @event_stream2],0,100) service.run([{
stream: @event_stream1, filter: []},
stream: @event_stream2, filter:[]],
0,100)
expect(service.success?).to be true expect(service.success?).to be true
expect(service.data).to eq [@event_stream1_data, @event_stream2_data] expect(service.data).to eq [@event_stream1_data, @event_stream2_data]
expect(@mock_adapter.event_run_count).to eq 2 expect(@mock_adapter.event_run_count).to eq 2
...@@ -44,11 +47,12 @@ RSpec.describe 'ReadEvents' do ...@@ -44,11 +47,12 @@ RSpec.describe 'ReadEvents' do
end end
it 'fills in the data that is available' do it 'fills in the data that is available' do
service = ReadEvents.new service = ReadEvents.new
service.run([@event_stream1, @event_stream2],0,100) service.run([{stream: @event_stream1, filter: []},
stream: @event_stream2, filter:[]],0,100)
expect(service.warnings.length).to eq 1 expect(service.warnings.length).to eq 1
expect(service.data).to eq [ expect(service.data).to eq [
@event_stream1_data, @event_stream1_data,
{id: @event_stream2.id, valid: false, events: nil, count: 0} {id: @event_stream2.id, valid: false, tag: nil, events: nil, count: 0}
] ]
expect(@mock_adapter.event_run_count).to eq 2 expect(@mock_adapter.event_run_count).to eq 2
end end
...@@ -71,7 +75,7 @@ RSpec.describe 'ReadEvents' do ...@@ -71,7 +75,7 @@ RSpec.describe 'ReadEvents' do
events1 = EventStream.find_by_path("/Homes/AB Transients") events1 = EventStream.find_by_path("/Homes/AB Transients")
events2 = EventStream.find_by_path("/basic/aux/events0") events2 = EventStream.find_by_path("/basic/aux/events0")
service = ReadEvents.new service = ReadEvents.new
service.run([events1, events2], nil, nil) service.run([{stream:events1, filter:[]}, {stream:events2, filter:[]}], nil, nil)
#bounds taken from test joule on vagrant instance #bounds taken from test joule on vagrant instance
# AB Transients: [1564632656344436 - 1564637216855134] # AB Transients: [1564632656344436 - 1564637216855134]
# events 0 - no events # events 0 - no events
......
...@@ -16,7 +16,7 @@ class MockAdapter ...@@ -16,7 +16,7 @@ class MockAdapter
end end
{data: data, decimation_factor: 1} {data: data, decimation_factor: 1}
end end
def read_events(event_stream,max_events, start_time, end_time) def read_events(event_stream,max_events, start_time, end_time, filter)
data = @events.select{|d| d[:event_stream]==event_stream}.first[:data] data = @events.select{|d| d[:event_stream]==event_stream}.first[:data]
@event_run_count += 1 @event_run_count += 1
data data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment