@@ -453,76 +453,81 @@ impl VhostUserVsockThread {
453453
454454 let queue = vring_mut. get_queue_mut ( ) ;
455455
456- while let Some ( mut avail_desc) = queue
457- . iter ( atomic_mem. memory ( ) )
458- . map_err ( |_| Error :: IterateQueue ) ?
459- . next ( )
460- {
461- used_any = true ;
462- let mem = atomic_mem. clone ( ) . memory ( ) ;
463-
464- let head_idx = avail_desc. head_index ( ) ;
465- let used_len = match VsockPacket :: from_rx_virtq_chain (
466- mem. deref ( ) ,
467- & mut avail_desc,
468- self . tx_buffer_size ,
469- ) {
470- Ok ( mut pkt) => {
471- let recv_result = match rx_queue_type {
472- RxQueueType :: Standard => self . thread_backend . recv_pkt ( & mut pkt) ,
473- RxQueueType :: RawPkts => self . thread_backend . recv_raw_pkt ( & mut pkt) ,
474- } ;
475-
476- if recv_result. is_ok ( ) {
477- PKT_HEADER_SIZE + pkt. len ( ) as usize
478- } else {
479- queue. iter ( mem) . unwrap ( ) . go_to_previous_position ( ) ;
480- break ;
456+ let mut iter_has_elemnt = true ;
457+ while iter_has_elemnt {
458+ let queue_iter = queue
459+ . iter ( atomic_mem. memory ( ) )
460+ . map_err ( |_| Error :: IterateQueue ) ?;
461+
462+ iter_has_elemnt = false ;
463+ for mut avail_desc in queue_iter {
464+ used_any = true ;
465+ iter_has_elemnt = true ;
466+ let mem = atomic_mem. clone ( ) . memory ( ) ;
467+
468+ let head_idx = avail_desc. head_index ( ) ;
469+ let used_len = match VsockPacket :: from_rx_virtq_chain (
470+ mem. deref ( ) ,
471+ & mut avail_desc,
472+ self . tx_buffer_size ,
473+ ) {
474+ Ok ( mut pkt) => {
475+ let recv_result = match rx_queue_type {
476+ RxQueueType :: Standard => self . thread_backend . recv_pkt ( & mut pkt) ,
477+ RxQueueType :: RawPkts => self . thread_backend . recv_raw_pkt ( & mut pkt) ,
478+ } ;
479+
480+ if recv_result. is_ok ( ) {
481+ PKT_HEADER_SIZE + pkt. len ( ) as usize
482+ } else {
483+ queue. iter ( mem) . unwrap ( ) . go_to_previous_position ( ) ;
484+ break ;
485+ }
481486 }
482- }
483- Err ( e) => {
484- warn ! ( "vsock: RX queue error: {:?}" , e) ;
485- 0
486- }
487- } ;
487+ Err ( e) => {
488+ warn ! ( "vsock: RX queue error: {:?}" , e) ;
489+ 0
490+ }
491+ } ;
488492
489- let vring = vring. clone ( ) ;
490- let event_idx = self . event_idx ;
493+ let vring = vring. clone ( ) ;
494+ let event_idx = self . event_idx ;
491495
492- self . pool . spawn_ok ( async move {
493- // TODO: Understand why doing the following in the pool works
494- if event_idx {
495- if vring. add_used ( head_idx, used_len as u32 ) . is_err ( ) {
496- warn ! ( "Could not return used descriptors to ring" ) ;
497- }
498- match vring. needs_notification ( ) {
499- Err ( _) => {
500- warn ! ( "Could not check if queue needs to be notified" ) ;
501- vring. signal_used_queue ( ) . unwrap ( ) ;
496+ self . pool . spawn_ok ( async move {
497+ // TODO: Understand why doing the following in the pool works
498+ if event_idx {
499+ if vring. add_used ( head_idx, used_len as u32 ) . is_err ( ) {
500+ warn ! ( "Could not return used descriptors to ring" ) ;
502501 }
503- Ok ( needs_notification) => {
504- if needs_notification {
502+ match vring. needs_notification ( ) {
503+ Err ( _) => {
504+ warn ! ( "Could not check if queue needs to be notified" ) ;
505505 vring. signal_used_queue ( ) . unwrap ( ) ;
506506 }
507+ Ok ( needs_notification) => {
508+ if needs_notification {
509+ vring. signal_used_queue ( ) . unwrap ( ) ;
510+ }
511+ }
507512 }
513+ } else {
514+ if vring. add_used ( head_idx, used_len as u32 ) . is_err ( ) {
515+ warn ! ( "Could not return used descriptors to ring" ) ;
516+ }
517+ vring. signal_used_queue ( ) . unwrap ( ) ;
508518 }
509- } else {
510- if vring. add_used ( head_idx, used_len as u32 ) . is_err ( ) {
511- warn ! ( "Could not return used descriptors to ring" ) ;
512- }
513- vring. signal_used_queue ( ) . unwrap ( ) ;
514- }
515- } ) ;
519+ } ) ;
516520
517- match rx_queue_type {
518- RxQueueType :: Standard => {
519- if !self . thread_backend . pending_rx ( ) {
520- break ;
521+ match rx_queue_type {
522+ RxQueueType :: Standard => {
523+ if !self . thread_backend . pending_rx ( ) {
524+ break ;
525+ }
521526 }
522- }
523- RxQueueType :: RawPkts => {
524- if ! self . thread_backend . pending_raw_pkts ( ) {
525- break ;
527+ RxQueueType :: RawPkts => {
528+ if ! self . thread_backend . pending_raw_pkts ( ) {
529+ break ;
530+ }
526531 }
527532 }
528533 }
0 commit comments