<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Anand Iyer</title>
    <description>The latest articles on DEV Community by Anand Iyer (@aiyer100).</description>
    <link>https://dev.to/aiyer100</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F986090%2Fa83fbb59-2f5b-4f92-b420-fc94fae0046a.png</url>
      <title>DEV Community: Anand Iyer</title>
      <link>https://dev.to/aiyer100</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/aiyer100"/>
    <language>en</language>
    <item>
      <title>Need help with invalid token error while setting up OAuth2 SASL authentication on Kafka</title>
      <dc:creator>Anand Iyer</dc:creator>
      <pubDate>Sat, 14 Dec 2024 17:09:08 +0000</pubDate>
      <link>https://dev.to/aiyer100/need-help-with-invalid-token-error-while-setting-up-oauth2-sasl-authentication-on-kafka-g14</link>
      <guid>https://dev.to/aiyer100/need-help-with-invalid-token-error-while-setting-up-oauth2-sasl-authentication-on-kafka-g14</guid>
      <description>&lt;p&gt;Hi All,&lt;/p&gt;

&lt;p&gt;I have the following setup:&lt;br&gt;
Kafka broker (3.9.0)&lt;br&gt;
Kafka producer (for now, using the producer-console in kafka itself)&lt;/p&gt;

&lt;p&gt;This setup works fine for basic TCP, TLS and even tried SASL authentication using PLAIN, SHA256.&lt;br&gt;
Now, I am trying to setup OAuth2 SASL authentication on this setup and get an &lt;strong&gt;invalid_token&lt;/strong&gt; error from kafka broker while doing SASL authentication;.&lt;/p&gt;

&lt;p&gt;This is what my configuration looks like: (included only properties relevant to SASL oauth)&lt;/p&gt;

&lt;p&gt;server.properties:&lt;/p&gt;

&lt;p&gt;sasl.enabled.mechanisms=OAUTHBEARER&lt;/p&gt;

&lt;h1&gt;
  
  
  JWKS URL from the openid-configuration URL for the oauth2 host
&lt;/h1&gt;

&lt;p&gt;sasl.oauthbearer.&lt;strong&gt;jwks&lt;/strong&gt;.endpoint.url=https://:443/admin/v1/SigningCert/jwk&lt;br&gt;
listener.name.sasl_plaintext.sasl.enabled.mechanisms=OAUTHBEARER&lt;/p&gt;

&lt;h1&gt;
  
  
  verifief that this isn sync with the values for aud and iss from the access token
&lt;/h1&gt;

&lt;p&gt;sasl.oauthbearer.expected.audience=""&lt;br&gt;
sasl.oauthbearer.expected.issuer=""&lt;br&gt;
listener.name.sasl_plaintext.oauthbearer.principal.builder.class=org.apache.kafka.common.security.authenticator.&lt;strong&gt;DefaultKafkaPrincipalBuilder&lt;/strong&gt;&lt;br&gt;
sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.&lt;strong&gt;OAuthBearerValidatorCallbackHandler&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Server side jaas config file:&lt;/p&gt;

&lt;p&gt;KafkaServer {&lt;br&gt;
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;&lt;br&gt;
};&lt;/p&gt;

&lt;p&gt;producer.properties:&lt;/p&gt;

&lt;p&gt;security.protocol=SASL_PLAINTEXT&lt;br&gt;
sasl.mechanism=OAUTHBEARER&lt;/p&gt;

&lt;p&gt;sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.&lt;strong&gt;OAuthBearerLoginModule required **&lt;br&gt;
clientId="" \&lt;br&gt;
clientSecret="" \&lt;br&gt;
scope="";&lt;br&gt;
sasl.oauthbearer.&lt;/strong&gt;token.endpoint.url*&lt;em&gt;=https://:443/oauth2/v1/token&lt;br&gt;
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.&lt;/em&gt;&lt;em&gt;OAuthBearerLoginCallbackHandler&lt;/em&gt;*&lt;/p&gt;

&lt;p&gt;Note: I am not interested in any custom functionality. I just want to be able to use oauth2 authentication for my kafka client. Basically, I want that if I give the provided oauth2 credentials (client id and client secret), I should be able to login and carry out with the kafka functionality.&lt;/p&gt;

&lt;p&gt;This is what happens with the above configuration:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;I can see from the logs that the kafka producer is able to login to the oauth2 server and get the access token. I see logs like this on the producer console which tell me that the client can authenticate with the oauth2 server:&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;DEBUG getClaim - scope: all (org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)&lt;br&gt;
[2024-12-13 13:08:04,852] DEBUG getClaim - exp: 1734098884 (org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)&lt;br&gt;
[2024-12-13 13:08:04,853] DEBUG getClaim - sub (org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)&lt;br&gt;
[2024-12-13 13:08:04,853] DEBUG getClaim - iat: 1734095284 (org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)&lt;br&gt;
[2024-12-13 13:08:04,863] &lt;strong&gt;DEBUG Login succeeded&lt;/strong&gt;; invoke commit() to commit it; current committed token count=0 (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)&lt;br&gt;
[2024-12-13 13:08:04,864] TRACE Committing my token; current committed token count = 0 (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)&lt;br&gt;
[2024-12-13 13:08:04,865] DEBUG Done committing my token; committed token count is now 1 (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)&lt;br&gt;
[2024-12-13 13:08:04,866] &lt;strong&gt;INFO Successfully logged in&lt;/strong&gt;. &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;However, after that, when the producer tries to do SASL authentication with the kafka broker, it fails with the error: &lt;strong&gt;{"status":"invalid_token"}&lt;/strong&gt;.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;I see the following logs on the producer console:&lt;br&gt;
 DEBUG [Producer clientId=console-producer] Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)&lt;br&gt;
[2024-12-13 13:08:08,076] DEBUG Setting SASL/OAUTHBEARER client state to RECEIVE_SERVER_FIRST_MESSAGE (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient)&lt;br&gt;
[2024-12-13 13:08:08,084] DEBUG [Producer clientId=console-producer] Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)&lt;br&gt;
[2024-12-13 13:08:08,085] TRACE [Producer clientId=console-producer] Found least loaded connecting node :9093 (id: -1 rack: null) (org.apache.kafka.clients&lt;br&gt;
.NetworkClient)&lt;br&gt;
[2024-12-13 13:08:08,086] TRACE For telemetry state SUBSCRIPTION_NEEDED, returning the value 0 ms;  (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)&lt;br&gt;
[2024-12-13 13:08:08,086] TRACE [Producer clientId=console-producer] Found least loaded connecting node :9093 (id: -1 rack: null) (org.apache.kafka.clients&lt;br&gt;
.NetworkClient)&lt;br&gt;
[2024-12-13 13:08:08,091] DEBUG Sending %%x01 response to server after receiving an error: {"status":"invalid_token"} (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient)&lt;/p&gt;

&lt;p&gt;In the server log file, I see the following lines:&lt;/p&gt;

&lt;p&gt;DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)&lt;br&gt;
[2024-12-13 12:28:42,091] DEBUG Handling Kafka request API_VERSIONS during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)&lt;br&gt;
[2024-12-13 12:28:42,093] DEBUG Set SASL server state to HANDSHAKE_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)&lt;br&gt;
[2024-12-13 12:28:42,101] DEBUG Handling Kafka request SASL_HANDSHAKE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)&lt;br&gt;
[2024-12-13 12:28:42,138] DEBUG Using SASL mechanism 'OAUTHBEARER' provided by client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)&lt;br&gt;
[2024-12-13 12:28:42,142] DEBUG Set SASL server state to AUTHENTICATE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)&lt;br&gt;
[2024-12-13 12:28:42,168] DEBUG {"status":"invalid_token"} (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer)&lt;br&gt;
[2024-12-13 12:28:42,182] DEBUG Received %x01 response from client after it received our error (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer)&lt;br&gt;
[2024-12-13 12:28:42,191] DEBUG Set SASL server state to FAILED during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator&lt;/p&gt;

