Docker

kafka安装和使用场景

kafka场景

摘要: 

1. kafka具备吞吐量大无限扩容的特点,相比起同类,它更适合需要无限扩容, 吞吐量要大(并发量很大) 的场景,比如日志大数据等。 

2. 本来也是个消息系统,所以可以做: 解耦,异步处理,流量削峰,消息队列 

场景: 

1)消息系统。Kafka作为一款优秀的消息系统,具有高吞吐量、内置的分区、备份冗余分布式等特点,为大规模 消息处理提供了一种很好的解决方案。 

2)应用监控。利用Kafka采集应用程序和服务器健康相关的指标,如CPU占用率、IO、内存、连接数、TPSQPS 等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用 KafkaELKElasticSearchLogstashKibana)整合构建应用服务监控系统。 

3)网站用户行为追踪。为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作 轨迹、内容等信息发送到Kafka集群上,通过HadoopSparkStrom等进行数据分析处理,生成相应的统计报告, 为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。 

4)流处理。需要将已收集的流数据提供给其他流式计算框架进行处理,用Kafka收集流数据是一个不错的选择, 而且当前版本的Kafka提供了Kafka Streams支持对流数据的处理。

5)持久性日志。Kafka可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份, Kafka为故障节点数据恢复提供了一种重新同步的机制。同时,Kafka很方便与HDFSFlume进行整合,这样就方便 Kafka采集的数据持久化到其他外部系统。 更多的场景主要是用来做日志分析系统,除了日志,网站的一些浏览数据应该也适用。(只要原始数据不需要直接存 DB的都可以) 使用kafka的核心理由: 分布式,高吞吐量,速度快(kafka是直接通过磁盘存储,线性读写,速度快)

2 kafka集群介绍 

微信截图_20220219232515.pngProducerProducer即生产者,消息的产生 者,是消息的入口。 kafka cluster BrokerBrokerkafka实例,每个服务器上有一个或多个 kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图 中的broker-0broker-1…… Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic在每个broker上都可以创建多个topic PartitionTopic的分区,每个topic可以有多个分区,分区的作用 是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹! Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候 会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于 Broker的数量,followerleader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自 己)。 Message:每一条发送的消息主体。 Consumer:消费者,即消息的消费方,是消息的出口。 Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费 者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高 kafka的吞吐量! Zookeeperkafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

工作流程分析 

上面介绍了kafka的基础架构及基本概念,不知道大家看完有没有对kafka有个大致印象,如果对还比较懵也没关 系!我们接下来再结合上面的结构图分析kafka的工作流程,最后再回来整个梳理一遍我相信你会更有收获! 2.2.1 发送数据 我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据 的时候永远的找leader,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下 图:

微信截图_20220219233149.png

kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是: 

1、 方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据 量。

2、 提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。 熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量 分发到不同的服务器,

那在kafka中,如果某个topic有多个partitionproducer又怎么知道该将数据发往哪个 partition呢?kafka中有几个原则: 

1 partition在写入的时候可以指定需要写入的partition,如果有指定,则 写入对应的partition

2、 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个 partition 

3、 如果既没指定partition,又没有设置key,则会轮询选出一个partition保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢 失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参 数来确定是否确认kafka接收到数据,这个参数可设置的值为01all 0代表producer往集群发送数据不需要 等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。 1代表producer往集群发送数据只要leader 应答就可以发送下一条,只确保leader发送成功。 all代表producer往集群发送数据需要所有的follower都完成 leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。 最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量 根据默认配置都是1

 保存数据 Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的 认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数 据(效率比随机写入高)。 Partition 结构 前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition 就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多 segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而indextimeindex文件为索引文件,用于检索消息。

Message结构 上面说到log文件就实际是存储message的地方,我们在producerkafka写入的也是一条一条的 message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offffset、压缩类型……等!我们重点需要知道的是下面三个: 1 offffsetoffffset是一个占8byte的有序id号,它可以唯一确定每条消息 parition内的位置! 2、 消息大小:消息大小占用4byte,用于描述消息的大小。 3、 消息体:消息体存 放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。 存储策略 无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢? 1、 基于时 间,默认配置是168小时(7天)。 2、 基于大小,默认配置是1073741824 需要注意的是,kafka读取特 定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

消费数据 消息存储在log文件后,消费者就可以进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是leader去拉取。 多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费 者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据

安装

需要装的docker容器有6个,3zookeeper容器和3kafka容器,这里我们装在一台机子上。实际场景中是6台机 子。节约的话也至少是3台(每台上1zookeeper1kafka)。

一 zookeeper 集群安装 

这里用zookeeper干啥 我们这里用zookeeper来做元数据/配置信息管理,具体包括:存储消费偏移量,topic话题信息,partition信息) 些部分组成。 当然zookeeper的功能肯定不止这么多,但我们这里只用到它这个功能!

Docker 配置

docker-compose-zookeeper-cluster.yml

version: '3.5'

networks:
  docker_net:
    external: true


services:
  zoo1:
    image: zookeeper
    restart: unless-stopped
    hostname: zoo10
    container_name: zoo11
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - ./zookeeper/zoo1/data:/data
      - ./zookeeper/zoo1/datalog:/datalog
    networks:
      - docker_net

  zoo2:
    image: zookeeper
    restart: unless-stopped
    hostname: zoo20
    container_name: zoo22
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - ./zookeeper/zoo2/data:/data
      - ./zookeeper/zoo2/datalog:/datalog
    networks:
      - docker_net

  zoo3:
    image: zookeeper
    restart: unless-stopped
    hostname: zoo30
    container_name: zoo33
    ports:
      - 2184:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
    volumes:
      - ./zookeeper/zoo3/data:/data
      - ./zookeeper/zoo3/datalog:/datalog
    networks:
      - docker_net

启动zookeeper集群

docker-compose -f docker-compose-zookeeper-cluster.yml up -d

查看集群

ZAB 算法中,存在 LeaderFollowerObserver 三种角色,举例查看zoo1的

docker exec -it zoo1 /bin/sh
#查看角色
zkServer.sh status

kafka安装

docker-compose-kafka-cluster.yml

version: '3.5'

networks:
  docker_net:
    external: true

services:

  kafka1:
    image: wurstmeister/kafka
    restart: unless-stopped
    container_name: kafka1
    ports:
      - "9093:9092"
    external_links:
      - zoo1
      - zoo2
      - zoo3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_HOST_NAME: 192.168.232.204                   ## 修改:宿主机IP
      KAFKA_ADVERTISED_PORT: 9093                                 ## 修改:宿主机映射port
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.232.204:9093    ## 绑定发布订阅的端口。修改:宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
    volumes:
      - "./kafka/kafka1/docker.sock:/var/run/docker.sock"
      - "./kafka/kafka1/data/:/kafka"
    networks:
      - docker_net


  kafka2:
    image: wurstmeister/kafka
    restart: unless-stopped
    container_name: kafka2
    ports:
      - "9094:9092"
    external_links:
      - zoo1
      - zoo2
      - zoo3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_HOST_NAME: 192.168.232.204                 ## 修改:宿主机IP
      KAFKA_ADVERTISED_PORT: 9094                               ## 修改:宿主机映射port
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.232.204:9094   ## 修改:宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
    volumes:
      - "./kafka/kafka2/docker.sock:/var/run/docker.sock"
      - "./kafka/kafka2/data/:/kafka"
    networks:
      - docker_net

  kafka3:
    image: wurstmeister/kafka
    restart: unless-stopped
    container_name: kafka3
    ports:
      - "9095:9092"
    external_links:
      - zoo1
      - zoo2
      - zoo3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_HOST_NAME: 192.168.232.204                ## 修改:宿主机IP
      KAFKA_ADVERTISED_PORT: 9095                              ## 修改:宿主机映射port
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.232.204:9095   ## 修改:宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
    volumes:
      - "./kafka/kafka3/docker.sock:/var/run/docker.sock"
      - "./kafka/kafka3/data/:/kafka"
    networks:
      - docker_net

  kafka-manager:
    image: sheepkiller/kafka-manager:latest
    restart: unless-stopped
    container_name: kafka-manager
    hostname: kafka-manager
    ports:
      - "9000:9000"
    links:            # 连接本compose文件创建的container
      - kafka1
      - kafka2
      - kafka3
    external_links:   # 连接本compose文件以外的container
      - zoo1
      - zoo2
      - zoo3
    environment:
      ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181                 ## 修改:宿主机IP
      TZ: CST-8
    networks:
      - docker_net

执行以下命令启动

docker-compose -f docker-compose-kafka-cluster.yml up -d

可以看到 kafka 集群已经启动成功。 这样,我们就成功安装了由3kafka3zookeeper组成的kafka集群。 并且还安装了kafka集群可视化管理工具: kafka-manag

访问192.168.232.204:9000,按图示添加相关配置

image.png

image.png

现在,我们进入集群,创建主题:

image.png

image.png

主题创建成功。 这样,我们就可以使用这个集群投入生产了!


 命令行生产与消费

1.命令创建主题如果在可视化工具里创建了就不用创建了

[root@204 ~]# docker exec -it kafka1 bash # 进入容器 
bash-4.4# cd /opt/kafka/ # 进入安装目录 
bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 # 查看主 题列表 __consumer_offsets 
bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 -- replication-factor 2 --partitions 3 --topic test # 新建主题 Created topic test.

2.生产消息

bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test

3.消费消息

bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning

php整合kafka

#新建目录 [root@201 kafka]# mkdir kafka 
#进入目录 [root@201 kafka]# cd kafka 
#下载组件包: [root@201 kafka1]# composer require nmred/kafka-php

生产端代码:

[root@201 kafka]# vim producer.php
<?php 
require './vendor/autoload.php'; 
date_default_timezone_set('PRC'); 
$config = \Kafka\ProducerConfig::getInstance(); 
$config->setMetadataRefreshIntervalMs(10000); 
$config->setMetadataBrokerList('192.168.232.204:9093'); 
$config->setBrokerVersion('1.0.0'); 
$config->setRequiredAck(1); 
$config->setIsAsyn(false); 
$config->setProduceInterval(500); 
$producer = new \Kafka\Producer(); 
for($i = 0; $i < 2; $i++) 
{ 
  $result = $producer->send([ [ 'topic' => 'test', 'value' => 'test1....message.', 'key' => '', ], ]); 
  var_dump($result);
 }

消费端:

[root@201 kafka]# vim consumer.php 
require './vendor/autoload.php'; 
date_default_timezone_set('PRC'); 
$config = \Kafka\ConsumerConfig::getInstance(); 
$config->setMetadataRefreshIntervalMs(10000); 
$config->setMetadataBrokerList('192.168.232.204:9093'); 
$config->setGroupId('test'); 
$config->setBrokerVersion('1.0.0'); 
$config->setTopics(array('test')); 
//$config->setOffsetReset('earliest'); 
$consumer = new \Kafka\Consumer(); 
#开启消费 
$consumer->start(function ($topic, $part, $message) { var_dump($message); //打印出获取的消息 });


(0)
分享:

本文由:xiasohu168.com 作者:xiaoshu发表,转载请注明来源!

相关阅读