Настройка Кафка кластера

Теперь мы готовы к тому чтобы поиграть с системой обмена сообщениями издатель-подписчик  Apache Kafka.

С Кафкой, мы можем создать несколько типов кластеров:

  • Один узел — Один брокер
  • Один узел- несколько брокеров
  • Несколько узлов-несколько брокеров

Кластер Kafka состоит из пяти основных компонентов:

  • Тема(Topic): Тема  это назвение категории или фида в котором сообщения публикуются производителями сообщений. В Кафке, темы разделены и каждый раздел представлен упорядоченным списком последовательных сообщений. Кластер Кафка поддерживает раздельнное журналировние для каждой темы. Каждое сообщение в разделе присваивается уникальный идентификатор.
  • Брокер: кластер Кафки состоит из одного или нескольких серверов, и каждый из них может иметь один или несколько запущенных серверных процесса которые иназываются брокером. Темы создаются в контексте процессов брокера.
  • Zookeeper: Zookeeper служит интерфейсом координации между
    брокерами и потребителями Кафки. На сайте Hadoop Wiki описан следующи образом  (http://wiki.apache.org/hadoop/ZooKeeper/ProjectDescription):

    «Zookeeper кординирует распределенные процессы друг с другом через общее иерархического пространство имен регистров данных (мы называем эти регистры Zузламы (znode)), так как в файловой системе.» (Вольный перевод)

    Основные различия между Zookeeper и стандартных файловых систем является то, что каждый znode может иметь данные, связанные с ним и znode-и ограничены объемом данных, которые они могут содержать. Zookeeper был предназначен для хранения координации таких данных как: информацию о состоянии, конфигурации, информацию о местоположении, и так далее.

  • Производители: производители публикуют данные тем, выбрав соответствующий раздел в рамках темы. Для балансировки нагрузки, распределение сообщений на раздел темы может быть сделано в циклическом режиме или с помощью пользовательских функции.
  • Потребители: Потребители это приложения или процессы, которые подписываются на темы и сам процесс получения опубликованных сообщений.

Итак, давайте начнем с очень простой установки кластера.

Один узел — Один брокер

В предыдущей теме мы установили Кафку на одну машину. Теперь настало время, чтобы создать один узел — одного брокера на основе Кафка кластера, как показано на следующей схеме:

kafkaOnetoOne

Запуск сервера Zookeeper
Кафка по умолчанию предоставляет простой конфигурационный файл Zookeeper, используемый для запуска одного локального экземпляра Zookeeper хотя отдельная установка Zookeeper также может осуществляться при настройке кластера Кафка (Об этом поже). Запустим первый экземпляр локального Zookeeper с помощью следующей команды (все последуюшии команды запускаются от имени суперпользователя (sudo) и в папке где был установлен кафка напрмер /opt/kafka-0.9.0.1-src):

bin/zookeeper-server-start.sh config/zookeeper.properties

После чего в консоле должны увидеть чтото вроде этого

...
[2016-03-03 10:52:59,528] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,528] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,528] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,528] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,528] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,528] INFO Server environment:os.version=3.13.0-79-generic (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,528] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,528] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,528] INFO Server environment:user.dir=/opt/kafka-0.9.0.1-src (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,557] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,557] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,557] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-03-03 10:52:59,581] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

Заметка
Кафка поставляется с конфигурационным файлом с минимальными свойствами, необходимыми для кластера один узел-один брокер.
Вот список важных параметров, которые можно использовать в файле zookeeper.properties:

# Папка где хранится снимок (snapshot) zookeeper -а.
dataDir=/tmp/zookeeper
# Порт прослушивания
clientPort=2181
# Кличество клиентов (0 по умолчанию не ограниченно)
maxClientCnxns=0
По умолчанию ZooKeeper  прослушивает порт  *:2181/tcp. Для более детальной информации как настроить ZooKeeper   можно посетить сайт  http://zookeeper.apache.org/.

Запуск брокера Кафка
Теперь запустите брокер Кафка в новом окне консоли с помощью следующей команды:

bin/kafka-server-start.sh config/server.properties

Теперь вы должны увидеть такой результат

...
[2016-03-03 11:41:25,061] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-03-03 11:41:25,143] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-03-03 11:41:25,163] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-03-03 11:41:25,179] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-03-03 11:41:25,390] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator)
[2016-03-03 11:41:25,395] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2016-03-03 11:41:25,396] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-03-03 11:41:25,410] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-03-03 11:41:25,410] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-03-03 11:41:25,430] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 41 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-03-03 11:41:25,480] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-03-03 11:41:25,481] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-03-03 11:41:25,487] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-03-03 11:41:25,505] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-03-03 11:41:25,509] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-03-03 11:41:25,513] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(vahan-virtual-machine,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-03-03 11:41:25,549] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-03-03 11:41:25,549] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser)
[2016-03-03 11:41:25,551] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

Файл server.properties определяет следующие важные свойства, необходимые для Кафка брокера:

# Идентификатор брокера. Это должно быть уникальное число для каждого брокера.
Broker.id = 0
# Порт прослушивания
port=9092
# Каталог, в котором будут храниться файлы журналов
log.dir=/tmp/kafka8-logs
# По умолчанию количество разделов  журналов на топик.
num.partitions = 2
# Строка соединения Zookeeper
zookeeper.connect=localhost:2181

