广告位联系
返回顶部
分享到

云服务器(Linux)安装部署Kafka的详细过程

服务器其他 来源:互联网 作者:秩名 发布时间:2022-11-25 15:21:02 人浏览
摘要

云服务器(Linux)安装部署Kafka 前期准备 kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。 下载安装包 官网地址: 较新的版本已自带Zookeeper,无需额外下载

云服务器(Linux)安装部署Kafka

前期准备

kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。

下载安装包

官网地址:

较新的版本已自带Zookeeper,无需额外下载。这里使用3.2.0做演示。

注意要下载Binary downloads标签下的tgz包,Source download标签下的包为源码。无法直接运行,需要编译。

上载安装包到云服务器

使用ssh连接工具将kafka_2.12-3.2.0.tgz这个包上传到云服务器上的一个目录。

打开命令行,进入到放有压缩包的目录,执行

1

tar -zxvf kafka_2.12-3.2.0.tgz

配置kafka

然后使用cd命令进入到/kafka_2.12-3.2.0/config/下,使用

1

vi server.properties

编辑配置文件。

删除listeners和advertised前方的#号,改成如下配置:

1

2

3

4

5

listeners=PLAINTEXT://云服务器内网ip:9092(本地访问用本地ip)

# 如果要提供外网访问则必须配置此项

advertised.listeners=PLAINTEXT://云服务器公网ip:9092(若要远程访问需配置此项为云服务器的公网ip)

# zookeeper连接地址,集群配置格式为ip:port,ip:port,ip:port

zookeeper.connect=云服务器公网ip:2181

开放云服务器端口

在云服务器控制台内进入安全组页面,添加两条新的入站规则,tcp/9092和tcp/2181

开放linux防火墙端口

先查看使用的防火墙类型iptables/firewalld

iptables操作命令

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

1.打开/关闭/重启防火墙

 

开启防火墙(重启后永久生效):chkconfig iptables on

 

关闭防火墙(重启后永久生效):chkconfig iptables off

 

开启防火墙(即时生效,重启后失效):service iptables start

 

关闭防火墙(即时生效,重启后失效):service iptables stop

 

重启防火墙:service iptables restartd

 

2.查看打开的端口

 

/etc/init.d/iptables status

3.开启端口

 

iptables -A INPUT -p tcp --dport 8080 -j ACCEPT

4.保存并重启防火墙

/etc/rc.d/init.d/iptables save

/etc/init.d/iptables restart

Centos7默认安装了firewalld,如果没有安装的话,可以使用 yum install firewalld firewalld-config进行安装。

操作指令如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

1.启动防火墙

 

systemctl start firewalld

2.禁用防火墙

 

systemctl stop firewalld

3.设置开机启动

 

systemctl enable firewalld

4.停止并禁用开机启动

 

sytemctl disable firewalld

5.重启防火墙

 

firewall-cmd --reload

 

6.查看状态

 

systemctl status firewalld或者 firewall-cmd --state

7.在指定区域打开端口(记得重启防火墙)

 

firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

打开tcp/9092和tcp/2181这两个端口后,重启防火墙,并查看开放的端口确实生效。

启动kafka服务

cd命令进入kafka_2.12-3.2.0目录下,执行

1

bin/zookeeper-server-start.sh config/zookeeper.properties

启动zookeeper,不加-daemon方便排除启动错误,新建一个shell窗口,进入该目录再执行

1

bin/kafka-server-start.sh config/server.properties

启动kafka,若打印日志未报错,若未出现error日志,说明启动成功。

测试单机连通性

1

2

3

4

5

6

7

8

9

查询kafka下所有的topic

bin/kafka-topics.sh --list --zookeeper ip:port

因为kafka使用zookeeper作为配置中心,一些topic信息需要查询该kafka对应的zookeeper

创建topic

bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test

开启生产者

bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test

开启消费者

bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test

Springboot连接kafak

在pom.xml文件中引入kafka依赖

1

2

3

4

5

6

7

8

9

10

<dependency>

            <groupId>org.springframework.kafka</groupId>

            <artifactId>spring-kafka</artifactId>

            <version>2.9.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>3.2.0</version>

        </dependency>

在application.yml配置文件中配置kafka

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

server:

  port: 8080

 

spring:

  kafka:

    bootstrap-servers: 云服务器外网ip地址:9092

    producer: # 生产者

      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送

      batch-size: 16384

      buffer-memory: 33554432

      acks: 1

      # 指定消息key和消息体的编解码方式

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:

      group-id: default-group

      enable-auto-commit: false

      auto-offset-reset: earliest

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:

      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交

      # RECORD

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

      # BATCH

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

      # TIME

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

      # COUNT

      # TIME | COUNT 有一个条件满足时提交

      # COUNT_TIME

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交

      # MANUAL

      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种

      # MANUAL_IMMEDIATE

      ack-mode: manual_immediate

