Skip to content

Commit a9fcfb5

Browse files
LuciferYangdongjoon-hyun
authored andcommitted
[SPARK-54664][CONNECT] Clean up the code related to listenerCache from connect.StreamingQueryManager
### What changes were proposed in this pull request? #41752 introduced a `listenerCache` and related private methods (`cacheListenerById`, `getIdByListener`, and `removeCachedListener`) for `connect.StreamingQueryManager`. However, in #46287, the usage related to `listenerCache` was replaced by `streamingQueryListenerBus`. As a result, `listenerCache` and its associated private methods are no longer in use, and this current pr cleans them up. ### Why are the changes needed? Code cleanup. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #53420 from LuciferYang/StreamingQueryManager. Lead-authored-by: yangjie01 <yangjie01@baidu.com> Co-authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent a20ecc9 commit a9fcfb5

File tree

1 file changed

+0
-24
lines changed

1 file changed

+0
-24
lines changed

sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
package org.apache.spark.sql.connect
1919

2020
import java.util.UUID
21-
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
2221

2322
import scala.jdk.CollectionConverters._
2423

2524
import org.apache.spark.annotation.Evolving
2625
import org.apache.spark.connect.proto.{Command, StreamingQueryManagerCommand, StreamingQueryManagerCommandResult}
2726
import org.apache.spark.internal.Logging
28-
import org.apache.spark.sql.connect.common.InvalidPlanInput
2927
import org.apache.spark.sql.streaming
3028
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamingQueryListener}
3129

@@ -39,15 +37,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession)
3937
extends streaming.StreamingQueryManager
4038
with Logging {
4139

42-
// Mapping from id to StreamingQueryListener. There's another mapping from id to
43-
// StreamingQueryListener on server side. This is used by removeListener() to find the id
44-
// of previously added StreamingQueryListener and pass it to server side to find the
45-
// corresponding listener on server side. We use id to StreamingQueryListener mapping
46-
// here to make sure there's no hash collision as well as handling the case that adds and
47-
// removes the same listener instance multiple times properly.
48-
private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] =
49-
new ConcurrentHashMap()
50-
5140
private[spark] val streamingQueryListenerBus = new StreamingQueryListenerBus(sparkSession)
5241

5342
private[spark] def close(): Unit = {
@@ -128,17 +117,4 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession)
128117

129118
resp.getStreamingQueryManagerCommandResult
130119
}
131-
132-
private def cacheListenerById(id: String, listener: StreamingQueryListener): Unit = {
133-
listenerCache.putIfAbsent(id, listener)
134-
}
135-
136-
private def getIdByListener(listener: StreamingQueryListener): String = {
137-
listenerCache.forEach((k, v) => if (listener.equals(v)) return k)
138-
throw InvalidPlanInput(s"No id with listener $listener is found.")
139-
}
140-
141-
private def removeCachedListener(id: String): StreamingQueryListener = {
142-
listenerCache.remove(id)
143-
}
144120
}

0 commit comments

Comments
 (0)