Commit bebc3a32 by John Doe

optimized data retrieval by omitting stream update, rely on nilmdb or joule to…

optimized data retrieval by omitting stream update, rely on nilmdb or joule to auto compute time bounds
parent b778046a
...@@ -110,13 +110,11 @@ module Nilmdb ...@@ -110,13 +110,11 @@ module Nilmdb
end end
def get_count(path, start_time, end_time) def get_count(path, start_time, end_time)
query = {path: path, count: 1}
query[:start] = start_time unless start_time.nil?
query[:end] = end_time unless end_time.nil?
resp = self.class.get("#{@url}/stream/extract", resp = self.class.get("#{@url}/stream/extract",
query: { query: query)
path: path,
start: start_time,
end: end_time,
count: 1
})
return nil unless resp.success? return nil unless resp.success?
return resp.parsed_response.to_i return resp.parsed_response.to_i
rescue rescue
...@@ -124,13 +122,11 @@ module Nilmdb ...@@ -124,13 +122,11 @@ module Nilmdb
end end
def get_data(path, start_time, end_time) def get_data(path, start_time, end_time)
query = {path: path, markup: 1}
query[:start] = start_time unless start_time.nil?
query[:end] = end_time unless end_time.nil?
resp = self.class.get("#{@url}/stream/extract", resp = self.class.get("#{@url}/stream/extract",
query: { query: query)
path: path,
start: start_time,
end: end_time,
markup: 1
})
return nil unless resp.success? return nil unless resp.success?
return __parse_data(resp.parsed_response) return __parse_data(resp.parsed_response)
rescue rescue
...@@ -138,12 +134,11 @@ module Nilmdb ...@@ -138,12 +134,11 @@ module Nilmdb
end end
def get_intervals(path, start_time, end_time) def get_intervals(path, start_time, end_time)
query = {path: path}
query[:start] = start_time unless start_time.nil?
query[:end] = end_time unless end_time.nil?
resp = self.class.get("#{@url}/stream/intervals", resp = self.class.get("#{@url}/stream/intervals",
query: { query: query)
path: path,
start: start_time,
end: end_time
})
return nil unless resp.success? return nil unless resp.success?
return __parse_intervals(resp.parsed_response) return __parse_intervals(resp.parsed_response)
rescue rescue
......
...@@ -48,12 +48,38 @@ module Nilmdb ...@@ -48,12 +48,38 @@ module Nilmdb
else else
[db_stream.db.max_points_per_plot,resolution].min [db_stream.db.max_points_per_plot,resolution].min
end end
valid_decim = findValidDecimationLevel(db_stream, start_time)
# --------- MODIFIED FOR SPEED & COMPATIBILITY WITH JOULE -------------
# ASSUME ALL DATA IS IN RAW AND DECIMATIONS ARE x4
# This means we don't need to know the start and end bounds of the data
# ------------- omitted ------------------
# valid_decim = findValidDecimationLevel(db_stream, start_time)
# valid_decim is the highest resolution, find one we can plot # valid_decim is the highest resolution, find one we can plot
plottable_decim = findPlottableDecimationLevel( # plottable_decim = findPlottableDecimationLevel(
db_stream, valid_decim, start_time, end_time, resolution # db_stream, valid_decim, start_time, end_time, resolution
) # )
# -------------- end omitted --------------
valid_decim = DbDecimation.new(level: 1)
plottable_decim = DbDecimation.new(level: 1)
count = @db_backend.get_count(db_stream.path, start_time, end_time)
if count.nil?
add_error("cannot get count for [#{db_stream.path}] @ #{@db_backend.url}")
return self
end
decim_level = _compute_decimation_level(count, resolution)
if decim_level > 1
count = @db_backend.get_count(db_stream.path+"~decim-#{decim_level}" , start_time, end_time)
# count will be nil if the decimation level does not exist
if (not count.nil?) and (count <= resolution) and (count > 0)
plottable_decim = DbDecimation.new(level: decim_level)
else
plottable_decim = nil
end
end
# ---------- END MODIFICATION ---------------------------
if plottable_decim.nil? if plottable_decim.nil?
# check if its nil becuase the nilm isn't available # check if its nil becuase the nilm isn't available
...@@ -91,6 +117,14 @@ module Nilmdb ...@@ -91,6 +117,14 @@ module Nilmdb
self self
end end
# Compute decimation level assuming x4 levels
def _compute_decimation_level(count, resolution)
return 1 if count == 0
desired_decimation = (Float(count)/resolution).ceil
4 ** (Math.log(desired_decimation)/Math.log(4.0)).ceil
end
#===Description #===Description
# Given a starting decimation level and time interval # Given a starting decimation level and time interval
# find a decimation level that meets the target resolution # find a decimation level that meets the target resolution
......
...@@ -33,37 +33,43 @@ class LoadElementData ...@@ -33,37 +33,43 @@ class LoadElementData
req_streams << elem.db_stream req_streams << elem.db_stream
end end
end end
#2 compute bounds by updating stream info if start/end are missing
if start_time==nil || end_time==nil
req_streams.map do |stream|
adapter = NodeAdapterFactory.from_nilm(stream.db.nilm)
if adapter.nil?
add_error("cannot contact installation")
return self
end
adapter.refresh_stream(stream)
end
end
#3 compute start and end times if nil # ----------- REMOVED FOR SPEED, NOT NECESSARY ------------------------
streams_with_data = req_streams.select{|stream| stream.total_time > 0} # #2 compute bounds by updating stream info if start/end are missing
if (start_time == nil || end_time == nil) && streams_with_data.empty? # if start_time==nil || end_time==nil
add_error("no time bounds for requested elements, refresh database?") # req_streams.map do |stream|
return self # adapter = NodeAdapterFactory.from_nilm(stream.db.nilm)
end # if adapter.nil?
# add_error("cannot contact installation")
# return self
# end
# adapter.refresh_stream(stream)
# end
# end
#
# #3 compute start and end times if nil
# streams_with_data = req_streams.select{|stream| stream.total_time > 0}
# if (start_time == nil || end_time == nil) && streams_with_data.empty?
# add_error("no time bounds for requested elements, refresh database?")
# return self
# end
# @start_time = start_time
# @end_time = end_time
# if start_time == nil
# @start_time = streams_with_data
# .sort_by{|x| x.start_time}
# .first.start_time
# end
# if end_time == nil
# @end_time = streams_with_data
# .sort_by{|x| -1*x.end_time}
# .first.end_time
# end
# ------------------------- END MODIFICATION ------------------------------
@start_time = start_time @start_time = start_time
@end_time = end_time @end_time = end_time
if start_time == nil if (not @start_time.nil?) and (not @end_time.nil?) and (@start_time > @end_time)
@start_time = streams_with_data
.sort_by{|x| x.start_time}
.first.start_time
end
if end_time == nil
@end_time = streams_with_data
.sort_by{|x| -1*x.end_time}
.first.end_time
end
if @start_time > @end_time
add_error("invalid time bounds") add_error("invalid time bounds")
return self return self
end end
......
...@@ -4,6 +4,21 @@ require 'rails_helper' ...@@ -4,6 +4,21 @@ require 'rails_helper'
RSpec.describe 'LoadStreamData' do RSpec.describe 'LoadStreamData' do
let(:db) { create(:db, max_points_per_plot: 100) } let(:db) { create(:db, max_points_per_plot: 100) }
describe 'rapid decimation algorithm' do
it 'computes decimation level' do
@server = Nilmdb::LoadStreamData.new(nil)
examples = [
{count: 0, resolution: 100, level: 1},
{count: 100, resolution: 500, level: 1},
{count: 1000, resolution: 500, level: 4},
{count: 10*64, resolution: 10, level: 64},
{count: 10*64+1, resolution: 10, level: 64*4}]
examples.each do |example|
level = @server._compute_decimation_level(example[:count],example[:resolution])
expect(level).to eq example[:level]
end
end
end
describe 'with large datasets' do describe 'with large datasets' do
describe 'when the data is decimated' do describe 'when the data is decimated' do
before do before do
...@@ -68,7 +83,7 @@ RSpec.describe 'LoadStreamData' do ...@@ -68,7 +83,7 @@ RSpec.describe 'LoadStreamData' do
@service.data.each_with_index do |data,i| @service.data.each_with_index do |data,i|
elem = @db_stream.db_elements.find_by_column(i) elem = @db_stream.db_elements.find_by_column(i)
expect(data[:id]).to eq elem.id expect(data[:id]).to eq elem.id
if(elem.display_type=="discrete" || elem.display_type=="continuous") if elem.display_type=="discrete" || elem.display_type=="continuous"
d_count += 1 d_count += 1
mean = __scale_value(i,elem) mean = __scale_value(i,elem)
min = __scale_value(i-1,elem) min = __scale_value(i-1,elem)
...@@ -101,6 +116,7 @@ RSpec.describe 'LoadStreamData' do ...@@ -101,6 +116,7 @@ RSpec.describe 'LoadStreamData' do
column: i, offset: i+1, scale_factor:i+2) column: i, offset: i+1, scale_factor:i+2)
end end
@mockAdapter = MockDataDbAdapter.new( @mockAdapter = MockDataDbAdapter.new(
decimations=[1],
start_time: @db_stream.start_time, start_time: @db_stream.start_time,
end_time: @db_stream.end_time, end_time: @db_stream.end_time,
raw_count: 1000, data: @data raw_count: 1000, data: @data
......
...@@ -4,57 +4,86 @@ ...@@ -4,57 +4,86 @@
class MockDataDbAdapter class MockDataDbAdapter
attr_reader :url attr_reader :url
def initialize(start_time:, end_time:, raw_count:, data:) def initialize(decimations=nil, start_time:, end_time:, raw_count:, data:)
@start_time = start_time @start_time = start_time
@end_time = end_time @end_time = end_time
@raw_count = raw_count @raw_count = raw_count
@data = data @data = data
@last_path = nil @last_path = nil
@url = "http://mockbackend/nilmdb" @url = "http://mockbackend/nilmdb"
# set to an array of valid decimation levels, if left blank assume all decimations exist
@decimations = decimations
end end
def get_data(path, start_time, end_time) def get_data(path, start_time, end_time)
#as long as start and end time are within #as long as start and end time are within
#bounds return the 'data' #bounds return the 'data'
@last_path = path @last_path = path
if(end_time<@start_time || if end_time<@start_time ||
start_time>@end_time) start_time>@end_time
return [] return []
end end
@last_path = path @last_path = path
return @data @data
end end
def get_count(path, start_time, end_time) def get_count(path, start_time, end_time)
matches = /-(\d+)$/.match(path)
#raw stream, return raw_count
if matches == nil
#as long as start and end time are within #as long as start and end time are within
#bounds return raw_count/decim level #bounds return raw_count/decim level
if(end_time<@start_time || if end_time<@start_time ||
start_time>@end_time) start_time>@end_time
return 0 return 0
else
return @raw_count
end
end end
matches = /-(\d+)$/.match(path)
#raw stream, return raw_count
return @raw_count if(matches==nil)
#decimated return what would be left #decimated return what would be left
level = matches[1].to_i level = matches[1].to_i
if not @decimations.nil?
# make sure this level is in the decimations array
unless @decimations.include?(level)
return nil
end
#as long as start and end time are within
#bounds return raw_count/decim level
if end_time<@start_time ||
start_time>@end_time
return 0
else
return @raw_count/level
end
# decimations are not specified, assume they exist
else
#as long as start and end time are within
#bounds return raw_count/decim level
if end_time<@start_time ||
start_time>@end_time
return 0
else
return @raw_count/level return @raw_count/level
end end
end
end
def get_intervals(path, start_time, end_time) def get_intervals(path, start_time, end_time)
#as long as start and end time are within #as long as start and end time are within
#bounds return intervals #bounds return intervals
if(end_time<@start_time || if end_time<@start_time ||
start_time>@end_time) start_time>@end_time
return 0 return 0
end end
@last_path = path @last_path = path
return @data @data
end end
def level_retrieved() def level_retrieved
return nil if @last_path==nil return nil if @last_path==nil
matches = /-(\d+)$/.match(@last_path) matches = /-(\d+)$/.match(@last_path)
return 1 if matches == nil return 1 if matches == nil
return matches[1].to_i matches[1].to_i
end end
end end
...@@ -111,14 +111,15 @@ RSpec.describe 'LoadElementData' do ...@@ -111,14 +111,15 @@ RSpec.describe 'LoadElementData' do
#make sure the decimations are messed up by partial update #make sure the decimations are messed up by partial update
ndecims1 = elem1.db_stream.db_decimations.count ndecims1 = elem1.db_stream.db_decimations.count
ndecims2 = elem2.db_stream.db_decimations.count ndecims2 = elem2.db_stream.db_decimations.count
# ------ CHECK REMOVED, SERVICE NO LONGER UPDATES STREAMS -------
#artificially mess up time bounds to check if service updates the streams #artificially mess up time bounds to check if service updates the streams
elem1.db_stream.update(start_time: 0, end_time: 0) # elem1.db_stream.update(start_time: 0, end_time: 0)
elem2.db_stream.update(start_time: 0, end_time: 0) # elem2.db_stream.update(start_time: 0, end_time: 0)
service = LoadElementData.new service = LoadElementData.new
service.run([elem1,elem2], nil, nil) service.run([elem1,elem2], nil, nil)
#bounds taken from test nilm on vagrant instance #bounds taken from test nilm on vagrant instance
expect(service.start_time).to eq(1360017784000000) # expect(service.start_time).to eq(1360017784000000)
expect(service.end_time).to eq(1435438182000001) # expect(service.end_time).to eq(1435438182000001)
#make sure decimations are still here #make sure decimations are still here
expect(elem1.db_stream.reload.db_decimations.count).to eq ndecims1 expect(elem1.db_stream.reload.db_decimations.count).to eq ndecims1
expect(elem2.db_stream.reload.db_decimations.count).to eq ndecims2 expect(elem2.db_stream.reload.db_decimations.count).to eq ndecims2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment