kafka get partition count for a topic
JavaApache KafkaJava Problem Overview
How can I get number of partitions for any kafka topic from the code. I have researched many links but none seem to work.
Mentioning a few:
http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api
http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic
http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic
which look like similar discussions.
Also there are similar links on SO which do not have a working solution to this.
Java Solutions
Solution 1 - Java
Go to your kafka/bin
directory.
Then run this:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
You should see what you need under PartitionCount
.
Topic:topic_name PartitionCount:5 ReplicationFactor:1 Configs:
Topic: topic_name Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
When using a version where zookeeper is no longer a dependency of Kafka
kafka-topics --describe --bootstrap-server localhost:9092 --topic topic_name
Solution 2 - Java
In the 0.82 Producer API and 0.9 Consumer api you can use something like
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")
Solution 3 - Java
Here's how I do it:
/**
* Retrieves list of all partitions IDs of the given {@code topic}.
*
* @param topic
* @param seedBrokers List of known brokers of a Kafka cluster
* @return list of partitions or empty list if none found
*/
public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
for (BrokerInfo seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<Integer> partitions = new ArrayList<>();
// find our partition's metadata
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
partitions.add(part.partitionId());
}
}
return partitions; // leave on first successful broker (every broker has this info)
} catch (Exception e) {
// try all available brokers, so just report error and go to next one
LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
throw new RuntimeError("Could not get partitions");
}
Note that I just needed to pull out partition IDs, but you can additionally retrieve any other partition metadata, like leader
, isr
, replicas
, ...
And BrokerInfo
is just a simple POJO that has host
and port
fields.
Solution 4 - Java
Below shell cmd can print the number of partitions. You should be in kafka bin directory before executing the cmd:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{print "count of partitions=" $1}'
Note that you have to change the topic name according to your need. You can further validate this using if condition as well:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{if ($1=="16") print "valid partitions"}'
The above cmd command prints valid partitions if count is 16. You can change count depending on your requirement.
Solution 5 - Java
In java code we can use AdminClient
to get sum partions of one topic.
Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);
DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>> values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);
Solution 6 - Java
Use PartitionList from KafkaConsumer
//create consumer then loop through topics
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
ArrayList<Integer> partitionList = new ArrayList<>();
System.out.println(partitions.get(0).partition());
for(int i = 0; i < partitions.size(); i++){
partitionList.add(partitions.get(i).partition());
}
Collections.sort(partitionList);
Should work like a charm. Let me know if there's a simpler way to access Partition List from Topic.
Solution 7 - Java
So the following approach works for kafka 0.10 and it does not use any producer or consumer APIs. It uses some classes from the scala API in kafka such as ZkConnection and ZkUtils.
ZkConnection zkConnection = new ZkConnection(zkConnect);
ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());
Solution 8 - Java
I have had the same issue, where I needed to get the partitions for a topic.
With the help of the answer here I was able to get the information from Zookeeper.
Here is my code in Scala (but could be easily translated into Java)
import org.apache.zookeeper.ZooKeeper
def extractPartitionNumberForTopic(topicName: String, zookeeperQurom: String): Int = {
val zk = new ZooKeeper(zookeeperQurom, 10000, null);
val zkNodeName = s"/brokers/topics/$topicName/partitions"
val numPartitions = zk.getChildren(zkNodeName, false).size
zk.close()
numPartitions
}
Using this approach allowed me to access the information about Kafka topics as well as other information about Kafka brokers ...
From Zookeeper you could check for the number of partitions for a topic by browsing to /brokers/topics/MY_TOPIC_NAME/partitions
Using zookeeper-client.sh
to connect to your zookeeper:
[zk: ZkServer:2181(CONNECTED) 5] ls /brokers/topics/MY_TOPIC_NAME/partitions
[0, 1, 2]
That shows us that there are 3 partitions for the topic MY_TOPIC_NAME
Solution 9 - Java
//create the kafka producer
def getKafkaProducer: KafkaProducer[String, String] = {
val kafkaProps: Properties = new Properties()
kafkaProps.put("bootstrap.servers", "localhost:9092")
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](kafkaProps)
}
val kafkaProducer = getKafkaProducer
val noOfPartition = kafkaProducer.partitionsFor("TopicName")
println(noOfPartition) //it will print the number of partiton for the given
//topic
Solution 10 - Java
@Sunil-patil answer stopped short of answering the count piece of it. You have to get the size of the List
producer.partitionsFor("test").size()
@vish4071 no point butting Sunil, you did not mention that you are using ConsumerConnector in the question.
Solution 11 - Java
The number of partitions can be retrieved from zookeeper-shell
Syntax: ls /brokers/topics/<topic_name>/partitions
Below is the example:
root@zookeeper-01:/opt/kafka_2.11-2.0.0# bin/zookeeper-shell.sh zookeeper-01:2181
Connecting to zookeeper-01:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4]
Solution 12 - Java
You can get kafka partition list from zookeeper like this. It is real kafka server side partition number.
[zk: zk.kafka:2181(CONNECTED) 43] ls /users/test_account/test_kafka_name/brokers/topics/test_kafka_topic_name/partitions
[35, 36, 159, 33, 34, 158, 157, 39, 156, 37, 155, 38, 154, 152, 153, 150, 151, 43, 42, 41, 40, 202, 203, 204, 205, 200, 201, 22, 23, 169, 24, 25, 26, 166, 206, 165, 27, 207, 168, 208, 28, 29, 167, 209, 161, 3, 2, 162, 1, 163, 0, 164, 7, 30, 6, 32, 5, 160, 31, 4, 9, 8, 211, 212, 210, 215, 216, 213, 19, 214, 17, 179, 219, 18, 178, 177, 15, 217, 218, 16, 176, 13, 14, 11, 12, 21, 170, 20, 171, 174, 175, 172, 173, 220, 221, 222, 223, 224, 225, 226, 227, 188, 228, 187, 229, 189, 180, 10, 181, 182, 183, 184, 185, 186, 116, 117, 79, 114, 78, 77, 115, 112, 113, 110, 111, 118, 119, 82, 83, 80, 81, 86, 87, 84, 85, 67, 125, 66, 126, 69, 127, 128, 68, 121, 122, 123, 124, 129, 70, 71, 120, 72, 73, 74, 75, 76, 134, 135, 132, 133, 59, 138, 58, 57, 139, 136, 56, 137, 55, 64, 65, 62, 63, 60, 131, 130, 61, 49, 143, 48, 144, 145, 146, 45, 147, 44, 148, 47, 149, 46, 51, 52, 53, 54, 140, 142, 141, 50, 109, 108, 107, 106, 105, 104, 103, 99, 102, 101, 100, 98, 97, 96, 95, 94, 93, 92, 91, 90, 88, 89, 195, 194, 197, 196, 191, 190, 193, 192, 198, 199, 230, 239, 232, 231, 234, 233, 236, 235, 238, 237]
And you can use partition count at consumer code.
def getNumPartitions(topic: String): Int = {
val zk = CuratorFrameworkFactory.newClient(zkHostList, new RetryNTimes(5, 1000))
zk.start()
var numPartitions: Int = 0
val topicPartitionsPath = zkPath + "/brokers/topics/" + topic + "/partitions"
if (zk.checkExists().forPath(topicPartitionsPath) != null) {
try {
val brokerIdList = zk.getChildren().forPath(topicPartitionsPath).asScala
numPartitions = brokerIdList.length.toInt
} catch {
case e: Exception => {
e.printStackTrace()
}
}
}
zk.close()
numPartitions
}
Solution 13 - Java
To get the list of partitions the ideal/actual way is to use the AdminClients API
Properties properties=new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient adminClient=KafkaAdminClient.create(properties);
Map<String, TopicDescription> jension = adminClient.describeTopics(Collections.singletonList("jenison")).all().get();
System.out.println(jension.get("jenison").partitions().size());
This can be run as a standalone java method with no producer/consumer dependencies.
Solution 14 - Java
You can explore the kafka.utils.ZkUtils
which has many methods aimed to help extract metadata about the cluster. The answers here are nice so I'm just adding for the sake of diversity:
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
def getTopicPartitionCount(zookeeperQuorum: String, topic: String): Int = {
val client = new ZkClient(zookeeperQuorum)
val partitionCount = ZkUtils.getAllPartitions(client)
.count(topicPartitionPair => topicPartitionPair.topic == topic)
client.close
partitionCount
}
Solution 15 - Java
cluster.availablePartitionsForTopic(topicName).size()
Solution 16 - Java
I found none of the answers provided a quick easy way to count all the partitions for a given topic regex. In my case I needed to see how many partitions there were in my cluster including replicas for sizing purposes.
Below is the bash command you can run (no extra tools needed):
kafka-topics --describe --bootstrap-server broker --topic ".*" | grep Configs | awk '{printf "%d\n", $4*$6}' | awk '{s+=$1} END {print s}'
You can adjust the topic regex by replacing the .*
regex with whatever you like. Also make sure to change broker
to your broker's address.
Details:
- Stream
kafka-topics
describe output for the given topics of interest - Extract only the first line for each topic which contains the partition count and replication factor
- Multiply PartitionCount by ReplicationFactor to get total partitions for the topic
- Sum all counts and print total
Bonus:
If you have docker installed you don't need to download the Kafka binary:
docker run -it confluentinc/cp-kafka:6.0.0 /bin/bash
Then you can run this to access all the Kafka scripts:
cd /usr/bin
Solution 17 - Java
Anyone looking for python confluent-kafka packages
from confluent_kafka.admin import AdminClient
topic_name = 'my_topic'
settings = {'bootstrap.servers': ["..."]}
kadmin = AdminClient(settings)
topic_metadata = kadmin.list_topics(topic_name).topics