Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol. #1008

Open
mjunaidca opened this issue May 18, 2024 · 1 comment

Comments

@mjunaidca
Copy link

Describe the bug
When trying to create a topic using AIOKafkaAdminClient, the following error is encountered:

IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol.

Expected behaviour
The topic should be created successfully without throwing an IncompatibleBrokerVersion error.

Environment (please complete the following information):

  • aiokafka version is 0.10.0 and I am using apache/kafka:3.7.0 docker hub image to run kafka broker
  • Kafka Broker version (kafka-topics.sh --version):
    3.7.0
  • Other information (Confluent Cloud version, etc.):
    apache/kafka:3.7.0

Here's the compose.yaml file:
https://github.com/mjunaidca/kafka-playground/blob/main/python-kafka/compose.yml

Reproducible example

import logging
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
from aiokafka.errors import KafkaError, TopicAlreadyExistsError

# Kafka settings (replace with actual values)
KAFKA_BOOTSTRAP_SERVER = 'broker:19092'

# Logging configuration
logging.basicConfig(level=logging.INFO)

async def startup_topic_event():
    admin_client = AIOKafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVER)
    try:
        topic1 = NewTopic(
            name="PET",
            num_partitions=3,
            replication_factor=1,
        )
        # Create topic if it doesn't exist
        await admin_client.create_topics(
            new_topics=[topic1], validate_only=False
        )
        logging.info(f"Topic 'PET' created successfully.")
    except TopicAlreadyExistsError:
        logging.info(f"Topic 'PET' already exists.")
    except KafkaError as e:
        logging.error(f"Failed to create topic 'PET': {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
    finally:
        await admin_client.close()

# Note: This function is not used in the current implementation due to the compatibility issue.
# The good thing is that the producer/consumer auto-creates the topic.

Additional context
The auto-creation of topics by producers/consumers works as expected. However, explicit creation using the admin client fails due to the broker version compatibility issue. This was tested with the Kafka broker version specified in the environment section.

@ods
Copy link
Collaborator

ods commented May 22, 2024

AIOKafkaAdminClient must be bootstrapped before use, the preferred way is

async with AIOKafkaAdminClient(...) as client:
    ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants