diff --git a/CLAUDE.md b/CLAUDE.md index 89c7013..67bf258 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -59,6 +59,19 @@ Handles encoding/decoding CloudEvents to/from HTTP (Rack env hashes). Supports: `HttpBinding.default` returns a singleton with `JsonFormat` and `TextFormat` pre-registered. Custom formatters are registered via `register_formatter`. +### Kafka Binding (`KafkaBinding`) + +Handles encoding/decoding CloudEvents to/from Kafka messages. CloudEvents 1.0 only (no V0.3). Supports: +- **Binary content mode** — event attributes as `ce_*` headers (plain UTF-8, no percent-encoding), data in value +- **Structured content mode** — entire event serialized in value (e.g., JSON) +- **No batch mode** (per the Kafka spec) +- **Tombstone support** — `nil` value represents an event with no data +- **Key mapping** — configurable callables to map between Kafka record keys and event attributes (default: `partitionkey` extension) + +Kafka messages are represented as plain `{ key:, value:, headers: }` Hashes, decoupled from any specific Kafka client library. + +`KafkaBinding.default` returns a singleton with `JsonFormat` and `TextFormat` pre-registered. Key mappers are configurable at construction and overridable per-call via `key_mapper:` / `reverse_key_mapper:` keyword arguments. + ### Format Layer - **`JsonFormat`** — Encodes/decodes `application/cloudevents+json` and batch format. Also handles JSON data encoding/decoding for binary mode. @@ -77,6 +90,7 @@ All errors inherit from `CloudEventsError`: `NotCloudEventError`, `UnsupportedFo ## Contributing +- Use red-green test-driven development when making changes, unless instructed otherwise. - Conventional Commits format required (`fix:`, `feat:`, `docs:`, etc.) - Commits must be signed off (`git commit --signoff`) - Run `toys ci` before submitting PRs diff --git a/features/step_definitions/steps.rb b/features/step_definitions/steps.rb index 931b462..e635b1d 100644 --- a/features/step_definitions/steps.rb +++ b/features/step_definitions/steps.rb @@ -50,17 +50,21 @@ end Given "Kafka Protocol Binding is supported" do - pending "Kafka Protocol Binding is not yet implemented" + @kafka_binding = CloudEvents::KafkaBinding.default end -Given "a Kafka message with payload:" do |_str| - pending +Given "a Kafka message with payload:" do |str| + @kafka_value = str end -Given "Kafka headers:" do |_table| - pending +Given "Kafka headers:" do |table| + @kafka_headers = {} + table.hashes.each do |hash| + @kafka_headers[hash["key"].strip] = hash["value"] + end end When "parsed as Kafka message" do - pending + message = { key: nil, value: @kafka_value, headers: @kafka_headers } + @event = @kafka_binding.decode_event(message, reverse_key_mapper: nil) end diff --git a/lib/cloud_events.rb b/lib/cloud_events.rb index 929a43b..31b3862 100644 --- a/lib/cloud_events.rb +++ b/lib/cloud_events.rb @@ -5,6 +5,7 @@ require "cloud_events/event" require "cloud_events/format" require "cloud_events/http_binding" +require "cloud_events/kafka_binding" require "cloud_events/json_format" require "cloud_events/text_format" diff --git a/lib/cloud_events/errors.rb b/lib/cloud_events/errors.rb index c679d9b..f557ba6 100644 --- a/lib/cloud_events/errors.rb +++ b/lib/cloud_events/errors.rb @@ -51,6 +51,14 @@ class SpecVersionError < CloudEventsError class AttributeError < CloudEventsError end + ## + # An error raised when a protocol binding that does not support batch + # content mode receives a batch. For example, the Kafka protocol binding + # does not support batches per the CloudEvents spec. + # + class BatchNotSupportedError < CloudEventsError + end + ## # Alias of UnsupportedFormatError, for backward compatibility. # diff --git a/lib/cloud_events/kafka_binding.rb b/lib/cloud_events/kafka_binding.rb new file mode 100644 index 0000000..3974a2a --- /dev/null +++ b/lib/cloud_events/kafka_binding.rb @@ -0,0 +1,364 @@ +# frozen_string_literal: true + +module CloudEvents + ## + # Kafka protocol binding for CloudEvents. + # + # This class implements the Kafka protocol binding, including decoding of + # events from Kafka message hashes, and encoding of events to Kafka message + # hashes. It supports binary (header-based) and structured (body-based) + # content modes that can delegate to formatters such as JSON. + # + # Supports CloudEvents 1.0 only. + # See https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/kafka-protocol-binding.md. + # + # Kafka messages are represented as plain Ruby Hashes with the keys: + # - `key:` [String, nil] — the Kafka record key + # - `value:` [String, nil] — the Kafka record value (body) + # - `headers:` [Hash] — String => String header pairs + # + class KafkaBinding + ## + # The name of the JSON decoder/encoder. + # @return [String] + # + JSON_FORMAT = "json" + + ## + # The default key mapper for encoding. + # Returns the `partitionkey` extension attribute from the event. + # @return [Proc] + # + DEFAULT_KEY_MAPPER = ->(event) { event["partitionkey"] } + + ## + # The default reverse key mapper for decoding. + # Sets the `partitionkey` extension attribute from the Kafka record key. + # Returns an empty hash if the key is nil. + # @return [Proc] + # + DEFAULT_REVERSE_KEY_MAPPER = ->(key) { key.nil? ? {} : { "partitionkey" => key } } + + ## + # Returns a default Kafka binding, including support for JSON format. + # + # @return [KafkaBinding] + # + def self.default + @default ||= begin + kafka_binding = new + kafka_binding.register_formatter(JsonFormat.new, encoder_name: JSON_FORMAT) + kafka_binding.default_encoder_name = JSON_FORMAT + kafka_binding + end + end + + ## + # Create an empty Kafka binding. + # + # @param key_mapper [Proc,nil] A callable `(event) -> String|nil` used + # to derive the Kafka record key when encoding. Defaults to + # {DEFAULT_KEY_MAPPER}. Set to `nil` to always produce a `nil` key. + # @param reverse_key_mapper [Proc,nil] A callable `(key) -> Hash` used + # to derive attributes to merge into the event when decoding. + # Defaults to {DEFAULT_REVERSE_KEY_MAPPER}. Set to `nil` to skip + # key-to-attribute mapping. + # + def initialize(key_mapper: DEFAULT_KEY_MAPPER, reverse_key_mapper: DEFAULT_REVERSE_KEY_MAPPER) + @key_mapper = key_mapper + @reverse_key_mapper = reverse_key_mapper + @event_decoders = Format::Multi.new do |result| + result&.key?(:event) ? result : nil + end + @event_encoders = {} + @data_decoders = Format::Multi.new do |result| + result&.key?(:data) && result.key?(:content_type) ? result : nil + end + @data_encoders = Format::Multi.new do |result| + result&.key?(:content) && result.key?(:content_type) ? result : nil + end + text_format = TextFormat.new + @data_decoders.formats.replace([text_format, HttpBinding::DefaultDataFormat]) + @data_encoders.formats.replace([text_format, HttpBinding::DefaultDataFormat]) + @default_encoder_name = nil + end + + ## + # Register a formatter for all operations it supports, based on which + # methods are implemented by the formatter object. See + # {CloudEvents::Format} for a list of possible methods. + # + # @param formatter [Object] The formatter. + # @param encoder_name [String] The encoder name under which this + # formatter will register its encode operations. Optional. If not + # specified, any event encoder will _not_ be registered. + # @return [self] + # + def register_formatter(formatter, encoder_name: nil) + encoder_name = encoder_name.to_s.strip.downcase if encoder_name + decode_event = formatter.respond_to?(:decode_event) + encode_event = encoder_name if formatter.respond_to?(:encode_event) + decode_data = formatter.respond_to?(:decode_data) + encode_data = formatter.respond_to?(:encode_data) + register_formatter_methods(formatter, + decode_event: decode_event, + encode_event: encode_event, + decode_data: decode_data, + encode_data: encode_data) + self + end + + ## + # Registers the given formatter for the given operations. Some arguments + # are activated by passing `true`, whereas those that rely on a format + # name are activated by passing in a name string. + # + # @param formatter [Object] The formatter. + # @param decode_event [boolean] If true, register the formatter's + # {CloudEvents::Format#decode_event} method. + # @param encode_event [String] If set to a string, use the formatter's + # {CloudEvents::Format#encode_event} method when that name is + # requested. + # @param decode_data [boolean] If true, register the formatter's + # {CloudEvents::Format#decode_data} method. + # @param encode_data [boolean] If true, register the formatter's + # {CloudEvents::Format#encode_data} method. + # @return [self] + # + def register_formatter_methods(formatter, + decode_event: false, + encode_event: nil, + decode_data: false, + encode_data: false) + @event_decoders.formats.unshift(formatter) if decode_event + if encode_event + encoders = @event_encoders[encode_event] ||= Format::Multi.new do |result| + result&.key?(:content) && result.key?(:content_type) ? result : nil + end + encoders.formats.unshift(formatter) + end + @data_decoders.formats.unshift(formatter) if decode_data + @data_encoders.formats.unshift(formatter) if encode_data + self + end + + ## + # The name of the encoder to use if none is specified. + # + # @return [String,nil] + # + attr_accessor :default_encoder_name + + ## + # Determine whether a Kafka message is likely a CloudEvent, by + # inspecting headers only (does not parse the value). + # + # @param message [Hash] The Kafka message hash. + # @return [boolean] + # + def probable_event?(message) + headers = message[:headers] || {} + return true if headers.key?("ce_specversion") + content_type_string = headers["content-type"] + return false unless content_type_string + content_type = ContentType.new(content_type_string) + content_type.media_type == "application" && content_type.subtype_base == "cloudevents" + end + + ## + # Decode an event from a Kafka message hash. + # + # @param message [Hash] A hash with `:key`, `:value`, and `:headers` keys. + # @param allow_opaque [boolean] If true, returns {Event::Opaque} for + # unrecognized structured formats. Default is false. + # @param reverse_key_mapper [Proc,nil,:NOT_SET] A callable + # `(key) -> Hash`, or `nil` to skip key mapping. Defaults to the + # instance's reverse_key_mapper. + # @param format_args [keywords] Extra args forwarded to formatters. + # @return [CloudEvents::Event] The decoded event. + # @raise [NotCloudEventError] if the message is not a CloudEvent. + # @raise [SpecVersionError] if the specversion is not supported. + # @raise [UnsupportedFormatError] if a structured format is not recognized. + # @raise [FormatSyntaxError] if the structured content is malformed. + # + def decode_event(message, allow_opaque: false, reverse_key_mapper: :NOT_SET, **format_args) + reverse_key_mapper = @reverse_key_mapper if reverse_key_mapper == :NOT_SET + headers = message[:headers] || {} + content_type_string = headers["content-type"] + content_type = ContentType.new(content_type_string) if content_type_string + + event = decode_content(message, headers, content_type, content_type_string, allow_opaque, **format_args) + apply_reverse_key_mapper(event, message[:key], reverse_key_mapper) + end + + ## + # Encode an event into a Kafka message hash. + # + # @param event [CloudEvents::Event,CloudEvents::Event::Opaque] The event. + # @param structured_format [boolean,String] If false (default), encodes + # in binary content mode. If true or a format name string, encodes + # in structured content mode. + # @param key_mapper [Proc,nil,:NOT_SET] A callable + # `(event) -> String|nil`, or `nil` to always produce a `nil` key. + # Defaults to the instance's key_mapper. + # @param format_args [keywords] Extra args forwarded to formatters. + # @return [Hash] A hash with `:key`, `:value`, and `:headers` keys. + # + def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format_args) + key_mapper = @key_mapper if key_mapper == :NOT_SET + if event.is_a?(Event::Opaque) + return encode_opaque_event(event) + end + unless event.respond_to?(:spec_version) + raise(SpecVersionError, "Unable to determine specversion") + end + unless event.spec_version.start_with?("1.") + raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") + end + if structured_format + encode_structured_event(event, structured_format, key_mapper, **format_args) + else + encode_binary_event(event, key_mapper, **format_args) + end + end + + # @private + OMIT_ATTR_NAMES = ["specversion", "spec_version", "data", "datacontenttype", "data_content_type"].freeze + + private + + # Detect content mode, reject batches, and dispatch to the appropriate + # binary or structured decoder. Raises if the message is not a CloudEvent. + def decode_content(message, headers, content_type, content_type_string, allow_opaque, **format_args) + if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents-batch" + raise(BatchNotSupportedError, "Kafka protocol binding does not support batch content mode") + end + if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents" + return decode_structured_content(message, content_type, allow_opaque, **format_args) + end + if headers.key?("ce_specversion") + return decode_binary_content(message, content_type, **format_args) + end + ct_desc = content_type_string ? content_type_string.inspect : "not present" + raise(NotCloudEventError, "content-type is #{ct_desc}, and ce_specversion header is not present") + end + + # Decode a single event from binary content mode. Reads ce_* headers as + # attributes and the message value as event data. + def decode_binary_content(message, content_type, **format_args) + headers = message[:headers] || {} + spec_version = headers["ce_specversion"] + raise(SpecVersionError, "Unrecognized specversion: #{spec_version}") unless spec_version =~ /^1(\.|$)/ + attributes = { "spec_version" => spec_version } + headers.each do |key, value| + next unless key.start_with?("ce_") + attr_name = key[3..].downcase + attributes[attr_name] = value unless OMIT_ATTR_NAMES.include?(attr_name) + end + value = message[:value] + unless value.nil? + content_type = populate_data_attributes(attributes, value, content_type, spec_version, format_args) + end + attributes["data_content_type"] = content_type if content_type + Event.create(spec_version: spec_version, set_attributes: attributes) + end + + # Populate data-related attributes (data_encoded, data, data_content_type) + # by running the value through registered data decoders. Returns the + # (possibly updated) content_type. + def populate_data_attributes(attributes, value, content_type, spec_version, format_args) + attributes["data_encoded"] = value + result = @data_decoders.decode_data(spec_version: spec_version, + content: value, + content_type: content_type, + **format_args) + if result + attributes["data"] = result[:data] + content_type = result[:content_type] + end + content_type + end + + # Decode a single event from structured content mode. Delegates to + # registered event decoders, falling back to Event::Opaque if allowed. + def decode_structured_content(message, content_type, allow_opaque, **format_args) + content = message[:value].to_s + result = @event_decoders.decode_event(content: content, + content_type: content_type, + data_decoder: @data_decoders, + **format_args) + if result + event = result[:event] + raise(SpecVersionError, "Unable to determine specversion") unless event.respond_to?(:spec_version) + unless event.spec_version.start_with?("1.") + raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") + end + return event + end + return Event::Opaque.new(content, content_type) if allow_opaque + raise(UnsupportedFormatError, "Unknown cloudevents content type: #{content_type}") + end + + # Apply the reverse_key_mapper to merge Kafka record key attributes into + # the decoded event. Returns the event unchanged if the mapper is nil or + # returns an empty hash. + def apply_reverse_key_mapper(event, key, reverse_key_mapper) + return event unless reverse_key_mapper + mapped_attrs = reverse_key_mapper.call(key) + return event if mapped_attrs.nil? || mapped_attrs.empty? + event.with(**mapped_attrs.transform_keys(&:to_sym)) + end + + # Encode an event in binary content mode. Writes attributes as ce_* + # headers and event data as the message value. + def encode_binary_event(event, key_mapper, **format_args) + key = key_mapper&.call(event) + headers = {} + event.to_h.each do |attr_key, value| + next if ["data", "data_encoded", "datacontenttype"].include?(attr_key) + headers["ce_#{attr_key}"] = value.to_s + end + body, content_type = extract_event_data(event, format_args) + headers["content-type"] = content_type.to_s if content_type + { key: key, value: body, headers: headers } + end + + # Encode an event in structured content mode using a named format encoder. + # The entire event is serialized into the message value. + def encode_structured_event(event, structured_format, key_mapper, **format_args) + key = key_mapper&.call(event) + structured_format = default_encoder_name if structured_format == true + raise(::ArgumentError, "Format name not specified, and no default is set") unless structured_format + result = @event_encoders[structured_format]&.encode_event(event: event, + data_encoder: @data_encoders, + **format_args) + raise(::ArgumentError, "Unknown format name: #{structured_format.inspect}") unless result + { key: key, value: result[:content], headers: { "content-type" => result[:content_type].to_s } } + end + + # Encode an opaque event by passing through its content and content_type + # directly, with a nil key. + def encode_opaque_event(event) + { key: nil, value: event.content, headers: { "content-type" => event.content_type.to_s } } + end + + # Extract the event data and content type for binary mode encoding. + # Uses data_encoded if present, otherwise delegates to data encoders. + # Returns [body, content_type], where body is nil for tombstones. + def extract_event_data(event, format_args) + body = event.data_encoded + if body + [body, event.data_content_type] + elsif event.data? + result = @data_encoders.encode_data(spec_version: event.spec_version, + data: event.data, + content_type: event.data_content_type, + **format_args) + raise(UnsupportedFormatError, "Could not encode data content-type") unless result + [result[:content], result[:content_type]] + else + [nil, nil] + end + end + end +end diff --git a/test/test_kafka_binding.rb b/test/test_kafka_binding.rb new file mode 100644 index 0000000..e55048b --- /dev/null +++ b/test/test_kafka_binding.rb @@ -0,0 +1,589 @@ +# frozen_string_literal: true + +require_relative "helper" + +require "date" +require "json" +require "uri" + +describe CloudEvents::KafkaBinding do + let(:kafka_binding) { CloudEvents::KafkaBinding.default } + let(:minimal_kafka_binding) { CloudEvents::KafkaBinding.new } + let(:my_id) { "my_id" } + let(:my_source_string) { "/my_source" } + let(:my_source) { URI.parse(my_source_string) } + let(:my_type) { "my_type" } + let(:spec_version) { "1.0" } + let(:my_simple_data) { "12345" } + let(:my_json_object) { { "a" => "ä", "b" => "😀" } } + let(:my_json_escaped_data) { '{"a":"ä","b":"😀"}' } + let(:my_content_type_string) { "text/plain; charset=us-ascii" } + let(:my_content_type) { CloudEvents::ContentType.new(my_content_type_string) } + let(:my_json_content_type_string) { "application/json" } + let(:my_json_content_type) { CloudEvents::ContentType.new(my_json_content_type_string) } + let(:my_schema_string) { "/my_schema" } + let(:my_schema) { URI.parse(my_schema_string) } + let(:my_subject) { "my_subject" } + let(:my_time_string) { "2020-01-12T20:52:05-08:00" } + let(:my_time) { DateTime.rfc3339(my_time_string) } + let(:my_trace_context) { "1234567890;9876543210" } + + describe "constructor" do + it "creates a new instance" do + binding_obj = CloudEvents::KafkaBinding.new + assert_instance_of CloudEvents::KafkaBinding, binding_obj + end + + it "returns a default instance with JSON format registered" do + assert_instance_of CloudEvents::KafkaBinding, kafka_binding + end + + it "returns the same default singleton" do + assert_same CloudEvents::KafkaBinding.default, CloudEvents::KafkaBinding.default + end + + it "defines DEFAULT_KEY_MAPPER constant" do + assert_respond_to CloudEvents::KafkaBinding::DEFAULT_KEY_MAPPER, :call + end + + it "defines DEFAULT_REVERSE_KEY_MAPPER constant" do + assert_respond_to CloudEvents::KafkaBinding::DEFAULT_REVERSE_KEY_MAPPER, :call + end + end + + describe "probable_event?" do + it "detects a probable binary event" do + message = { + key: nil, + value: "hello", + headers: { "ce_specversion" => "1.0" }, + } + assert kafka_binding.probable_event?(message) + end + + it "detects a probable structured event" do + message = { + key: nil, + value: "{}", + headers: { "content-type" => "application/cloudevents+json" }, + } + assert kafka_binding.probable_event?(message) + end + + it "returns false for a non-CE message" do + message = { + key: nil, + value: "hello", + headers: { "content-type" => "application/json" }, + } + refute kafka_binding.probable_event?(message) + end + + it "detects a probable structured event with mixed-case content type" do + message = { + key: nil, + value: "{}", + headers: { "content-type" => "Application/CloudEvents+JSON" }, + } + assert kafka_binding.probable_event?(message) + end + + it "returns false for a message with no relevant headers" do + message = { + key: nil, + value: "hello", + headers: {}, + } + refute kafka_binding.probable_event?(message) + end + end + + describe "decode_event binary mode" do + it "decodes a binary message with text content type" do + message = { + key: nil, + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + "ce_dataschema" => my_schema_string, + "ce_subject" => my_subject, + "ce_time" => my_time_string, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_id, event.id + assert_equal my_source, event.source + assert_equal my_type, event.type + assert_equal spec_version, event.spec_version + assert_equal my_simple_data, event.data + assert_equal my_content_type_string, event.data_content_type.to_s + assert_equal my_schema, event.data_schema + assert_equal my_subject, event.subject + assert_equal my_time, event.time + end + + it "decodes a binary message with JSON content type" do + message = { + key: nil, + value: my_json_escaped_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_json_content_type_string, + "ce_dataschema" => my_schema_string, + "ce_subject" => my_subject, + "ce_time" => my_time_string, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_json_object, event.data + assert_equal my_json_escaped_data, event.data_encoded + end + + it "decodes a minimal binary message" do + message = { + key: nil, + value: nil, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_id, event.id + assert_equal my_source, event.source + assert_equal my_type, event.type + assert_nil event.data + assert_nil event.data_content_type + end + + it "decodes a binary message with extension attributes" do + message = { + key: nil, + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + "ce_tracecontext" => my_trace_context, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_trace_context, event["tracecontext"] + end + + it "decodes a tombstone message (nil value)" do + message = { + key: nil, + value: nil, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + refute event.data? + assert_nil event.data + end + + it "maps key to partitionkey with default reverse_key_mapper" do + message = { + key: "my-partition-key", + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + }, + } + event = kafka_binding.decode_event(message) + assert_equal "my-partition-key", event["partitionkey"] + end + + it "uses custom reverse_key_mapper per-call" do + message = { + key: "custom-key", + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + }, + } + mapper = ->(key) { key.nil? ? {} : { "mykey" => key } } + event = kafka_binding.decode_event(message, reverse_key_mapper: mapper) + assert_equal "custom-key", event["mykey"] + assert_nil event["partitionkey"] + end + + it "skips key mapping when reverse_key_mapper is nil" do + message = { + key: "my-partition-key", + value: my_simple_data, + headers: { + "ce_specversion" => spec_version, + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + "content-type" => my_content_type_string, + }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_nil event["partitionkey"] + end + + it "raises SpecVersionError for bad specversion" do + message = { + key: nil, + value: "hello", + headers: { + "ce_specversion" => "0.1", + "ce_id" => my_id, + "ce_source" => my_source_string, + "ce_type" => my_type, + }, + } + assert_raises CloudEvents::SpecVersionError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + + it "raises NotCloudEventError for non-CE message" do + message = { + key: nil, + value: "hello", + headers: { "content-type" => "application/json" }, + } + assert_raises CloudEvents::NotCloudEventError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + + it "raises BatchNotSupportedError for batch content type" do + message = { + key: nil, + value: "[{}]", + headers: { "content-type" => "application/cloudevents-batch+json" }, + } + assert_raises CloudEvents::BatchNotSupportedError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + end + + describe "decode_event structured mode" do + let(:my_json_struct) do + { + "data" => my_simple_data, + "datacontenttype" => my_content_type_string, + "dataschema" => my_schema_string, + "id" => my_id, + "source" => my_source_string, + "specversion" => spec_version, + "subject" => my_subject, + "time" => my_time_string, + "type" => my_type, + } + end + let(:my_json_struct_encoded) { JSON.dump(my_json_struct) } + + it "decodes a JSON-structured message" do + message = { + key: nil, + value: my_json_struct_encoded, + headers: { "content-type" => "application/cloudevents+json" }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_id, event.id + assert_equal my_source, event.source + assert_equal my_type, event.type + assert_equal spec_version, event.spec_version + assert_equal my_simple_data, event.data + end + + it "decodes a JSON-structured message with charset" do + message = { + key: nil, + value: my_json_struct_encoded, + headers: { "content-type" => "application/cloudevents+json; charset=utf-8" }, + } + event = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_id, event.id + assert_equal my_type, event.type + end + + it "returns opaque for unrecognized structured format when allow_opaque is true" do + message = { + key: nil, + value: "some content", + headers: { "content-type" => "application/cloudevents+foo" }, + } + event = minimal_kafka_binding.decode_event(message, allow_opaque: true, reverse_key_mapper: nil) + assert_kind_of CloudEvents::Event::Opaque, event + assert_equal "some content", event.content + end + + it "raises UnsupportedFormatError for unknown structured format" do + message = { + key: nil, + value: "some content", + headers: { "content-type" => "application/cloudevents+foo" }, + } + assert_raises CloudEvents::UnsupportedFormatError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + + it "raises SpecVersionError for structured event with specversion 0.3" do + v03_hash = { + "data" => "hello", + "id" => my_id, + "source" => my_source_string, + "specversion" => "0.3", + "type" => my_type, + } + v03_struct = JSON.dump(v03_hash) + message = { + key: nil, + value: v03_struct, + headers: { "content-type" => "application/cloudevents+json" }, + } + assert_raises CloudEvents::SpecVersionError do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + end + + it "raises FormatSyntaxError for malformed JSON" do + message = { + key: nil, + value: "!!!", + headers: { "content-type" => "application/cloudevents+json" }, + } + error = assert_raises(CloudEvents::FormatSyntaxError) do + kafka_binding.decode_event(message, reverse_key_mapper: nil) + end + assert_kind_of JSON::ParserError, error.cause + end + + it "applies reverse_key_mapper to structured decoded events" do + message = { + key: "my-partition-key", + value: my_json_struct_encoded, + headers: { "content-type" => "application/cloudevents+json" }, + } + event = kafka_binding.decode_event(message) + assert_equal "my-partition-key", event["partitionkey"] + end + end + + describe "encode_event binary mode" do + let(:my_simple_event) do + CloudEvents::Event::V1.new(data_encoded: my_simple_data, + data: my_simple_data, + datacontenttype: my_content_type_string, + dataschema: my_schema_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + subject: my_subject, + time: my_time_string, + type: my_type) + end + let(:my_json_event) do + CloudEvents::Event::V1.new(data_encoded: my_json_escaped_data, + data: my_json_object, + datacontenttype: my_json_content_type_string, + dataschema: my_schema_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + subject: my_subject, + time: my_time_string, + type: my_type) + end + let(:my_minimal_event) do + CloudEvents::Event::V1.new(id: my_id, + source: my_source_string, + specversion: spec_version, + type: my_type) + end + let(:my_extensions_event) do + CloudEvents::Event::V1.new(data_encoded: my_simple_data, + data: my_simple_data, + datacontenttype: my_content_type_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + type: my_type, + tracecontext: my_trace_context) + end + + it "encodes an event with text content type to binary mode" do + message = kafka_binding.encode_event(my_simple_event, key_mapper: nil) + assert_equal my_simple_data, message[:value] + assert_equal my_content_type_string, message[:headers]["content-type"] + assert_equal spec_version, message[:headers]["ce_specversion"] + assert_equal my_id, message[:headers]["ce_id"] + assert_equal my_source_string, message[:headers]["ce_source"] + assert_equal my_type, message[:headers]["ce_type"] + assert_equal my_schema_string, message[:headers]["ce_dataschema"] + assert_equal my_subject, message[:headers]["ce_subject"] + assert_equal my_time_string, message[:headers]["ce_time"] + assert_nil message[:key] + end + + it "encodes an event with JSON content type to binary mode" do + message = kafka_binding.encode_event(my_json_event, key_mapper: nil) + assert_equal my_json_escaped_data, message[:value] + assert_equal my_json_content_type_string, message[:headers]["content-type"] + end + + it "encodes a minimal event" do + message = kafka_binding.encode_event(my_minimal_event, key_mapper: nil) + assert_nil message[:value] + assert_nil message[:headers]["content-type"] + assert_equal spec_version, message[:headers]["ce_specversion"] + assert_equal my_id, message[:headers]["ce_id"] + end + + it "encodes an event with extension attributes" do + message = kafka_binding.encode_event(my_extensions_event, key_mapper: nil) + assert_equal my_trace_context, message[:headers]["ce_tracecontext"] + end + + it "encodes an event with no data as tombstone (nil value)" do + message = kafka_binding.encode_event(my_minimal_event, key_mapper: nil) + assert_nil message[:value] + end + + it "uses default key_mapper to set key from partitionkey" do + event = my_simple_event.with(partitionkey: "my-partition-key") + message = kafka_binding.encode_event(event) + assert_equal "my-partition-key", message[:key] + end + + it "produces nil key when event has no partitionkey" do + message = kafka_binding.encode_event(my_simple_event) + assert_nil message[:key] + end + + it "uses custom key_mapper per-call" do + message = kafka_binding.encode_event(my_simple_event, key_mapper: :id.to_proc) + assert_equal my_id, message[:key] + end + + it "produces nil key when key_mapper is nil" do + event = my_simple_event.with(partitionkey: "my-partition-key") + message = kafka_binding.encode_event(event, key_mapper: nil) + assert_nil message[:key] + end + + it "raises SpecVersionError for a V0 event" do + v0_event = CloudEvents::Event::V0.new(id: my_id, + source: my_source_string, + specversion: "0.3", + type: my_type) + assert_raises CloudEvents::SpecVersionError do + kafka_binding.encode_event(v0_event, key_mapper: nil) + end + end + end + + describe "encode_event structured mode" do + let(:my_simple_event) do + CloudEvents::Event::V1.new(data_encoded: my_simple_data, + data: my_simple_data, + datacontenttype: my_content_type_string, + dataschema: my_schema_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + subject: my_subject, + time: my_time_string, + type: my_type) + end + + it "encodes an event to JSON structured format" do + message = kafka_binding.encode_event(my_simple_event, structured_format: true, + key_mapper: nil, sort: true) + assert_equal "application/cloudevents+json; charset=utf-8", message[:headers]["content-type"] + parsed = JSON.parse(message[:value]) + assert_equal my_id, parsed["id"] + assert_equal my_type, parsed["type"] + assert_equal my_source_string, parsed["source"] + assert_nil message[:key] + end + + it "encodes an opaque event" do + opaque = CloudEvents::Event::Opaque.new("some content", + CloudEvents::ContentType.new("application/cloudevents+json")) + message = kafka_binding.encode_event(opaque) + assert_equal "some content", message[:value] + assert_equal "application/cloudevents+json", message[:headers]["content-type"] + assert_nil message[:key] + end + + it "applies key_mapper in structured mode" do + event = my_simple_event.with(partitionkey: "my-partition-key") + message = kafka_binding.encode_event(event, structured_format: true) + assert_equal "my-partition-key", message[:key] + end + + it "raises ArgumentError when format name not specified and no default" do + binding_obj = CloudEvents::KafkaBinding.new + assert_raises ::ArgumentError do + binding_obj.encode_event(my_simple_event, structured_format: true, key_mapper: nil) + end + end + end + + describe "round-trip" do + let(:my_event) do + CloudEvents::Event::V1.new(data_encoded: my_json_escaped_data, + data: my_json_object, + datacontenttype: my_json_content_type_string, + dataschema: my_schema_string, + id: my_id, + source: my_source_string, + specversion: spec_version, + subject: my_subject, + time: my_time_string, + type: my_type) + end + + it "round-trips through binary mode" do + message = kafka_binding.encode_event(my_event, key_mapper: nil) + decoded = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_event, decoded + end + + it "round-trips through structured mode" do + message = kafka_binding.encode_event(my_event, structured_format: true, key_mapper: nil, sort: true) + decoded = kafka_binding.decode_event(message, reverse_key_mapper: nil) + assert_equal my_event, decoded + end + + it "round-trips with partitionkey extension" do + event = my_event.with(partitionkey: "my-partition-key") + message = kafka_binding.encode_event(event) + decoded = kafka_binding.decode_event(message) + assert_equal event, decoded + end + end +end