From 7d7d9a52f7e7c371b0e86f811f00d2cd4eb1b5cc Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Fri, 14 Nov 2025 18:56:07 +0530 Subject: [PATCH 1/5] update kafka annotations --- .../functions/annotation/KafkaOutput.java | 118 ++++++++++++++++ .../functions/annotation/KafkaTrigger.java | 126 +++++++++++++++++- 2 files changed, 241 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 829b097..3366a0b 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -208,6 +208,62 @@ */ String avroSchema() default ""; + /** + * Gets or sets the Avro schema of message key. + * Should be used only if a generic record should be generated. + * default "" + * + * @return the avro schema for message key + */ + String keyAvroSchema() default ""; + + /** + * Specifies the data type of the message key. + * This data type will be used to serialize the key before sending it to the Kafka topic. + * If KeyAvroSchema is set, this value is ignored and the key will be serialized using Avro. + * The default type is System.String. + * default "" + * + * @return the data type of the message key + */ + String keyDataType() default ""; + + /** + * Client certificate in PEM format. + * ssl.certificate.pem in librdkafka + * default "" + * + * @return the ssl certificate PEM + */ + String sslCertificatePEM() default ""; + + /** + * Client Private Key in PEM format. + * ssl.key.pem in librdkafka + * default "" + * + * @return the ssl key PEM + */ + String sslKeyPEM() default ""; + + /** + * CA certificate for verifying the broker's certificate in PEM format + * ssl.ca.pem in librdkafka + * default "" + * + * @return the ssl CA PEM + */ + String sslCaPEM() default ""; + + /** + * Client certificate and key in PEM format. + * Additional Configuration for extension as KeyVault supports uploading certificate only with private key. + * default "" + * + * @return the ssl certificate and key PEM + */ + String sslCertificateandKeyPEM() default ""; + /** * URL for the Avro Schema Registry * default "" @@ -232,4 +288,66 @@ */ String schemaRegistryPassword() default ""; + /** + * OAuth Bearer method. + * Either 'default' or 'oidc' + * sasl.oauthbearer in librdkafka + * default "" + * + * @return the OAuth Bearer method + */ + String oAuthBearerMethod() default ""; + + /** + * OAuth Bearer Client Id + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.client.id in librdkafka + * default "" + * + * @return the OAuth Bearer client id + */ + String oAuthBearerClientId() default ""; + + /** + * OAuth Bearer Client Secret + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.client.secret in librdkafka + * default "" + * + * @return the OAuth Bearer client secret + */ + String oAuthBearerClientSecret() default ""; + + /** + * OAuth Bearer scope. + * Client use this to specify the scope of the access request to the broker. + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.extensions in librdkafka + * default "" + * + * @return the OAuth Bearer scope + */ + String oAuthBearerScope() default ""; + + /** + * OAuth Bearer token endpoint url. + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.token.endpoint.url in librdkafka + * default "" + * + * @return the OAuth Bearer token endpoint url + */ + String oAuthBearerTokenEndpointUrl() default ""; + + /** + * OAuth Bearer extensions. + * Allow additional information to be provided to the broker. + * Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea" + * sasl.oauthbearer.extensions in librdkafka + * default "" + * + * @return the OAuth Bearer extensions + */ + String oAuthBearerExtensions() default ""; + } diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index 5656d3c..a41db12 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -182,10 +182,130 @@ */ String avroSchema() default ""; - /*** + /** + * Gets or sets the Avro schema of message key. + * Should be used only if a generic record should be generated. + * default "" + * + * @return the avro schema for message key + */ + String keyAvroSchema() default ""; + + /** + * Specifies the data type of the message key that will be deserialized from the Kafka topic. + * If KeyAvroSchema is set, this value is ignored and the key will be generated as a generic record. + * The default type is System.String. + * default "" + * + * @return the data type of the message key + */ + String keyDataType() default ""; + + /** + * Client certificate in PEM format. + * ssl.certificate.pem in librdkafka + * default "" + * + * @return the ssl certificate PEM + */ + String sslCertificatePEM() default ""; + + /** + * Client Private Key in PEM format. + * ssl.key.pem in librdkafka + * default "" + * + * @return the ssl key PEM + */ + String sslKeyPEM() default ""; + + /** + * CA certificate for verifying the broker's certificate in PEM format + * ssl.ca.pem in librdkafka + * default "" + * + * @return the ssl CA PEM + */ + String sslCaPEM() default ""; + + /** + * Client certificate and key in PEM format. + * Additional Configuration for extension as KeyVault supports uploading certificate only with private key. + * default "" + * + * @return the ssl certificate and key PEM + */ + String sslCertificateandKeyPEM() default ""; + + /** + * OAuth Bearer method. + * Either 'default' or 'oidc' + * sasl.oauthbearer in librdkafka + * default "" + * + * @return the OAuth Bearer method + */ + String oAuthBearerMethod() default ""; + + /** + * OAuth Bearer Client Id + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.client.id in librdkafka + * default "" + * + * @return the OAuth Bearer client id + */ + String oAuthBearerClientId() default ""; + + /** + * OAuth Bearer Client Secret + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.client.secret in librdkafka + * default "" + * + * @return the OAuth Bearer client secret + */ + String oAuthBearerClientSecret() default ""; + + /** + * OAuth Bearer scope. + * Client use this to specify the scope of the access request to the broker. + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.extensions in librdkafka + * default "" + * + * @return the OAuth Bearer scope + */ + String oAuthBearerScope() default ""; + + /** + * OAuth Bearer token endpoint url. + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.token.endpoint.url in librdkafka + * default "" + * + * @return the OAuth Bearer token endpoint url + */ + String oAuthBearerTokenEndpointUrl() default ""; + + /** + * OAuth Bearer extensions. + * Allow additional information to be provided to the broker. + * Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea" + * sasl.oauthbearer.extensions in librdkafka + * default "" + * + * @return the OAuth Bearer extensions + */ + String oAuthBearerExtensions() default ""; + + /** + * Maximum number of unprocessed messages a worker is expected to have at an instance. + * When target-based scaling is not disabled, this is used to divide total unprocessed event count to determine the number of worker instances, which will then be rounded up to a worker instance count that creates a balanced partition distribution. + * Default: 1000 * - * @return - */ + * @return the lag threshold + */ int lagThreshold() default 1000; /** From 7dfd08bd66bb403db0e09f3560d59fb7584a3fc6 Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Fri, 14 Nov 2025 19:25:18 +0530 Subject: [PATCH 2/5] add avro --- .../annotation/KafkaMessageKeyType.java | 37 +++++++++++++++++++ .../functions/annotation/KafkaOutput.java | 6 +-- .../functions/annotation/KafkaTrigger.java | 6 +-- 3 files changed, 43 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/microsoft/azure/functions/annotation/KafkaMessageKeyType.java diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaMessageKeyType.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaMessageKeyType.java new file mode 100644 index 0000000..7ccb6fc --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaMessageKeyType.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ + +package com.microsoft.azure.functions.annotation; + +/** + *

+ * Specifies the data type of the message key that will be serialized/deserialized for Kafka topics. + * This defines the supported key types for Kafka messages. + *

+ * + * @since 1.4.0 + */ +public enum KafkaMessageKeyType { + /** + * Integer key type (32-bit signed integer) + */ + INT, + + /** + * Long key type (64-bit signed integer) + */ + LONG, + + /** + * String key type (default) + */ + STRING, + + /** + * Binary key type (byte array) + */ + BINARY +} diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 3366a0b..1127604 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -221,12 +221,12 @@ * Specifies the data type of the message key. * This data type will be used to serialize the key before sending it to the Kafka topic. * If KeyAvroSchema is set, this value is ignored and the key will be serialized using Avro. - * The default type is System.String. - * default "" + * The default type is STRING. + * Default: STRING * * @return the data type of the message key */ - String keyDataType() default ""; + KafkaMessageKeyType keyDataType() default KafkaMessageKeyType.STRING; /** * Client certificate in PEM format. diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index a41db12..329ac60 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -194,12 +194,12 @@ /** * Specifies the data type of the message key that will be deserialized from the Kafka topic. * If KeyAvroSchema is set, this value is ignored and the key will be generated as a generic record. - * The default type is System.String. - * default "" + * The default type is STRING. + * Default: STRING * * @return the data type of the message key */ - String keyDataType() default ""; + KafkaMessageKeyType keyDataType() default KafkaMessageKeyType.STRING; /** * Client certificate in PEM format. From 48f6aab6d37a646990251920f9e7a0799c700194 Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Mon, 17 Nov 2025 09:14:26 +0530 Subject: [PATCH 3/5] use java additions --- .../annotation/KafkaMessageKeyType.java | 37 ------------------- .../functions/annotation/KafkaOutput.java | 4 +- .../functions/annotation/KafkaTrigger.java | 4 +- 3 files changed, 6 insertions(+), 39 deletions(-) delete mode 100644 src/main/java/com/microsoft/azure/functions/annotation/KafkaMessageKeyType.java diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaMessageKeyType.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaMessageKeyType.java deleted file mode 100644 index 7ccb6fc..0000000 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaMessageKeyType.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for - * license information. - */ - -package com.microsoft.azure.functions.annotation; - -/** - *

- * Specifies the data type of the message key that will be serialized/deserialized for Kafka topics. - * This defines the supported key types for Kafka messages. - *

- * - * @since 1.4.0 - */ -public enum KafkaMessageKeyType { - /** - * Integer key type (32-bit signed integer) - */ - INT, - - /** - * Long key type (64-bit signed integer) - */ - LONG, - - /** - * String key type (default) - */ - STRING, - - /** - * Binary key type (byte array) - */ - BINARY -} diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 1127604..fde2bc9 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -7,6 +7,8 @@ import com.microsoft.azure.functions.BrokerAuthenticationMode; import com.microsoft.azure.functions.BrokerProtocol; +import com.microsoft.azure.functions.OAuthBearerMethod; +import com.microsoft.azure.functions.KafkaMessageKeyType; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -296,7 +298,7 @@ * * @return the OAuth Bearer method */ - String oAuthBearerMethod() default ""; + OAuthBearerMethod oAuthBearerMethod() default OAuthBearerMethod.Default; /** * OAuth Bearer Client Id diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index 329ac60..8b332b2 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -7,6 +7,8 @@ import com.microsoft.azure.functions.BrokerAuthenticationMode; import com.microsoft.azure.functions.BrokerProtocol; +import com.microsoft.azure.functions.OAuthBearerMethod; +import com.microsoft.azure.functions.KafkaMessageKeyType; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -245,7 +247,7 @@ * * @return the OAuth Bearer method */ - String oAuthBearerMethod() default ""; + OAuthBearerMethod oAuthBearerMethod() default OAuthBearerMethod.Default; /** * OAuth Bearer Client Id From b389ec9d0bc2275e2bed3349dfd924ea3ad32b08 Mon Sep 17 00:00:00 2001 From: Pranava <68387945+aloiva@users.noreply.github.com> Date: Tue, 6 Jan 2026 12:04:40 +0530 Subject: [PATCH 4/5] Update version from 3.2.2 to 3.2.3 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e6cf749..e879bf5 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.microsoft.azure.functions azure-functions-java-library - 3.2.2 + 3.2.3 jar com.microsoft.maven From 63983a088c4f4639ac40cdc5706defc38c872a1e Mon Sep 17 00:00:00 2001 From: Pranava <68387945+aloiva@users.noreply.github.com> Date: Sun, 18 Jan 2026 19:10:01 +0530 Subject: [PATCH 5/5] update azure-functions-java-core-library version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e879bf5..d636043 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ com.microsoft.azure.functions azure-functions-java-core-library - 1.2.0 + 1.3.0 compile