Difference between revisions of "Kafka"

From UVOO Tech Wiki
Jump to navigation Jump to search
Line 36: Line 36:
 
topic_list.append(NewTopic("example_topic", 1, 1))
 
topic_list.append(NewTopic("example_topic", 1, 1))
 
admin_client.create_topics(topic_list)
 
admin_client.create_topics(topic_list)
 +
```
 +
 +
 +
```
 +
his is a known broker issue.
 +
Topic deletions are asynchronous and depending on the cluster load and state it may take a long time for a topic to be completely removed from all brokers in the cluster.
 +
It is therefore strongly recommended not to delete and re-create topics in this manner, but instead create new unique topics, or maintain (commit) the last read offset for the topic partitions to make sure no old messages are picked up by a consumer.
 
```
 
```

Revision as of 01:48, 3 May 2021

Refs

Using http to interface

Python Kafka

https://python.plainenglish.io/how-to-programmatically-create-topics-in-kafka-using-python-d8a22590ecde

Using kafka-python
from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092", 
    client_id='test'
)

topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
Using confluent_kafka
from confluent_kafka.admin import AdminClient, NewTopic


admin_client = AdminClient({
    "bootstrap.servers": "localhost:9092"
})

topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)
his is a known broker issue.
Topic deletions are asynchronous and depending on the cluster load and state it may take a long time for a topic to be completely removed from all brokers in the cluster.
It is therefore strongly recommended not to delete and re-create topics in this manner, but instead create new unique topics, or maintain (commit) the last read offset for the topic partitions to make sure no old messages are picked up by a consumer.