@@ -462,76 +462,81 @@ impl VhostUserVsockThread {
462462
463463 let queue = vring_mut. get_queue_mut ( ) ;
464464
465- while let Some ( mut avail_desc) = queue
466- . iter ( atomic_mem. memory ( ) )
467- . map_err ( |_| Error :: IterateQueue ) ?
468- . next ( )
469- {
470- used_any = true ;
471- let mem = atomic_mem. clone ( ) . memory ( ) ;
472-
473- let head_idx = avail_desc. head_index ( ) ;
474- let used_len = match VsockPacket :: from_rx_virtq_chain (
475- mem. deref ( ) ,
476- & mut avail_desc,
477- self . tx_buffer_size ,
478- ) {
479- Ok ( mut pkt) => {
480- let recv_result = match rx_queue_type {
481- RxQueueType :: Standard => self . thread_backend . recv_pkt ( & mut pkt) ,
482- RxQueueType :: RawPkts => self . thread_backend . recv_raw_pkt ( & mut pkt) ,
483- } ;
484-
485- if recv_result. is_ok ( ) {
486- PKT_HEADER_SIZE + pkt. len ( ) as usize
487- } else {
488- queue. iter ( mem) . unwrap ( ) . go_to_previous_position ( ) ;
489- break ;
465+ let mut iter_has_elemnt = true ;
466+ while iter_has_elemnt {
467+ let queue_iter = queue
468+ . iter ( atomic_mem. memory ( ) )
469+ . map_err ( |_| Error :: IterateQueue ) ?;
470+
471+ iter_has_elemnt = false ;
472+ for mut avail_desc in queue_iter {
473+ used_any = true ;
474+ iter_has_elemnt = true ;
475+ let mem = atomic_mem. clone ( ) . memory ( ) ;
476+
477+ let head_idx = avail_desc. head_index ( ) ;
478+ let used_len = match VsockPacket :: from_rx_virtq_chain (
479+ mem. deref ( ) ,
480+ & mut avail_desc,
481+ self . tx_buffer_size ,
482+ ) {
483+ Ok ( mut pkt) => {
484+ let recv_result = match rx_queue_type {
485+ RxQueueType :: Standard => self . thread_backend . recv_pkt ( & mut pkt) ,
486+ RxQueueType :: RawPkts => self . thread_backend . recv_raw_pkt ( & mut pkt) ,
487+ } ;
488+
489+ if recv_result. is_ok ( ) {
490+ PKT_HEADER_SIZE + pkt. len ( ) as usize
491+ } else {
492+ queue. iter ( mem) . unwrap ( ) . go_to_previous_position ( ) ;
493+ break ;
494+ }
490495 }
491- }
492- Err ( e) => {
493- warn ! ( "vsock: RX queue error: {:?}" , e) ;
494- 0
495- }
496- } ;
496+ Err ( e) => {
497+ warn ! ( "vsock: RX queue error: {:?}" , e) ;
498+ 0
499+ }
500+ } ;
497501
498- let vring = vring. clone ( ) ;
499- let event_idx = self . event_idx ;
502+ let vring = vring. clone ( ) ;
503+ let event_idx = self . event_idx ;
500504
501- self . pool . spawn_ok ( async move {
502- // TODO: Understand why doing the following in the pool works
503- if event_idx {
504- if vring. add_used ( head_idx, used_len as u32 ) . is_err ( ) {
505- warn ! ( "Could not return used descriptors to ring" ) ;
506- }
507- match vring. needs_notification ( ) {
508- Err ( _) => {
509- warn ! ( "Could not check if queue needs to be notified" ) ;
510- vring. signal_used_queue ( ) . unwrap ( ) ;
505+ self . pool . spawn_ok ( async move {
506+ // TODO: Understand why doing the following in the pool works
507+ if event_idx {
508+ if vring. add_used ( head_idx, used_len as u32 ) . is_err ( ) {
509+ warn ! ( "Could not return used descriptors to ring" ) ;
511510 }
512- Ok ( needs_notification) => {
513- if needs_notification {
511+ match vring. needs_notification ( ) {
512+ Err ( _) => {
513+ warn ! ( "Could not check if queue needs to be notified" ) ;
514514 vring. signal_used_queue ( ) . unwrap ( ) ;
515515 }
516+ Ok ( needs_notification) => {
517+ if needs_notification {
518+ vring. signal_used_queue ( ) . unwrap ( ) ;
519+ }
520+ }
516521 }
522+ } else {
523+ if vring. add_used ( head_idx, used_len as u32 ) . is_err ( ) {
524+ warn ! ( "Could not return used descriptors to ring" ) ;
525+ }
526+ vring. signal_used_queue ( ) . unwrap ( ) ;
517527 }
518- } else {
519- if vring. add_used ( head_idx, used_len as u32 ) . is_err ( ) {
520- warn ! ( "Could not return used descriptors to ring" ) ;
521- }
522- vring. signal_used_queue ( ) . unwrap ( ) ;
523- }
524- } ) ;
528+ } ) ;
525529
526- match rx_queue_type {
527- RxQueueType :: Standard => {
528- if !self . thread_backend . pending_rx ( ) {
529- break ;
530+ match rx_queue_type {
531+ RxQueueType :: Standard => {
532+ if !self . thread_backend . pending_rx ( ) {
533+ break ;
534+ }
530535 }
531- }
532- RxQueueType :: RawPkts => {
533- if ! self . thread_backend . pending_raw_pkts ( ) {
534- break ;
536+ RxQueueType :: RawPkts => {
537+ if ! self . thread_backend . pending_raw_pkts ( ) {
538+ break ;
539+ }
535540 }
536541 }
537542 }
0 commit comments