Spring Boot 中使用kafka AdminClient管理Kafka

2021-06-07 12:05

阅读:560

标签:form   port   tin   oid   one   bst   clist   describe   kafka   

2021-03-27

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;

import com.ibm.dsw.quote.preentitlement.base.AbstractBaseTest;

public class AdminClientTest extends AbstractBaseTest {

    @Autowired
    private KafkaAdmin admin;

    @Test
    public void listTopic() throws InterruptedException, ExecutionException {
        AdminClient client = AdminClient.create(admin.getConfig());

        // create topic
        NewTopic newTopic = new NewTopic("test1", 3, (short) 1);
        Collection newTopicList = new ArrayList();
        newTopicList.add(newTopic);
        CreateTopicsResult createTopicResult = client.createTopics(newTopicList);
        createTopicResult.all().get();
        
        // list topic
        ListTopicsResult result = client.listTopics();
        Collection topic = result.listings().get();
        topic.forEach(each -> System.out.println(each.name()));

        
        //get topic configuration
        DescribeConfigsResult ret = client.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "TopicName")));
        Map configs = ret.all().get();
        for (Map.Entry entry : configs.entrySet()) {
             ConfigResource key = entry.getKey();
             Config value = entry.getValue();
             System.out.println(String.format("Resource type: %s, resource name: %s", key.type(), key.name()));
             Collection configEntries = value.entries();
             for (ConfigEntry each : configEntries) {
                  System.out.println(each.name() + " = " + each.value());
             }
         }

        //get cluster information
        DescribeClusterResult ret2= client.describeCluster(new DescribeClusterOptions());
        String clusterId = ret2.clusterId().get();
        System.out.println("----------------clusterId------------"+clusterId);
        Collection nodes = ret2.nodes().get();
        for (Node node: nodes) {
            System.out.println(node.host());
        }
        
           
        client.close();

    }

}

 

Spring Boot 中使用kafka AdminClient管理Kafka

标签:form   port   tin   oid   one   bst   clist   describe   kafka   

原文地址:https://www.cnblogs.com/Ivyduan/p/14586400.html


评论


亲,登录后才可以留言!