@@ -734,6 +734,7 @@ impl Layout {
734734
735735 pub async fn insert < ' a > (
736736 & ' a self ,
737+ logger : & Logger ,
737738 conn : & mut AsyncPgConnection ,
738739 group : & ' a RowGroup ,
739740 stopwatch : & StopwatchMetrics ,
@@ -767,13 +768,39 @@ impl Layout {
767768 for chunk in group. write_chunks ( chunk_size) {
768769 // Empty chunks would lead to invalid SQL
769770 if !chunk. is_empty ( ) {
770- InsertQuery :: new ( table, & chunk) ?
771- . execute ( conn)
772- . await
773- . map_err ( |e| {
771+ if let Err ( e) = InsertQuery :: new ( table, & chunk) ?. execute ( conn) . await {
772+ // We occasionally get these errors but it's entirely
773+ // unclear what causes them. We work around that by
774+ // switching to row-by-row inserts until we can figure
775+ // out what the underlying cause is
776+ let err_msg = e. to_string ( ) ;
777+ if !err_msg. contains ( "value too large to transmit" ) {
774778 let ( block, msg) = chunk_details ( & chunk) ;
775- StoreError :: write_failure ( e, table. object . as_str ( ) , block, msg)
776- } ) ?;
779+ return Err ( StoreError :: write_failure (
780+ e,
781+ table. object . as_str ( ) ,
782+ block,
783+ msg,
784+ ) ) ;
785+ }
786+ let ( block, msg) = chunk_details ( & chunk) ;
787+ warn ! ( logger, "Insert of entire chunk failed. Trying row by row insert." ;
788+ "table" => table. object. as_str( ) ,
789+ "block" => block,
790+ "error" => err_msg,
791+ "details" => msg
792+ ) ;
793+ for single_chunk in chunk. as_vec ( ) {
794+ InsertQuery :: new ( table, & single_chunk) ?
795+ . execute ( conn)
796+ . await
797+ . map_err ( |e| {
798+ let ( block, msg) = chunk_details ( & single_chunk) ;
799+ let msg = format ! ( "{}: offending row {:?}" , msg, single_chunk) ;
800+ StoreError :: write_failure ( e, table. object . as_str ( ) , block, msg)
801+ } ) ?;
802+ }
803+ }
777804 }
778805 }
779806 Ok ( ( ) )
0 commit comments