Kafka broker配置介绍

Stella981
• 阅读 713

这部分内容对了解系统和提高软件性能都有很大的帮助,kafka官网上也给出了比较详细的配置详单,但是我们还是直接从代码来看broker到底有哪些配置需要我们去了解的,配置都有英文注释,所以每一部分是干什么的就不翻译了,都能看懂:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.server

import java.util.Properties
import kafka.utils.{Utils, ZKConfig}
import kafka.message.Message

/**
 * Configuration settings for the kafka server
 */
class KafkaConfig(props: Properties) extends ZKConfig(props) {
  /* the port to listen and accept connections on */
  val port: Int = Utils.getInt(props, "port", 6667)

  /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
  val hostName: String = Utils.getString(props, "hostname", null)

  /* the broker id for this server */
  val brokerId: Int = Utils.getInt(props, "brokerid")
  
  /* the SO_SNDBUFF buffer of the socket sever sockets */
  val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
  
  /* the SO_RCVBUFF buffer of the socket sever sockets */
  val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
  
  /* the maximum number of bytes in a socket request */
  val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))

  /* the maximum size of message that the server can receive */
  val maxMessageSize = Utils.getIntInRange(props, "max.message.size", 1000000, (0, Int.MaxValue))

  /* the number of worker threads that the server uses for handling all client requests*/
  val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))
  
  /* the interval in which to measure performance statistics */
  val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
  
  /* the default number of log partitions per topic */
  val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
  
  /* the directory in which the log data is kept */
  val logDir = Utils.getString(props, "log.dir")
  
  /* the maximum size of a single log file */
  val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))

  /* the maximum size of a single log file for some specific topic */
  val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))

  /* the maximum time before a new log segment is rolled out */
  val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))

  /* the number of hours before rolling out a new log segment for some specific topic */
  val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))

  /* the number of hours to keep a log file before deleting it */
  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))

  /* the number of hours to keep a log file before deleting it for some specific topic*/
  val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
  
  /* the maximum size of the log before deleting it */
  val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)

  /* the maximum size of the log for some specific topic before deleting it */
  val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))

  /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
  val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
  
  /* enable zookeeper registration in the server */
  val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)

  /* the number of messages accumulated on a log partition before messages are flushed to disk */
  val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))

  /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
  val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))

  /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
  val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms",  3000)

  /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
  val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)

   /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
  val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))

  /* the maximum length of topic name*/
  val maxTopicNameLength = Utils.getIntInRange(props, "max.topic.name.length", 255, (1, Int.MaxValue))
}

上面这段代码来自kafka.server包下的KafkaConfig类,之前我们就说过,broker就是kafka中的server,所以讲配置放在这个包中也不奇怪。这里我们顺着代码往下读,也顺便看看scala的语法。和java一样也要import相关的包,kafka将同一包内的两个类写在大括号中:

import kafka.utils.{Utils, ZKConfig}

然后我们看类的写法:

class KafkaConfig(props: Properties) extends ZKConfig(props)

我们看到在加载kafkaConfig的时候会加载一个properties对象,同时也会加载有关zookeeper的properties,这个时候我们可以回忆一下,之前我们启动kafka broker的命令:

  1. 启动zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties  & (用&是为了能退出命令行)

  2. 启动kafka server:  bin/kafka-server-start.sh ../config/server.properties  &

所以你能明白,初始化kafka broker的时候程序一定是去加载位于config文件夹下的properties,这个和java都一样没有区别。当然properties我们也可以通过程序来给出,这个我们后面再说,继续看我们的代码。既然找到了对应的properties文件,我们就结合代码和properties一起来看。

Kafka broker的properties中,将配置分为以下六类:

l  Server Basics:关于brokerid,hostname等配置

l  Socket Server Settings:关于传输的配置,端口、buffer的区间等。

l  Log Basics:配置log的位置和partition的数量。

l  Log Flush Policy:这部分是kafka配置中最重要的部分,决定了数据flush到disk的策略。

l  Log Retention Policy:这部分主要配置日志处理时的策略。

l  Zookeeper:配置zookeeper的相关信息。

在文件properties中的配置均出现在kafkaConfig这个类中,我们再看看kafkaConfig中的代码:

/* the broker id for this server */
  val brokerId: Int = Utils.getInt(props, "brokerid")
  
  /* the SO_SNDBUFF buffer of the socket sever sockets */
  val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)

凡是参数中有三个的,最后一个是default,而参数只有两个的则要求你一定要配置,否则的话则报错。当然在这么多参数中肯定是有一些经验参数的,至于这些参数怎么配置我确实没有一个特别的推荐,需要在不断的测试中才能磨合出来。

当然你也可以将配置写在程序里,然后通过程序去启动broker,这样kafka的配置就可以像下面一样写:

Properties props = new Properties();
props.setProperty("port","9093");
props.setProperty("log.dir","/home/kafka/data1");

我倒是觉得配置还是直接写在配置文件中比较好,如果需要修改也不会影响正在运行的服务,写在内存中,总是会有些不方便的地方。所以还是建议大家都写配置好了,后面讲到的producer和consumer都一样。

这里再提两个参数一个是brokerid,每个broker的id必须要区分;第二个参数是hostname,这个是broker和producer、consumer联系的关键,这里记住一定要改成你的地址和端口,否则永远连得都是localhost。

--------------------------------------------------------

下一篇将写producer和consumer的配置了,涉及到这部分就要开始编程了,写着写着又往源码里看进去了,下篇会先讲如何搭建开发环境,然后再写两个简单那的例子去熟悉配置。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
KAFKA官方教程笔记
 因为kafka配置较多,所以单独写一篇博客来记录。       1,broker配置   主要的配置项有三个broker.id集群内唯一log.dir数据目录zookeeper.connectzookeeper连接地址Topiclevelconfigurationsanddefaultsa
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(