Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -877,15 +877,21 @@ private class BufferedRowsReader(

private var index: Int = -1
private var rowsRead: Long = 0
private var closed: Boolean = false

private def checkNotClosed(op: String): Unit =
if (closed) throw new IllegalStateException(s"$op called on a closed BufferedRowsReader")

override def next(): Boolean = {
checkNotClosed("next()")
index += 1
val hasNext = index < partition.rows.length
if (hasNext) rowsRead += 1
hasNext
}

override def get(): InternalRow = {
checkNotClosed("get()")
val originalRow = partition.rows(index)
val values = new Array[Any](nonMetadataColumns.length)
nonMetadataColumns.zipWithIndex.foreach { case (col, idx) =>
Expand All @@ -895,7 +901,13 @@ private class BufferedRowsReader(
addMetadata(new GenericInternalRow(values))
}

override def close(): Unit = {}
// Intentionally strict: double-close throws rather than being idempotent (as Closeable permits).
// This is test code whose purpose is to catch reader lifecycle bugs early; a silent no-op on
// double-close would mask the very errors we want to detect.
override def close(): Unit = {
checkNotClosed("close()")
closed = true
}

private def extractFieldValue(
field: StructField,
Expand Down Expand Up @@ -1041,6 +1053,7 @@ private class BufferedRowsReader(
}

override def currentMetricsValues(): Array[CustomTaskMetric] = {
checkNotClosed("currentMetricsValues()")
val metric = new CustomTaskMetric {
override def name(): String = "rows_read"
override def value(): Long = rowsRead
Expand Down