使用java客户端, kafkaproducer, kafkaconsumer进行kafka的连接
注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息
1, kafka 配置信息
{ "producer": { "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093", "key.serializer": "org.apache.kafka.common.serialization.StringSerializer", "value.serializer": "org.apache.kafka.common.serialization.StringSerializer", "max.request.size": "10485760", "batch.size": "163840", "buffer.memory": "536870912", "max.block.ms": "500", "retries": "3", "acks": "1", }, "cosumer": { "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093", "group.id": "test222", "session.timeout.ms": "30000", "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer" }}
2, kafka utils, 用来读取kafka的配置信息
package com.wenbronk.kafka;import com.alibaba.fastjson.JSON;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import com.google.gson.JsonParser;import org.junit.Test;import java.io.FileNotFoundException;import java.io.FileReader;import java.util.Map;import java.util.Properties;public class KafkaUtils { @Test public void test() throws FileNotFoundException { getConfig("producer");// fastJSON(); } public static JsonObject getConfig(String name) throws FileNotFoundException { JsonParser parser = new JsonParser(); JsonElement parse = parser.parse(new FileReader("src/main/resources/kafka")); JsonObject jsonObject = parse.getAsJsonObject().getAsJsonObject(name); System.out.println(jsonObject); return jsonObject; } public static Properties getProperties(String sourceName) throws FileNotFoundException { JsonObject config = KafkaUtils.getConfig(sourceName); Properties properties = new Properties(); for (Map.Entryentry : config.entrySet()) { properties.put(entry.getKey(), entry.getValue().getAsString()); } return properties; }// public static void fastJSON() throws FileNotFoundException {// Object o = JSON.toJSON(new FileReader("src/main/resources/kafka"));// System.out.println(o);// }}
3, kafka producer
package com.wenbronk.kafka;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.junit.Test;import javax.swing.text.StyledEditorKit;import java.io.FileNotFoundException;import java.util.*;import java.util.stream.IntStream;/** * 消息提供者 */public class KafkaProducerMain { @Test public void send() throws Exception { HashMapmap = new HashMap<>(); map.put("http_zhixin", "send message to kafka from producer"); for (int i = 0; i < 3; i++ ) { sendMessage(map); }// sendMessage(map); } /** * 消息发送 */ public void sendMessage(Map topicMsg) throws FileNotFoundException { Properties properties = KafkaUtils.getProperties("producer"); KafkaProducer producer = new KafkaProducer (properties); for (Map.Entry entry : topicMsg.entrySet()) { String topic = entry.getKey(); String message = entry.getValue(); ProducerRecord record = new ProducerRecord (topic, message); // 发送// producer.send(record, new CallBackFuntion(topic, message)); producer.send(record, (recordMetadata, e) -> { if (e != null) { System.err.println(topic + ": " + message + "--消息发送失败"); }else { System.err.println(topic + ": " + message + "--消息发送成功"); } }); } producer.flush(); producer.close(); }}
回掉函数可写匿名内部类, 也可写外部类通过新建的方式运行
package com.wenbronk.kafka;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.RecordMetadata;/** * 回掉函数 */public class CallBackFuntion implements Callback { private String topic; private String message; public CallBackFuntion(String topic, String message) { this.topic = topic; this.message = message; } @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println(topic + ": " + message + "--消息发送失败"); }else { System.out.println(topic + ": " + message + "--消息发送成功"); } }}
4, kafka consumer
package com.wenbronk.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import org.junit.Test;import java.io.FileNotFoundException;import java.util.*;public class KafkaConsumerMain { /** * 自动提交offset */ public void commitAuto(Listtopics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); while (true) { ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } /** * 手动提交offset * * @throws FileNotFoundException */ public void commitControl(List topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); final int minBatchSize = 2; List > buffer = new ArrayList<>(); while (true) { ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 阻塞同步提交 consumer.commitSync(); buffer.clear(); } } } /** * 手动设置分区 */ public void setOffSet(List topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); // 处理每个分区消息后, 提交偏移量 for (TopicPartition partition : records.partitions()) { List > partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } /** * 手动设置消息offset */ public void setSeek(List topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); consumer.seek(new TopicPartition("http_zhixin", 0), 797670770); ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); consumer.commitSync(); } } @Test public void test() throws FileNotFoundException { ArrayList topics = new ArrayList<>(); topics.add("http_zhixin");// commitAuto(topics);// commitControl(topics);// setOffSet(topics); setSeek(topics); } /** * doSomethings */ private void insertIntoDb(List > buffer) { buffer.stream().map(x -> x.value()).forEach(System.err::println); }}
kafka 处于同一组的消费者, 不可以重复读取消息, 0.11版本中加入了事物控制