Commit ad5796e8 by John Doe

added stream data loader service to retrive and format data from nilms

parent 9d33dd1d
......@@ -129,7 +129,7 @@ GEM
guard (~> 2.0)
rubocop (~> 0.20)
hashdiff (0.3.2)
hashie (3.5.3)
hashie (3.5.5)
httparty (0.14.0)
multi_xml (>= 0.5.2)
i18n (0.8.0)
......@@ -169,9 +169,9 @@ GEM
nenv (~> 0.1)
shellany (~> 0.0)
oj (2.18.1)
omniauth (1.3.2)
hashie (>= 1.2, < 4)
rack (>= 1.0, < 3)
omniauth (1.6.1)
hashie (>= 3.4.6, < 3.6.0)
rack (>= 1.6.2, < 3)
orm_adapter (0.5.0)
parser (2.3.3.1)
ast (~> 2.2)
......
......@@ -4,6 +4,7 @@
class DbAdapter
include HTTParty
default_timeout 10
attr_reader :url
def initialize(url)
@url = url
......@@ -82,6 +83,53 @@ class DbAdapter
__build_stream_metadata(db_stream))
end
def get_count(path, start_time, end_time)
begin
resp = self.class.get("#{@url}/stream/extract",
:query=>{
:path=>path,
:start=>start_time,
:end=>end_time,
:count=>1
})
return nil unless resp.success?
return resp.parsed_response.to_i
rescue
return nil
end
end
def get_data(path, start_time, end_time)
begin
resp = self.class.get("#{@url}/stream/extract",
:query=>{
:path=>path,
:start=>start_time,
:end=>end_time,
:markup=>1
})
return nil unless resp.success?
return __parse_data(resp.parsed_response)
rescue
return nil
end
end
def get_intervals(path, start_time, end_time)
begin
resp = self.class.get("#{@url}/stream/intervals",
:query=>{
:path=>path,
:start=>start_time,
:end=>end_time
})
return nil unless resp.success?
return __parse_intervals(resp.parsed_response)
rescue
return nil
end
end
def _set_path_metadata(path, data)
params = { path: path,
data: data }.to_json
......@@ -149,4 +197,47 @@ class DbAdapter
end
metadata.symbolize_keys
end
# create an array from string response
def __parse_data(resp)
data = []
add_break = false
resp.split("\n").each do |row|
next if row.length==0 #last row is empty (\n)
words = row.split(" ")
#check if this is an interval
if(words[0]=="#")
#this is a comment line, check if it is an interval boundary marker
if(words[1]=="interval-start")
intervalStart=words[2].to_i
end
if(words[1]=="interval-end")
intervalEnd=words[2].to_i
if(intervalEnd!=intervalStart)
add_break = true
end
end
next
end
data.push(nil) if(add_break) #add a data break
add_break = false
#this is a normal row
data.push(words.map(&:to_i))
end
data
end
#create horizontal line segments representing
#the intervals
#
def __parse_intervals(resp)
intervals = JSON.parse('['+resp.chomp().gsub(/\r\n/,',')+']')
data = []
intervals.each do |interval|
data.push([interval[0],0])
data.push([interval[1],0])
data.push(nil) #break up the intervals
end
return data
end
end
# frozen_string_literal: true
# Loads stream data over specified interval
class LoadStreamData
include ServiceStatus
attr_reader :data, :data_type
def initialize(db_adapter)
super()
@db_adapter = db_adapter
@data = []
@data_type = 'unset' # interval, raw, decimated
end
# load data at or below the resolution of the
# associated database, sets data and data_type
#
# sets data and data_type
# data_type: raw
# data:
# [{id: element_id, values: [[ts,y],[ts,y],nil,[ts,y]]},...]
# data_type: interval
# data:
# [{id: element_id, values: [[start,0],[end,0],nil,...]}]
# data_type: decimated
# data:
# [{id: element_id, values: [[ts,y,ymin,ymax],[ts,y,ymin,ymax],nil,...]}]
#
def run(db_stream, start_time, end_time)
resolution = db_stream.db.max_points_per_plot
valid_decim = findValidDecimationLevel(db_stream, start_time)
# valid_decim is the highest resolution, find one we can plot
plottable_decim = findPlottableDecimationLevel(
db_stream, valid_decim, start_time, end_time, resolution
)
if plottable_decim.nil?
# data is not sufficiently decimated, get intervals from
# the valid decimation level (highest resolution)
path = __build_path(db_stream, valid_decim.level)
resp = @db_adapter.get_intervals(path, start_time, end_time)
@data_type = 'interval'
@data = __build_interval_data(db_stream, resp)
return self
end
# request is plottable, see if we can get the raw (level 1) data
path = __build_path(db_stream, plottable_decim.level)
resp = @db_adapter.get_data(path, start_time, end_time)
if resp.nil?
add_error("cannot get data for [#{path}] @ #{@db_adapter.url}")
return self
end
if plottable_decim.level == 1
@data_type = 'raw'
@data = __build_raw_data(db_stream, resp)
else
@data_type = 'decimated'
@data = __build_decimated_data(db_stream, resp)
end
end
#===Description
# Given a starting decimation level and time interval
# find a decimation level that meets the target resolution
#===Parameters
# * +db_stream+ - DbStream object
# * +start_time+ - unix timestamp in us
#
# returns: +decimation_level+ - DecimationLevel object
# *NOTE:* if the data is too high resolution to request
# (data is not sufficiently decimated),
# the decimation level will be 0
#
def findPlottableDecimationLevel(
db_stream, valid_decim, start_time, end_time, _resolution
)
path = db_stream.path
path += "~decim-#{valid_decim.level}" if valid_decim.level > 1
# figure out how much data this stream has over the interval
count = @db_adapter.get_count(path, start_time, end_time)
if count.nil?
add_error("cannot get count for [#{path}] @ #{@db_adapter.url}")
return nil
end
# find out how much raw data exists over the specified interval
raw_count = count * valid_decim.level
# now we can find the right decimation level for plotting
max_count = db_stream.db.max_points_per_plot
# if the valid decim can be plotted, use it
return valid_decim if raw_count <= max_count
# otherwise look for a higher decimation level
found_valid_decim = false
db_stream.db_decimations
.where('level >= ?', valid_decim.level)
.order(:level)
.each do |decim|
if raw_count / decim.level <= max_count
# the lowest decimation level is the best
return decim
end
end
# all of the decimations have too much data
# no plottable decimation exists
nil
end
#===Description
# Given the plot resolution and time interval, find the decimation
# level with the highest resolution data possible. This means
# find highest resolution stream that has a start_time before
# the specified start_time
#===Parameters
# * +db_stream+ - DbStream object
# * +start_time+ - unix timestamp in us
#
# returns: +decimation_level+ - DecimationLevel object
#
def findValidDecimationLevel(db_stream, start_time)
# assume raw stream is a valid level (best resolution)
validDecim = DbDecimation.new(level: 1)
# check if raw stream has the data
if !db_stream.start_time.nil? &&
db_stream.start_time <= start_time
return validDecim
end
# keep track of the level thats missing the least data, this will be used
# if no level can be found with all the data
min_gap = db_stream.start_time - start_time
db_stream.db_decimations.order(:level).each do |decim|
# skip empty decimation levels
next if decim.start_time.nil? || decim.end_time.nil?
# the first (lowest) level with data over the interval is the best answer
return decim if decim.start_time <= start_time
# this level doesn't contain all the requested data, see how much its missing
gap = decim.start_time - start_time
if min_gap.nil? || gap < min_gap
min_gap = gap
validDecim = decim
end
end
validDecim
end
def __build_path(db_stream, level)
return db_stream.path if level == 1
"#{db_stream.path}~decim-#{level}"
end
def __build_raw_data(db_stream, resp)
elements = db_stream.db_elements.order(:column)
data = elements.map { |e| { id: e.id, values: [] } }
resp.each do |row|
if row.nil? # add an interval break to all the elements
data.each { |d| d[:values].push(nil) }
next
end
ts = row[0]
elements.each_with_index do |elem, i|
data[i][:values].push([ts, __scale_value(row[1+i],elem)])
end
end
return data
end
def __build_decimated_data(db_stream, resp)
elements = db_stream.db_elements.order(:column)
data = elements.map { |e| { id: e.id, values: [] } }
resp.each do |row|
if row.nil? # add an interval break to all the elements
data.each { |d| d[:values].push(nil) }
next
end
ts = row[0]
elements.each_with_index do |elem, i|
mean_offset = 0
min_offset = elements.length
max_offset = elements.length*2
mean = __scale_value(row[1+i+mean_offset],elem)
min = __scale_value(row[1+i+min_offset], elem)
max = __scale_value(row[1+i+max_offset], elem)
tmp_min = [min,max].min
max = [min,max].max
min = tmp_min
data[i][:values].push([ts,mean,min,max])
end
end
return data
end
def __build_interval_data(db_stream, resp)
elements = db_stream.db_elements.order(:column)
elements.map { |e| { id: e.id, values: resp } }
end
def __scale_value(value,element)
(value.to_f-element.offset)*element.scale_factor
end
end
......@@ -13,7 +13,7 @@ module ControlPanel
# -- all .rb files in that directory are automatically loaded.
config.api_only = true
# Add folders under the services and adapters directory
%w(nilm db db_folder db_stream permission user_group).each do |service|
%w(data nilm db db_folder db_stream permission user_group).each do |service|
config.autoload_paths << Rails.root.join("app/services/#{service}")
end
config.autoload_paths << Rails.root.join("app/adapters")
......
......@@ -51,4 +51,59 @@ describe DbAdapter do
end
end
describe 'get_count' do
it 'returns number of elements in path over interval', :vcr do
adapter = DbAdapter.new(url)
start_time = 1361546159000000
end_time = 1361577615684742
path = '/tutorial/pump-events'
raw_count = adapter.get_count(path,start_time, end_time)
lvl4_count = adapter.get_count(path+"~decim-4",start_time, end_time)
expect(raw_count>0).to be true
expect(raw_count/4).to eq(lvl4_count)
end
it 'returns nil on server failure', :vcr do
adapter = DbAdapter.new(url)
start_time = 1361546159000000
end_time = 1361577615684742
path = '/path/does/not/exist'
count = adapter.get_count(path,start_time, end_time)
expect(count).to be nil
end
end
describe 'get_data' do
it 'returns array of data over interval', :vcr do
adapter = DbAdapter.new(url)
start_time = 1361546159000000
end_time = 1361577615684742
path = '/tutorial/pump-events'
raw_data = adapter.get_data(path,start_time, end_time)
lvl4_data = adapter.get_data(path+"~decim-4",start_time, end_time)
expect(raw_data.length>0).to be true
expect(raw_data.length/4).to eq(lvl4_data.length)
end
it 'adds nil to indicate interval breaks', :vcr do
adapter = DbAdapter.new(url)
start_time = 1361466001000000
end_time = 1361577615684742
path = '/tutorial/pump-events'
data = adapter.get_data(path,start_time, end_time)
expect(data.length>0).to be true
num_intervals = data.select{|elem| elem==nil}.length
expect(num_intervals).to eq 1
end
end
describe 'get_intervals' do
it 'returns array of interval line segments', :vcr do
adapter = DbAdapter.new(url)
start_time = 1360017784000000
end_time = 1361579612066315
path = '/tutorial/pump-events'
intervals = adapter.get_intervals(path,start_time, end_time)
expect(intervals.length).to eq(60) #20 intervals
end
end
end
# frozen_string_literal: true
# Mock class to test clients
class MockDataDbAdapter
attr_reader :url
def initialize(start_time:, end_time:, raw_count:, data:)
@start_time = start_time
@end_time = end_time
@raw_count = raw_count
@data = data
@last_path = nil
@url = "http://mockadapter/nilmdb"
end
def get_data(path, start_time, end_time)
#as long as start and end time are within
#bounds return the 'data'
@last_path = path
if(end_time<@start_time ||
start_time>@end_time)
return []
end
@last_path = path
return @data
end
def get_count(path, start_time, end_time)
#as long as start and end time are within
#bounds return raw_count/decim level
if(end_time<@start_time ||
start_time>@end_time)
return 0
end
matches = /-(\d+)$/.match(path)
#raw stream, return raw_count
return @raw_count if(matches==nil)
#decimated return what would be left
level = matches[1].to_i
return @raw_count/level
end
def get_intervals(path, start_time, end_time)
#as long as start and end time are within
#bounds return intervals
if(end_time<@start_time ||
start_time>@end_time)
return 0
end
@last_path = path
return @data
end
def level_retrieved()
return nil if @last_path==nil
matches = /-(\d+)$/.match(@last_path)
return 1 if matches == nil
return matches[1].to_i
end
end
......@@ -2,5 +2,6 @@
FactoryGirl.define do
factory :db_decimation do
end
end
......@@ -16,12 +16,20 @@ FactoryGirl.define do
transient do
elements_count 4
decimations_count 0
end
after(:create) do |stream, evaluator|
create_list(:db_element,
evaluator.elements_count,
db_stream: stream)
evaluator.decimations_count.times do |x|
create(:db_decimation,
db_stream: stream,
start_time: stream.start_time,
end_time: stream.end_time,
level: 4**(x+1))
end
end
end
end
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe 'LoadStreamData' do
let(:db) { create(:db, max_points_per_plot: 100) }
describe 'with large datasets' do
describe 'when the data is decimated' do
before do
@data = [[98,0,1,2,-1,0,1,1,2,3],
nil,
[99,0,1,2,-1,0,1,1,2,3]]
@db_stream = create(:db_stream, elements_count: 0,
db: db, decimations_count: 3, # lvl64
start_time: 0, end_time: 100)
#create the db elements
3.times do |i|
@db_stream.db_elements << create(:db_element,
column: i, offset: i+1, scale_factor:i+2)
end
@mockAdapter = MockDataDbAdapter.new(
start_time: @db_stream.start_time,
end_time: @db_stream.end_time,
raw_count: 1600, data: @data
)
@service = LoadStreamData.new(@mockAdapter)
end
it 'sets @type to [decimated]' do
@service.run(@db_stream, 10, 90)
expect(@service.success?).to be true
expect(@service.data_type).to eq('decimated')
end
it 'finds appropriate level based on nilm resolution' do
# expect level 16 decimation to meet plotting requirements
@service.run(@db_stream, 10, 90)
expect(@mockAdapter.level_retrieved).to eq(16)
# with higher resolution setting, level 4 should meet requirements
db.max_points_per_plot = 425; db.save
@service.run(@db_stream, 10, 90)
expect(@mockAdapter.level_retrieved).to eq(4)
# with lower resolution setting, level 64 should meet requirements
db.max_points_per_plot = 26; db.save
@service.run(@db_stream, 10, 90)
expect(@mockAdapter.level_retrieved).to eq(64)
end
it 'populates @data structure with decimated data' do
@service.run(@db_stream, 10, 90)
@service.data.each_with_index do |data,i|
elem = @db_stream.db_elements.find_by_column(i)
expect(data[:id]).to eq elem.id
mean = __scale_value(i,elem)
min = __scale_value(i-1,elem)
max = __scale_value(i+1,elem)
expect(data[:values]).to eq([[98,mean,min,max],
nil,
[99,mean,min,max]])
end
end
end
describe 'when the data is not decimated' do
before do
@data = [[98,0],[99,0],
nil,
[110,0],[115,0]]
@db_stream = create(:db_stream, elements_count: 0,
db: db, decimations_count: 1, # lvl4
start_time: 0, end_time: 100)
#create the db elements
3.times do |i|
@db_stream.db_elements << create(:db_element,
column: i, offset: i+1, scale_factor:i+2)
end
@mockAdapter = MockDataDbAdapter.new(
start_time: @db_stream.start_time,
end_time: @db_stream.end_time,
raw_count: 1000, data: @data
)
@service = LoadStreamData.new(@mockAdapter)
end
it 'sets @type to [interval] if all decimations have too much data' do
@service.run(@db_stream, 10, 90)
expect(@service.success?).to be true
expect(@service.data_type).to eq('interval')
expect(@mockAdapter.level_retrieved).to be 1
end
it 'sets @data to intervals' do
@service.run(@db_stream, 10, 90)
@service.data.each_with_index do |data,i|
elem = @db_stream.db_elements.find_by_column(i)
expect(data[:id]).to eq elem.id
expect(data[:values]).to eq(@data)
end
end
end
end
describe 'with small datasets' do
before do
@data = [[98,0,1,2],nil,[99,0,1,2]]
@db_stream = create(:db_stream, elements_count: 0,
db: db, decimations_count: 3, # lvl64
start_time: 0, end_time: 100)
#create the db elements
3.times do |i|
@db_stream.db_elements << create(:db_element,
column: i, offset: i+1, scale_factor:i+2)
end
@mockAdapter = MockDataDbAdapter.new(
start_time: @db_stream.start_time,
end_time: @db_stream.end_time,
raw_count: 100, data: @data
)
@service = LoadStreamData.new(@mockAdapter)
end
it 'sets @type to [raw]' do
@service.run(@db_stream, 10, 90)
expect(@service.success?).to be true
expect(@service.data_type).to eq('raw')
expect(@mockAdapter.level_retrieved).to eq(1)
end
it 'only if count <= nilm resolution over interval' do
db.max_points_per_plot = 90; db.save
@service.run(@db_stream, 10, 90)
expect(@mockAdapter.level_retrieved).to be > 1
end
it 'populates @data structure with raw data' do
@service.run(@db_stream, 10, 90)
@service.data.each_with_index do |data,i|
elem = @db_stream.db_elements.find_by_column(i)
expect(data[:id]).to eq elem.id
expect(data[:values]).to eq([[98,(i-elem.offset)*elem.scale_factor],
nil,
[99,(i-elem.offset)*elem.scale_factor]])
end
end
end
describe 'when data is not present' do
before do
@data = [1, 2, 3]
@db_stream = create(:db_stream, elements_count: 0,
db: db, decimations_count: 4, # lvl64
start_time: 100, end_time: 200)
#create the db elements
3.times do |i|
@db_stream.db_elements << create(:db_element,
column: i, offset: i+1, scale_factor:i+2)
end
@mockAdapter = MockDataDbAdapter.new(
start_time: @db_stream.start_time,
end_time: @db_stream.end_time,
raw_count: 400, data: @data
)
@service = LoadStreamData.new(@mockAdapter)
end
it 'still succeeds' do
#requested interval is before actual data
@service.run(@db_stream, 0, 50)
expect(@service.success?).to be true
expect(@service.data_type).to eq('raw')
expect(@mockAdapter.level_retrieved).to eq 1
#all data is after request
end
it 'sets data values to empty arrays' do
@service.run(@db_stream, 0, 50)
@service.data.each_with_index do |data,i|
elem = @db_stream.db_elements.find_by_column(i)
expect(data[:id]).to eq elem.id
expect(data[:values]).to eq([])
end
end
end
end
def __scale_value(value,element)
(value.to_f-element.offset)*element.scale_factor
end
......@@ -24,7 +24,7 @@ require 'webmock/rspec'
RSpec.configure do |config|
config.filter_run_excluding :broken => true
#config.filter_run_excluding :broken => true
# rspec-expectations config goes here. You can use an alternate
# assertion/expectation library such as wrong or the stdlib/minitest
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment