Kafka搭建(集群版)

Kafka单机版

部署前提

VMware环境 : 两台centos系统

Jdk包:jdk-8u202-linux-x64.tar.gz
Kafka包:kafka_2.12-3.5.0.tgz
Zookeeper包:apache-zookeeper-3.7.2-bin.tar.gz

百度网盘自取: 链接: https://pan.baidu.com/s/11EWuhBoSmH3musd_3Rgodw?pwd=e32t 提取码: e32t

Kafka搭建(集群版)

创建服务器

  • 修改克隆机主机名,以下以kafka-broker1举例说明
#使用root用户登录
#修改主机名
vim /etc/hostname
kafka-broker1 #末尾追加
  • 配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts
# 修改主机名称映射

vim /etc/hosts

添加如下内容:
192.168.233.106 kafka-broker1
192.168.233.107 kafka-broker2
  • 重启克隆机kafka-broker1
# 重启
reboot
  • 其他机器也同样配置

以下操作默认在kafka-broker1里操作

创建xsync文件

cd /root
mkdir bin
cd bin
vim xsync
  • 增加如下内容
#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi

#2. 遍历集群所有机器
for host in kafka-broker1 kafka-broker2
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $@
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done

  • 修改xsync文件权限
# 进入/root/bin目录
cd /root/bin
# 修改权限
chmod 777 xsync

SSH无密登录配置

分发文件时,需要通过脚本切换主机进行指令操作,切换主机时,是需要输入密码的,每一次都输入就显得有点麻烦,所以这里以虚拟机kafka-broker1为例配置SSH免密登录(其他节点执行同样步骤即可),配置完成后,脚本执行时就不需要输入密码了。

生成公钥和私钥

# 公钥和私钥
ssh-keygen -t rsa

#然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
  • 将公钥拷贝到要免密登录的目标机器上,拷贝过程需要输入目标机器密码
# ssh-copy-id 目标机器
ssh-copy-id kafka-broker1
ssh-copy-id kafka-broker2

安装JDK1.8

卸载现有JDK

  • 不同节点都要执行操作

    rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

上传Java压缩包

  • 将jdk-8u202-linux-x64.tar.gz文件上传到虚拟机的/opt/software目录中

解压Java压缩包

  • 进入/opt/software目录

    cd /opt/software/

  • 解压缩文件到指定目录

    tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

  • 进入/opt/module目录

    cd /opt/module

  • 改名

    mv jdk1.8.0_212/ java

配置Java环境变量

  • 新建 /etc/profile.d/my_env.sh文件

    vim /etc/profile.d/my_env.sh

  • 添加内容

    #JAVA_HOME
    export JAVA_HOME=/opt/module/java
    export PATH=$PATH:$JAVA_HOME/bin
    
  • 让环境变量生效

    source /etc/profile.d/my_env.sh

安装测试

java -version

分发软件

  • 分发环境变量文件

    xsync /etc/profile.d/my_env.sh

  • 进入/opt/module路径

    cd /opt/module

  • 调用分发脚本将本机得Java安装包分发到其他两台机器

    xsync java

在每个节点让环境变量生效

安装ZooKeeper

上传ZooKeeper压缩包

将apache-zookeeper-3.7.2-bin.tar.gz文件上传到三台虚拟机的/opt/software目录中

解压ZooKeeper压缩包

  • 进入到/opt/software目录中

    cd /opt/software/

  • 解压缩文件到指定目录

    tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/module/

  • 进入/opt/module目录

    cd /opt/module

  • 文件目录改名

    mv apache-zookeeper-3.7.1-bin/ zookeeper

配置服务器编号

  • 在/opt/module/zookeeper/目录下创建zkData

  • 进入/opt/module/zookeeper目录

    cd /opt/module/zookeeper

  • 创建zkData文件目录

    mkdir zkData

  • 创建myid文件

  • 进入/opt/module/zookeeper/zkData目录

    cd /opt/module/zookeeper/zkData

  • 创建myid文件

    vim myid

  • 在文件中增加内容

    1

修改配置文件

  • 重命名/opt/module/zookeeper/conf目录下的zoo_sample.cfg文件为zoo.cfg文件

  • 进入cd /opt/module/zookeeper/conf文件目录

    cd /opt/module/zookeeper/conf

  • 修改文件名称

    mv zoo_sample.cfg zoo.cfg

  • 修改文件内容

    vim zoo.cfg

  • 修改zoo.cfg文件

以下内容为修改内容
# **以下内容为修改内容**
dataDir=/opt/module/zookeeper/zkData

 
#以下内容为新增内容

####################### cluster ##########################
# server.A=B:C:D
#
# A是一个数字,表示这个是第几号服务器
# B是A服务器的主机名
# C是A服务器与集群中的主服务器(Leader)交换信息的端口
# D是A服务器用于主服务器(Leader)选举的端口
#########################################################

server.1=kafka-broker1:2888:3888
server.2=kafka-broker2:2888:3888

设置防火墙端口

firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent
firewall-cmd --reload
firewall-cmd --zone=public --list-ports

启动ZooKeeper

在每个节点下执行如下操作
  • 进入zookeeper目录

    cd /opt/module/zookeeper

  • 启动ZK服务

    bin/zkServer.sh start

关闭ZooKeeper

在每个节点下执行如下操作
  • 进入zookeeper目录

    cd /opt/module/zookeeper

  • 关闭ZK服务

    bin/zkServer.sh stop

查看ZooKeeper状态

在每个节点下执行如下操作
  • 进入zookeeper目录

    cd /opt/module/zookeeper

  • 查看ZK服务状态

    bin/zkServer.sh status

分发软件

  • 进入/opt/module路径

    cd /opt/module

  • 调用分发脚本将本机得ZooKeeper安装包分发到其他机器

    xsync zookeeper

分别将不同虚拟机/opt/module/zookeeper/zkData目录下myid文件进行修改

vim /opt/module/zookeeper/zkData/myid
# kafka-broker1:1
# kafka-broker2:2

启停脚本

ZooKeeper软件的启动和停止比较简单,但是每一次如果都在不同服务器节点执行相应指令,也会有点麻烦,所以我们这里将指令封装成脚本文件,方便我们的调用。

  • 在虚拟机kafka-broker1的/root/bin目录下创建zk.sh脚本文件

  • 在/root/bin这个目录下存放的脚本,root用户可以在系统任何地方直接执行

  • 进入/root/bin目录

    cd /root/bin

  • 创建zk.sh脚本文件

    vim zk.sh

在脚本中增加内容:
#!/bin/bash
case $1 in
"start"){
        for i in kafka-broker1 kafka-broker2
        do
        echo ---------- zookeeper $i 启动 ------------
                ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
        done
};;
"stop"){
        for i in kafka-broker1 kafka-broker2
        do
        echo ---------- zookeeper $i 停止 ------------
                ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
        done
};;
"status"){
        for i in kafka-broker1 kafka-broker2
        do
        echo ---------- zookeeper $i 状态 ------------
                ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
        done
};;
esac
增加脚本文件权限
  • 给zk.sh文件授权

    chmod 777 zk.sh

脚本调用方式
  • 启动ZK服务

    zk.sh start

  • 查看ZK服务状态

    zk.sh status

  • 停止ZK服务

    zk.sh stop

安装Kafka

上传Kafka压缩包

将kafka_2.12-3.5.0.tgz文件上传到虚拟机的/opt/software目录中

解压Kafka压缩包

  • 进入/opt/software目录

    cd /opt/software

  • 解压缩文件到指定目录

    tar -zxvf kafka_2.12-3.5.0.tgz -C /opt/module/

  • 进入/opt/module目录

    cd /opt/module

  • 修改文件目录名称

    mv kafka_2.12-3.5.0/ kafka

