Skip to content

Commit 21f9fb5

Browse files
committed
Change typehints to use Consumer and Producer
Previously typehints in namespaces `clj-kafka-x.consumers.simple` and `clj-kafka-x.producer` used implementation classes `KafkaConsumer` and `KafkaProducer`. This causes a problem when trying to test application code with fake/mock implementations of those classes. If passing anything else to those functions using the typehints one gets a `ClassCastException`. One could use Clojure's `proxy` or similar mechanisms to try to get around this problem but unfortunately the inner implementation of especially `KafkaProducer` makes mocking them via this manner practically impossible (there are some very nasty ways like JavaAssist but those should be unnecessary). Because of this very reason in Kafka Client library there are interfaces `Consumer` and `Producer` which `KafkaConsumer` and `KafkaProducer` implements. This commit replaces all typehint references to use those two interfaces to allow clean fake/mock implementation usage in tests. This is a non-breaking change and doesn't affect execution or performance characteristics of code in any way when using real `KafkaProducer` or `KafkaConsumer`. Typehint safety is also retained since code is referring to very same public methods as before.
1 parent d7187e4 commit 21f9fb5

File tree

2 files changed

+26
-26
lines changed

2 files changed

+26
-26
lines changed

src/clj_kafka_x/consumers/simple.clj

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ clj-kafka-x.consumers.simple
55
(:require [clj-kafka-x.data :refer :all])
66
(:import java.util.List
77
java.util.regex.Pattern
8-
[org.apache.kafka.clients.consumer ConsumerRebalanceListener KafkaConsumer OffsetAndMetadata OffsetCommitCallback]
8+
[org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetAndMetadata OffsetCommitCallback]
99
[org.apache.kafka.common.serialization ByteArrayDeserializer Deserializer StringDeserializer]
1010
org.apache.kafka.common.TopicPartition
1111
(java.util Map)))
@@ -93,7 +93,7 @@ clj-kafka-x.consumers.simple
9393
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)
9494
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)
9595
"
96-
[^KafkaConsumer consumer topics & {:keys [assigned-callback revoked-callback]
96+
[^Consumer consumer topics & {:keys [assigned-callback revoked-callback]
9797
:or {assigned-callback (fn [_])
9898
revoked-callback (fn [_])}}]
9999
;;TODO needs to be cleaned up and refactored
@@ -134,7 +134,7 @@ clj-kafka-x.consumers.simple
134134
;; {:topic \"topic-b\", :partitions #{0 1 2}},
135135
;; {:topic \"topic-c\", :partitions #{}}]
136136
"
137-
[^KafkaConsumer consumer]
137+
[^Consumer consumer]
138138
;;TODO is this clear and readable enough ? refactor?
139139
(let [auto-subs (.subscription consumer)
140140
manual-subs (.assignment consumer)
@@ -153,7 +153,7 @@ clj-kafka-x.consumers.simple
153153
(defn unsubscribe
154154
"Unsubcribes the consumer from any subscribed topics and/or partitions.
155155
It works for subscriptions carried out via subscribe-to-topics or subscribe-to-partitions functions"
156-
[^KafkaConsumer consumer]
156+
[^Consumer consumer]
157157
(.unsubscribe consumer))
158158

159159
(defn seek
@@ -190,9 +190,9 @@ clj-kafka-x.consumers.simple
190190
;; => nil
191191
192192
"
193-
([^KafkaConsumer consumer topic partition offset]
193+
([^Consumer consumer topic partition offset]
194194
(seek consumer (vector {:topic topic :partition partition}) offset))
195-
([^KafkaConsumer consumer tp-seq offset]
195+
([^Consumer consumer tp-seq offset]
196196
(let [tp-class-seq (map map->topic-partition tp-seq)
197197
tp-class-array (into-array TopicPartition tp-class-seq)]
198198
(cond
@@ -229,7 +229,7 @@ clj-kafka-x.consumers.simple
229229
;; :value \"Count Zero says 3 at Fri Mar 11 14:34:32 GMT 2016\"}]
230230
231231
"
232-
[^KafkaConsumer consumer & {:keys [timeout] :or {timeout 1000}}]
232+
[^Consumer consumer & {:keys [timeout] :or {timeout 1000}}]
233233

234234
(let [consumer-records (.poll consumer timeout)]
235235
(to-clojure consumer-records)))
@@ -272,13 +272,13 @@ clj-kafka-x.consumers.simple
272272
(println \"Commits passed for \" offsets))))
273273
;; => nil
274274
"
275-
([^KafkaConsumer consumer] (.commitAsync consumer))
276-
([^KafkaConsumer consumer offset-commit-fn]
275+
([^Consumer consumer] (.commitAsync consumer))
276+
([^Consumer consumer offset-commit-fn]
277277
(let [callback (reify OffsetCommitCallback
278278
(onComplete [_ offsets exception]
279279
(offset-commit-fn (tp-om-map->map offsets) exception)))]
280280
(.commitAsync consumer callback)))
281-
([^KafkaConsumer consumer topic-partition-offsets-metadata offset-commit-fn]
281+
([^Consumer consumer topic-partition-offsets-metadata offset-commit-fn]
282282
(let [callback (reify OffsetCommitCallback
283283
(onComplete [_ offsets exception]
284284
(offset-commit-fn (tp-om-map->map offsets) exception)))
@@ -306,8 +306,8 @@ clj-kafka-x.consumers.simple
306306
(commit-sync consumer tp-om)
307307
;; => nil
308308
"
309-
([^KafkaConsumer consumer] (.commitSync consumer))
310-
([^KafkaConsumer consumer topic-partitions-offsets-metadata]
309+
([^Consumer consumer] (.commitSync consumer))
310+
([^Consumer consumer topic-partitions-offsets-metadata]
311311
(let [tp-om-map (map->tp-om-map topic-partitions-offsets-metadata)]
312312
(.commitSync consumer tp-om-map))))
313313

@@ -323,7 +323,7 @@ clj-kafka-x.consumers.simple
323323
(last-committed-offset consumer {:topic \"topic-a\" :partition 2})
324324
;; => {:offset 10, :metadata \"Metadata set during commit\"}
325325
"
326-
[^KafkaConsumer consumer tp]
326+
[^Consumer consumer tp]
327327
(->> tp
328328
map->topic-partition
329329
(.committed consumer)
@@ -362,7 +362,7 @@ clj-kafka-x.consumers.simple
362362
;; :replicas [{:id 2, :host \"172.17.0.3\", :port 9093}],
363363
;; :in-sync-replicas [{:id 2, :host \"172.17.0.3\", :port 9093}]}]}
364364
"
365-
[^KafkaConsumer consumer]
365+
[^Consumer consumer]
366366
(str-pi-map->map (.listTopics consumer)))
367367

368368
(defn list-all-partitions
@@ -390,7 +390,7 @@ clj-kafka-x.consumers.simple
390390
;; :replicas [{:id 2, :host \"172.17.0.3\", :port 9093}],
391391
;; :in-sync-replicas [{:id 2, :host \"172.17.0.3\", :port 9093}]}]
392392
"
393-
[^KafkaConsumer consumer topic]
393+
[^Consumer consumer topic]
394394
(mapv to-clojure (.partitionsFor consumer topic)))
395395

396396

@@ -404,7 +404,7 @@ clj-kafka-x.consumers.simple
404404
(pause consumer {:topic \"topic-a\" :partition 2}
405405
{:topic \"topic-b\" :partition 0})
406406
"
407-
[^KafkaConsumer consumer tp-seq]
407+
[^Consumer consumer tp-seq]
408408
(->> (map map->topic-partition tp-seq)
409409
(into-array TopicPartition)
410410
(.pause consumer)))
@@ -420,7 +420,7 @@ clj-kafka-x.consumers.simple
420420
(resume consumer {:topic \"topic-a\" :partition 2}
421421
{:topic \"topic-b\" :partition 0})
422422
"
423-
[^KafkaConsumer consumer tp-seq]
423+
[^Consumer consumer tp-seq]
424424
(->> (map map->topic-partition tp-seq)
425425
(into-array TopicPartition)
426426
(.resume consumer)))
@@ -445,5 +445,5 @@ clj-kafka-x.consumers.simple
445445
;; :tags {\"client-id\" \"consumer-3\"},
446446
;; :value 0.0}]
447447
"
448-
[^KafkaConsumer consumer]
448+
[^Consumer consumer]
449449
(metrics->map (.metrics consumer)))

src/clj_kafka_x/producer.clj

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
(:refer-clojure :exclude [send flush])
66
(:require [clj-kafka-x.data :refer :all])
77
(:import [java.util.concurrent Future TimeUnit TimeoutException]
8-
[org.apache.kafka.clients.producer Callback KafkaProducer ProducerRecord RecordMetadata]
8+
[org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord RecordMetadata]
99
[org.apache.kafka.common Metric MetricName]
1010
(org.apache.kafka.common.serialization Serializer ByteArraySerializer StringSerializer)
1111
(java.util Map)))
@@ -95,18 +95,18 @@
9595
;; => #object[string representation of future object]
9696
;; Metadata-> {:topic topic-unknown, :partition 4, :offset 1} Exception-> nil
9797
"
98-
([^KafkaProducer producer record]
98+
([^Producer producer record]
9999
(let [fut (.send producer record)]
100100
(map-future-val fut to-clojure)))
101-
([^KafkaProducer producer record callback]
101+
([^Producer producer record callback]
102102
(let [fut (.send producer record (reify Callback
103103
(onCompletion [_ metadata exception]
104104
(callback (and metadata (to-clojure metadata)) exception))))]
105105
(map-future-val fut to-clojure))))
106106

107107
(defn flush
108108
"See: http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#flush()"
109-
[^KafkaProducer producer]
109+
[^Producer producer]
110110
(.flush producer))
111111

112112
(defn close
@@ -116,9 +116,9 @@
116116
117117
- http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()
118118
- http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close(long,%20java.util.concurrent.TimeUnit)"
119-
([^KafkaProducer producer]
119+
([^Producer producer]
120120
(.close producer))
121-
([^KafkaProducer producer timeout-ms]
121+
([^Producer producer timeout-ms]
122122
(.close producer timeout-ms TimeUnit/MILLISECONDS)))
123123

124124
(defn partitions
@@ -143,7 +143,7 @@
143143
;; :in-sync-replicas [{:id 1, :host \"172.17.0.4\", :port 9092}
144144
;; {:id 2, :host \"172.17.0.3\", :port 9093}]}]
145145
"
146-
[^KafkaProducer producer topic]
146+
[^Producer producer topic]
147147
(mapv to-clojure (.partitionsFor producer topic)))
148148

149149
(defn metrics
@@ -170,6 +170,6 @@
170170
;; :tags {\"client-id\" \"producer-2\", \"node-id\" \"node-3\"},
171171
;; :value 0.23866348448687352}]
172172
"
173-
[^KafkaProducer producer]
173+
[^Producer producer]
174174
(metrics->map (.metrics producer))
175175
)

0 commit comments

Comments
 (0)