生产者

1

2

3

4

5

6

7

8

9

10

11

12

13

@RestController

public class KafkaController {

    private final static String TOPIC_NAME = "test-topic";

 

    @Autowired

    private KafkaTemplate<String, String> kafkaTemplate;

 

    @RequestMapping("/send")

    public String send(@RequestParam("msg") String msg) {

        kafkaTemplate.send(TOPIC_NAME, "key", msg);

        return String.format("消息 %s 发送成功!", msg);

    }

}

消费者

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

@Component

public class DemoConsumer {

    /**

     * @param record record

     * @KafkaListener(groupId = "testGroup", topicPartitions = {

     * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),

     * @TopicPartition(topic = "topic2", partitions = "0",

     * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))

     * },concurrency = "6")

     * //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数

     */

    @KafkaListener(topics = "test-topic", groupId = "testGroup1")

    public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {

        String value = record.value();

        System.out.println("testGroup1 message: " + value);

        System.out.println("testGroup1 record: " + record);

        //手动提交offset,一般是提交一个banch,幂等性防止重复消息

        // === 每条消费完确认性能不好!

        ack.acknowledge();

    }

 

    //配置多个消费组

    @KafkaListener(topics = "test--topic", groupId = "testGroup2")

    public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {

        String value = record.value();

        System.out.println("testGroup2 message: " + value);

        System.out.println("testGroup2 record: " + record);

        //手动提交offset

        ack.acknowledge();

    }

}

使用swagger测试发送消息

控制台打印消息


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。

您可能感兴趣的文章 :

原文链接 : https://www.cnblogs.com/ndchao/archive/2022/11/14/chaos1.html
相关文章
  • Ansible Galaxy命令的使用实践示例介绍
    Ansible Galaxy 是 Ansible 官方 Roles 资源库(galaxy.ansible.com),在 Galaxy 平台上所有人可以分享 ansible 功能模块 为什么要用 ansible-galaxy 作为一个入
  • 云服务器(Linux)安装部署Kafka的详细过程

    云服务器(Linux)安装部署Kafka的详细过程
    云服务器(Linux)安装部署Kafka 前期准备 kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。 下载安装包 官网地
  • 网站https访问是443端口还是433端口

    网站https访问是443端口还是433端口
    https默认端口号是443 https是以安全为目标的http通道,简单讲是http的安全,即http下加入SSL层,https的安全基础是SSL,因此加密权的详细内容就
  • 关于HTTPS端口443的技术介绍(什么是443端口)
    443端口是用来保证客户和服务器之间的通信安全。 本文将重点介绍HTTPS 443端口,它是如何工作的,它保护什么,以及为什么我们需要它。
  • ElasticSearch事件查询语言EQL操作
    EQL的全名是Event Query Language (EQL)。事件查询语言(EQL)是一种用于基于事件的时间序列数据(例如日志,指标和跟踪)的查询语言。在Elast
  • aarch64服务器部署mysql的流程介绍

    aarch64服务器部署mysql的流程介绍
    aarch64服务器-部署mysql aarch64服务器-部署nacos 1、创建工作目录 1 mkdir -p /apps/mysql/{mydir,datadir,conf,source} 2、编写docker-compose.yaml 1 2 3 4 5 6 7 8 9 1
  • 解决Navicat连接服务器不成功的问题(Access denied

    解决Navicat连接服务器不成功的问题(Access denied
    出现的原因一般是服务器的root用户没有开启访问权限,一般来说值允许本地的访问。 解决方法: 一:第一种方法 1、首先打开xshell连接服务
  • Elasticsearch6.2服务器升配后的bug(避坑指南)

    Elasticsearch6.2服务器升配后的bug(避坑指南)
    本篇文章记录最近一次生产服务器硬件升级之后引起集群不稳定的现象,希望可以帮到有其它人避免采坑。 一、问题描述 升级后出现的异常
  • 使用Ubuntu搭建DNS服务器

    使用Ubuntu搭建DNS服务器
    一、重点说明/etc/bind/named.conf.options配置文件 在进行bind9服务器配置时,/etc/bind/named.conf.options是十分关键的配置文件,它决定着DNS服务器是否
  • Flink 侧流输出源码示例介绍

    Flink 侧流输出源码示例介绍
    Flink 的 side output 为我们提供了侧流(分流)输出的功能,根据条件可以把一条流分为多个不同的流,之后做不同的处理逻辑,下面就来看下
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计