&lt;p&gt;I searched online for clues and verified the following:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The problem is caused by the following file: &lt;a href="https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java" rel="noopener noreferrer"&gt;https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java&lt;/a&gt;
in the function handleValidatorCallback(OAuthBearerValidatorCallback callback)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;For some reason, here, it is trying to validate the token and considers it invalid.&lt;br&gt;
OAuthBearerToken token;&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    try {
        token = **accessTokenValidator.validate(callback.tokenValue());**
        callback.token(token);
    } catch (ValidateException e) {
        log.warn(e.getMessage(), e);
        callback.error("invalid_token", null, null);
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;I verified that the JWKS URL configured in the server properties is accessible. Ran it via curl and postman. (I however do not see anything in the server.log file to indicate that the kafka broker contacted the JWKS URL)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;I even verified that the sub value in the access token is right. I see the log in producer console: DEBUG getClaim - sub &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The iat and exp values in the access token are also appropriate. I even synced up the clocks on kafka broker, producer and the oauth2 server (all now use UTC)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;I verified that the kid in the access token matches the kid in the JWKS JSON.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Any suggestions on what the issue could be? I have already enabled logging level to DEBUG on producer and server. &lt;/p&gt;

&lt;p&gt;Thanks in advance!&lt;br&gt;
Iyer&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Is TLS 1.3 supported for Kafka 3.0.0 on Java 8?</title>
      <dc:creator>Anand Iyer</dc:creator>
      <pubDate>Tue, 04 Jul 2023 03:01:48 +0000</pubDate>
      <link>https://dev.to/aiyer100/is-tls-13-supported-for-kafka-300-on-java-8-3nkm</link>
      <guid>https://dev.to/aiyer100/is-tls-13-supported-for-kafka-300-on-java-8-3nkm</guid>
      <description>&lt;p&gt;I have a setup running kafka 3.0.0. We use JDK 1.8 for our environment. We want to support TLS 1.3.Is the possible?&lt;/p&gt;

&lt;p&gt;I looked up the Apache kafka documentation and added the following to kafka broker server.properties file:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;ssl.enabled.protocols=TLSv1.3&lt;br&gt;
ssl.protocol=TLSv1.3&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;However, when I start kafka broker, I see the following error:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)&lt;br&gt;
org.apache.kafka.common.KafkaException: java.security.NoSuchAlgorithmException:&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;I looked up on Google and it sounds like TLS 1.3 is supported on Kafka only for Java11 and above. If we are using an older Java version, TLS 1.3 is not supported.&lt;/p&gt;

&lt;p&gt;I wanted to check here if this is true. Is there an alternative way of setting up TLS 1.3 on my setup? Remember - I have Kafka 3.0.0 on Java 8.&lt;/p&gt;

&lt;p&gt;TIA &lt;br&gt;
Anand&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Kafka Partitions not getting split among multiple consumers</title>
      <dc:creator>Anand Iyer</dc:creator>
      <pubDate>Wed, 07 Dec 2022 14:04:56 +0000</pubDate>
      <link>https://dev.to/aiyer100/kafka-partitions-not-getting-split-among-multiple-consumers-48oo</link>
      <guid>https://dev.to/aiyer100/kafka-partitions-not-getting-split-among-multiple-consumers-48oo</guid>
      <description>&lt;p&gt;I have a Kafka setup running on Linux. I have set the num of partitions in server.properties to 5. I also have set the num of partitions for the topic I am using (let us call it topic1) to 5.&lt;/p&gt;

&lt;p&gt;Now, I have 5 consumers (implemented in Java) subscribing to the above topic. I am expecting that all 5 partitions should split the traffic equally among all the 5 consumers.&lt;/p&gt;

&lt;p&gt;I tried the following:&lt;/p&gt;

&lt;p&gt;ensured that the num of partitions is set to 5 at the topic level and in server.properties.&lt;br&gt;
I ran to ensure that there are 5 partitions for this topic:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;**kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1 Topic: topic1       TopicId: 4kX9oP3ARA2uHQ1_nVGY-Q PartitionCount: 5       ReplicationFactor: 1    Configs: Topic: topic1       Partition: 0    Leader: 0       Replicas: 0     Isr: 0 Topic: topic1       Partition: 1    Leader: none    Replicas: 1     Isr: 1 Topic: topic1       Partition: 2    Leader: none    Replicas: 2     Isr: 2 Topic: topic1       Partition: 3    Leader: none    Replicas: 3     Isr: 3 Topic: topic1       Partition: 4    Leader: none    Replicas: 4     Isr: 4**&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;I also ran a similar config in another setup with 5 partitions and 5 consumers on a Windows setup. But there, I could see that all 5 consumers are equally getting the data.&lt;br&gt;
But on the Linux setup I mentioned above, it always goes to just 1 consumer.&lt;br&gt;
Any inputs on how I can debug this issue? From the Kafka-topics describe output, it is clear that this topic has 5 partitions. But still those 5 partitions are not splitting among the 5 consumers available.&lt;/p&gt;

&lt;p&gt;Is there a way to check either via command line or from the Kafka logs, how many partitions are actually used during the test run? If Kafka-topics describe says 5 partitions, does it actually mean that 5 partitions are also getting used when we run a test?&lt;/p&gt;

&lt;p&gt;Note that I am running a load test on this machine - with more than 100 users firing data.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>java</category>
    </item>
  </channel>
</rss>