修改配置文件
  • 进入cd /opt/module/kafka/config文件目录

    cd /opt/module/kafka/config

  • 修改配置文件

    vim server.properties

  • 输入以下内容:

    #broker的全局唯一编号,每个服务节点不能重复,只能是数字。
    broker.id=1
    #broker对外暴露的IP和端口 (每个节点单独配置)
    advertised.listeners=PLAINTEXT://**kafka-broker1**:9092
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘IO的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/opt/module/kafka/datas
    #topic在当前broker上的分区个数
    num.partitions=1
    #用来恢复和清理data下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #每个topic创建时的副本数,默认时1个副本
    offsets.topic.replication.factor=1
    #segment文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个segment文件的大小,默认最大1G
    log.segment.bytes=1073741824
    #检查过期数据的时间,默认5分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
    zookeeper.connect=kafka-broker1:2181,kafka-broker2:2181/kafka
    

分发kafka软件

  • 进入 /opt/module目录

    cd /opt/module

  • 执行分发指令

    xsync kafka

按照上面的配置文件内容,在每一个Kafka节点进行配置

vim /opt/module/kafka/config/server.properties

配置环境变量

  • 修改 /etc/profile.d/my_env.sh文件

    vim /etc/profile.d/my_env.sh

  • 添加内容

    #KAFKA_HOME
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
  • 让环境变量生效

    source /etc/profile.d/my_env.sh

  • 分发环境变量,并让环境变量生效

    xsync /etc/profile.d/my_env.sh

  • 每个节点执行刷新操作

    source /etc/profile.d/my_env.sh

启动Kafka

启动前请先启动ZooKeeper服务

  • 进入/opt/module/kafka目录

    cd /opt/module/kafka

  • 执行启动指令

    bin/kafka-server-start.sh -daemon config/server.properties

关闭Kafka

  • 进入/opt/module/kafka目录

    cd /opt/module/kafka

  • 执行关闭指令

    bin/kafka-server-stop.sh

启停脚本

(1) 在虚拟机kafka-broker1的/root/bin目录下创建kfk.sh脚本文件,对kafka服务的启动停止等指令进行封装

  • 进入/root/bin目录

    cd /root/bin

  • 创建kfk.sh脚本文件

    vim kfk.sh

  • 在脚本中增加内容:

    #! /bin/bash
    case $1 in
    "start"){
        for i in kafka-broker1 kafka-broker2
        do
            echo " --------启动 $i Kafka-------"
            ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
        done
    };;
    "stop"){
        for i in kafka-broker1 kafka-broker2
        do
            echo " --------停止 $i Kafka-------"
            ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
        done
    };;
    esac
    
增加脚本文件权限
  • 给文件授权

    chmod 777 kfk.sh

脚本调用方式
  • 启动kafka

    kfk.sh start

  • 停止Kafka

    kfk.sh stop

注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

联合脚本

因为Kafka启动前,需要先启动ZooKeeper,关闭时,又需要将所有Kafka全部关闭后,才能关闭ZooKeeper,这样,操作起来感觉比较麻烦,所以可以将之前的2个脚本再做一次封装。

  • 在虚拟机kafka-broker1的/root/bin目录下创建xcall脚本文件

  • 进入/root/bin目录

    cd /root/bin

  • 创建xcall文件

    vim xcall

  • 在脚本中增加内容:

    #! /bin/bash
    
    for i in kafka-broker1 kafka-broker2
    do
        echo --------- $i ----------
        ssh $i "$*"
    done
    
增加脚本文件权限
  • 进入/root/bin目录

    cd /root/bin

  • 增加权限

    chmod 777 xcall

创建cluster.sh脚本文件
  • 进入/root/bin目录

    cd /root/bin

  • 创建cluster.sh脚本文件

    vim cluster.sh

  • 在脚本中增加内容:

    #!/bin/bash
    
    case $1 in
    "start"){
            echo ================== 启动 Kafka集群 ==================
    
            #启动 Zookeeper集群
            zk.sh start
    
            #启动 Kafka采集集群
            kfk.sh start
    
            };;
    "stop"){
            echo ================== 停止 Kafka集群 ==================
    
            #停止 Kafka采集集群
            kfk.sh stop
    
                    #循环直至 Kafka 集群进程全部停止
                    kafka_count=$(xcall jps | grep Kafka | wc -l)
                    while [ $kafka_count -gt 0 ]
                    do
                            sleep 1
                            kafka_count=$(xcall | grep Kafka | wc -l)
                echo "当前未停止的 Kafka 进程数为 $kafka_count"
                    done
    
            #停止 Zookeeper集群
            zk.sh stop
    };;
    esac
    
增加脚本文件权限
  • 进入/root/bin目录

    cd /root/bin

  • 增加权限

    chmod 777 cluster.sh

脚本调用方式

  • 集群启动

    cluster.sh start

  • 集群关闭

    cluster.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/772233.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入解读:如何解决微调扩散模型时微调数据集和训练数据集之间的差距过大问题?

Diffusion Models专栏文章汇总:入门与实战 前言:在微调扩散模型的时候经常会遇到微调数据集和训练数据集之间的差距过大,导致训练效果很差。在图像生成任务中并不明显,但是在视频生成任务中这个问题非常突出。这篇博客深入解读如何…

代码随想录算法训练营第69天:图论7[1]

代码随想录算法训练营第69天:图论7 109. 冗余连接II 卡码网题目链接(ACM模式)(opens new window) 题目描述 有向树指满足以下条件的有向图。该树只有一个根节点,所有其他节点都是该根节点的后继。该树除了根节点之外的每一个节…

5分钟读懂GPS-RTK实时动态技术,建议收藏!

由于”智慧工地“理念的兴起和发展,目前越来越多的企业将信息技术手段融合于施工现场安全管理,构建智能化的安全监管模式。基于此,蓝牙LORA融合定位技术、UWB超宽带定位技术、GPS-RTK定位技术等信息技术也越来越频繁出现在大众视野。然而&…

单片机软件架构连载(4)-结构体

枚举、指针、结构体,我愿称为C语言"三板斧"。 用人话来讲,几乎所有c语言高阶编程,都离不开这这3个知识点的应用。 今天站在实际产品常用的角度,给大家讲一下结构体。 1.结构体概念 结构体可以用来构建更复杂的数据结…

Diffusion模型的微调和引导

留意后续更新,欢迎关注微信公众号:组学之心 Diffusion模型的微调和引导 微调(fine-tuning): 从一个已经训练过的模型开始训练,我们就可以从一个学会如何“去噪”的模型开始训练,相对于随机初始…

c++:动态内存变量

典型的C面向对象编程 元素 (1)头文件hpp中类的定义 (2)源文件cpp中类的实现(构造函数、析构函数、方法) (3)主程序 案例 (1)用C来编程“人一天的生活” (2)“人”的属性:name、age、male (3)“人”的方法:eat、work(coding/shop…

【免费可视化工具】助力风电行业智能化管理

在绿色能源日益成为全球共识的今天,风电作为清洁能源的重要组成部分,正以前所未有的速度发展。然而,随着风电场规模的扩大和数量的增加,如何高效、直观地管理和监控风电资源成为了一个亟待解决的问题。 而山海鲸可视化这款免费可…

汉光联创HGLM2200N黑白激光多功能一体机加粉及常见问题处理

基本参数: 机器型号:HGLM2200N 产品名称:A4黑白激光多功能一体机 基础功能:打印、扫描、复印 打印速度:22页/分钟 纸张输入容量:150-249页 单面支持纸张尺寸:A4、A5、A6 产品尺寸&#x…

功能详解-电商接口丨电商API

随着电商平台各类机制日益成熟,电商接口逐渐被大家所熟知,淘宝、天猫、京东、拼多多、抖店、快手、小红书这些都是主流的电商平台,为了提升电商管理系统的效率,开发者可以通过电商接口将多个电商平台的数据和功能集成。 电商接…

firewalld高级配置

一、1、关于iptables的知识 IP数据包过滤系统是一种功能强大的工具,可用于添加、编辑和除去规则,这些规则是做数据包过滤决定时,防火墙所遵循和组成的规则。这些规则存储在专用的数据包过滤表中,而这些表集成在Linux内核中。在数据…

二手物品交易小程序的设计

管理员账户功能包括:系统首页,个人中心,用户管理,管理员管理,商品信息管理,论坛管理,收货地址管理,基础数据管理 微信端账号功能包括:系统首页,商品信息&…

ESIX挂载usb移动硬盘或者优盘并拷贝原数据存储数据

1、esxi支持和不支持分区格式 ESXi 6.*系列默认不支持NTFS、exFAT等常见文件系统,fat32支持但没意义仅小文件可以用,不过可以通过一些额外步骤和第三方工具来访问NTFS格式的存储设备,但生产环境不推荐这样做,需要安装第三方包。 e…

2024 年第十四届 APMCM 亚太地区大学生数学建模竞赛B题超详细解题思路+数据预处理问题一代码分享

B题 洪水灾害的数据分析与预测 亚太中文赛事本次报名队伍约3000队,竞赛规模体量大致相当于2024年认证杯,1/3个妈杯,1/10个国赛。赛题难度大致相当于0.6个国赛,0.8个妈杯。该比例仅供大家参考。 本次竞赛赛题难度A:B:C3:1:4&…

中霖教育:税务师考试报名现有职称怎么写?

【中霖教育怎么样】】【中霖教育好吗】 报考税务师考试的考生在报名期间需要登录税务师职业资格考试网上报名系统填写报名信息。 税务师报名现有职称按照实际情况填写会计助理、会计或者会计主管,没有工作的考生在填写工作信息的时候填写待业即可。 在报名阶段&a…

Linux_进程池

目录 1、进程池基本逻辑 2、实现进程池框架 3、文件描述符的继承 4、分配任务给进程池 5、让进程池执行任务 6、回收子进程 7、进程池总结 结语 前言: 在Linux下,进程池表示把多个子进程用数据结构的方式进行统一管理,在任何时候…

MATLAB和Python发那科ABB库卡史陶比尔工业机器人模拟示教框架

🎯要点 🎯模拟工业机器人 | 🎯可视化机器人DH 参数,机器人三维视图 | 🎯绘制观察运动时关节坐标位置、速度和加速度 | 🎯绘制每个关节处的扭矩和力 | 🎯图形界面示教机器人 | 🎯工业…

通过9大步骤,帮助企业在数字化转型中搭建数据分析的报表体系!

引言:在数字化转型中,企业搭建数据分析的报表体系是一个系统性的过程,需要综合考虑业务需求、数据来源、技术平台等多个方面。此外从报表生命周期的角度来说,从产生、使用以及最后消亡退出体系,都需要通盘考虑&#xf…

[数据集][目标检测]轮椅检测数据集VOC+YOLO格式13826张1类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):13826 标注数量(xml文件个数):13826 标注数量(txt文件个数):13826 标…

remix测试文件测试智能合约

remix内其实也是可以通过编写测试文件来测试智能合约的,需要使用插件自动生成框架以及测试结果。本文介绍一个简单的HelloWorld合约来讲解 安装插件多重检测: (solidity unit testing) 编译部署HelloWorld合约 // SPDX-License-…

在线图片转文字的软件,分享3种强大的软件!

在信息爆炸的时代,图片作为信息的重要载体之一,其内容往往蕴含着巨大的价值。然而,面对海量的图片信息,如何高效、准确地将其转化为文字,成为了许多人的迫切需求。今天,就为大家盘点几款功能强大的在线图片…