File tree Expand file tree Collapse file tree 3 files changed +17
-24
lines changed
query/service/src/pipelines/processors/transforms/new_hash_join Expand file tree Collapse file tree 3 files changed +17
-24
lines changed Original file line number Diff line number Diff line change @@ -16,7 +16,6 @@ use std::error::Error;
1616use std:: fmt:: Debug ;
1717use std:: fmt:: Display ;
1818use std:: fmt:: Formatter ;
19- use std:: sync:: PoisonError ;
2019
2120use geozero:: error:: GeozeroError ;
2221
@@ -437,9 +436,3 @@ impl From<redis::RedisError> for ErrorCode {
437436 ErrorCode :: DictionarySourceError ( format ! ( "Dictionary Redis Error, cause: {}" , error) )
438437 }
439438}
440-
441- impl < T > From < PoisonError < T > > for ErrorCode {
442- fn from ( error : PoisonError < T > ) -> Self {
443- ErrorCode :: Internal ( format ! ( "{error}" ) )
444- }
445- }
Original file line number Diff line number Diff line change @@ -141,26 +141,26 @@ impl BasicHashJoin {
141141 return ;
142142 }
143143
144- if matches ! (
145- self . state. hash_table. deref( ) ,
146- HashJoinHashTable :: NestedLoop ( _)
147- ) {
148- return ;
149- }
150-
151144 let locked = self . state . mutex . lock ( ) ;
152145 let _locked = locked. unwrap_or_else ( PoisonError :: into_inner) ;
153146
154- if self . state . chunks . is_empty ( ) || !self . state . columns . is_empty ( ) {
147+ debug_assert ! ( !matches!(
148+ self . state. hash_table. deref( ) ,
149+ HashJoinHashTable :: NestedLoop ( _)
150+ ) ) ;
151+
152+ if !self . state . columns . is_empty ( ) {
155153 return ;
156154 }
157-
158155 if let Some ( block) = self . state . chunks . first ( ) {
159- for offset in 0 ..self . desc . build_projection . len ( ) {
160- let column_type = self . state . column_types . as_mut ( ) ;
161- column_type. push ( block. get_by_offset ( offset) . data_type ( ) ) ;
162- }
163- }
156+ let column_type = self . state . column_types . as_mut ( ) ;
157+ column_type. extend (
158+ ( 0 ..self . desc . build_projection . len ( ) )
159+ . map ( |offset| block. get_by_offset ( offset) . data_type ( ) ) ,
160+ ) ;
161+ } else {
162+ return ;
163+ } ;
164164
165165 let mut columns = Vec :: with_capacity ( self . desc . build_projection . len ( ) ) ;
166166 for offset in 0 ..self . desc . build_projection . len ( ) {
Original file line number Diff line number Diff line change @@ -120,7 +120,7 @@ impl Processor for TransformHashJoin {
120120 }
121121 }
122122
123- fn process < ' a > ( & ' a mut self ) -> Result < ( ) > {
123+ fn process ( & mut self ) -> Result < ( ) > {
124124 match & mut self . stage {
125125 Stage :: Finished => Ok ( ( ) ) ,
126126 Stage :: Build ( state) => {
@@ -147,7 +147,7 @@ impl Processor for TransformHashJoin {
147147 let stream = self . join . probe_block ( probe_data) ?;
148148 // This is safe because both join and stream are properties of the struct.
149149 state. stream = Some ( unsafe {
150- std:: mem:: transmute :: < Box < dyn JoinStream + ' a > , Box < dyn JoinStream > > ( stream)
150+ std:: mem:: transmute :: < Box < dyn JoinStream + ' _ > , Box < dyn JoinStream > > ( stream)
151151 } ) ;
152152 }
153153
@@ -166,7 +166,7 @@ impl Processor for TransformHashJoin {
166166 state. initialize = true ;
167167 // This is safe because both join and stream are properties of the struct.
168168 state. stream = Some ( unsafe {
169- std:: mem:: transmute :: < Box < dyn JoinStream + ' a > , Box < dyn JoinStream > > (
169+ std:: mem:: transmute :: < Box < dyn JoinStream + ' _ > , Box < dyn JoinStream > > (
170170 final_stream,
171171 )
172172 } ) ;
You can’t perform that action at this time.
0 commit comments