Commit 43c8e19c by zacharyc

Basic resume functionality working


git-svn-id: https://bucket.mit.edu/svn/nilm/acquisition/ethstream@7250 ddd99763-3ecb-0310-9145-efcb8ce7c51f
parent 4aab6061
Showing with 92 additions and 53 deletions
...@@ -354,36 +354,48 @@ int nerdDoStream(const char *address, int *channel_list, int channel_count, int ...@@ -354,36 +354,48 @@ int nerdDoStream(const char *address, int *channel_list, int channel_count, int
int fd_data; int fd_data;
static int first_call = 1; static int first_call = 1;
char command[14]; char command[14];
static unsigned short currentcount = 0;
if(first_call) {
if (nerd_generate_command(command, channel_list, channel_count, precision, period) < 0) { if (nerd_generate_command(command, channel_list, channel_count, precision, period) < 0) {
info("Failed to create configuration command\n"); info("Failed to create configuration command\n");
goto out; goto out;
} }
if (nerd_send_command(address,"STOP") < 0) { if (nerd_send_command(address,"STOP") < 0) {
info("Failed to send STOP command\n"); if (first_call)
goto out; retval = -ENOTCONN;
} info("Failed to send STOP command\n");
goto out;
}
if (nerd_send_command(address,command) < 0) { if (nerd_send_command(address,command) < 0) {
if (first_call) info("Failed to send GET command\n");
retval = -ENOTCONN; goto out;
info("Failed to send GET command\n"); }
goto out; }
//We have sent the configuration commands. If we retry later, don't resend them. We would like
//to resume the interrupted transmission
first_call = 0;
if(currentcount != 0) {
char cmdbuf[10];
sprintf(cmdbuf,"SETC%05hd",currentcount);
if (nerd_send_command(address,cmdbuf) < 0) {
info("Failed to send SETC command\n");
goto out;
}
} }
first_call = 0;
/* Open connection. If this fails, and this is the /* Open connection */
first attempt, return a different error code so we give up. */
fd_data = nerd_open(address, NERDJACK_DATA_PORT); fd_data = nerd_open(address, NERDJACK_DATA_PORT);
if (fd_data < 0) { if (fd_data < 0) {
info("Connect failed: %s:%d\n", address, NERDJACK_DATA_PORT); info("Connect failed: %s:%d\n", address, NERDJACK_DATA_PORT);
goto out; goto out;
} }
if (nerd_data_stream(fd_data, channel_count, channel_list, precision, convert, lines, showmem) < 0) { if (nerd_data_stream(fd_data, channel_count, channel_list, precision, convert, lines, showmem, &currentcount) < 0) {
info("Failed to open data stream\n"); info("Failed to open data stream\n");
goto out1; goto out1;
} }
......
...@@ -161,57 +161,34 @@ int nerd_send_command(const char * address, char * command) ...@@ -161,57 +161,34 @@ int nerd_send_command(const char * address, char * command)
return 0; return 0;
} }
int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precision, int convert, int lines, int showmem) static int nerd_init_channels(deststruct * destination, int numChannels, int *channel_list) {
{
unsigned char buf[NERDJACK_PACKET_SIZE];
//int numGroups = NERDJACK_NUM_SAMPLES / numChannels;
int index = 0;
//int totalread = 0;
//int ret = 0;
int alignment = 0;
signed short datapoint = 0;
unsigned short dataline[NERDJACK_CHANNELS];
long double voltline[NERDJACK_CHANNELS];
deststruct destination[NERDJACK_CHANNELS];
int tempdestlist[NERDJACK_CHANNELS];
unsigned short currentcount = 0;
unsigned long memused = 0;
unsigned short packetsready = 0;
unsigned short adcused = 0;
unsigned short tempshort = 0;
int charsread = 0;
int charsleft = 0;
int additionalread = 0;
int linesleft = lines;
int numgroups = 0;
long double volts;
int channels_left = numChannels; int channels_left = numChannels;
int channelprocessing = 0; int channelprocessing = 0;
int currentalign = 0; //Index into sampled channels int currentalign = 0; //Index into sampled channels
int i; int i;
int numDuplicates = 0; int numDuplicates = 0;
int tempdestlist[NERDJACK_CHANNELS];
//Loop through channel_list until all channels recognized //Loop through channel_list until all channels recognized
//start with channelprocessing = 0 and increment through channels. //start with channelprocessing = 0 and increment through channels.
//If a channel is found in the list set it up appropriately. //If a channel is found in the list set it up appropriately.
//The complication arises because we want to allow a channel to
//display more than once or even out of order
//We need to distill a duplicate-free list of channels in order to
//sample as well as a mapping of how to print those channels
//to screen.
do { do {
//numduplicates = 0;
destination[currentalign].numCopies = 0; destination[currentalign].numCopies = 0;
for(i = 0; i < numChannels; i++) { for(i = 0; i < numChannels; i++) {
if(channelprocessing == channel_list[i]) { if(channelprocessing == channel_list[i]) {
//destination[currentalign] = i;
tempdestlist[destination[currentalign].numCopies] = i; tempdestlist[destination[currentalign].numCopies] = i;
if(destination[currentalign].numCopies > 0) { if(destination[currentalign].numCopies > 0) {
numDuplicates++; numDuplicates++;
} }
destination[currentalign].numCopies++; destination[currentalign].numCopies++;
//currentalign++;
channels_left--; channels_left--;
//break;
} }
} }
...@@ -222,8 +199,47 @@ int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precis ...@@ -222,8 +199,47 @@ int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precis
} }
channelprocessing++; channelprocessing++;
} while(channels_left > 0); } while(channels_left > 0);
return numDuplicates;
}
int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precision, int convert, int lines, int showmem, unsigned short * currentcount)
{
//Variables that should persist across retries
static unsigned char buf[NERDJACK_PACKET_SIZE];
//static int charsleft = NERDJACK_PACKET_SIZE;
static int linesleft = 0;
int index = 0;
int alignment = 0;
signed short datapoint = 0;
unsigned short dataline[NERDJACK_CHANNELS];
long double voltline[NERDJACK_CHANNELS];
int i;
deststruct destination[NERDJACK_CHANNELS];
unsigned long memused = 0;
unsigned short packetsready = 0;
unsigned short adcused = 0;
unsigned short tempshort = 0;
int charsread = 0;
int numgroups = 0;
long double volts;
//Check to see if we're trying to resume
//Don't blow away linesleft in that case
if(lines != 0 && linesleft == 0) {
linesleft = lines;
}
int numDuplicates = nerd_init_channels(destination,numChannels, channel_list);
//Now destination structure array is set as well as numDuplicates.
int numChannelsSampled = numChannels - numDuplicates; int numChannelsSampled = numChannels - numDuplicates;
int numGroups = NERDJACK_NUM_SAMPLES / numChannelsSampled; int numGroups = NERDJACK_NUM_SAMPLES / numChannelsSampled;
...@@ -234,6 +250,14 @@ int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precis ...@@ -234,6 +250,14 @@ int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precis
//We want a complete packet, so take the chars so far and keep waiting //We want a complete packet, so take the chars so far and keep waiting
if(charsread != NERDJACK_PACKET_SIZE) { if(charsread != NERDJACK_PACKET_SIZE) {
//charsleft = NERDJACK_PACKET_SIZE - charsread;
//There was a problem getting data. Probably a closed
//connection. Stash the data we did get, save state, and hope
//to get it later
info("Packet was too short\n");
return -2;
/*
charsleft = NERDJACK_PACKET_SIZE - charsread; charsleft = NERDJACK_PACKET_SIZE - charsread;
while(charsleft != 0){ while(charsleft != 0){
additionalread = recv_all_timeout(data_fd,buf+charsread,charsleft,0, additionalread = recv_all_timeout(data_fd,buf+charsread,charsleft,0,
...@@ -241,7 +265,10 @@ int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precis ...@@ -241,7 +265,10 @@ int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precis
charsread = charsread + additionalread; charsread = charsread + additionalread;
charsleft = NERDJACK_PACKET_SIZE - charsread; charsleft = NERDJACK_PACKET_SIZE - charsread;
} }
*/
} }
//charsleft = NERDJACK_PACKET_SIZE;
//First check the header info //First check the header info
if(buf[0] != 0xF0 || buf[1] != 0xAA) { if(buf[0] != 0xF0 || buf[1] != 0xAA) {
...@@ -251,13 +278,13 @@ int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precis ...@@ -251,13 +278,13 @@ int nerd_data_stream(int data_fd, int numChannels, int *channel_list, int precis
//Check counter info to make sure not out of order //Check counter info to make sure not out of order
tempshort = (buf[2] << 8) | buf[3]; tempshort = (buf[2] << 8) | buf[3];
if(tempshort != currentcount ){ if(tempshort != *currentcount ){
info("Count wrong. Expected %hd but got %hd\n", currentcount, tempshort); info("Count wrong. Expected %hd but got %hd\n", *currentcount, tempshort);
return -1; return -1;
} }
//Increment number of packets received //Increment number of packets received
currentcount++; *currentcount = *currentcount + 1;
//Process the rest of the header and update the index value to be pointing after it //Process the rest of the header and update the index value to be pointing after it
index = 12; index = 12;
......
...@@ -36,7 +36,7 @@ int nerd_generate_command(char * command, int * channel_list, int channel_count, ...@@ -36,7 +36,7 @@ int nerd_generate_command(char * command, int * channel_list, int channel_count,
int nerd_send_command(const char * address, char * command); int nerd_send_command(const char * address, char * command);
/* Stream data out of the NerdJack */ /* Stream data out of the NerdJack */
int nerd_data_stream(int data_fd, int numChannels, int * channel_list, int precision, int convert, int lines, int showmem); int nerd_data_stream(int data_fd, int numChannels, int * channel_list, int precision, int convert, int lines, int showmem, unsigned short * currentcount);
/* Detect the IP Address of the NerdJack and return in ipAddress */ /* Detect the IP Address of the NerdJack and return in ipAddress */
int nerdjack_detect(char * ipAddress); int nerdjack_detect(char * ipAddress);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment