Skip to content

Conversation

kprzybyla
Copy link

@kprzybyla kprzybyla commented May 6, 2025

Changes

Fixes #1080
Fixes #1015

I recently discovered that aiokafka does not implement KIP-368, which is essential for the OAUTHBEARER authorization, for example, when using AWS MSK. This was not so obvious to find out for me as someone who does not know Kafka internals and just wants to use a certain authorization method, because according to the KIP-368, the server will just break the connection to any client for which the token has expired. So, from the client side, this looks like the Kafka server is terminating the connection for no reason. This was especially mindboggling because the Amazon MSK library "hardcodes" the session timeout to be 15 minutes, while the token is actually granted with a 1-hour expiration time, and this did not help to make the 1-hour error interval noticeable.

Also, Amazon MSK library describes in their "Get Started" how to use it with the kafka-python library, but this library does not implement KIP-368 either (see dpkp/kafka-python#2205), which leads me to believe that not that many people is aware how problematic this is when you are trying to built something reliable. This also happens to be the case for the confluent-kafka-python (see confluentinc/confluent-kafka-python#1485), which is also used in the Amazon MSK example.

Anyway, after I figured out what was happening, I created a quick and dirty patch for aiokafka with a workaround that closed connections that were about to expire, and this got rid of all connection drops from the Kafka server that I was experiencing previously. This "patched" version is now used in our production in the company I worked for, and it works flawlessly, but due to certain implementation shortcuts, it generates unharmful but still annoying errors. So, after it was proven that this solved the issue we were battling for a long time, I decided to contribute a polished solution so that nobody else has to go through the same frustration I went through to get this working reliably.

The OAUTHBEARER SASL mechanism was also painful to configure in the Kafka server since it is implemented differently than other SASL mechanisms. Setting up OAUTHBEARER with other SASL mechanisms is impossible, at least I did not find a way to make it work. Because of this, a separate Docker container runs specifically for OAUTHBEARER tests.

Also, for anyone reading this who will use AWS MSK, please remember to set a static value for AWS_ROLE_SESSION_NAME (different for each client), otherwise, the re-authentication will fail since by default, the session name will contain the current timestamp at the end, and the session name must stay the same for a given client.

Java reference implementation of the same feature: apache/kafka#5582

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder

kprzybyla added 5 commits May 5, 2025 17:06
This is to avoid certain limitations of the sh shell. The start-kafka.sh uses
`exec` and passes all constructed options by gluing them into a single string.
This is fine as long as those arguments do not contain any space characters.
Unfortunately, the JAAS config passed via arguments will always contain spaces,
so to properly pass arguments, we have to use bash arrays.
The JAAS SASL OAUTHBEARER configuration is not compatible with other
JAAS configurations. I have no idea why Kafka developers did such a thing,
but it requires passing JAAS config via `sasl.jaas.config` property
for the given listener. If we put this into the standard JAAS config,
Kafka would complain when setting up the OAUTHBEARER mechanism.
self_ref: weakref.ReferenceType["AIOKafkaConnection"],
sasl_reauthentication_time: int,
) -> None:
self = self_ref()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The weak reference pattern is defeated by acquiring self before the sleep. Consider moving await asyncio.sleep() before getting a strong ref to self. Or explain why we should keep the strong ref while sleeping

@legion49f
Copy link

@Mixser my team is running into this issue and we are waiting for this PR to be merged. Would it be possible to make the changes you recommended and going ahead with a merge and a release? it's been longer than two months since the last time anyone commented on this thread.

thanks!

@kprzybyla
Copy link
Author

I can make recommended changes if any maintainer of aiokafka is willing to review this. The comment by @Mixser is valid and I can fix this even today but I don't think that @Mixser will be able to push it further as I don't believe they are a maintainer of this project. We probably need someone like @ods here.

@ods
Copy link
Collaborator

ods commented Jul 22, 2025

Hi @kprzybyla, thank you for your work. I could help with bringing changes to the release, but unfortunately, I don't feel confident enough to do a thorough review.

@kprzybyla
Copy link
Author

@ods, Ok, would you happen to have any suggestions on whom I should approach to do the review?

@legion49f
Copy link

Just fyi. We forked and built this branch and have it deployed in a prod environment. This is working just fine aside from some nuisance misleading log messages.

@VladislavSokolovKpler
Copy link

Hello,

FYI we have this issue too, and we prefer to wait this PR being merged.
By any chance, are there any updates?

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

5 participants