+
+
+{% highlight scala %}
+val df = spark
+ .readStream
+ .format("kafka-share")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.share.group.id", "my-share-group")
+ .option("subscribe", "topic1")
+ .load()
+
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+{% endhighlight %}
+
+
+
+{% highlight python %}
+df = spark \
+ .readStream \
+ .format("kafka-share") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("kafka.share.group.id", "my-share-group") \
+ .option("subscribe", "topic1") \
+ .load()
+
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+{% endhighlight %}
+
+
+
+{% highlight java %}
+Dataset df = spark
+ .readStream()
+ .format("kafka-share")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("kafka.share.group.id", "my-share-group")
+ .option("subscribe", "topic1")
+ .load();
+
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+{% endhighlight %}
+
+
+
+
+## Schema
+
+Each row has the following schema:
+
+| Column | Type |
+|--------|------|
+| key | binary |
+| value | binary |
+| topic | string |
+| partition | int |
+| offset | long |
+| timestamp | long |
+| timestampType | int |
+| headers (optional) | array |
+
+## Configuration Options
+
+| Option | Required | Default | Description |
+|--------|----------|---------|-------------|
+| kafka.bootstrap.servers | yes | none | Kafka broker addresses |
+| kafka.share.group.id | yes | none | Share group identifier |
+| subscribe | yes | none | Comma-separated list of topics |
+| subscribePattern | no | none | Topic pattern (alternative to subscribe) |
+| kafka.share.acknowledgment.mode | no | implicit | `implicit` or `explicit` |
+| kafka.share.exactly.once.strategy | no | none | `none`, `idempotent`, `two-phase-commit`, or `checkpoint-dedup` |
+| kafka.share.parallelism | no | spark.default.parallelism | Number of concurrent consumers |
+| kafka.share.lock.timeout.ms | no | 30000 | Acquisition lock timeout |
+| includeHeaders | no | false | Include Kafka headers |
+
+## Acknowledgment Modes
+
+### Implicit Mode (Default)
+
+Records are automatically acknowledged as ACCEPT when the batch completes successfully. On failure, acquisition locks expire and Kafka redelivers records.
+
+### Explicit Mode
+
+Use `foreachBatch` to manually acknowledge records:
+
+