Zookeeper & Kafka Install : A single node and a single broker cluster – 2016

By | May 7, 2016
Starting Zookeeper

In previous chapter, we ran ZooKeeper package that’s available in Ubuntu’s default repositories as daemon (zookeeperd).

Let’s stop the ZooKeeper daemon if it’s running:

$ sudo service zookeeper stop

To launch a single local Zookeeper, we’ll use the default configuration that Kafka provides:

$ $ ls ~/kafka/config
consumer.properties  producer.properties  test-log4j.properties   zookeeper.properties
log4j.properties     server.properties    tools-log4j.properties

Let’s start the local Zookeeper instance:

$ ~/kafka/bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties
[2015-11-07 05:03:12,885] INFO Reading configuration from: /home/ubuntu/kafka/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

 

zookeeper.properties

Kafka provides us with the required property files which defining minimal properties required for a single broker-single node cluster. For example, ~/kafka/config/zookeeper.properties has the following lines:

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

The Zookeeper, by default, will listen on *:2181/tcp.

 

ZooKeeper Overview

From ZooKeeper Overview.

ZooKeeper allows distributed processes to coordinate with each other through a shared hierarchical name space of data registers (we call these registers znodes), much like a file system.

Unlike normal file systems, ZooKeeper provides its clients with high throughput, low latency, highly available, strictly ordered access to the znodes.

The performance aspects of ZooKeeper allows it to be used in large distributed systems.

The reliability aspects prevent it from becoming the single point of failure in big systems.

Its strict ordering allows sophisticated synchronization primitives to be implemented at the client.

The name space provided by ZooKeeper is much like that of a standard file system. A name is a sequence of path elements separated by a slash (/).

Every znode in ZooKeeper’s name space is identified by a path. And every znode has a parent whose path is a prefix of the znode with one less element; the exception to this rule is root (/) which has no parent. Also, exactly like standard file systems, a znode cannot be deleted if it has any children.

The main differences between ZooKeeper and standard file systems are that every znode can have data associated with it (every file can also be a directory and vice-versa) and znodes are limited to the amount of data that they can have.

ZooKeeper was designed to store coordination data: status information, configuration, location information, etc. This kind of meta-information is usually measured in kilobytes, if not bytes.

ZooKeeper has a built-in sanity check of 1M, to prevent it from being used as a large data store, but in general it is used to store much smaller pieces of data.

ZookeeperService.png

The service itself is replicated over a set of machines that comprise the service. These machines maintain an in-memory image of the data tree along with a transaction logs and snapshots in a persistent store. Because the data is kept in-memory, ZooKeeper is able to get very high throughput and low latency numbers. The downside to an in memory database is that the size of the database that ZooKeeper can manage is limited by memory. This limitation is further reason to keep the amount of data stored in znodes small.

The servers that make up the ZooKeeper service must all know about each other. As long as a majority of the servers are available, the ZooKeeper service will be available. Clients must also know the list of servers. The clients create a handle to the ZooKeeper service using this list of servers.

Clients only connect to a single ZooKeeper server. The client maintains a TCP connection through which it sends requests, gets responses, gets watch events, and sends heart beats. If the TCP connection to the server breaks, the client will connect to a different server. When a client first connects to the ZooKeeper service, the first ZooKeeper server will setup a session for the client. If the client needs to connect to another server, this session will get reestablished with the new server.

Read requests sent by a ZooKeeper client are processed locally at the ZooKeeper server to which the client is connected. If the read request registers a watch on a znode, that watch is also tracked locally at the ZooKeeper server. Write requests are forwarded to other ZooKeeper servers and go through consensus before a response is generated. Sync requests are also forwarded to another server, but does not actually go through consensus. Thus, the throughput of read requests scales with the number of servers and the throughput of write requests decreases with the number of servers.

Order is very important to ZooKeeper. (They tend to be a bit obsessive compulsive.) All updates are totally ordered. ZooKeeper actually stamps each update with a number that reflects this order. We call this number the zxid (ZooKeeper Transaction Id). Each update will have a unique zxid. Reads (and watches) are ordered with respect to updates. Read responses will be stamped with the last zxid processed by the server that services the read.

 

Kafka properties

Let’s check Kafka properties ():

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

 

Testing Installation

Let’s publish and consume a message (“Hello, Kafka”) to check our Kafka server’s behavior.

To publish messages, we need to create a Kafka producer from the command line using the bin/kafka-console-producer.sh script. It requires the Kafka server’s hostname and port, along with a topic name as its arguments.

Publish the string “Hello, Kafka” to a topic called “MyTopic” as following:

$ echo "Hello, Kafka" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyTopic > /dev/null

Because the topic doesn’t exist, Kafka will create it automatically.

To consume messages, we may want to create a Kafka consumer using the bin/kafka-console-consumer.sh script. It requires the ZooKeeper server’s hostname and port, along with a topic name as its arguments.

The following command consumes messages from the topic we published to.

Note the use of the –from-beginning flag, which is present because we want to consume a message that was published before the consumer was started.

$ ~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic MyTopic --from-beginning
Hello, Kafka

We can see “Hello, Kafka” in the output, and we are sure that there are no configuration issues.

The script will continue to run, waiting for more messages to be published to the topic.

If we want to, we can open a new terminal and start a producer to publish a few more messages. We should be able to see them all in the consumer’s output instantly.

To get a list of topics, we can use “–list — …” command:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181
MyTopic

Press CTRL+C to stop the consumer script.

 

Producer/Consumer properties

Here are notable properties of producer (producer.properties):

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none

The most important properties for the consumer (consumer.properties) are:

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181

#consumer group id
group.id=test-consumer-group
Сomments аrchive