Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
import io.grpc.Detachable;
import io.grpc.Drainable;
import io.grpc.HasByteBuffer;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.protobuf.ProtoUtils;
import io.netty.buffer.ByteBuf;
Expand All @@ -41,11 +43,12 @@
import java.util.Collections;
import java.util.List;
import org.apache.arrow.flight.grpc.AddWritableBuffer;
import org.apache.arrow.flight.grpc.GetReadableBuffer;
import org.apache.arrow.flight.impl.Flight.FlightData;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ForeignAllocation;
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
Expand All @@ -55,10 +58,14 @@
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.MetadataVersion;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The in-memory representation of FlightData used to manage a stream of Arrow messages. */
class ArrowMessage implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(ArrowMessage.class);

// If true, deserialize Arrow data by giving Arrow a reference to the underlying gRPC buffer
// instead of copying the data. Defaults to true.
public static final boolean ENABLE_ZERO_COPY_READ;
Expand Down Expand Up @@ -312,8 +319,7 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
case APP_METADATA_TAG:
{
int size = readRawVarint32(stream);
appMetadata = allocator.buffer(size);
GetReadableBuffer.readIntoBuffer(stream, appMetadata, size, ENABLE_ZERO_COPY_READ);
appMetadata = readBuffer(allocator, stream, size);
break;
}
case BODY_TAG:
Expand All @@ -323,8 +329,7 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
body = null;
}
int size = readRawVarint32(stream);
body = allocator.buffer(size);
GetReadableBuffer.readIntoBuffer(stream, body, size, ENABLE_ZERO_COPY_READ);
body = readBuffer(allocator, stream, size);
break;

default:
Expand Down Expand Up @@ -377,6 +382,114 @@ private static int readRawVarint32(int firstByte, InputStream is) throws IOExcep
return CodedInputStream.readRawVarint32(firstByte, is);
}

/**
* Reads data from the stream into an ArrowBuf, without copying data when possible.
*
* <p>First attempts to transfer ownership of the gRPC buffer to Arrow via {@link
* #wrapGrpcBuffer}. This avoids any memory copy when the gRPC transport provides a direct
* ByteBuffer (e.g., Netty).
*
* <p>If not possible (e.g., heap buffer, fragmented data, or unsupported transport), falls back
* to allocating a new buffer and copying data into it.
*
* @param allocator The allocator to use for buffer allocation
* @param stream The input stream to read from
* @param size The number of bytes to read
* @return An ArrowBuf containing the data
* @throws IOException if there is an error reading from the stream
*/
private static ArrowBuf readBuffer(BufferAllocator allocator, InputStream stream, int size)
throws IOException {
if (ENABLE_ZERO_COPY_READ) {
ArrowBuf zeroCopyBuf = wrapGrpcBuffer(stream, allocator, size);
if (zeroCopyBuf != null) {
return zeroCopyBuf;
}
}

// Fall back to allocating and copying
ArrowBuf buf = allocator.buffer(size);
byte[] heapBytes = new byte[size];
ByteStreams.readFully(stream, heapBytes);
buf.writeBytes(heapBytes);
buf.writerIndex(size);
return buf;
}

/**
* Attempts to wrap gRPC's buffer as an ArrowBuf without copying.
*
* <p>This method takes ownership of gRPC's underlying buffer via {@link Detachable#detach()} and
* wraps it as an ArrowBuf using {@link BufferAllocator#wrapForeignAllocation}. The gRPC buffer
* will be released when the ArrowBuf is closed.
*
* @param stream The gRPC-provided InputStream
* @param allocator The allocator to use for wrapping the foreign allocation
* @param size The number of bytes to wrap
* @return An ArrowBuf wrapping gRPC's buffer, or {@code null} if zero-copy is not possible
*/
static ArrowBuf wrapGrpcBuffer(
final InputStream stream, final BufferAllocator allocator, final int size) {

if (!(stream instanceof Detachable) || !(stream instanceof HasByteBuffer)) {
return null;
}

HasByteBuffer hasByteBuffer = (HasByteBuffer) stream;
if (!hasByteBuffer.byteBufferSupported()) {
return null;
}

ByteBuffer peekBuffer = hasByteBuffer.getByteBuffer();
if (peekBuffer == null) {
return null;
}
if (!peekBuffer.isDirect()) {
return null;
}
if (peekBuffer.remaining() < size) {
// Data is fragmented across multiple buffers; zero-copy not possible
return null;
}

// Take ownership
InputStream detachedStream = ((Detachable) stream).detach();

// Get buffer from detached stream
ByteBuffer detachedByteBuffer = ((HasByteBuffer) detachedStream).getByteBuffer();

// Calculate memory address accounting for buffer position
long baseAddress = MemoryUtil.getByteBufferAddress(detachedByteBuffer);
long dataAddress = baseAddress + detachedByteBuffer.position();

// Create ForeignAllocation with proper cleanup
ForeignAllocation foreignAllocation =
new ForeignAllocation(size, dataAddress) {
@Override
protected void release0() {
closeQuietly(detachedStream);
}
};

try {
return allocator.wrapForeignAllocation(foreignAllocation);
} catch (Throwable t) {
// If it fails, clean up the detached stream and propagate
closeQuietly(detachedStream);
throw t;
}
}

private static void closeQuietly(InputStream stream) {
if (stream != null) {
try {
stream.close();
} catch (IOException e) {
LOG.debug("Error closing detached gRPC stream", e);
}
}
}

/**
* Convert the ArrowMessage to an InputStream.
*
Expand Down

This file was deleted.

Loading