Skip to content

Commit ce3fe69

Browse files
babenkoqyryq
authored andcommitted
Fix PipeInputToOutput when both input and output streams are zero-copy
commit_hash:59a168a6cba58ffbcb59d277c152c522bf0dbc79
1 parent cb01c24 commit ce3fe69

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

yt/yt/core/concurrency/async_stream.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,15 +1087,21 @@ namespace {
10871087

10881088
void DoPipeInputToOutput(
10891089
const auto& input,
1090-
const auto& output)
1090+
const auto& output,
1091+
bool copyBlocks)
10911092
{
10921093
while (true) {
10931094
auto asyncBlock = input->Read();
10941095
auto block = WaitFor(asyncBlock)
10951096
.ValueOrThrow();
1096-
if (!block || block.Empty()) {
1097+
if (block.Empty()) {
10971098
break;
10981099
}
1100+
if (copyBlocks) {
1101+
struct TPipeInputToOutputTag
1102+
{ };
1103+
block = TSharedRef::MakeCopy<TPipeInputToOutputTag>(block);
1104+
}
10991105
WaitFor(output->Write(block))
11001106
.ThrowOnError();
11011107
}
@@ -1107,14 +1113,26 @@ void PipeInputToOutput(
11071113
const IAsyncZeroCopyInputStreamPtr& input,
11081114
const IAsyncOutputStreamPtr& output)
11091115
{
1110-
DoPipeInputToOutput(input, output);
1116+
DoPipeInputToOutput(
1117+
input,
1118+
output,
1119+
/*copyBlocks*/ false);
11111120
}
11121121

11131122
void PipeInputToOutput(
11141123
const IAsyncZeroCopyInputStreamPtr& input,
11151124
const IAsyncZeroCopyOutputStreamPtr& output)
11161125
{
1117-
DoPipeInputToOutput(input, output);
1126+
// NB: When both input and output strams are zero-copy
1127+
// blocks need to be copied:
1128+
// 1) input stream only guarantees their content remains intact
1129+
// until the next Read() call is made;
1130+
// 2) output stream relies on the passed content to be immutable
1131+
// the whole time.
1132+
DoPipeInputToOutput(
1133+
input,
1134+
output,
1135+
/*copyBlocks*/ true);
11181136
}
11191137

11201138
void DrainInput(const IAsyncZeroCopyInputStreamPtr& input)

0 commit comments

Comments
 (0)