Kafka在线修改topic配置(不停机,不重启)如日志保存时长
使用场景:
某些时候,当几个topic生产者突发产生大量消息时,会造成磁盘空间紧张,这时,除了增加磁盘,另一个方法就是修改配置文件,将日志的保存时间修改小一点,但这两种方案,都必须停机和重启kafka,显然,这在生产集群上,是不能这么处理的。这里,可以通过在线修改单个topic的配置,以覆盖默认配置,临时解决磁盘空间紧张的问题。
特点
在线修改,不需要重启和停机,但是只能对单个 topic 进行修改,只能按照 ms 修改
修改后,新的配置会在 log.retention.check.interval.ms 时间内被检查并应用到整个集群,该值默认为 300 秒
kafka-config 命令
kafka-config 相对于 kafka-topics 而言, 在配置修改方面做得更全面,用户可以指定 config 的 对象 : topic, client, user or broker.
kafka-configs.sh --help 查看帮助
[tomcat@kafka-test ~]$ kafka-configs.sh --help This tool helps to manipulate and describe entity config for a topic, client, user or broker Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1, k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity-type 'topics': cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.compaction.lag.ms max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable For entity-type 'brokers': log.message.timestamp.type ssl.client.auth log.retention.ms sasl.login.refresh.window.jitter sasl.kerberos.ticket.renew.window. factor log.preallocate log.index.size.max.bytes sasl.login.refresh.window.factor ssl.truststore.type ssl.keymanager.algorithm log.cleaner.io.buffer.load.factor sasl.login.refresh.min.period.seconds ssl.key.password background.threads log.retention.bytes ssl.trustmanager.algorithm log.segment.bytes max.connections.per.ip.overrides log.cleaner.delete.retention.ms log.segment.delete.delay.ms min.insync.replicas ssl.keystore.location ssl.cipher.suites log.roll.jitter.ms log.cleaner.backoff.ms sasl.jaas.config principal.builder.class log.flush.interval.ms log.cleaner.max.compaction.lag.ms max.connections log.cleaner.dedupe.buffer.size log.flush.interval.messages advertised.listeners num.io.threads listener.security.protocol.map log.message.downconversion.enable sasl.enabled.mechanisms sasl.login.refresh.buffer.seconds ssl.truststore.password listeners metric.reporters ssl.protocol sasl.kerberos.ticket.renew.jitter ssl.keystore.password sasl.mechanism.inter.broker.protocol log.cleanup.policy sasl.kerberos.principal.to.local.rules sasl.kerberos.min.time.before.relogin num.recovery.threads.per.data.dir log.cleaner.io.max.bytes.per.second log.roll.ms ssl.endpoint.identification.algorithm unclean.leader.election.enable message.max.bytes log.cleaner.threads log.cleaner.io.buffer.size max.connections.per.ip sasl.kerberos.service.name ssl.provider follower.replication.throttled.rate log.index.interval.bytes log.cleaner.min.compaction.lag.ms log.message.timestamp.difference.max. ms ssl.enabled.protocols log.cleaner.min.cleanable.ratio replica.alter.log.dirs.io.max.bytes. per.second ssl.keystore.type ssl.secure.random.implementation ssl.truststore.location sasl.kerberos.kinit.cmd leader.replication.throttled.rate num.network.threads compression.type num.replica.fetchers For entity-type 'users': request_percentage producer_byte_rate SCRAM-SHA-256 SCRAM-SHA-512 consumer_byte_rate For entity-type 'clients': request_percentage producer_byte_rate consumer_byte_rate Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --alter Alter the configuration for the entity. --bootstrap-server <String: server to The Kafka server to connect to. This connect to> is required for describing and altering broker configs. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for clients/users/brokers (applies to corresponding entity type in command line) --entity-name <String> Name of entity (topic name/client id/user principal name/broker id) --entity-type <String> Type of entity (topics/clients/users/brokers) --force Suppress console prompts --help Print usage information. --version Display Kafka version. --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. [tomcat@kafka-test ~]$
修改 topic 的参数:
可在创建topic时,通过--config进行指定项的参数配置,覆盖默认配置: > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1 也可以在创建topic之后通过config.sh文件对其中的特定指标进行修改,下面操作对my-topic相关指标进行配置: > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000 查看是否修改成功: > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe 也可以撤销/删除某些指定配置,将该项重置为默认配置: > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
修改 broker 的参数:
改变当前broker 0上的log cleaner threads可以通过下面命令实现: > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 查看当前broker 0的动态配置参数: > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe 删除broker id为0的server上的配置参数/设置为默认值: > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads 同时更新集群上所有broker上的参数(cluster-wide类型,保持所有brokers上参数的一致性): > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2 查看当前集群中动态的cluster-wide类型的参数列表: > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
kafka-topics.sh 命令
老版本的 kafka 使用 kafka-topics.sh 修改,新版本这种方式可能会被弃用
#1,查看当前topic配置 ./kafka-topics.sh --describe --topic my_test_topic --zookeeper test.myzk.com:2181/kafkacluster #2,调整topic配置 ./kafka-topics.sh --topic my_test_topic --zookeeper test.myzk.com:2181/kafkacluster --alter --config retention.ms=43200000 # 时长毫秒 43200000ms=12h #3,检查修改的配置是否生效 同第一步,查看输出的第一行,类似如下: Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,delete.retention.ms=86400000,retention.ms=43200000,cleanup.policy=delete,compression.type=producer
其他可选的调整参数:
segment.bytes=104857600 #单个日志文件大小,默认1G
delete.retention.ms=86400000 #对于压缩日志保留的最长时间,也是客户端消费消息的最长时间,与retention.ms的区别在于一个控制未压缩数据,一个控制压缩后的数据
retention.ms=86400000 #如果使用“delete”的retention策略,这项配置就是指删除日志前日志保存的时间
cleanup.policy=delete #默认方式 delete 将会丢弃旧的部分 compact 将会进行日志压缩
compression.type=producer #压缩类型,此配置接受标准压缩编码 gzip, snappy, lz4 ,另外接受 uncompressed 相当于不压缩, producer 意味着压缩类型由producer指定
./zookeeper-shell.sh test.myzk.com:2181/kafkacluster #查看zk中kafka集群信息
共 0 条评论