From 4281b6ce709ceb88a6b0ef5a67c6d87fa68f0a7c Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 11:56:35 -0800 Subject: [PATCH 01/18] feat: Add KafkaBinding skeleton with constructor and formatter registration Scaffold the KafkaBinding class with initialize, self.default singleton, DEFAULT_KEY_MAPPER/DEFAULT_REVERSE_KEY_MAPPER constants, and formatter registration methods mirroring HttpBinding. Add require to cloud_events.rb and constructor unit tests. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events.rb | 1 + lib/cloud_events/kafka_binding.rb | 152 ++++++++++++++++++++++++++++++ test/test_kafka_binding.rb | 53 +++++++++++ 3 files changed, 206 insertions(+) create mode 100644 lib/cloud_events/kafka_binding.rb create mode 100644 test/test_kafka_binding.rb diff --git a/lib/cloud_events.rb b/lib/cloud_events.rb index 929a43b..31b3862 100644 --- a/lib/cloud_events.rb +++ b/lib/cloud_events.rb @@ -5,6 +5,7 @@ require "cloud_events/event" require "cloud_events/format" require "cloud_events/http_binding" +require "cloud_events/kafka_binding" require "cloud_events/json_format" require "cloud_events/text_format" diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb new file mode 100644 index 0000000..9a7542c --- /dev/null +++ b/lib/cloud_events/kafka_binding.rb @@ -0,0 +1,152 @@ +# frozen_string_literal: true + +module CloudEvents + ## + # Kafka protocol binding for CloudEvents. + # + # This class implements the Kafka protocol binding, including decoding of + # events from Kafka message hashes, and encoding of events to Kafka message + # hashes. It supports binary (header-based) and structured (body-based) + # content modes that can delegate to formatters such as JSON. + # + # Supports CloudEvents 1.0 only. + # See https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/kafka-protocol-binding.md. + # + # Kafka messages are represented as plain Ruby Hashes with the keys: + # - `key:` [String, nil] โ€” the Kafka record key + # - `value:` [String, nil] โ€” the Kafka record value (body) + # - `headers:` [Hash] โ€” String => String header pairs + # + class KafkaBinding + ## + # The name of the JSON decoder/encoder. + # @return [String] + # + JSON_FORMAT = "json" + + ## + # The default key mapper for encoding. + # Returns the `partitionkey` extension attribute from the event. + # @return [Proc] + # + DEFAULT_KEY_MAPPER = ->(event) { event["partitionkey"] } + + ## + # The default reverse key mapper for decoding. + # Sets the `partitionkey` extension attribute from the Kafka record key. + # Returns an empty hash if the key is nil. + # @return [Proc] + # + DEFAULT_REVERSE_KEY_MAPPER = ->(key) { key.nil? ? {} : { "partitionkey" => key } } + + ## + # Returns a default Kafka binding, including support for JSON format. + # + # @return [KafkaBinding] + # + def self.default + @default ||= begin + kafka_binding = new + kafka_binding.register_formatter(JsonFormat.new, encoder_name: JSON_FORMAT) + kafka_binding.default_encoder_name = JSON_FORMAT + kafka_binding + end + end + + ## + # Create an empty Kafka binding. + # + # @param key_mapper [Proc,nil] A callable `(event) -> String|nil` used + # to derive the Kafka record key when encoding. Defaults to + # {DEFAULT_KEY_MAPPER}. Set to `nil` to always produce a `nil` key. + # @param reverse_key_mapper [Proc,nil] A callable `(key) -> Hash` used + # to derive attributes to merge into the event when decoding. + # Defaults to {DEFAULT_REVERSE_KEY_MAPPER}. Set to `nil` to skip + # key-to-attribute mapping. + # + def initialize(key_mapper: DEFAULT_KEY_MAPPER, reverse_key_mapper: DEFAULT_REVERSE_KEY_MAPPER) + @key_mapper = key_mapper + @reverse_key_mapper = reverse_key_mapper + @event_decoders = Format::Multi.new do |result| + result&.key?(:event) ? result : nil + end + @event_encoders = {} + @data_decoders = Format::Multi.new do |result| + result&.key?(:data) && result.key?(:content_type) ? result : nil + end + @data_encoders = Format::Multi.new do |result| + result&.key?(:content) && result.key?(:content_type) ? result : nil + end + text_format = TextFormat.new + @data_decoders.formats.replace([text_format, HttpBinding::DefaultDataFormat]) + @data_encoders.formats.replace([text_format, HttpBinding::DefaultDataFormat]) + @default_encoder_name = nil + end + + ## + # Register a formatter for all operations it supports, based on which + # methods are implemented by the formatter object. See + # {CloudEvents::Format} for a list of possible methods. + # + # @param formatter [Object] The formatter. + # @param encoder_name [String] The encoder name under which this + # formatter will register its encode operations. Optional. If not + # specified, any event encoder will _not_ be registered. + # @return [self] + # + def register_formatter(formatter, encoder_name: nil) + encoder_name = encoder_name.to_s.strip.downcase if encoder_name + decode_event = formatter.respond_to?(:decode_event) + encode_event = encoder_name if formatter.respond_to?(:encode_event) + decode_data = formatter.respond_to?(:decode_data) + encode_data = formatter.respond_to?(:encode_data) + register_formatter_methods(formatter, + decode_event: decode_event, + encode_event: encode_event, + decode_data: decode_data, + encode_data: encode_data) + self + end + + ## + # Registers the given formatter for the given operations. Some arguments + # are activated by passing `true`, whereas those that rely on a format + # name are activated by passing in a name string. + # + # @param formatter [Object] The formatter. + # @param decode_event [boolean] If true, register the formatter's + # {CloudEvents::Format#decode_event} method. + # @param encode_event [String] If set to a string, use the formatter's + # {CloudEvents::Format#encode_event} method when that name is + # requested. + # @param decode_data [boolean] If true, register the formatter's + # {CloudEvents::Format#decode_data} method. + # @param encode_data [boolean] If true, register the formatter's + # {CloudEvents::Format#encode_data} method. + # @return [self] + # + def register_formatter_methods(formatter, + decode_event: false, + encode_event: nil, + decode_data: false, + encode_data: false) + @event_decoders.formats.unshift(formatter) if decode_event + if encode_event + encoders = @event_encoders[encode_event] ||= Format::Multi.new do |result| + result&.key?(:content) && result.key?(:content_type) ? result : nil + end + encoders.formats.unshift(formatter) + end + @data_decoders.formats.unshift(formatter) if decode_data + @data_encoders.formats.unshift(formatter) if encode_data + self + end + + ## + # The name of the encoder to use if none is specified. + # + # @return [String,nil] + # + attr_accessor :default_encoder_name + end +end diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb new file mode 100644 index 0000000..0066e53 --- /dev/null +++ b/test/test_kafka_binding.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +require_relative "helper" + +require "date" +require "json" +require "uri" + +describe CloudEvents::KafkaBinding do + let(:kafka_binding) { CloudEvents::KafkaBinding.default } + let(:minimal_kafka_binding) { CloudEvents::KafkaBinding.new } + let(:my_id) { "my_id" } + let(:my_source_string) { "/my_source" } + let(:my_source) { URI.parse(my_source_string) } + let(:my_type) { "my_type" } + let(:spec_version) { "1.0" } + let(:my_simple_data) { "12345" } + let(:my_json_object) { { "a" => "รค", "b" => "๐Ÿ˜€" } } + let(:my_json_escaped_data) { '{"a":"รค","b":"๐Ÿ˜€"}' } + let(:my_content_type_string) { "text/plain; charset=us-ascii" } + let(:my_content_type) { CloudEvents::ContentType.new(my_content_type_string) } + let(:my_json_content_type_string) { "application/json" } + let(:my_json_content_type) { CloudEvents::ContentType.new(my_json_content_type_string) } + let(:my_schema_string) { "/my_schema" } + let(:my_schema) { URI.parse(my_schema_string) } + let(:my_subject) { "my_subject" } + let(:my_time_string) { "2020-01-12T20:52:05-08:00" } + let(:my_time) { DateTime.rfc3339(my_time_string) } + let(:my_trace_context) { "1234567890;9876543210" } + + describe "constructor" do + it "creates a new instance" do + binding_obj = CloudEvents::KafkaBinding.new + assert_instance_of CloudEvents::KafkaBinding, binding_obj + end + + it "returns a default instance with JSON format registered" do + assert_instance_of CloudEvents::KafkaBinding, kafka_binding + end + + it "returns the same default singleton" do + assert_same CloudEvents::KafkaBinding.default, CloudEvents::KafkaBinding.default + end + + it "defines DEFAULT_KEY_MAPPER constant" do + assert_respond_to CloudEvents::KafkaBinding::DEFAULT_KEY_MAPPER, :call + end + + it "defines DEFAULT_REVERSE_KEY_MAPPER constant" do + assert_respond_to CloudEvents::KafkaBinding::DEFAULT_REVERSE_KEY_MAPPER, :call + end + end +end From 59f2c08bd50cc265248c49c06a4169f390d00f74 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 12:10:31 -0800 Subject: [PATCH 02/18] feat: Add KafkaBinding#probable_event? method Quick header-only check to determine if a Kafka message is likely a CloudEvent (checks for ce_specversion header or application/cloudevents content-type). Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 15 ++++++++++++ test/test_kafka_binding.rb | 38 +++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index 9a7542c..b63a3fa 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -148,5 +148,20 @@ def register_formatter_methods(formatter, # @return [String,nil] # attr_accessor :default_encoder_name + + ## + # Determine whether a Kafka message is likely a CloudEvent, by + # inspecting headers only (does not parse the value). + # + # @param message [Hash] The Kafka message hash. + # @return [boolean] + # + def probable_event?(message) + headers = message[:headers] || {} + return true if headers.key?("ce_specversion") + content_type = headers["content-type"] + return false unless content_type + content_type.start_with?("application/cloudevents") + end end end diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index 0066e53..097024b 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -50,4 +50,42 @@ assert_respond_to CloudEvents::KafkaBinding::DEFAULT_REVERSE_KEY_MAPPER, :call end end + + describe "probable_event?" do + it "detects a probable binary event" do + message = { + key: nil, + value: "hello", + headers: { "ce_specversion" => "1.0" }, + } + assert kafka_binding.probable_event?(message) + end + + it "detects a probable structured event" do + message = { + key: nil, + value: "{}", + headers: { "content-type" => "application/cloudevents+json" }, + } + assert kafka_binding.probable_event?(message) + end + + it "returns false for a non-CE message" do + message = { + key: nil, + value: "hello", + headers: { "content-type" => "application/json" }, + } + refute kafka_binding.probable_event?(message) + end + + it "returns false for a message with no relevant headers" do + message = { + key: nil, + value: "hello", + headers: {}, + } + refute kafka_binding.probable_event?(message) + end + end end From 445cc569d95a87f3fac2c30ba30e69458053ebd1 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 12:17:00 -0800 Subject: [PATCH 03/18] feat: Add KafkaBinding binary mode decode_event Decode CloudEvents from Kafka messages in binary content mode. Supports text/JSON data decoding, extension attributes, tombstones (nil value), configurable reverse_key_mapper for partition key mapping, and proper error handling for bad specversion and non-CE messages. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 80 ++++++++++++++ test/test_kafka_binding.rb | 178 ++++++++++++++++++++++++++++++ 2 files changed, 258 insertions(+) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index b63a3fa..a7ce7e0 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -163,5 +163,85 @@ def probable_event?(message) return false unless content_type content_type.start_with?("application/cloudevents") end + + ## + # Decode an event from a Kafka message hash. + # + # @param message [Hash] A hash with `:key`, `:value`, and `:headers` keys. + # @param allow_opaque [boolean] If true, returns {Event::Opaque} for + # unrecognized structured formats. Default is false. + # @param reverse_key_mapper [Proc,nil,:NOT_SET] A callable + # `(key) -> Hash`, or `nil` to skip key mapping. Defaults to the + # instance's reverse_key_mapper. + # @param format_args [keywords] Extra args forwarded to formatters. + # @return [CloudEvents::Event] The decoded event. + # @raise [NotCloudEventError] if the message is not a CloudEvent. + # @raise [SpecVersionError] if the specversion is not supported. + # @raise [UnsupportedFormatError] if a structured format is not recognized. + # @raise [FormatSyntaxError] if the structured content is malformed. + # + def decode_event(message, allow_opaque: false, reverse_key_mapper: :NOT_SET, **format_args) + reverse_key_mapper = @reverse_key_mapper if reverse_key_mapper == :NOT_SET + headers = message[:headers] || {} + content_type_string = headers["content-type"] + content_type = ContentType.new(content_type_string) if content_type_string + + event = if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents" + decode_structured_content(message, content_type, allow_opaque, **format_args) + elsif headers.key?("ce_specversion") + decode_binary_content(message, content_type, **format_args) + end + unless event + ct_desc = content_type_string ? content_type_string.inspect : "not present" + raise NotCloudEventError, "content-type is #{ct_desc}, and ce_specversion header is not present" + end + apply_reverse_key_mapper(event, message[:key], reverse_key_mapper) + end + + private + + # @private + OMIT_ATTR_NAMES = ["specversion", "spec_version", "data", "datacontenttype", "data_content_type"].freeze + + def decode_binary_content(message, content_type, **format_args) + headers = message[:headers] || {} + spec_version = headers["ce_specversion"] + unless spec_version =~ /^1(\.|$)/ + raise SpecVersionError, "Unrecognized specversion: #{spec_version}" + end + attributes = { "spec_version" => spec_version } + headers.each do |key, value| + next unless key.start_with?("ce_") + attr_name = key[3..].downcase + attributes[attr_name] = value unless OMIT_ATTR_NAMES.include?(attr_name) + end + value = message[:value] + if value.nil? + attributes["data_content_type"] = content_type if content_type + else + attributes["data_encoded"] = value + result = @data_decoders.decode_data(spec_version: spec_version, + content: value, + content_type: content_type, + **format_args) + if result + attributes["data"] = result[:data] + content_type = result[:content_type] + end + attributes["data_content_type"] = content_type if content_type + end + Event.create(spec_version: spec_version, set_attributes: attributes) + end + + def decode_structured_content(_message, _content_type, _allow_opaque, **_format_args) + nil + end + + def apply_reverse_key_mapper(event, key, reverse_key_mapper) + return event unless reverse_key_mapper + mapped_attrs = reverse_key_mapper.call(key) + return event if mapped_attrs.nil? || mapped_attrs.empty? + event.with(**mapped_attrs.transform_keys(&:to_sym)) + end end end diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index 097024b..0e7daf6 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -88,4 +88,182 @@ refute kafka_binding.probable_event?(message) end end + + describe "decode_event binary mode" do + it "decodes a binary message with text content type" do + message = { + key: nil, + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + "ce_dataschema" => my_schema_string, + "ce_subject" => my_subject, + "ce_time" => my_time_string, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_id, event.id + assert_equal my_source, event.source + assert_equal my_type, event.type + assert_equal spec_version, event.spec_version + assert_equal my_simple_data, event.data + assert_equal my_content_type_string, event.data_content_type.to_s + assert_equal my_schema, event.data_schema + assert_equal my_subject, event.subject + assert_equal my_time, event.time + end + + it "decodes a binary message with JSON content type" do + message = { + key: nil, + value: my_json_escaped_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_json_content_type_string, + "ce_dataschema" => my_schema_string, + "ce_subject" => my_subject, + "ce_time" => my_time_string, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_json_object, event.data + assert_equal my_json_escaped_data, event.data_encoded + end + + it "decodes a minimal binary message" do + message = { + key: nil, + value: nil, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_id, event.id + assert_equal my_source, event.source + assert_equal my_type, event.type + assert_nil event.data + assert_nil event.data_content_type + end + + it "decodes a binary message with extension attributes" do + message = { + key: nil, + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + "ce_tracecontext" => my_trace_context, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_trace_context, event["tracecontext"] + end + + it "decodes a tombstone message (nil value)" do + message = { + key: nil, + value: nil, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + refute event.data? + assert_nil event.data + end + + it "maps key to partitionkey with default reverse_key_mapper" do + message = { + key: "my-partition-key", + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + }, + } + event = kafka_binding.decode_event(message) + assert_equal "my-partition-key", event["partitionkey"] + end + + it "uses custom reverse_key_mapper per-call" do + message = { + key: "custom-key", + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + }, + } + mapper = ->(key) { key.nil? ? {} : { "mykey" => key } } + event = kafka_binding.decode_event(message, reverse_key_mapper: mapper) + assert_equal "custom-key", event["mykey"] + assert_nil event["partitionkey"] + end + + it "skips key mapping when reverse_key_mapper is nil" do + message = { + key: "my-partition-key", + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_nil event["partitionkey"] + end + + it "raises SpecVersionError for bad specversion" do + message = { + key: nil, + value: "hello", + headers: { + "ce_specversion" => "0.1", + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + }, + } + assert_raises CloudEvents::SpecVersionError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + + it "raises NotCloudEventError for non-CE message" do + message = { + key: nil, + value: "hello", + headers: { "content-type" => "application/json" }, + } + assert_raises CloudEvents::NotCloudEventError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + end end From b7af6a37810cd022c1a809afbbc9fa98e9b43ddd Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 12:22:50 -0800 Subject: [PATCH 04/18] feat: Add KafkaBinding structured mode decode_event Decode CloudEvents from Kafka messages in structured content mode. Supports JSON format, charset variants, opaque fallback for unrecognized formats, and reverse_key_mapper application to structured events. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 11 +++- test/test_kafka_binding.rb | 86 +++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 2 deletions(-) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index a7ce7e0..7cebd8b 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -233,8 +233,15 @@ def decode_binary_content(message, content_type, **format_args) Event.create(spec_version: spec_version, set_attributes: attributes) end - def decode_structured_content(_message, _content_type, _allow_opaque, **_format_args) - nil + def decode_structured_content(message, content_type, allow_opaque, **format_args) + content = message[:value].to_s + result = @event_decoders.decode_event(content: content, + content_type: content_type, + data_decoder: @data_decoders, + **format_args) + return result[:event] if result + return Event::Opaque.new(content, content_type) if allow_opaque + raise UnsupportedFormatError, "Unknown cloudevents content type: #{content_type}" end def apply_reverse_key_mapper(event, key, reverse_key_mapper) diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index 0e7daf6..fc42282 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -266,4 +266,90 @@ end end end + + describe "decode_event structured mode" do + let(:my_json_struct) do + { + "data" => my_simple_data, + "datacontenttype" => my_content_type_string, + "dataschema" => my_schema_string, + "id" => my_id, + "source" => my_source_string, + "specversion" => spec_version, + "subject" => my_subject, + "time" => my_time_string, + "type" => my_type, + } + end + let(:my_json_struct_encoded) { JSON.dump(my_json_struct) } + + it "decodes a JSON-structured message" do + message = { + key: nil, + value: my_json_struct_encoded, + headers: { "content-type" => "application/cloudevents+json" }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_id, event.id + assert_equal my_source, event.source + assert_equal my_type, event.type + assert_equal spec_version, event.spec_version + assert_equal my_simple_data, event.data + end + + it "decodes a JSON-structured message with charset" do + message = { + key: nil, + value: my_json_struct_encoded, + headers: { "content-type" => "application/cloudevents+json; charset=utf-8" }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_id, event.id + assert_equal my_type, event.type + end + + it "returns opaque for unrecognized structured format when allow_opaque is true" do + message = { + key: nil, + value: "some content", + headers: { "content-type" => "application/cloudevents+foo" }, + } + event = minimal_kafka_binding.decode_event(message, allow_opaque: true, reverse_key_mapper: nil) + assert_kind_of CloudEvents::Event::Opaque, event + assert_equal "some content", event.content + end + + it "raises UnsupportedFormatError for unknown structured format" do + message = { + key: nil, + value: "some content", + headers: { "content-type" => "application/cloudevents+foo" }, + } + assert_raises CloudEvents::UnsupportedFormatError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + + it "raises FormatSyntaxError for malformed JSON" do + message = { + key: nil, + value: "!!!", + headers: { "content-type" => "application/cloudevents+json" }, + } + error = assert_raises(CloudEvents::FormatSyntaxError) do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + assert_kind_of JSON::ParserError, error.cause + end + + it "applies reverse_key_mapper to structured decoded events" do + message = { + key: "my-partition-key", + value: my_json_struct_encoded, + headers: { "content-type" => "application/cloudevents+json" }, + } + event = kafka_binding.decode_event(message) + assert_equal "my-partition-key", event["partitionkey"] + end + end end From 3c6ec3ae3781018457698c20bb387be6e5e34259 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 12:36:25 -0800 Subject: [PATCH 05/18] feat: Add KafkaBinding binary mode encode_event Encode CloudEvents to Kafka messages in binary content mode. Supports text/JSON data encoding, extension attributes, tombstones (nil value for no data), and configurable key_mapper for partition key derivation. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 61 ++++++++++++++++++ test/test_kafka_binding.rb | 103 ++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index 7cebd8b..db33095 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -198,6 +198,31 @@ def decode_event(message, allow_opaque: false, reverse_key_mapper: :NOT_SET, **f apply_reverse_key_mapper(event, message[:key], reverse_key_mapper) end + ## + # Encode an event into a Kafka message hash. + # + # @param event [CloudEvents::Event,CloudEvents::Event::Opaque] The event. + # @param structured_format [boolean,String] If false (default), encodes + # in binary content mode. If true or a format name string, encodes + # in structured content mode. + # @param key_mapper [Proc,nil,:NOT_SET] A callable + # `(event) -> String|nil`, or `nil` to always produce a `nil` key. + # Defaults to the instance's key_mapper. + # @param format_args [keywords] Extra args forwarded to formatters. + # @return [Hash] A hash with `:key`, `:value`, and `:headers` keys. + # + def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format_args) + key_mapper = @key_mapper if key_mapper == :NOT_SET + if event.is_a?(Event::Opaque) + return encode_opaque_event(event) + end + if structured_format + encode_structured_event(event, structured_format, key_mapper, **format_args) + else + encode_binary_event(event, key_mapper, **format_args) + end + end + private # @private @@ -250,5 +275,41 @@ def apply_reverse_key_mapper(event, key, reverse_key_mapper) return event if mapped_attrs.nil? || mapped_attrs.empty? event.with(**mapped_attrs.transform_keys(&:to_sym)) end + + def encode_binary_event(event, key_mapper, **format_args) + key = key_mapper&.call(event) + headers = {} + event.to_h.each do |attr_key, value| + next if ["data", "data_encoded", "datacontenttype"].include?(attr_key) + headers["ce_#{attr_key}"] = value.to_s + end + body, content_type = extract_event_data(event, format_args) + headers["content-type"] = content_type.to_s if content_type + { key: key, value: body, headers: headers } + end + + def encode_structured_event(_event, _structured_format, _key_mapper, **_format_args) + raise ::ArgumentError, "Structured encoding not yet implemented" + end + + def encode_opaque_event(event) + { key: nil, value: event.content, headers: { "content-type" => event.content_type.to_s } } + end + + def extract_event_data(event, format_args) + body = event.data_encoded + if body + [body, event.data_content_type] + elsif event.data? + result = @data_encoders.encode_data(spec_version: event.spec_version, + data: event.data, + content_type: event.data_content_type, + **format_args) + raise UnsupportedFormatError, "Could not encode data content-type" unless result + [result[:content], result[:content_type]] + else + [nil, nil] + end + end end end diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index fc42282..a1a8d7b 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -352,4 +352,107 @@ assert_equal "my-partition-key", event["partitionkey"] end end + + describe "encode_event binary mode" do + let(:my_simple_event) do + CloudEvents::Event::V1.new(data_encoded: my_simple_data, + data: my_simple_data, + datacontenttype: my_content_type_string, + dataschema: my_schema_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + subject: my_subject, + time: my_time_string, + type: my_type) + end + let(:my_json_event) do + CloudEvents::Event::V1.new(data_encoded: my_json_escaped_data, + data: my_json_object, + datacontenttype: my_json_content_type_string, + dataschema: my_schema_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + subject: my_subject, + time: my_time_string, + type: my_type) + end + let(:my_minimal_event) do + CloudEvents::Event::V1.new(id: my_id, + source: my_source_string, + specversion: spec_version, + type: my_type) + end + let(:my_extensions_event) do + CloudEvents::Event::V1.new(data_encoded: my_simple_data, + data: my_simple_data, + datacontenttype: my_content_type_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + type: my_type, + tracecontext: my_trace_context) + end + + it "encodes an event with text content type to binary mode" do + message = kafka_binding.encode_event(my_simple_event, key_mapper: nil) + assert_equal my_simple_data, message[:value] + assert_equal my_content_type_string, message[:headers]["content-type"] + assert_equal spec_version, message[:headers]["ce_specversion"] + assert_equal my_id, message[:headers]["ce_id"] + assert_equal my_source_string, message[:headers]["ce_source"] + assert_equal my_type, message[:headers]["ce_type"] + assert_equal my_schema_string, message[:headers]["ce_dataschema"] + assert_equal my_subject, message[:headers]["ce_subject"] + assert_equal my_time_string, message[:headers]["ce_time"] + assert_nil message[:key] + end + + it "encodes an event with JSON content type to binary mode" do + message = kafka_binding.encode_event(my_json_event, key_mapper: nil) + assert_equal my_json_escaped_data, message[:value] + assert_equal my_json_content_type_string, message[:headers]["content-type"] + end + + it "encodes a minimal event" do + message = kafka_binding.encode_event(my_minimal_event, key_mapper: nil) + assert_nil message[:value] + assert_nil message[:headers]["content-type"] + assert_equal spec_version, message[:headers]["ce_specversion"] + assert_equal my_id, message[:headers]["ce_id"] + end + + it "encodes an event with extension attributes" do + message = kafka_binding.encode_event(my_extensions_event, key_mapper: nil) + assert_equal my_trace_context, message[:headers]["ce_tracecontext"] + end + + it "encodes an event with no data as tombstone (nil value)" do + message = kafka_binding.encode_event(my_minimal_event, key_mapper: nil) + assert_nil message[:value] + end + + it "uses default key_mapper to set key from partitionkey" do + event = my_simple_event.with(partitionkey: "my-partition-key") + message = kafka_binding.encode_event(event) + assert_equal "my-partition-key", message[:key] + end + + it "produces nil key when event has no partitionkey" do + message = kafka_binding.encode_event(my_simple_event) + assert_nil message[:key] + end + + it "uses custom key_mapper per-call" do + message = kafka_binding.encode_event(my_simple_event, key_mapper: ->(e) { e.id }) + assert_equal my_id, message[:key] + end + + it "produces nil key when key_mapper is nil" do + event = my_simple_event.with(partitionkey: "my-partition-key") + message = kafka_binding.encode_event(event, key_mapper: nil) + assert_nil message[:key] + end + end end From b79248aa85b428c560cb41ad352a55b5dbc20e9f Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 12:54:28 -0800 Subject: [PATCH 06/18] feat: Add KafkaBinding structured mode encode_event Encode CloudEvents to Kafka messages in structured content mode using named format encoders. Supports opaque event passthrough and key_mapper in structured mode. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 11 +++++-- test/test_kafka_binding.rb | 48 +++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index db33095..5bc877c 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -288,8 +288,15 @@ def encode_binary_event(event, key_mapper, **format_args) { key: key, value: body, headers: headers } end - def encode_structured_event(_event, _structured_format, _key_mapper, **_format_args) - raise ::ArgumentError, "Structured encoding not yet implemented" + def encode_structured_event(event, structured_format, key_mapper, **format_args) + key = key_mapper&.call(event) + structured_format = default_encoder_name if structured_format == true + raise ::ArgumentError, "Format name not specified, and no default is set" unless structured_format + result = @event_encoders[structured_format]&.encode_event(event: event, + data_encoder: @data_encoders, + **format_args) + raise ::ArgumentError, "Unknown format name: #{structured_format.inspect}" unless result + { key: key, value: result[:content], headers: { "content-type" => result[:content_type].to_s } } end def encode_opaque_event(event) diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index a1a8d7b..cd5b199 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -455,4 +455,52 @@ assert_nil message[:key] end end + + describe "encode_event structured mode" do + let(:my_simple_event) do + CloudEvents::Event::V1.new(data_encoded: my_simple_data, + data: my_simple_data, + datacontenttype: my_content_type_string, + dataschema: my_schema_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + subject: my_subject, + time: my_time_string, + type: my_type) + end + + it "encodes an event to JSON structured format" do + message = kafka_binding.encode_event(my_simple_event, structured_format: true, + key_mapper: nil, sort: true) + assert_equal "application/cloudevents+json; charset=utf-8", message[:headers]["content-type"] + parsed = JSON.parse(message[:value]) + assert_equal my_id, parsed["id"] + assert_equal my_type, parsed["type"] + assert_equal my_source_string, parsed["source"] + assert_nil message[:key] + end + + it "encodes an opaque event" do + opaque = CloudEvents::Event::Opaque.new("some content", + CloudEvents::ContentType.new("application/cloudevents+json")) + message = kafka_binding.encode_event(opaque) + assert_equal "some content", message[:value] + assert_equal "application/cloudevents+json", message[:headers]["content-type"] + assert_nil message[:key] + end + + it "applies key_mapper in structured mode" do + event = my_simple_event.with(partitionkey: "my-partition-key") + message = kafka_binding.encode_event(event, structured_format: true) + assert_equal "my-partition-key", message[:key] + end + + it "raises ArgumentError when format name not specified and no default" do + binding_obj = CloudEvents::KafkaBinding.new + assert_raises ::ArgumentError do + binding_obj.encode_event(my_simple_event, structured_format: true, key_mapper: nil) + end + end + end end From 676c6397e90104e3387e5b80f27ffef4b62d13e0 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 12:55:54 -0800 Subject: [PATCH 07/18] test: Add KafkaBinding round-trip tests Verify encode-then-decode round trips for binary mode, structured mode, and with partitionkey extension attribute. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- test/test_kafka_binding.rb | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index cd5b199..65fba8b 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -503,4 +503,39 @@ end end end + + describe "round-trip" do + let(:my_event) do + CloudEvents::Event::V1.new(data_encoded: my_json_escaped_data, + data: my_json_object, + datacontenttype: my_json_content_type_string, + dataschema: my_schema_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + subject: my_subject, + time: my_time_string, + type: my_type) + end + + it "round-trips through binary mode" do + message = kafka_binding.encode_event(my_event, key_mapper: nil) + decoded = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_event, decoded + end + + it "round-trips through structured mode" do + message = kafka_binding.encode_event(my_event, structured_format: true, key_mapper: nil, sort: true) + decoded = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_event, decoded + end + + it "round-trips with partitionkey extension" do + event = my_event.with(partitionkey: "my-partition-key") + message = kafka_binding.encode_event(event) + decoded = kafka_binding.decode_event(message) + assert_equal "my-partition-key", decoded["partitionkey"] + assert_equal my_id, decoded.id + end + end end From 1cffd41982f62b803241690a69410c8f170592e4 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 12:57:23 -0800 Subject: [PATCH 08/18] tests: Implement Cucumber step definitions for Kafka protocol binding Replace pending Kafka steps with real implementations that decode Kafka messages using KafkaBinding.default. All 7 conformance scenarios (HTTP and Kafka) now pass. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- features/step_definitions/steps.rb | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/features/step_definitions/steps.rb b/features/step_definitions/steps.rb index 931b462..e635b1d 100644 --- a/features/step_definitions/steps.rb +++ b/features/step_definitions/steps.rb @@ -50,17 +50,21 @@ end Given "Kafka Protocol Binding is supported" do - pending "Kafka Protocol Binding is not yet implemented" + @kafka_binding = CloudEvents::KafkaBinding.default end -Given "a Kafka message with payload:" do |_str| - pending +Given "a Kafka message with payload:" do |str| + @kafka_value = str end -Given "Kafka headers:" do |_table| - pending +Given "Kafka headers:" do |table| + @kafka_headers = {} + table.hashes.each do |hash| + @kafka_headers[hash["key"].strip] = hash["value"] + end end When "parsed as Kafka message" do - pending + message = { key: nil, value: @kafka_value, headers: @kafka_headers } + @event = @kafka_binding.decode_event(message, reverse_key_mapper: nil) end From 995ae120c6dd721f05ddd01e6c4d8e1dfe934c1d Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 13:03:14 -0800 Subject: [PATCH 09/18] fix: Resolve rubocop violations in KafkaBinding Use parenthesized raise calls, move constant above private modifier, extract populate_data_attributes to reduce method length, and fix SymbolProc offense in tests. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 47 ++++++++++++++++--------------- test/test_kafka_binding.rb | 2 +- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index 5bc877c..5a2d79d 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -193,7 +193,7 @@ def decode_event(message, allow_opaque: false, reverse_key_mapper: :NOT_SET, **f end unless event ct_desc = content_type_string ? content_type_string.inspect : "not present" - raise NotCloudEventError, "content-type is #{ct_desc}, and ce_specversion header is not present" + raise(NotCloudEventError, "content-type is #{ct_desc}, and ce_specversion header is not present") end apply_reverse_key_mapper(event, message[:key], reverse_key_mapper) end @@ -223,17 +223,15 @@ def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format end end - private - # @private OMIT_ATTR_NAMES = ["specversion", "spec_version", "data", "datacontenttype", "data_content_type"].freeze + private + def decode_binary_content(message, content_type, **format_args) headers = message[:headers] || {} spec_version = headers["ce_specversion"] - unless spec_version =~ /^1(\.|$)/ - raise SpecVersionError, "Unrecognized specversion: #{spec_version}" - end + raise(SpecVersionError, "Unrecognized specversion: #{spec_version}") unless spec_version =~ /^1(\.|$)/ attributes = { "spec_version" => spec_version } headers.each do |key, value| next unless key.start_with?("ce_") @@ -241,23 +239,26 @@ def decode_binary_content(message, content_type, **format_args) attributes[attr_name] = value unless OMIT_ATTR_NAMES.include?(attr_name) end value = message[:value] - if value.nil? - attributes["data_content_type"] = content_type if content_type - else - attributes["data_encoded"] = value - result = @data_decoders.decode_data(spec_version: spec_version, - content: value, - content_type: content_type, - **format_args) - if result - attributes["data"] = result[:data] - content_type = result[:content_type] - end - attributes["data_content_type"] = content_type if content_type + unless value.nil? + content_type = populate_data_attributes(attributes, value, content_type, spec_version, format_args) end + attributes["data_content_type"] = content_type if content_type Event.create(spec_version: spec_version, set_attributes: attributes) end + def populate_data_attributes(attributes, value, content_type, spec_version, format_args) + attributes["data_encoded"] = value + result = @data_decoders.decode_data(spec_version: spec_version, + content: value, + content_type: content_type, + **format_args) + if result + attributes["data"] = result[:data] + content_type = result[:content_type] + end + content_type + end + def decode_structured_content(message, content_type, allow_opaque, **format_args) content = message[:value].to_s result = @event_decoders.decode_event(content: content, @@ -266,7 +267,7 @@ def decode_structured_content(message, content_type, allow_opaque, **format_args **format_args) return result[:event] if result return Event::Opaque.new(content, content_type) if allow_opaque - raise UnsupportedFormatError, "Unknown cloudevents content type: #{content_type}" + raise(UnsupportedFormatError, "Unknown cloudevents content type: #{content_type}") end def apply_reverse_key_mapper(event, key, reverse_key_mapper) @@ -291,11 +292,11 @@ def encode_binary_event(event, key_mapper, **format_args) def encode_structured_event(event, structured_format, key_mapper, **format_args) key = key_mapper&.call(event) structured_format = default_encoder_name if structured_format == true - raise ::ArgumentError, "Format name not specified, and no default is set" unless structured_format + raise(::ArgumentError, "Format name not specified, and no default is set") unless structured_format result = @event_encoders[structured_format]&.encode_event(event: event, data_encoder: @data_encoders, **format_args) - raise ::ArgumentError, "Unknown format name: #{structured_format.inspect}" unless result + raise(::ArgumentError, "Unknown format name: #{structured_format.inspect}") unless result { key: key, value: result[:content], headers: { "content-type" => result[:content_type].to_s } } end @@ -312,7 +313,7 @@ def extract_event_data(event, format_args) data: event.data, content_type: event.data_content_type, **format_args) - raise UnsupportedFormatError, "Could not encode data content-type" unless result + raise(UnsupportedFormatError, "Could not encode data content-type") unless result [result[:content], result[:content_type]] else [nil, nil] diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index 65fba8b..7798bf7 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -445,7 +445,7 @@ end it "uses custom key_mapper per-call" do - message = kafka_binding.encode_event(my_simple_event, key_mapper: ->(e) { e.id }) + message = kafka_binding.encode_event(my_simple_event, key_mapper: :id.to_proc) assert_equal my_id, message[:key] end From 866a6bb38c3f757cc45b26251f3bd1eb6830757e Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 13:27:56 -0800 Subject: [PATCH 10/18] chore: Add KafkaBinding to CLAUDE.md architecture section Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- CLAUDE.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 89c7013..2f3184f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -59,6 +59,19 @@ Handles encoding/decoding CloudEvents to/from HTTP (Rack env hashes). Supports: `HttpBinding.default` returns a singleton with `JsonFormat` and `TextFormat` pre-registered. Custom formatters are registered via `register_formatter`. +### Kafka Binding (`KafkaBinding`) + +Handles encoding/decoding CloudEvents to/from Kafka messages. CloudEvents 1.0 only (no V0.3). Supports: +- **Binary content mode** โ€” event attributes as `ce_*` headers (plain UTF-8, no percent-encoding), data in value +- **Structured content mode** โ€” entire event serialized in value (e.g., JSON) +- **No batch mode** (per the Kafka spec) +- **Tombstone support** โ€” `nil` value represents an event with no data +- **Key mapping** โ€” configurable callables to map between Kafka record keys and event attributes (default: `partitionkey` extension) + +Kafka messages are represented as plain `{ key:, value:, headers: }` Hashes, decoupled from any specific Kafka client library. + +`KafkaBinding.default` returns a singleton with `JsonFormat` and `TextFormat` pre-registered. Key mappers are configurable at construction and overridable per-call via `key_mapper:` / `reverse_key_mapper:` keyword arguments. + ### Format Layer - **`JsonFormat`** โ€” Encodes/decodes `application/cloudevents+json` and batch format. Also handles JSON data encoding/decoding for binary mode. From b285d825a47988cf7c0869d5c70d709b98720548 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 13:59:54 -0800 Subject: [PATCH 11/18] tests: Simplify round-trip partitionkey assertion to use event equality Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- test/test_kafka_binding.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index 7798bf7..6189479 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -534,8 +534,7 @@ event = my_event.with(partitionkey: "my-partition-key") message = kafka_binding.encode_event(event) decoded = kafka_binding.decode_event(message) - assert_equal "my-partition-key", decoded["partitionkey"] - assert_equal my_id, decoded.id + assert_equal event, decoded end end end From 7848226322408e9944b1ef4bb52b38df564b5d8a Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 14:00:03 -0800 Subject: [PATCH 12/18] chore: Add comments to KafkaBinding private methods Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index 5a2d79d..05d80d3 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -228,6 +228,8 @@ def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format private + # Decode a single event from binary content mode. Reads ce_* headers as + # attributes and the message value as event data. def decode_binary_content(message, content_type, **format_args) headers = message[:headers] || {} spec_version = headers["ce_specversion"] @@ -246,6 +248,9 @@ def decode_binary_content(message, content_type, **format_args) Event.create(spec_version: spec_version, set_attributes: attributes) end + # Populate data-related attributes (data_encoded, data, data_content_type) + # by running the value through registered data decoders. Returns the + # (possibly updated) content_type. def populate_data_attributes(attributes, value, content_type, spec_version, format_args) attributes["data_encoded"] = value result = @data_decoders.decode_data(spec_version: spec_version, @@ -259,6 +264,8 @@ def populate_data_attributes(attributes, value, content_type, spec_version, form content_type end + # Decode a single event from structured content mode. Delegates to + # registered event decoders, falling back to Event::Opaque if allowed. def decode_structured_content(message, content_type, allow_opaque, **format_args) content = message[:value].to_s result = @event_decoders.decode_event(content: content, @@ -270,6 +277,9 @@ def decode_structured_content(message, content_type, allow_opaque, **format_args raise(UnsupportedFormatError, "Unknown cloudevents content type: #{content_type}") end + # Apply the reverse_key_mapper to merge Kafka record key attributes into + # the decoded event. Returns the event unchanged if the mapper is nil or + # returns an empty hash. def apply_reverse_key_mapper(event, key, reverse_key_mapper) return event unless reverse_key_mapper mapped_attrs = reverse_key_mapper.call(key) @@ -277,6 +287,8 @@ def apply_reverse_key_mapper(event, key, reverse_key_mapper) event.with(**mapped_attrs.transform_keys(&:to_sym)) end + # Encode an event in binary content mode. Writes attributes as ce_* + # headers and event data as the message value. def encode_binary_event(event, key_mapper, **format_args) key = key_mapper&.call(event) headers = {} @@ -289,6 +301,8 @@ def encode_binary_event(event, key_mapper, **format_args) { key: key, value: body, headers: headers } end + # Encode an event in structured content mode using a named format encoder. + # The entire event is serialized into the message value. def encode_structured_event(event, structured_format, key_mapper, **format_args) key = key_mapper&.call(event) structured_format = default_encoder_name if structured_format == true @@ -300,10 +314,15 @@ def encode_structured_event(event, structured_format, key_mapper, **format_args) { key: key, value: result[:content], headers: { "content-type" => result[:content_type].to_s } } end + # Encode an opaque event by passing through its content and content_type + # directly, with a nil key. def encode_opaque_event(event) { key: nil, value: event.content, headers: { "content-type" => event.content_type.to_s } } end + # Extract the event data and content type for binary mode encoding. + # Uses data_encoded if present, otherwise delegates to data encoders. + # Returns [body, content_type], where body is nil for tombstones. def extract_event_data(event, format_args) body = event.data_encoded if body From 678485dfad7a189f18c9dcc9b80d2b6dbdbf1703 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 14:55:20 -0800 Subject: [PATCH 13/18] fix: Use ContentType parser in KafkaBinding#probable_event? Replace raw string comparison with ContentType parsing so that mixed-case content-type values (e.g. Application/CloudEvents+JSON) are correctly detected as CloudEvents. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 7 ++++--- test/test_kafka_binding.rb | 9 +++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index 05d80d3..7084d45 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -159,9 +159,10 @@ def register_formatter_methods(formatter, def probable_event?(message) headers = message[:headers] || {} return true if headers.key?("ce_specversion") - content_type = headers["content-type"] - return false unless content_type - content_type.start_with?("application/cloudevents") + content_type_string = headers["content-type"] + return false unless content_type_string + content_type = ContentType.new(content_type_string) + content_type.media_type == "application" && content_type.subtype_base == "cloudevents" end ## diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index 6189479..e653c14 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -79,6 +79,15 @@ refute kafka_binding.probable_event?(message) end + it "detects a probable structured event with mixed-case content type" do + message = { + key: nil, + value: "{}", + headers: { "content-type" => "Application/CloudEvents+JSON" }, + } + assert kafka_binding.probable_event?(message) + end + it "returns false for a message with no relevant headers" do message = { key: nil, From e0dfc2251216ef1da2860498131f669991a17ece Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 15:10:40 -0800 Subject: [PATCH 14/18] feat: Raise BatchNotSupportedError for batch content in KafkaBinding Add BatchNotSupportedError to the error hierarchy and raise it from KafkaBinding#decode_event when given an application/cloudevents-batch content type, since the Kafka protocol binding does not support batch content mode per the spec. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/errors.rb | 8 ++++++++ lib/cloud_events/kafka_binding.rb | 26 +++++++++++++++++--------- test/test_kafka_binding.rb | 11 +++++++++++ 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/lib/cloud_events/errors.rb b/lib/cloud_events/errors.rb index c679d9b..f557ba6 100644 --- a/lib/cloud_events/errors.rb +++ b/lib/cloud_events/errors.rb @@ -51,6 +51,14 @@ class SpecVersionError < CloudEventsError class AttributeError < CloudEventsError end + ## + # An error raised when a protocol binding that does not support batch + # content mode receives a batch. For example, the Kafka protocol binding + # does not support batches per the CloudEvents spec. + # + class BatchNotSupportedError < CloudEventsError + end + ## # Alias of UnsupportedFormatError, for backward compatibility. # diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index 7084d45..0a62a01 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -187,15 +187,7 @@ def decode_event(message, allow_opaque: false, reverse_key_mapper: :NOT_SET, **f content_type_string = headers["content-type"] content_type = ContentType.new(content_type_string) if content_type_string - event = if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents" - decode_structured_content(message, content_type, allow_opaque, **format_args) - elsif headers.key?("ce_specversion") - decode_binary_content(message, content_type, **format_args) - end - unless event - ct_desc = content_type_string ? content_type_string.inspect : "not present" - raise(NotCloudEventError, "content-type is #{ct_desc}, and ce_specversion header is not present") - end + event = decode_content(message, headers, content_type, content_type_string, allow_opaque, **format_args) apply_reverse_key_mapper(event, message[:key], reverse_key_mapper) end @@ -229,6 +221,22 @@ def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format private + # Detect content mode, reject batches, and dispatch to the appropriate + # binary or structured decoder. Raises if the message is not a CloudEvent. + def decode_content(message, headers, content_type, content_type_string, allow_opaque, **format_args) + if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents-batch" + raise(BatchNotSupportedError, "Kafka protocol binding does not support batch content mode") + end + if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents" + return decode_structured_content(message, content_type, allow_opaque, **format_args) + end + if headers.key?("ce_specversion") + return decode_binary_content(message, content_type, **format_args) + end + ct_desc = content_type_string ? content_type_string.inspect : "not present" + raise(NotCloudEventError, "content-type is #{ct_desc}, and ce_specversion header is not present") + end + # Decode a single event from binary content mode. Reads ce_* headers as # attributes and the message value as event data. def decode_binary_content(message, content_type, **format_args) diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index e653c14..bcaeb36 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -274,6 +274,17 @@ kafka_binding.decode_event(message, reverse_key_mapper: nil) end end + + it "raises BatchNotSupportedError for batch content type" do + message = { + key: nil, + value: "[{}]", + headers: { "content-type" => "application/cloudevents-batch+json" }, + } + assert_raises CloudEvents::BatchNotSupportedError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end end describe "decode_event structured mode" do From 2e05ba171430bb3ea2665f74b126ca54fde07195 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 15:40:21 -0800 Subject: [PATCH 15/18] fix: Reject CloudEvents 0.3 in KafkaBinding structured mode decode The binary decode path already raised SpecVersionError for non-1.x spec versions, but structured mode would accept and return a V0 event. Now decode_structured_content checks the decoded event's spec_version and raises SpecVersionError if it does not start with "1". Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 8 +++++++- test/test_kafka_binding.rb | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index 0a62a01..0ce3de6 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -281,7 +281,13 @@ def decode_structured_content(message, content_type, allow_opaque, **format_args content_type: content_type, data_decoder: @data_decoders, **format_args) - return result[:event] if result + if result + event = result[:event] + if event && !event.spec_version.start_with?("1") + raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") + end + return event + end return Event::Opaque.new(content, content_type) if allow_opaque raise(UnsupportedFormatError, "Unknown cloudevents content type: #{content_type}") end diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index bcaeb36..e731f14 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -350,6 +350,25 @@ end end + it "raises SpecVersionError for structured event with specversion 0.3" do + v03_hash = { + "data" => "hello", + "id" => my_id, + "source" => my_source_string, + "specversion" => "0.3", + "type" => my_type, + } + v03_struct = JSON.dump(v03_hash) + message = { + key: nil, + value: v03_struct, + headers: { "content-type" => "application/cloudevents+json" }, + } + assert_raises CloudEvents::SpecVersionError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + it "raises FormatSyntaxError for malformed JSON" do message = { key: nil, From ebee42f732366c295f78a85cc1ccd7c639055fd3 Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 15:44:27 -0800 Subject: [PATCH 16/18] fix: Reject non-V1 events in KafkaBinding#encode_event Raise SpecVersionError when encode_event receives an event with a specversion that does not start with "1", ensuring V0.3 events cannot be encoded via the Kafka binding. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 3 +++ test/test_kafka_binding.rb | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index 0ce3de6..a25ba23 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -209,6 +209,9 @@ def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format if event.is_a?(Event::Opaque) return encode_opaque_event(event) end + unless event.spec_version.start_with?("1") + raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") + end if structured_format encode_structured_event(event, structured_format, key_mapper, **format_args) else diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb index e731f14..e55048b 100644 --- a/test/test_kafka_binding.rb +++ b/test/test_kafka_binding.rb @@ -493,6 +493,16 @@ message = kafka_binding.encode_event(event, key_mapper: nil) assert_nil message[:key] end + + it "raises SpecVersionError for a V0 event" do + v0_event = CloudEvents::Event::V0.new(id: my_id, + source: my_source_string, + specversion: "0.3", + type: my_type) + assert_raises CloudEvents::SpecVersionError do + kafka_binding.encode_event(v0_event, key_mapper: nil) + end + end end describe "encode_event structured mode" do From 59a422107f064c8515457109d59b83bcd24b8cae Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 15:51:53 -0800 Subject: [PATCH 17/18] fix: Update spec_version checks Signed-off-by: Daniel Azuma --- lib/cloud_events/kafka_binding.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index a25ba23..bf1f240 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -209,7 +209,7 @@ def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format if event.is_a?(Event::Opaque) return encode_opaque_event(event) end - unless event.spec_version.start_with?("1") + unless event.respond_to?(:spec_version) && event.spec_version.start_with?("1.") raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") end if structured_format @@ -286,7 +286,7 @@ def decode_structured_content(message, content_type, allow_opaque, **format_args **format_args) if result event = result[:event] - if event && !event.spec_version.start_with?("1") + if event && !(event.respond_to?(:spec_version) && event.spec_version.start_with?("1.")) raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") end return event From 1f629d67988da9cf2bd42cc791988a0d9c192f3d Mon Sep 17 00:00:00 2001 From: Daniel Azuma Date: Mon, 9 Feb 2026 16:08:25 -0800 Subject: [PATCH 18/18] fix: Fix logic in specversion checks Signed-off-by: Daniel Azuma --- CLAUDE.md | 1 + lib/cloud_events/kafka_binding.rb | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 2f3184f..67bf258 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -90,6 +90,7 @@ All errors inherit from `CloudEventsError`: `NotCloudEventError`, `UnsupportedFo ## Contributing +- Use red-green test-driven development when making changes, unless instructed otherwise. - Conventional Commits format required (`fix:`, `feat:`, `docs:`, etc.) - Commits must be signed off (`git commit --signoff`) - Run `toys ci` before submitting PRs diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb index bf1f240..3974a2a 100644 --- a/lib/cloud_events/kafka_binding.rb +++ b/lib/cloud_events/kafka_binding.rb @@ -209,7 +209,10 @@ def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format if event.is_a?(Event::Opaque) return encode_opaque_event(event) end - unless event.respond_to?(:spec_version) && event.spec_version.start_with?("1.") + unless event.respond_to?(:spec_version) + raise(SpecVersionError, "Unable to determine specversion") + end + unless event.spec_version.start_with?("1.") raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") end if structured_format @@ -286,7 +289,8 @@ def decode_structured_content(message, content_type, allow_opaque, **format_args **format_args) if result event = result[:event] - if event && !(event.respond_to?(:spec_version) && event.spec_version.start_with?("1.")) + raise(SpecVersionError, "Unable to determine specversion") unless event.respond_to?(:spec_version) + unless event.spec_version.start_with?("1.") raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") end return event