博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka-java客户端连接
阅读量:6507 次
发布时间:2019-06-24

本文共 8181 字,大约阅读时间需要 27 分钟。

使用java客户端, kafkaproducer, kafkaconsumer进行kafka的连接

注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息

1, kafka 配置信息

{    "producer": {        "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093",        "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",        "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",        "max.request.size": "10485760",        "batch.size": "163840",        "buffer.memory": "536870912",        "max.block.ms": "500",        "retries": "3",        "acks": "1",    },    "cosumer": {        "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093",        "group.id": "test222",        "session.timeout.ms": "30000",        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"    }}

 

2, kafka utils, 用来读取kafka的配置信息

package com.wenbronk.kafka;import com.alibaba.fastjson.JSON;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import com.google.gson.JsonParser;import org.junit.Test;import java.io.FileNotFoundException;import java.io.FileReader;import java.util.Map;import java.util.Properties;public class KafkaUtils {    @Test    public void test() throws FileNotFoundException {        getConfig("producer");//        fastJSON();    }    public static JsonObject getConfig(String name) throws FileNotFoundException {        JsonParser parser = new JsonParser();        JsonElement parse = parser.parse(new FileReader("src/main/resources/kafka"));        JsonObject jsonObject = parse.getAsJsonObject().getAsJsonObject(name);        System.out.println(jsonObject);        return jsonObject;    }    public static Properties getProperties(String sourceName) throws FileNotFoundException {        JsonObject config = KafkaUtils.getConfig(sourceName);        Properties properties = new Properties();        for (Map.Entry
entry : config.entrySet()) { properties.put(entry.getKey(), entry.getValue().getAsString()); } return properties; }// public static void fastJSON() throws FileNotFoundException {// Object o = JSON.toJSON(new FileReader("src/main/resources/kafka"));// System.out.println(o);// }}

3, kafka producer

package com.wenbronk.kafka;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.junit.Test;import javax.swing.text.StyledEditorKit;import java.io.FileNotFoundException;import java.util.*;import java.util.stream.IntStream;/** * 消息提供者 */public class KafkaProducerMain {    @Test    public void send() throws Exception {        HashMap
map = new HashMap<>(); map.put("http_zhixin", "send message to kafka from producer"); for (int i = 0; i < 3; i++ ) { sendMessage(map); }// sendMessage(map); } /** * 消息发送 */ public void sendMessage(Map
topicMsg) throws FileNotFoundException { Properties properties = KafkaUtils.getProperties("producer"); KafkaProducer
producer = new KafkaProducer
(properties); for (Map.Entry
entry : topicMsg.entrySet()) { String topic = entry.getKey(); String message = entry.getValue(); ProducerRecord
record = new ProducerRecord
(topic, message); // 发送// producer.send(record, new CallBackFuntion(topic, message)); producer.send(record, (recordMetadata, e) -> { if (e != null) { System.err.println(topic + ": " + message + "--消息发送失败"); }else { System.err.println(topic + ": " + message + "--消息发送成功"); } }); } producer.flush(); producer.close(); }}

回掉函数可写匿名内部类, 也可写外部类通过新建的方式运行

package com.wenbronk.kafka;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.RecordMetadata;/** * 回掉函数 */public class CallBackFuntion implements Callback {    private String topic;    private String message;    public CallBackFuntion(String topic, String message) {        this.topic = topic;        this.message = message;    }    @Override    public void onCompletion(RecordMetadata recordMetadata, Exception e) {        if (e != null) {            System.out.println(topic + ": " + message + "--消息发送失败");        }else {            System.out.println(topic + ": " + message + "--消息发送成功");        }    }}

 

4, kafka consumer

package com.wenbronk.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import org.junit.Test;import java.io.FileNotFoundException;import java.util.*;public class KafkaConsumerMain {    /**     * 自动提交offset     */    public void commitAuto(List
topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); KafkaConsumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); while (true) { ConsumerRecords
records = consumer.poll(1000); for (ConsumerRecord
record : records) System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } /** * 手动提交offset * * @throws FileNotFoundException */ public void commitControl(List
topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); final int minBatchSize = 2; List
> buffer = new ArrayList<>(); while (true) { ConsumerRecords
records = consumer.poll(1000); for (ConsumerRecord
record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 阻塞同步提交 consumer.commitSync(); buffer.clear(); } } } /** * 手动设置分区 */ public void setOffSet(List
topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); while (true) { ConsumerRecords
records = consumer.poll(Long.MAX_VALUE); // 处理每个分区消息后, 提交偏移量 for (TopicPartition partition : records.partitions()) { List
> partitionRecords = records.records(partition); for (ConsumerRecord
record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } /** * 手动设置消息offset */ public void setSeek(List
topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); consumer.seek(new TopicPartition("http_zhixin", 0), 797670770); ConsumerRecords
records = consumer.poll(100); for (ConsumerRecord
record : records) { System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); consumer.commitSync(); } } @Test public void test() throws FileNotFoundException { ArrayList
topics = new ArrayList<>(); topics.add("http_zhixin");// commitAuto(topics);// commitControl(topics);// setOffSet(topics); setSeek(topics); } /** * doSomethings */ private void insertIntoDb(List
> buffer) { buffer.stream().map(x -> x.value()).forEach(System.err::println); }}

 

kafka 处于同一组的消费者, 不可以重复读取消息, 0.11版本中加入了事物控制

转载地址:http://ahwfo.baihongyu.com/

你可能感兴趣的文章
Java 使用 Redis
查看>>
JPA常用注解
查看>>
Java基础学习总结(1)——equals方法
查看>>
Maven学习总结(6)——Maven与Eclipse整合
查看>>
HTML5:理解head
查看>>
oracle
查看>>
java SpringUtil获取bean
查看>>
Centos6.4最小化安装系统初始化脚本
查看>>
赛门铁克开启“容灾即服务”时代
查看>>
复杂度归纳--小结
查看>>
PHP学习笔记 第八讲 Mysql.简介和创建新的数据库
查看>>
js获取鼠标位置
查看>>
Mysql
查看>>
跨越企业的“中等收入陷阱”
查看>>
Android 开发者必知的开发资源
查看>>
软件工程技术基础-(软件复用技术)
查看>>
给django视图类添加装饰器
查看>>
luogu P1280 尼克的任务 序列DP
查看>>
sys.check_constraints
查看>>
vue问题
查看>>