Commit 750579a7 by John Doe

speed ups to data loading

parent dc7b714e
require "benchmark"
# frozen_string_literal: true # frozen_string_literal: true
# Loads data for specified elements # Loads data for specified elements
...@@ -58,7 +59,10 @@ class LoadElementData ...@@ -58,7 +59,10 @@ class LoadElementData
req_streams.each do |stream| req_streams.each do |stream|
adapter = DbAdapter.new(stream.db.url) adapter = DbAdapter.new(stream.db.url)
data_service = LoadStreamData.new(adapter) data_service = LoadStreamData.new(adapter)
data_service.run(stream, @start_time, @end_time) time = Benchmark.measure do
data_service.run(stream, @start_time, @end_time,elements.to_a)
end
puts "------ DataService #{time}"
if data_service.success? if data_service.success?
combined_data.concat(data_service.data) combined_data.concat(data_service.data)
else else
......
...@@ -15,6 +15,8 @@ class LoadStreamData ...@@ -15,6 +15,8 @@ class LoadStreamData
# load data at or below the resolution of the # load data at or below the resolution of the
# associated database, sets data and data_type # associated database, sets data and data_type
# specify a subset of elements as an optional array
# if ommitted, all elements are extracted from the stream (expensive!)
# #
# sets data and data_type # sets data and data_type
# data_type: raw # data_type: raw
...@@ -31,14 +33,22 @@ class LoadStreamData ...@@ -31,14 +33,22 @@ class LoadStreamData
# data: # data:
# [{id: element_id, type: decimated, values: [[start,0],[end,0],nil,...]}] # [{id: element_id, type: decimated, values: [[start,0],[end,0],nil,...]}]
# #
def run(db_stream, start_time, end_time) def run(db_stream, start_time, end_time, elements = [])
# if elements are not explicitly passed, get all of them
if(elements.empty?)
elements = db_stream.db_elements.all.to_a
end
elements.sort_by!(&:column)
resolution = db_stream.db.max_points_per_plot resolution = db_stream.db.max_points_per_plot
valid_decim = findValidDecimationLevel(db_stream, start_time) valid_decim = findValidDecimationLevel(db_stream, start_time)
# valid_decim is the highest resolution, find one we can plot # valid_decim is the highest resolution, find one we can plot
plottable_decim = findPlottableDecimationLevel( plottable_decim = findPlottableDecimationLevel(
db_stream, valid_decim, start_time, end_time, resolution db_stream, valid_decim, start_time, end_time, resolution
) )
elements = db_stream.db_elements.order(:column)
if plottable_decim.nil? if plottable_decim.nil?
# check if its nil becuase the nilm isn't available # check if its nil becuase the nilm isn't available
return self unless success? return self unless success?
...@@ -64,11 +74,15 @@ class LoadStreamData ...@@ -64,11 +74,15 @@ class LoadStreamData
@data = __build_raw_data(elements, resp) @data = __build_raw_data(elements, resp)
else else
@data_type = 'decimated' @data_type = 'decimated'
decimateable_elements = elements.where(display_type: %w(continuous discrete)) decimateable_elements =
interval_elements = elements.where(display_type: 'event') elements.select{|e| %w(continuous discrete).include? e.display_type}
interval_elements = elements.select{|e| e.display_type=='event'}
time = Benchmark.measure do
@data = __build_decimated_data(decimateable_elements, resp) + @data = __build_decimated_data(decimateable_elements, resp) +
__build_intervals_from_decimated_data(interval_elements, resp) __build_intervals_from_decimated_data(interval_elements, resp)
end end
puts "---- [LoadStreamData] Build Dataset #{time}"
end
self self
end end
...@@ -177,25 +191,34 @@ class LoadStreamData ...@@ -177,25 +191,34 @@ class LoadStreamData
end end
def __build_decimated_data(elements, resp) def __build_decimated_data(elements, resp)
data = elements.map { |e| { id: e.id, type: 'decimated', values: [] } } # if elements is empty we don't need to do anything
resp.each do |row| return [] if elements.empty?
#prepare the data structure
data = elements.map { |e| { id: e.id, type: 'decimated', values: Array.new(resp.length) } }
#set up constants so we compute them once
mean_offset = 0
min_offset = elements.first.db_stream.db_elements.length
max_offset = elements.first.db_stream.db_elements.length * 2
resp.each_with_index do |row, k|
if row.nil? # add an interval break to all the elements if row.nil? # add an interval break to all the elements
data.each { |d| d[:values].push(nil) } data.each { |d| d[:values][k]=nil }
next next
end end
ts = row[0] ts = row[0]
elements.each_with_index do |elem, i| elements.each_with_index do |elem, i|
# ###TODO: fix offset calcs when elements is a subset #mean = __scale_value(row[1 + elem.column + mean_offset], elem)
mean_offset = 0 #min = __scale_value(row[1 + elem.column + min_offset], elem)
min_offset = elem.db_stream.db_elements.length #max = __scale_value(row[1 + elem.column + max_offset], elem)
max_offset = elem.db_stream.db_elements.length * 2 mean = (row[1 + elem.column + mean_offset] - elem.offset) * elem.scale_factor
mean = __scale_value(row[1 + elem.column + mean_offset], elem) min = (row[1 + elem.column + min_offset] - elem.offset) * elem.scale_factor
min = __scale_value(row[1 + elem.column + min_offset], elem) max = (row[1 + elem.column + max_offset] - elem.offset) * elem.scale_factor
max = __scale_value(row[1 + elem.column + max_offset], elem)
tmp_min = [min, max].min tmp_min = [min, max].min
max = [min, max].max max = [min, max].max
min = tmp_min min = tmp_min
data[i][:values].push([ts, mean, min, max]) data[i][:values][k]=[ts, mean, min, max]
end end
end end
data data
...@@ -208,6 +231,9 @@ class LoadStreamData ...@@ -208,6 +231,9 @@ class LoadStreamData
# for data that cannot be represented as decimations # for data that cannot be represented as decimations
# eg: events, compute intervals from the actual decimated data # eg: events, compute intervals from the actual decimated data
def __build_intervals_from_decimated_data(elements, resp) def __build_intervals_from_decimated_data(elements, resp)
# if elements is empty we don't need to do anything
return [] if elements.empty?
# compute intervals from resp # compute intervals from resp
if resp.empty? if resp.empty?
elements.map do |e| elements.map do |e|
......
...@@ -8,7 +8,7 @@ class MockLoadStreamData ...@@ -8,7 +8,7 @@ class MockLoadStreamData
@data = nil @data = nil
@run_count = 0 @run_count = 0
end end
def run(db_stream, start_time, end_time) def run(db_stream, start_time, end_time, elements=[])
@data = @dataset.select{|d| d[:stream]==db_stream}.first[:data] @data = @dataset.select{|d| d[:stream]==db_stream}.first[:data]
@run_count += 1 @run_count += 1
if(@data == nil) if(@data == nil)
......
...@@ -136,9 +136,18 @@ RSpec.describe 'LoadStreamData' do ...@@ -136,9 +136,18 @@ RSpec.describe 'LoadStreamData' do
expect(@mockAdapter.level_retrieved).to eq(1) expect(@mockAdapter.level_retrieved).to eq(1)
end end
it 'only if count <= nilm resolution over interval' do it 'only if count <= nilm resolution over interval' do
#must have decimated data ready!
#use custom adapter and service objects
data = [[40,0,1,2,3,4,5,6,7,8],nil,[50,0,1,2,3,4,5,6,7,8]]
adapter = MockDataDbAdapter.new(
start_time: @db_stream.start_time,
end_time: @db_stream.end_time,
raw_count: 100, data: data
)
service = LoadStreamData.new(adapter)
db.max_points_per_plot = 90; db.save db.max_points_per_plot = 90; db.save
@service.run(@db_stream, 10, 90) service.run(@db_stream, 10, 90)
expect(@mockAdapter.level_retrieved).to be > 1 expect(adapter.level_retrieved).to be > 1
end end
it 'populates @data structure with raw data' do it 'populates @data structure with raw data' do
@service.run(@db_stream, 10, 90) @service.run(@db_stream, 10, 90)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment