① Kafka partition的数量问题
kafka的每个topic都可以创建多个partition,partition的数量无上限,并不局数会像replica一样受限于broker的数量,因此partition的数量可以随心所欲的设置。那确定partition的数量就需要思考一些权衡因素。
越多的partition可以提供更高的吞吐量
在kafka中,单个partition是kafka并行操作的最小单元。每个partition可以独立接收推送的消息以及被consumer消费,相当于topic的一个子通道,partition和topic的关系就像高速公路的车道和高速公路的关系一样,起始点和终点相同,每个车道都可以独立实现运输,不同的是kafka中不存在车辆变道的说法,入口时选择的车道需要从一而终。而kafka的吞吐量显而易见,在资源足够的情况下,partition越多速度越快。
这里提到的资源充足解释一下,假设我现在一个partition的最大传输速度为p,目前kafka集群共有三个broker,每个broker的资源足够支撑三个partition最大速度传输,那我的集群最大传输速度为3*3*p=9p,假设在不增加资源的情况下将partition增加到18个,每个partition只能以p/2的速度传输数据,因此传输速度上限还是9p,并不能再提升,因此吞吐量的设计需要考虑broker的资源上限。当然,kafka跟其他集群一样,可以横向扩展,再增加三个相同资源的broker,那传输速度即可达到18p。
越多的分区需要打开更多的文件句柄
在kafka的broker中,每个分区都会对照着文件系统的一个目录。
在kafka的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。因此,随着partition的增多,需要的文件句柄数急剧增加,必要时需要调整操作系统允许打开的文件句柄数。
更多的分区会导致端对端的延迟
kafka端对端的延迟为procer端发布消息到consumer端消费消息所需的时间,即consumer接收消息的时间减去proce发布消息的时间。kafka在消息正确接收后才会暴露给消费者,即在保证in-sync副本复制成功之后才会暴露,瓶颈则来自于此。在一个broker上的副本从其他broker的leader上复制数据的时候只会开启一个线程,假设partition数量为n,每个副本同步的时间为1ms,那in-sync操作完成所需的时间即n*1ms,若n为10000,则需要10秒才能返回同步状态,数据才能暴露给消费者,这就导致了较大的端对端的延迟。
越多的partition意味着需要更多的内存
在新版本的kafka中可以支持批量提交和批量消费,而设置了批量提交和批量消费后,每个partition都会需要一定的内存空间。假设为100k,当partition为100时,procer端和consumer端都需要10M的内存;当partition为100000时,procer端和consumer端则都需要10G内存。无限的partition数量很快就会占据大量的内存,造成性能瓶颈。
越多的partition会导致更长时间的恢复期
kafka通过多副本复制技术,实现kafka的高可用性和稳定性。每个partition都会有多个副本存在于多个broker中,其中一个副本为leader,其余的为follower。当kafka集群其中一个broker出现故障时,在这个broker上的leader会需要在其他broker上重新选择一个副本启动为leader,这个过程由kafka controller来完成,主要是从Zookeeper读取和修改受影响partition的一些元数据信息。
通常情况下,当一个broker有计划的停机上,该broker上的partition leader会在broker停机前有次序顷稿的一一移走,假设移走雀腊孝一个需要1ms,10个partition leader则需要10ms,这影响很小,并且在移动其中一个leader的时候,其他九个leader是可用的,因此实际上每个partition leader的不可用时间为1ms。但是在宕机情况下,所有的10个partition
leader同时无法使用,需要依次移走,最长的leader则需要10ms的不可用时间窗口,平均不可用时间窗口为5.5ms,假设有10000个leader在此宕机的broker上,平均的不可用时间窗口则为5.5s。
更极端的情况是,当时的broker是kafka controller所在的节点,那需要等待新的kafka leader节点在投票中产生并启用,之后新启动的kafka leader还需要从zookeeper中读取每一个partition的元数据信息用于初始化数据。在这之前partition leader的迁移一直处于等待状态。
总结
通常情况下,越多的partition会带来越高的吞吐量,但是同时也会给broker节点带来相应的性能损耗和潜在风险,虽然这些影响很小,但不可忽略,因此需要根据自身broker节点的实际情况来设置partition的数量以及replica的数量。
② 什么是kafka
Kafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低时延的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务水平和最关键指标之一。
基本工作流程如上图所示,其中:
我们看上面的架构图中,procer就是生产者,是数据的入口。注意看图中的红色箭头,Procer在写入数据的时候 永远的找leader ,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:
发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行神世伏同步的!procer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证 同一分区 内的数据是有序的!写入示意图如下:
上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,procer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
保证消息不丢失是一个消息队列中间件的基本保证,那procer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为 0 、 1 、 all 。
最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。
Procer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
前面说过了每个topic都可以分为一个或多个partition,游携如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
上面说到log文件就实际是存储message的地方,我们在procer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高返绝kafka的性能!
消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与procer相同的是,消费者在拉取消息的时候也是 找leader 去拉取。
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!如下图:
图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议 消费者组的consumer的数量与partition的数量一致 !
kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.
除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于procer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据).
其实对于procer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式
kafka集群中的任何一个broker,都可以向procer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当procer获取到metadata信息之后, procer将会和Topic下所有partition leader保持socket连接;消息由procer直接通过socket发送到broker,中间不会经过任何"路由层".
异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当procer失效时,那些尚未发送的消息将会丢失。
其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级。
kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.
Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。
最少1次:可能会重传数据,有可能出现数据被重复处理的情况;
最多1次:可能会出现数据丢失情况;
恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。
at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".
at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once".
"Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:
最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。
kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.
选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.
每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.
Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.
Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.
Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)
当consumer启动时,所触发的操作:
A) 首先进行"Consumer id Registry";
B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
总结:
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。
Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。
一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。
实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:
这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。
Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Procer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。
③ windows系统下启动kafka CMD报错:输入行太长,语法错误
在windows系孝和伍统用cmd输命令:.\bin\windows\kafka-server-start.bat .\config\server.properties 启动kafka居然棚亮报错:输入行太长,语法错误,折磨了我一个小时。
在网上查到一个解决方法: kafka目录不要巧或建太深,直接在放在D盘
④ Kafka 客户端开启压缩
需要注意的是,
1. borker / server 默认允许的最大消息大小是 1M,过大的消息会被拒
2. 1M 是包括压缩之后的大小,因此 procer/client 如果开启压缩,扮隐核将大于 1M 的数据厅掘压缩至小于 1M 发送即可
3. 如果修改 broker 端的 message.max.bytes 大小,需要修改消费者、follower fetch 的大小与之匹配,并且允许较大的消息对性能有较大影响携塌
1. 允许发的数据 > 1M
2. 开启压缩
⑤ kafka too many open files的解决方法
在生产环境中,为了方便将kafka做成了一个服务,使用systemctl start kafka,kafka用户来对kafka进行启动,可是在最近的一次升级中启动应用时,kafka出现too many open files的雹明报错并且宕机,
在linux系统下每一个进程都会有其相应的文件打开限制,可以使用cat /proc/<pid>/limits来进行查看。使用命令行启动的应用会共用执行该命令用户的文件打开数,若是将其做成一个服务,如果不经过相应的配置,那么该进程默认的文件打开数为4096,所以解决该问题不仅需要提高kafka用户的文件打开数,而且需要在启动脚本中配置文件打开数,
1)进入kafka用户中,执行ulimit -n可以查看kafka用户的文件打开数
2)若是该文件打开数小,那么需要在/etc/security/limits文件中添加:
kafka soft nofile <文件打开数>
kafka hard nofile <文件打开数>
3)也可以使用ulimit -n <文件打开数>来进肢历行临时配置
1)kafka.service脚本如下
[Unit]
Description=kafka service
Requires=zookeeper.service
After=zookeeper.service
[Service]
ExecStart=/data/apps_data/kafka/bin/kafka-server-start.sh /data/apps_data/kafka/config/server.properties
ExecStop=/home/kafka/bin/kafka-server-stop.sh
Type=simple
User=kafka
Group=kafka
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
[Unit]
Description=kafka service
Requires=zookeeper.service
After=zookeeper.service
[Service]
ExecStart=/data/apps_data/kafka/bin/kafka-server-start.sh /data/apps_data/kafka/config/server.properties
ExecStop=/home/kafka/bin/kafka-server-stop.sh
Type=simple
User=kafka
Group=kafka
LimitNOFILE=<文件打开数>
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
3)更改往后执行systemctl daemon-reload重载脚本源饥告
4)重启kafka生效
⑥ 启动kafka报错,应该怎么解决
Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念察察州,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。Kafka架构与安全首先,我们来了解下有关Kafka的几个基本概念:Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。Procer:向Topic发布消息的进程称为Procer。Consumer:从Topic订阅消息的进程称为Consumer。Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Procer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Procer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance. Broker接收和发送消息是被动的:由Procer主动发送消息,Consumer主动拉取消息。然而,分析Kafka框架,我们会发现以下严重的安全问题:1.网络败蔽中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Procer的消息,能够篡改消息并发送给Consumer。2.网络中的任何一台主机,都可以启动恶意的Procer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。3.Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。4.Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Procer)都能对任意Topic读取(或发送)消息。随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。Kafka安全设计基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。基于Kerberos的身份机制如下图所示:Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。Procer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:1.Procer向KDC认证身份,通过则得到没冲TGT(票证请求票证),否则报错退出2.Procer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Procer返回SessionKey(会话密钥)和ServiceTicket(服务票证)3.Procer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Procer通信的SessionKey,然后使用SessionKey验证Procer的身份,通过则建立连接,否则拒绝连接。ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。构建安全的Kafka服务首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示:其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab.认证模式为ipaddress时,Procer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:public class SecureProcer extends Thread {private final kafka.javaapi.procer.Procer<Integer, String> procer;private final String topic;private final Properties props = new Properties();public SecureProcer(String topic) { AuthenticationManager.setAuthMethod(“kerberos”); AuthenticationManager.login(“procer1″, “/etc/procer1.keytab”);props.put(“serializer.class”, “kafka.serializer.StringEncoder”);props.put(“metadata.broker.list”,“172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″);// Use random partitioner. Don’t need the key type. Just set it to Integer. // The message is of type String. procer = new kafka.javaapi.procer.Procer<Integer, String>(new ProcerConfig(props));this.topic = topic;
⑦ kafka 单机/集群压力测试
由于kafka吞吐量特别大,所以先考虑集群服务器的自身瓶颈,因为现在测试的是单机所以只会涉及到磁盘IO以及cpu,但是对于kafka来说对于cpu的使用还是可以忽略不计的,
1.1磁盘IO写入瓶颈
使用以下命令测试磁盘IO的写入瓶颈
sync;time -p bash -c "(dd if=/dev/zero of=test.dd bs=1M count=20000)"
说明: 在当前目录下创建一个test.dd的文件,写入20000个1M的数据
磁盘写入IO的结果
可以看到平均就是187MB/s
1.2 使用iostat命令监测磁盘io情况
使用命令
# iostat -x 1
说明: 扩展查看io性能,每秒刷新一次
注意: 如果没有iostat,请执行 yum install sysstat -y 进行安装 iostat命令
关注wkB/s和%util两个参数
wkB/s:每秒写入设备的数据量(单位:KB)
%util:消耗在I/O请求中的CPU时间百分比乱谨(设备带宽利用率)。如果该值接近100%说明设备出现了瓶颈。
如图现在这台机器的磁盘IO极限值为187MB/s
1.3 单机版测试kafka性能
因为测试的次数比较多,也没有去找kafka中数据存储设置,所以就使用docker部署单机版的kafka (因为测试的数据比较多,也就多次的删除了容器,重新启动镜像)
新建目录:
mkdir /usr/local/kafka_test
dockerfile
run.sh
sources.list
目录结构如下:
生成镜像
docker build -t kafka_test /usr/local/kafka_test
启动kafka
docker run -d -it kafka_test
测试结果庆贺
从表格中可以看出来五个分区就已经是极限了
结果分析
这中间并没有设置条数/每秒,所以就是按照kafka 就会按照量级自动的吞入数据,如果我们需要对于消息的即时性做控制,还需要再重新测试一下,按照业务的延迟找到最合适的数量(单机版,然后再部署集群,测试适合的数量)
集群测试:
部署就不再这里说明了
本次测试的是三台哗差基机器集群
测试结果:
之后还测试了9个分区的topic 因为空间不足所以就没有继续测下去,但是看部分数据还超过了500MB/s还是有上升空间的
1.3 磁盘IO 读取瓶颈
使用一下命令测试磁盘IO的读取瓶颈
hdparm -tT --direct /dev/vda
说明: hdparm命令是显示与设定硬盘的参数, -t参数为评估硬盘的读取效率(不经过磁盘cache), -T参数为评估硬盘的读取效率(经过磁盘cache).
⑧ 修改Kafka的启动内存
1.修改 bin 目录下的 zookeeper-server-start.sh ,将初始堆的大小(-Xms)设置前搏腊小一些
这一慧滑个是Kafka初始化银圆LogManager时候用到的buffer size
⑨ kafka原理分析
作为一款典型的消息中间件产品,kafka系统仍然由procer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简派薯单介绍如下:
当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。
__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为 {topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsets.topic.num.partitions(默认值为50)和offsets.topic.replication.factor(默认值为1)参数配置。我们通过公式 hash(group id) % offsets.topic.num.partitions 就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact操作,只保留最新的一次提交offset。
group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。
跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,2.8版本之前通腔销过zookeeper进行选主,2.8版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)
当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供procer和consumer查询获取。
因为只有partition的leader副本才会处理procer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本伍羡游分配时需要使partition的分布情况是如下这样的:
在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。
举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。
1)没有配置broker.rack的情况
现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:
2)配置了broker.rack的情况
假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:
kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本
kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。
当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个unclean.leader.election配置参数,它的默认值为true。当unclean.leader.election值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当unclean.leader.election值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。
当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。
为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:
1)follower副本处于活跃状态,与zookeeper(2.8之前版本)或kafka raft master之间的心跳正常
2)follower副本最近replica.lag.time.max.ms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。
follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式: follower HW <= leader HW <= follower LEO <= leader LEO 。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。
kafka的消息内容存储在log.dirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘log.dirs目录下的一个单独的目录下,目录命名规范为 ${topicName}-${partitionId} ,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为: {baseOffset}.index)和一个时间戳索引文件(命名规范为:${baseOffset}.timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。.index文件存储的是消息的offset到该消息在相应.log文件中的偏移,便于快速在.log文件中快速找到指定offset的消息。.index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。.timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。
可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,log.dirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。
kafka提供了两个参数log.segment.bytes和log.segment.ms来控制LogSegment文件的大小。log.segment.bytes默认值是1GB,当LogSegment大小达到log.segment.bytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。log.segment.ms表示最大多长时间会生成一个新的LogSegment,log.segment.ms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。
kafka还提供了log.retention.ms和log.retention.bytes两个参数来控制消息的保留时间。当消息的时间超过了log.retention.ms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了log.retention.bytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在log.retention.bytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过log.retention.ms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略
当我们使用KafkaProcer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProcerRecord对象就可以通过KafkaProcer的send()向kafka发送消息了,而且是线程安全的。KafkaProcer支持通过三种消息发送方式
KafkaProcer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProcer的内部结构如下所示:
从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProcer.send()方法的应用程序线程,因为KafkaProcer.send()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProcer实例时,会创建一个Sender线程,通过该KafkaProcer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProcer.send()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。
消息的发送过程如下:
在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumer.wakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。
跟procer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。
我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:
我们可以通过实现接口org.apache.kafka.clients.consumer.internals.PartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。
partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数auto.offset.reset值确定consumer消费的其实offset。如果auto.offset.reset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumer.seek()方法人工设置消费的起始offset。
kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在.index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetch.min.bytes和max.partition.fetch.bytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetch.max.wait.ms告诉broker即使消息大小没有达到fetch.min.bytes值,在收到请求后最多等待fetch.max.wait.ms时间后,也将当前消息返回给consumer。fetch.min.bytes默认值为1MB,待fetch.max.wait.ms默认值为500ms。
为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了procer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。
为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。
kafka提供了多种offset提交方式
partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。
kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。
触发再平衡的条件包括:
需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡
有两种情况与日常应用开发比较关系比较密切:
consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。
我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。
以下是保证kafka吞吐量大的一些设计考虑:
但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:
所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下procer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。
我们通过procer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是procer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。
kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:
kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。
如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数max.in.flight.requests.per.connection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。
⑩ Kafka(四)集群之kafka
在章节二( http://img.esxun.cn/202305/29/12/yapzpfbxpnq )中,我们部署了单机的kafka,现在我们部署一套集群模式的kafka。
这里我准备了三台虚拟机:
192.168.184.134
192.168.184.135
192.168.184.136
每台机器部署一个zk和kafka。
上一章节中zk集群已经神中部署完毕。
在章节二中,134这台机器已经有kafka存在了,我们在另外两台机器上安装kafka:
在上面的文件中有几个关键点,我们一一进行配置,我会对配置中的说明翻译:
以下这两个listeners,advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。
在内网中我们使用listenners就可以了,在docker等容器或云中使用advertised。游判山
下面这个是日志路径的配置
下面这个是个重点的东西,topic在磁盘上会分为多个partitions存储,相比单一文件存储,增加了并行性,在后续文章中会详细去讲解:
日志的保存时间:
以下是zookeeper的配置:
这里我们直接设置后台启动,三个节点都是如此:
这里面有个小坑,还记得之前我们搭建的单机环境吗?那时候默认的日志文件夹在/tmp/kafka-logs下面,生成了很多内容,导致我们134这个节点无法启动成功,报错如下:
解决这个问题只需要把/tmp/kafka-logs文件删除就好了。
看到日志出现这一句表明启动成功了:
下面我们验证下是否搭建成功了,首先使用kafkatool工机具连接看下:
我们在134节点创建一个topic:
查看topic列表:
在kafkatool中查看:
创建生产者:
创建消费者:
生成者发送冲游消息:
消费者接收消息:
到此为止,kafka的集群搭建已经完成了。在后面的文章我们会去学习如何在springboot中集成kafka。
声明:易商讯尊重创作版权。本文信息搜集、整理自互联网,若有来源标记错误或侵犯您的合法权益,请联系我们。我们将及时纠正并删除相关讯息,非常感谢!