Kafka搭建


Kafka搭建

windows kafka安装配置

  • 下载Zookeeper
  • 下载
  • 解压并进入ZooKeeper目录
  • 将“zoo_sample.cfg”重命名为“zoo.cfg”
  • 打开“zoo.cfg”找到并编辑dataDir=D:\Kafka\zookeeper-3.4.9\tmp(必须以\分割)
  • 添加系统变量:ZOOKEEPER_HOME=D:\Kafka\zookeeper-3.4.9
  • 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
  • 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
  • 打开新的cmd,输入“zkServer“,运行Zookeeper
  • 下载kafka
  • 要下载二进制版本Binary downloads: Scala 2.12 - kafka_2.12-3.0.0.tgz (asc, sha512)
  • 解压进入文件夹进入config目录找到文件server.properties并打开
  • 找到并编辑log.dirs=D:\Kafka\kafka_2.12-0.11.0.0\kafka-logs,D:\Kafka\kafka_2.12-0.11.0.0\kafka-logs
  • 找到并编辑zookeeper.connect=localhost:2181
  • 高版本的kafka不需要zookeeper(bootstrap.server=localhost:9092)
  • 进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0 运行cmd
    .\bin\windows\kafka-server-start.bat .\config\server.properties
    或bin\kafka-server-start.sh config\server.properties

测试

创建主题,进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0 cmd

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(高版本不需要zookeeper)
(使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181)

查看主题输入

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

创建生产者,进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

创建消费者,进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

往生产者窗口写入消息,消费者窗口也能同步的接收到消息

操作日志的处理

  • 找到config下的log4j.properties
  • 更改路径

Kafka服务搭建

配置

broker.id属性在kafka集群中必须唯一
broker.id=0
IP和服务端口号
listeners=PLAINTEXT://192.168.31.7:9092
消息存储文件
log.dir=/usr/local/data/kafka-logs
连接zookeeper地址
zookeeper.connect=192.168.31.7:2181
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0

**启动zookeeper:**
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &

**启动kafka:**
bin/kafka-server-start.sh -daemon config/server.properties &

jps命令可以查看zookeeper和kafka是否成功启动
有端口则启动
创建topic:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
展示topic:
bin/kafka-topics.sh --list --bootstrap-server localhost:2181
bin/kafka-topics.sh --bootstrap-server 192.168.31.7:9092 --list
查看topic:
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic test --bootstrap-server 192.168.31.7:9092
运行producer,发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
运行消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

 

当发布一个不存在的topic时自动创建topic,这里配置:
在server.properties中配置:
# 自动创建主题
auto.create.topics.enable=true
# 默认主题的分区数
num.partitions=8
# 默认分区副本
default.replication.factor=3
Copy
注意:
default.replication.factor:默认分区副本数不得超过kafka节点数(你想,副本数如果一个节点放2份,意义就没了)
每个节点都需要配置,然后重启即可。
https://www.orchome.com/10428

相关命令

停止kafka
bin/kafka-server-stop.sh

停止zookeeper
bin/zookeeper-server-stop.sh

生产者:
bin/kafka-console-producer.sh --broker-list 192.168.31.7:9092 --topic my-replicated-topic

消费者:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.7:2181 --topic test --from-beginnin

删除topicbin/kafka-topics.sh --delete --bootstrap-server 192.168.31.7:2181 --topic topicname删除topic中存储的内容在config/server.properties中找到如下的位置

删除log.dirs指定的文件目录,然后重新启动就可以了

docker简单搭建

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.7:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.7:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e TZ="Asia/Shanghai" wurstmeister/kafka
docker exec -it kafka /bin/sh
cd /opt/kafka_2.13-2.8.1

配置sasl/plain

配置jaas文件

cd config
vim kafka_server_jaas.conf

KafkaServer { 
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-pass"
    user_admin="admin-pass"
    user_zwq="zwq-pass";
};

vim kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="zwq"
    password="zwq-pass";
};

vim kafka_zoo_jaas.conf

Server{
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="admin"
	password="admin-pass"
	user_admin="admin-pass";
};

kafka启动命令修改

cd bin
vim zookeeper-server-start.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.client.username=admin"
export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/work/install_bag/kafka_2.13-3.1.0/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.client.username=admin"
if ....

vim kafka-server-start.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/work/install_bag/kafka_2.13-3.1.0/config/kafka_server_jaas.conf"
if ...

vim kafka-run-class.sh

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf'
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/work/install_bag/kafka_2.13-3.1.0/config/kafka_server_jaas.conf'

vim kafka-console-producer.sh
vim kafka-console-consumer.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/work/install_bag/kafka_2.13-3.1.0/config/kafka_client_jaas.conf"
if ...

主配置文件

kafka总配置:
cd config
vim server.properties

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://192.168.31.7:9092
advertised.host.name=192.168.31.7
advertised.port=9092

zookeeper总配置:
vim zookeeper.properties

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

vim producer.properties
vim consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

集群

一台机器启动三个broker
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
修改
broker.id在集群中需要唯一
broker.id=1
listeners=PLAINTEXT://192.168.31.7:9093
log.dir=/usr/local/data/kafka-logs-1
zookeeper配置要一样
zookeeper.connect=192.168.31.7:2181

启动

bin/kafka-server-start.sh -daemon config/server-1.properties &
bin/kafka-server-start.sh -daemon config/server-2.properties &

springBoot配置测试

yml:

spring:
  kafka:
    bootstrap-servers: 192.168.31.7:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas:
          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="zwq" password="zwq-pass";'

pom:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

testController:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    // 发送消息
    @GetMapping("kafka")
    public void sendMessage1() {
        kafkaTemplate.send("test", "hello,kafka");
    }

    @GetMapping("/test/{id}")
    public String test(@PathVariable("id") String id) {
        return id;
    }
}

消费:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}

  目录