diff options
author | Jaromil <jaromil@dyne.org> | 2012-04-17 16:39:13 (GMT) |
---|---|---|
committer | Jaromil <jaromil@dyne.org> | 2012-04-17 16:39:13 (GMT) |
commit | 992a8b37a34e2a43439b4b5df5e81cc65e31844e (patch) | |
tree | 2a55c49f4667ab7c1c956a9de429e769671a4186 | |
parent | 30b84b7a60a25aa218c378efcb47732d4d0926ac (diff) |
code cleanup and network debug (WIP)
-rw-r--r-- | src/hdsync_cli.cpp | 693 |
1 files changed, 387 insertions, 306 deletions
diff --git a/src/hdsync_cli.cpp b/src/hdsync_cli.cpp index 88109a8..915ced3 100644 --- a/src/hdsync_cli.cpp +++ b/src/hdsync_cli.cpp @@ -27,7 +27,7 @@ #include <sys/stat.h> #include <sys/types.h> #include <sys/socket.h> -#include <netdb.h> +//#include <netdb.h> #include <errno.h> @@ -39,6 +39,7 @@ #include <avremote.h> #include <parsers.h> #include <zmq.h> +#include <zmq_utils.h> // our exit codes are shell style: 1 is error, 0 is success #define ERR 1 @@ -72,13 +73,396 @@ char IPv4[24]; // 0mq void *zcontext; void *sock_in = NULL; -void *sock_out = NULL; +void *sock_shout = NULL; char pgmaddr[64]; zmq_msg_t request; // we use only getopt, no _long static const char *short_options = "-hvts:p:"; +void cmdline(int argc, char **argv); + + +int get_address(); + + +int shout(const char *message, int flags) { + int res; + // zmq_connect(sock_out, pgmaddr); + zmq_msg_init_size (&request, strlen(message)); + memcpy (zmq_msg_data (&request), message, strlen(message)); + printf ("Shouting \"%s\"\n", message); + res = zmq_send (sock_shout, &request, flags); + zmq_msg_close (&request); + // zmq_close(sock_out); + return(res); +} + +// extract a string from a message received: +// PREFIX;MESSAGE; +// message should be a char of 512 bytes +int expect(const char *prefix, char *message, int flags) { + zmq_msg_t reply; + char res[512]; + char *p, *pp; + int pfxlen = strlen(prefix); + + printf("Expecting prefix %s\n",prefix); + + // listen for answers + zmq_msg_init (&reply); + zmq_recv (sock_in, &reply, flags); + // printf ("Received response\n"); + snprintf(res,511,"%s",zmq_msg_data(&reply)); + zmq_msg_close (&reply); + + // parse the response and if ACK insert it in uniq list + if( strncmp(res,prefix,pfxlen) == 0) { + // quick and secure string parsing + p = res; while(*p != ';') p++; p++; + pp = p; while(*pp != ';') pp++; *pp='\0'; + + // *p has the ip string + snprintf(message,511,"%s",p); + printf("Expected prefix %s found: %s\n",prefix,message); + return(0); + } + + return(1); +} + +int handshake() { + int request_nbr; + char message[256]; + int listindex = 0; + int listmax = 0; + char res[512]; + int c, i; + + if(chanID==1) { // first channel is the one to offer + + //////// + // OFFER + ////////////////////////////// + printf ("Offering on %s from address %s\n",pgmaddr, IPv4); + + listmax = chanTOT; // number of desired peers + listmax--; // we count outrselves in + printf("looking to connect %u peers\n",listmax); + // allocate space for listener array + listeners = (char**) calloc(listmax,sizeof(char*)); + for(i=0; i<listmax; i++) listeners[i] = (char*)calloc(16, sizeof(char)); + + // offer subscribes to all broadcasts by default + // somehow listening to ACK only doesn't works + snprintf(message,255,"OFFER;%s;",IPv4); + while(1) { + + shout(message, 0); + + if( expect("ACK",res, 0) == 0) { + printf("Got ACK from %s\n",res); + // parse the response and if ACK insert it in uniq list + for(c=0; c<listindex; c++) + if(strncmp(listeners[c], res, 16) == 0) + break; // found a duplicate + if(c==listindex) { // there was no duplicate + snprintf(listeners[c], 16, "%s", res); + printf("New listener: %s\n",listeners[c]); + listindex++; + } + + if(listindex==listmax) { // goal reached + printf("Sending ready signals to listeners\n"); + /////////////////////////////////////////////// + shout("READY", 0); + break; // break offer loop + } // if listmax reached + + } // if ACK response + + zmq_sleep(1); + + } // offer while loop + + } else { // all other channels listen + + ///////// + // LISTEN + + printf ("Listening on %s from address %s\n",pgmaddr, IPv4); + + snprintf(message,255,"ACK;%s;",IPv4); + + while (1) { + // Wait for offer + while( expect("OFFER", res, 0) !=0); // blocking + // reply with ACK + shout(message, 0); + + while( expect("READY", res, 0) !=0); // blocking + printf ("Received ready signal\n"); + break; // quit loop on ready signal + + } + + } + + return(1); +} + +int main(int argc, char **argv) { + upnp_t *upnp; + int found; + + ////////// + ///// INIT + + cmdline(argc, argv); + + if(!filename[0]) { + fprintf(stderr,"not enough args specified on commandline, see help."); + exit(ERR); + } + + fprintf(stderr,"HDSync starting for channel %u of %u\n",chanID, chanTOT); + fprintf(stderr,"will sync and play video: %s\n", filename); + + upnp = create_upnp(); + + // no server specified, force localhost + if(!server[0]) sprintf(server,"localhost"); + + // commandline or detection found explicit addresses + snprintf(upnp->hostname, MAX_HOSTNAME_SIZE-1,"%s",server); + upnp->port = port; + + if(!dryrun) { + if ( connect_upnp (upnp) < 0 ) { + fprintf(stderr,"can't connect to %s:%u: operation aborted.\n", server, port); + free_upnp(upnp); + exit(ERR); + } else { + fprintf(stderr,"UPNP connected to %s:%u\n",server, port); + close(upnp->sockfd); + upnp->sockfd = 0; + } + } + + if( ! get_address() ) { + fprintf(stderr,"Error getting own address.\n"); + exit(ERR); + } + + // initialize 0mq + zcontext = zmq_init (1); + + // create broadcast socket + sock_shout = zmq_socket (zcontext, ZMQ_PUB); + zmq_connect(sock_shout, pgmaddr); + + // create input socket + sock_in = zmq_socket (zcontext, ZMQ_SUB); + zmq_bind (sock_in, pgmaddr); + zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"",0); + + + /// END OF INIT + //////////////// + + + //////////////////// + if(! handshake()) { + // no error return so far, just running endless untill contact is established + fprintf(stderr,"error establishing contact with all desired channels, aborting.\n"); + exit(ERR); + } + + printf("Handshake successful\n"); + + ////////////////////// + if(dryrun) { // DRYRUN + printf("Dry run\n"); + + // testing sync with no real playback + zmq_sleep(1); + + if(chanID==1) { // offer should send sync + + printf("Prepare to offer sync\n"); + + zmq_sleep(3); + + shout("SYNC;GOGO;", 0); + + } else { // listener should prepare to receive sync + + printf("Prepare to receive sync\n"); + + // TODO + zmq_sleep(6); + } + + ////////////////////// + + } else { // RUN FOR REAL + + ////////////////////// + + // load, play and pause in sequence + // break and reopen connection in between + + + if(chanID==1) { // offer should send sync + zmq_sleep(1); + // create output socket + // sock_out = zmq_socket (zcontext, ZMQ_PUB); + // zmq_connect(sock_out, pgmaddr); + // prepare signal + zmq_msg_init_size (&request, 4); + memcpy (zmq_msg_data (&request), "SYNC", 4); + + // stop player + connect_upnp(upnp); + render_upnp(upnp,"Stop",""); + send_upnp(upnp); + recv_upnp(upnp, 1000); + close(upnp->sockfd); + upnp->sockfd = 0; + + + zmq_sleep(1); + + + // prepare player in position + render_uri_meta(upnp,filename); + render_upnp(upnp,"SetAVTransportURI", upnp->meta); + send_upnp(upnp); + recv_upnp(upnp, 1000); + close(upnp->sockfd); + upnp->sockfd = 0; + + zmq_sleep(1); + + // play + connect_upnp(upnp); + render_upnp(upnp,"Play","<Speed>1</Speed>"); + send_upnp(upnp); + recv_upnp(upnp, 1000); + close(upnp->sockfd); + upnp->sockfd = 0; + // pause + connect_upnp(upnp); + render_upnp(upnp,"Pause",""); + send_upnp(upnp); + recv_upnp(upnp, 1000); + close(upnp->sockfd); + upnp->sockfd = 0; + + zmq_sleep(1); + + // prepare upnp play message + connect_upnp(upnp); + render_upnp(upnp,"Play","<Speed>1</Speed>"); + + // send sync signal + // zmq_send (sock_out, &request, 0); + + // start player + send_upnp(upnp); + recv_upnp(upnp, 1000); + + } else { // this is a listener + + // create input socket + sock_in = zmq_socket (zcontext, ZMQ_SUB); + zmq_bind (sock_in, pgmaddr); + + + // stop player + connect_upnp(upnp); + render_upnp(upnp,"Stop",""); + send_upnp(upnp); + recv_upnp(upnp, 1000); + close(upnp->sockfd); + upnp->sockfd = 0; + + + zmq_sleep(1); + + + // prepare player in position + render_uri_meta(upnp,filename); + render_upnp(upnp,"SetAVTransportURI", upnp->meta); + send_upnp(upnp); + recv_upnp(upnp, 1000); + close(upnp->sockfd); + upnp->sockfd = 0; + + zmq_sleep(1); + + // play + connect_upnp(upnp); + render_upnp(upnp,"Play","<Speed>1</Speed>"); + send_upnp(upnp); + recv_upnp(upnp, 1000); + close(upnp->sockfd); + upnp->sockfd = 0; + // pause + connect_upnp(upnp); + render_upnp(upnp,"Pause",""); + send_upnp(upnp); + recv_upnp(upnp, 1000); + close(upnp->sockfd); + upnp->sockfd = 0; + + zmq_sleep(1); + + // prepare upnp play message + connect_upnp(upnp); + render_upnp(upnp,"Play","<Speed>1</Speed>"); + + // Wait for next request from client + zmq_msg_init (&request); + zmq_recv (sock_in, &request, 0); + if(strncmp((char*)zmq_msg_data(&request),"SYNC",4)==0) { + // start player + send_upnp(upnp); + recv_upnp(upnp, 1000); + } + + } // if(offer|listen) + + zmq_msg_close (&request); + free_upnp(upnp); + + } // if(test|real) + + fprintf(stderr,"Software shutdown\n"); + + // if(sock_out) { + // zmq_close (sock_out); + // sock_out = NULL; + // } + // if(sock_in) { + // zmq_close (sock_in); + // sock_in = NULL; + // } + + // somehow offer blocks here + zmq_term (zcontext); + + if(listeners) { + for(int c; c<chanTOT; c++) + free(listeners[c]); + free(listeners); + } + + fprintf(stderr,"Done.\n"); + exit(0); +} + + void cmdline(int argc, char **argv) { filename[0] = 0; server[0] = 0; @@ -156,14 +540,7 @@ void cmdline(int argc, char **argv) { } while(res != -1); } -int handshake() { - int request_nbr; - zmq_msg_t reply; - char message[256]; - int listindex = 0; - int listmax = 0; - char res[128]; - +int get_address() { // discover own IP address struct ifaddrs * ifAddrStruct=NULL; struct ifaddrs * ifa=NULL; @@ -194,301 +571,5 @@ int handshake() { printf("Error: ethernet interface %s is not configured\n",ETHDEV); return(0); } - - - // create output socket - sock_out = zmq_socket (zcontext, ZMQ_PUB); - zmq_connect(sock_out, pgmaddr); - - // create input socket - sock_in = zmq_socket (zcontext, ZMQ_SUB); - zmq_bind (sock_in, pgmaddr); - - if(chanID==1) { // first channel is the one to offer - int i; - - //////// - // OFFER - ////////////////////////////// - printf ("Offering on %s from address %s\n",pgmaddr, IPv4); - - listmax = chanTOT; // number of desired peers - listmax--; // we count outrselves in - printf("looking to connect %u peers\n",listmax); - // allocate space for listener array - listeners = (char**) calloc(listmax,sizeof(char*)); - for(i=0; i<listmax; i++) listeners[i] = (char*)calloc(16, sizeof(char)); - - // offer subscribes to all broadcasts by default - // somehow listening to ACK only doesn't works - zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"",0); - snprintf(message,255,"OFFER;%s;",IPv4); - while(1) { - - // broadcast offer - zmq_msg_init_size (&request, strlen(message)); - memcpy (zmq_msg_data (&request), message, strlen(message)); - // printf ("Sending offer...\n", request_nbr); - zmq_send (sock_out, &request, 0); - zmq_msg_close (&request); - - sleep(1); - - // listen for answers - zmq_msg_init (&reply); - zmq_recv (sock_in, &reply, 0); - // printf ("Received response\n"); - snprintf(res,127,"%s",zmq_msg_data(&reply)); - zmq_msg_close (&reply); - // parse the response and if ACK insert it in uniq list - if( strncmp(res,"ACK",3) == 0) { - int c; - char *p, *pp; - // quick and secure string parsing - p = res; while(*p != ';') p++; p++; - pp = p; while(*pp != ';') pp++; *pp='\0'; - - // *p has the ip string - for(c=0; c<listindex; c++) - if(strncmp(listeners[c], p, 16) == 0) - break; // found a duplicate - if(c==listindex) { // there was no duplicate - snprintf(listeners[c], 16, "%s", p); - printf("New listener: %s\n",p); - listindex++; - } - if(listindex==listmax) { // goal reached - printf("Sending ready signals to listeners:\n"); - /////////////////////////////////////////////// - for(c=0;c<listmax;c++) { - char tmpaddr[256]; - snprintf(tmpaddr,255, "tcp://%s:%s",listeners[c],PGMPORT); - printf (" %s\n", tmpaddr); - zmq_connect(sock_out, tmpaddr); - zmq_msg_init_size (&request, 5); - memcpy (zmq_msg_data (&request), "READY", 5); - zmq_send (sock_out, &request, 0); - zmq_msg_close (&request); - } - break; // break offer loop - } // if listmax reached - - } // if ACK response - - } // offer while loop - - } else { // all other channels listen - - ///////// - // LISTEN - - - printf ("Listening on %s from address %s\n",pgmaddr, IPv4); - - // listen subscribes only to offers and ready - zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"OFFER",5); - zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"READY",5); - - - snprintf(message,255,"ACK;%s;",IPv4); - - while (1) { - // Wait for next request from client - zmq_msg_init (&request); - zmq_recv (sock_in, &request, 0); - - snprintf(res,127,"%s",zmq_msg_data(&request)); - zmq_msg_close (&request); - if(strncmp(res,"READY",5) == 0) { - printf ("Received ready signal\n"); - break; // quit loop on ready signal - } - sleep (1); - - // Send reply back to client - zmq_msg_init_size (&reply, strlen(message)); - memcpy (zmq_msg_data (&reply), message, strlen(message)); - zmq_send (sock_out, &reply, 0); - zmq_msg_close (&reply); - - } - - } - - if(sock_out) { - zmq_close (sock_out); - sock_out = NULL; - } - if(sock_in) { - zmq_close (sock_in); - sock_in = NULL; - } - return(1); } - -int main(int argc, char **argv) { - upnp_t *upnp; - int found; - - cmdline(argc, argv); - - if(!filename[0]) { - fprintf(stderr,"not enough args specified on commandline, see help."); - exit(ERR); - } - - fprintf(stderr,"HDSync starting for channel %u of %u\n",chanID, chanTOT); - fprintf(stderr,"will sync and play video: %s\n", filename); - - upnp = create_upnp(); - - // no server specified, force localhost - if(!server[0]) sprintf(server,"localhost"); - - // commandline or detection found explicit addresses - snprintf(upnp->hostname, MAX_HOSTNAME_SIZE-1,"%s",server); - upnp->port = port; - - if(!dryrun) { - if ( connect_upnp (upnp) < 0 ) - { - fprintf(stderr,"can't connect to %s:%u: operation aborted.\n", server, port); - free_upnp(upnp); - exit(ERR); - } - } - - // initialize 0mq - zcontext = zmq_init (1); - - if(! handshake()) { - // might never get here since while(1) tries endlessly - fprintf(stderr,"error establishing contact with all desired channels, aborting.\n"); - exit(ERR); - } - - ////////////////////// - if(dryrun) { // DRYRUN - // testing sync with no real playback - sleep(1); - - if(chanID==1) { // offer should send sync - sleep(3); - // create output socket - sock_out = zmq_socket (zcontext, ZMQ_PUB); - zmq_connect(sock_out, pgmaddr); - // broadcast offer - zmq_msg_init_size (&request, 4); - memcpy (zmq_msg_data (&request), "SYNC", 4); - // printf ("Sending offer...\n", request_nbr); - zmq_send (sock_out, &request, 0); - fprintf(stderr,"SYNC\n"); - zmq_msg_close (&request); - - } else { // listener should prepare to receive sync - - // create input socket - sock_in = zmq_socket (zcontext, ZMQ_SUB); - zmq_bind (sock_in, pgmaddr); - zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"",0); - // Wait for next request from client - zmq_msg_init (&request); - zmq_recv (sock_in, &request, 0); - if(strncmp((char*)zmq_msg_data(&request),"SYNC",4)==0) { - fprintf(stderr,"SYNC\n"); - } - zmq_msg_close (&request); - - } - - ////////////////////// - } else { // RUN FOR REAL - - - // Prepare - - // load, play and pause in sequence - // break and reopen connection in between - - // was connected already - - // load - render_uri_meta(upnp,filename); - render_upnp(upnp,"SetAVTransportURI", upnp->meta); - - // must re-connect socket between commands - send_upnp(upnp); - recv_upnp(upnp, 1000); - close(upnp->sockfd); - upnp->sockfd = 0; - - connect_upnp(upnp); - render_upnp(upnp,"Play","<Speed>1</Speed>"); - - send_upnp(upnp); - recv_upnp(upnp, 1000); - close(upnp->sockfd); - upnp->sockfd = 0; - - connect_upnp(upnp); - render_upnp(upnp,"Pause",""); - send_upnp(upnp); - recv_upnp(upnp, 1000); - - // START SYNC - - // was connected already - render_upnp(upnp,"Play","<Speed>1</Speed>"); - send_upnp(upnp); - recv_upnp(upnp, 1000); - - /* - case 'g': // dump a parsable full state of the device - render_upnp(upnp,"GetTransportInfo",""); - parser = GetTransportInfo; - - break; - - case 'm': // set the playmode: - // "NORMAL", "REPEAT_ONE", "REPEAT_ALL", "RANDOM" - { - char tmp[256]; - snprintf(tmp,255,"<NewPlayMode>%s</NewPlayMode>",filename); - render_upnp(upnp,"SetPlayMode",tmp); - } - break; - - case 'j': // jump aka seek - // <SeekMode> and <SeekTarget> - { - char tmp[512]; - snprintf(tmp,511,"<Unit>REL_TIME</Unit><Target>%s</Target>",filename); - render_upnp(upnp,"Seek",tmp); - } - break; - */ - - free_upnp(upnp); - } - - if(sock_out) { - zmq_close (sock_out); - sock_out = NULL; - } - if(sock_in) { - zmq_close (sock_in); - sock_in = NULL; - } - - zmq_term (zcontext); - - if(listeners) { - for(int c; c<chanTOT; c++) - free(listeners[c]); - free(listeners); - } - - - exit(0); -} |