@@ -123,6 +123,7 @@ typedef struct ndag_format_data {
123123 char * localiface ;
124124 uint16_t nextthreadid ;
125125 recvstream_t * receivers ;
126+ int receiver_cnt ;
126127
127128 pthread_t controlthread ;
128129 libtrace_message_queue_t controlqueue ;
@@ -261,6 +262,7 @@ static int join_multicast_group(char *groupaddr, char *localiface,
261262 goto sockcreateover ;
262263 }
263264
265+ memset (& greq , 0 , sizeof (greq ));
264266 greq .gr_interface = interface ;
265267 memcpy (& (greq .gr_group ), group -> ai_addr , group -> ai_addrlen );
266268
@@ -311,6 +313,7 @@ static int ndag_init_input(libtrace_t *libtrace) {
311313 FORMAT_DATA -> localiface = NULL ;
312314 FORMAT_DATA -> nextthreadid = 0 ;
313315 FORMAT_DATA -> receivers = NULL ;
316+ FORMAT_DATA -> receiver_cnt = 0 ;
314317 FORMAT_DATA -> consterfframing = -1 ;
315318
316319 scan = strchr (libtrace -> uridata , ',' );
@@ -506,6 +509,8 @@ static void *ndag_controller_run(void *tdata) {
506509 close (sock );
507510 }
508511
512+ freeaddrinfo (receiveaddr );
513+
509514 /* Control channel has fallen over, should probably encourage libtrace
510515 * to halt the receiver threads as well.
511516 */
@@ -543,6 +548,7 @@ static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
543548 libtrace_message_queue_init (& (FORMAT_DATA -> receivers [i ].mqueue ),
544549 sizeof (ndag_internal_message_t ));
545550 }
551+ FORMAT_DATA -> receiver_cnt = maxthreads ;
546552
547553 /* Start the controller thread */
548554 /* TODO consider affinity of this thread? */
@@ -610,9 +616,11 @@ static int ndag_pause_input(libtrace_t *libtrace) {
610616 int i ;
611617
612618 /* Close the existing receiver sockets */
613- for (i = 0 ; i < libtrace -> perpkt_thread_count ; i ++ ) {
619+ for (i = 0 ; i < FORMAT_DATA -> receiver_cnt ; i ++ ) {
614620 halt_ndag_receiver (& (FORMAT_DATA -> receivers [i ]));
615621 }
622+ ndag_paused = 1 ;
623+ pthread_join (FORMAT_DATA -> controlthread , NULL );
616624 return 0 ;
617625}
618626
@@ -1564,7 +1572,7 @@ static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
15641572 stat -> missing = 0 ;
15651573
15661574 /* TODO Is this thread safe? */
1567- for (i = 0 ; i < libtrace -> perpkt_thread_count ; i ++ ) {
1575+ for (i = 0 ; i < FORMAT_DATA -> receiver_cnt ; i ++ ) {
15681576 stat -> dropped += FORMAT_DATA -> receivers [i ].dropped_upstream ;
15691577 stat -> received += FORMAT_DATA -> receivers [i ].received_packets ;
15701578 stat -> missing += FORMAT_DATA -> receivers [i ].missing_records ;
@@ -1599,6 +1607,9 @@ static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
15991607 return 0 ;
16001608 }
16011609
1610+ if (t -> perpkt_num >= FORMAT_DATA -> receiver_cnt ) {
1611+ return 0 ;
1612+ }
16021613 recvr = & (FORMAT_DATA -> receivers [t -> perpkt_num ]);
16031614 t -> format_data = recvr ;
16041615
0 commit comments