Kafka搭建
windows kafka安装配置
- 下载Zookeeper
- 下载
- 解压并进入ZooKeeper目录
- 将“zoo_sample.cfg”重命名为“zoo.cfg”
- 打开“zoo.cfg”找到并编辑dataDir=D:\Kafka\zookeeper-3.4.9\tmp(必须以\分割)
- 添加系统变量:ZOOKEEPER_HOME=D:\Kafka\zookeeper-3.4.9
- 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
- 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
- 打开新的cmd,输入“zkServer“,运行Zookeeper
- 下载kafka
- 要下载二进制版本Binary downloads: Scala 2.12 - kafka_2.12-3.0.0.tgz (asc, sha512)
- 解压进入文件夹进入config目录找到文件server.properties并打开
- 找到并编辑log.dirs=D:\Kafka\kafka_2.12-0.11.0.0\kafka-logs,D:\Kafka\kafka_2.12-0.11.0.0\kafka-logs
- 找到并编辑zookeeper.connect=localhost:2181
- 高版本的kafka不需要zookeeper(bootstrap.server=localhost:9092)
- 进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0 运行cmd
.\bin\windows\kafka-server-start.bat .\config\server.properties 或bin\kafka-server-start.sh config\server.properties
测试
创建主题,进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0 cmd
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(高版本不需要zookeeper)
(使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181)
查看主题输入
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
创建生产者,进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
创建消费者,进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
往生产者窗口写入消息,消费者窗口也能同步的接收到消息
操作日志的处理
- 找到config下的log4j.properties
- 更改路径
Kafka服务搭建
配置
broker.id属性在kafka集群中必须唯一
broker.id=0
IP和服务端口号
listeners=PLAINTEXT://192.168.31.7:9092
消息存储文件
log.dir=/usr/local/data/kafka-logs
连接zookeeper地址
zookeeper.connect=192.168.31.7:2181
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0
**启动zookeeper:**
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
**启动kafka:**
bin/kafka-server-start.sh -daemon config/server.properties &
jps命令可以查看zookeeper和kafka是否成功启动
有端口则启动
创建topic:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
展示topic:
bin/kafka-topics.sh --list --bootstrap-server localhost:2181
bin/kafka-topics.sh --bootstrap-server 192.168.31.7:9092 --list
查看topic:
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic test --bootstrap-server 192.168.31.7:9092
运行producer,发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
运行消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
当发布一个不存在的topic时自动创建topic,这里配置:
在server.properties中配置:
# 自动创建主题
auto.create.topics.enable=true
# 默认主题的分区数
num.partitions=8
# 默认分区副本
default.replication.factor=3
Copy
注意:
default.replication.factor:默认分区副本数不得超过kafka节点数(你想,副本数如果一个节点放2份,意义就没了)
每个节点都需要配置,然后重启即可。
https://www.orchome.com/10428
相关命令
停止kafka
bin/kafka-server-stop.sh
停止zookeeper
bin/zookeeper-server-stop.sh
生产者:
bin/kafka-console-producer.sh --broker-list 192.168.31.7:9092 --topic my-replicated-topic
消费者:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.7:2181 --topic test --from-beginnin
删除topicbin/kafka-topics.sh --delete --bootstrap-server 192.168.31.7:2181 --topic topicname删除topic中存储的内容在config/server.properties中找到如下的位置
删除log.dirs指定的文件目录,然后重新启动就可以了
docker简单搭建
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.7:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.7:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e TZ="Asia/Shanghai" wurstmeister/kafka
docker exec -it kafka /bin/sh
cd /opt/kafka_2.13-2.8.1
配置sasl/plain
配置jaas文件
cd config
vim kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-pass"
user_admin="admin-pass"
user_zwq="zwq-pass";
};
vim kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zwq"
password="zwq-pass";
};
vim kafka_zoo_jaas.conf
Server{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-pass"
user_admin="admin-pass";
};
kafka启动命令修改
cd bin
vim zookeeper-server-start.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.client.username=admin"
export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/work/install_bag/kafka_2.13-3.1.0/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.client.username=admin"
if ....
vim kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/work/install_bag/kafka_2.13-3.1.0/config/kafka_server_jaas.conf"
if ...
vim kafka-run-class.sh
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf'
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/work/install_bag/kafka_2.13-3.1.0/config/kafka_server_jaas.conf'
vim kafka-console-producer.sh
vim kafka-console-consumer.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/work/install_bag/kafka_2.13-3.1.0/config/kafka_client_jaas.conf"
if ...
主配置文件
kafka总配置:
cd config
vim server.properties
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://192.168.31.7:9092
advertised.host.name=192.168.31.7
advertised.port=9092
zookeeper总配置:
vim zookeeper.properties
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
vim producer.properties
vim consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
集群
一台机器启动三个broker
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
修改
broker.id在集群中需要唯一
broker.id=1
listeners=PLAINTEXT://192.168.31.7:9093
log.dir=/usr/local/data/kafka-logs-1
zookeeper配置要一样
zookeeper.connect=192.168.31.7:2181
启动
bin/kafka-server-start.sh -daemon config/server-1.properties &
bin/kafka-server-start.sh -daemon config/server-2.properties &
springBoot配置测试
yml:
spring:
kafka:
bootstrap-servers: 192.168.31.7:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="zwq" password="zwq-pass";'
pom:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
testController:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("kafka")
public void sendMessage1() {
kafkaTemplate.send("test", "hello,kafka");
}
@GetMapping("/test/{id}")
public String test(@PathVariable("id") String id) {
return id;
}
}
消费:
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"test"})
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}