В конце есть список нескольких дополнительных и важных свойств, доступных для брокера Кафки.

Создание темы Кафка
Кафка предоставляет утилиту командной строки для создания темы на сервере Кафки. Давайте создадим тему которую назовем  называется kafkatopic  с одним разделом и только одна копия с помощью этой утилиты:

bin/kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 1 —partitions 1 —topic kafkatopic

И в консоли должны увидеть что-то типа этого

[2016-03-03 12:04:46,896] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [kafkatopic,0] (kafka.server.ReplicaFetcherManager)
[2016-03-03 12:04:46,913] INFO Completed load of log kafkatopic-0 with log end offset 0 (kafka.log.Log)
[2016-03-03 12:04:46,917] INFO Created log for partition [kafkatopic,0] in /tmp/kafka-logs with properties {flush.messages -> 9223372036854775807, segment.bytes -> 1073741824, preallocate -> false, cleanup.policy -> delete, delete.retention.ms -> 86400000, segment.ms -> 604800000, min.insync.replicas -> 1, file.delete.delay.ms -> 60000, retention.ms -> 604800000, max.message.bytes -> 1000012, index.interval.bytes -> 4096, segment.index.bytes -> 10485760, retention.bytes -> -1, segment.jitter.ms -> 0, min.cleanable.dirty.ratio -> 0.5, compression.type -> producer, unclean.leader.election.enable -> true, flush.ms -> 9223372036854775807}. (kafka.log.LogManager)
[2016-03-03 12:04:46,918] INFO Partition [kafkatopic,0] on broker 0: No checkpointed highwatermark is found for partition [kafkatopic,0] (kafka.cluster.Partition)

Утилита kafka-topics.sh создаст тему, с задаными параметрами , и покажет сообщение об успешном создании. Он также в параметраз задается информация о сервере Zookeeper, в этом случае: localhost:2181. Чтобы получить список тем, на сервере Кафки, используйте следующую команду в новом окне консоли:

bin/kafka-topics.sh —list —zookeeper localhost:2181

Запуск производителя для отправки сообщений
Кафка предоставляет пользователям с клиентом производителя утилитой командной строки, которая принимает входные сигналы от командной строки и публикует их в виде сообщения на кластере Кафки. По умолчанию, каждая новая линия рассматривается как новое сообщение. Следующая команда используется для запуска в консоли производителя для отправки сообщений:

bin/kafka-console-producer.sh —broker-list localhost:9092 —topic kafkatopic

запустим ее и в косоле должны увидеть

SLF4J: Class path contains multiple SLF4J bindings.                                                                                                                                                                                                                            
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]                                                                                                                    
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]                                                                                                                   
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/tools/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]                                                                                                                   
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/connect/api/build/dependant-libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]                                                                                                                    
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]                                                                                                                
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/connect/file/build/dependant-libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]                                                                                                                   
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/connect/json/build/dependant-libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]                                                                                                                   
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.                                                                                                                                                                                               
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]  

При запуске клиента командной строки продюсера, необходимы следующие параметры:

  • broker-list
  • topic

Параметор broker-list определяет брокеров с которымы нужно связаться в данном случае это localhost:9092 , а topic  определяет тему куда надо отсылать сообшения.

Теперь введите следующие сообщения на окне консоли:

  • Впишите Welcome to Kafka и нажмите Ввод (Enter)
  • Впишите Это кластер с одним брокером и нажмите Ввод (Enter)

Можете попробовать и другие тексты.

Свойства по умолчанию для продюсера определены в producer.properties. Важными свойствами являются:

# Список брокеров используется для самонастройки сведеней об остальной части кластера
# формат: host1:port1,host2:port2…
metadata.broker.list=localhost:9092
#Указать кодек сжатия для данных: none , gzip, snappy.
compression.codec=none

В дальнейшем будут посты о том как  как писать производителей для Кафки и более подробное описание свойства продюсера.

Запуск потребителя для получения сообщений
Кафка также обеспечивает потребительский клиент командной строки для получения сообщений. Следующая команда используется для запуска в консоле потребителя, который показывает сообшения на выходе командной строки, как только он присоединяется к теме, созданной в брокере Кафки:

bin/kafka-console-consumer.sh —zookeeper localhost:2181 —topic kafkatopic —from-beginning

Ну и на выходе должны увидеть что такое

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/tools/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/connect/api/build/dependant-libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/connect/file/build/dependant-libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka-0.9.0.1-src/connect/json/build/dependant-libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Welcome to Kafka
Это кластер с одним брокером

Свойства по умолчанию для потребителя определены в /config/consumer.properties. Важными свойством является:
# идентификатор группы (Строка, которая однозначно идентифицирует набор потребителей # в пределах той же группы потребителей)
group.id = test-consumer-group

ну и в дальнейшем после постов о том  как писать производителей будут и посты о том как  как писать потребителей для Кафки и более подробное описание свойства.

Запустив все четыре компонента (Zookeeper, брокер, производитель и потребитель) в различных терминалах, вы будете иметь возможность вводить сообщения в терминале производителя и видеть их появления в терминале потребителя.

Информация об использовании консольных инструментов как для производителей так и потребителей можно просмотреть, выполнив команды без аргументов.

Слейдующий пост будет о том как запускать на одном узле несколько брокеров.

One thought on “Настройка Кафка кластера

Добавить комментарий