edgecase
Author: StJohn Piano
Published: 2020-08-12
Datafeed Article 164
This article has been digitally signed by Edgecase Datafeed.
This article has been digitally signed by its author.
2989 words - 1374 lines - 35 pages
The server infrastructure for this project was provided by Solidi.

Solidi is the simplest way to get started in crypto in the UK.

This preface is unsigned and subject to change at any time.




GOAL



Install Kafka 2.5.0 and its dependencies on Ubuntu 16.04. Configure Kafka to run as a systemd service.

Sub-goals: Run a Hello World test for Kafka. Make a list of useful Kafka commands. Export all messages from a topic to a file.




CONTENTS



- Goal
- Contents
- Notes
- Installation Recipe
- Hello World Test
- Kafka Commands
- Export All Messages From A Topic










NOTES



Security: To perform this recipe, you should either be a) logged in over a network via SSH or b) using the machine via a directly-connected keyboard and monitor. You must be logged in as a user with sudo permissions.

In this recipe, the service user is called "franz", but you can use a different name.

Kafka relies on Zookeeper and the Java Virtual Machine (JVM). The Kafka binaries archive will include Zookeeper.

Kafka 2.5.0 requires Zookeeper 3.5.7.

By default, Zookeeper runs on port 2181 and Kafka runs on port 9092.

Excerpt from:
kafka.apache.org/documentation/#java

From a security perspective, we recommend you use the latest released version of JDK 1.8 as older freely available versions have disclosed security vulnerabilities.


Security: Consider configuring the firewall (using e.g.
ufw
) in order to be sure that no unauthorised connection can be made from another machine.

Virtual machine specs:
- Hostname: kafkaTest4
- OS: Ubuntu 16.04.6 (LTS) x64
- RAM: 2 GB
- 1 CPU
- Disk: 50 GB SSD




















INSTALLATION RECIPE





### Create new service user.


When setting up a service, it's usually best to create a new user that will manage the service and its resources.

It's important to limit the permissions of this service user account, so that it doesn't have more control over the server than necessary.

I'll give the user sudo permissions while setting up the service and installing various components, and then remove them.


Useful commands:

Get list of users on the server.
cat /etc/passwd | awk -F: '{ print $1}'


Check current user's sudo permissions.
sudo -l


Check another user's sudo permissions.
sudo -l -U franz




1. Create user.

sudo useradd franz --create-home


No password is created (password access will be blocked). A home directory for the user will be created:
/home/franz



2. Give the user sudo permissions.

Run
sudo visudo
.

In the visudo interface, add the following line at the end of the file:
franz ALL=(ALL) NOPASSWD: ALL



3. Log in as the user. Only root or a sudo user can do this. To log out, run
logout
.

sudo su -l franz



4. [For later use] After installing and setting up the service, remove sudo permissions from the user.

Log out from the user by running
logout
.

Run
sudo visudo
.

In the visudo interface, delete the following line:
franz ALL=(ALL) NOPASSWD: ALL





### Install the JVM.


Check if it's already installed.
java -version


Install it if not.

sudo apt-get update

sudo apt-get -y install default-jre

java -version



Example:
franz@kafkaTest4:~$ java -version

openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-8u265-b01-0ubuntu2~16.04-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)





### Set up work directories.


# Change directory to franz user's home directory.
cd /home/franz

# Create downloads directory.
mkdir downloads

# Create kafka directory.
mkdir kafka





### Get the list of GPG keys for the Apache developers.


cd ~/downloads

wget https://downloads.apache.org/kafka/KEYS

# Check that GPG is installed.
gpg --version

# Import the keys.
gpg --import KEYS





### Download and verify Kafka binaries archive for Kafka 2.5.0.


Browse to:
downloads.apache.org/kafka/2.5.0

In the list, you should see these items:
- kafka_2.12-2.5.0.tgz
- kafka_2.12-2.5.0.tgz.asc
- kafka_2.12-2.5.0.tgz.md5
- kafka_2.12-2.5.0.tgz.sha1
- kafka_2.12-2.5.0.tgz.sha512

"2.12" is the Scala version. "2.5.0" is the Kafka version.

The .asc file is the GPG signature. The other files contain hash values.

Some details from the documentation:
kafka.apache.org/downloads

2.5.0 is the latest release. The current stable version is 2.5.0.
We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.12 is recommended).

Kafka 2.5.0 includes a number of significant new features.
[...]
- Upgrade Zookeeper to 3.5.7
- Deprecate support for Scala 2.11



cd ~/downloads

wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz





### Verify MD5 hash.


md5sum kafka_2.12-2.5.0.tgz | cut -d' ' -f1 > md5_calculated.txt


Example:
franz@kafkaTest4:~/downloads$ cat md5_calculated.txt

29ba62ed67483c8b060d7bb758923d5b


wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz.md5


Example:
franz@kafkaTest4:~/downloads$ cat kafka_2.12-2.5.0.tgz.md5

kafka_2.12-2.5.0.tgz: 29 BA 62 ED 67 48 3C 8B 06 0D 7B B7 58 92 3D 5B


cat kafka_2.12-2.5.0.tgz.md5 | sed 's/kafka_2.12-2.5.0.tgz: //' | sed 's/ //g' | tr [:upper:] [:lower:] > md5_supplied.txt

diff md5_calculated.txt md5_supplied.txt





### Verify SHA512 hash.


sha512sum kafka_2.12-2.5.0.tgz


Example:
franz@kafkaTest4:~/downloads$ sha512sum kafka_2.12-2.5.0.tgz

447a7057bcd9faca98b6f4807bd6019ef73eee90efdc1e7b10005f669e2537a8a190cb8b9c9f4c20db1d95b13d0f0487e9cc560d0759532058439ce7f722c7cd kafka_2.12-2.5.0.tgz


sha512sum kafka_2.12-2.5.0.tgz | cut -d' ' -f1 > sha512_calculated.txt

wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz.sha512


Example:
franz@kafkaTest4:~/downloads$ cat kafka_2.12-2.5.0.tgz.sha512

kafka_2.12-2.5.0.tgz: 447A7057 BCD9FACA 98B6F480 7BD6019E F73EEE90 EFDC1E7B 10005F66 9E2537A8 A190CB8B 9C9F4C20 DB1D95B1 3D0F0487 E9CC560D 07595320 58439CE7 F722C7CD


cat kafka_2.12-2.5.0.tgz.sha512 | sed 's/kafka_2.12-2.5.0.tgz: //' | tr '\n' ' ' | sed 's/ //g' | tr [:upper:] [:lower:] > sha512_supplied.txt

# Add a newline at the end of the file.
echo "" >> sha512_supplied.txt


Example:
franz@kafkaTest4:~/downloads$ cat sha512_supplied.txt

447a7057bcd9faca98b6f4807bd6019ef73eee90efdc1e7b10005f669e2537a8a190cb8b9c9f4c20db1d95b13d0f0487e9cc560d0759532058439ce7f722c7cd


diff sha512_calculated.txt sha512_supplied.txt





### Verify GPG signature.


wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz.asc

gpg --verify kafka_2.12-2.5.0.tgz.asc kafka_2.12-2.5.0.tgz


Example:
franz@kafkaTest4:~/downloads$ gpg --verify kafka_2.12-2.5.0.tgz.asc kafka_2.12-2.5.0.tgz

gpg: Signature made Wed 08 Apr 2020 01:18:52 AM UTC using RSA key ID 989E9B3F gpg: Good signature from "David Arthur (CODE SIGNING KEY) <davidarthur@apache.org>" gpg: WARNING: This key is not certified with a trusted signature! gpg: There is no indication that the signature belongs to the owner. Primary key fingerprint: 63F2 B2B8 D10C CC7A 0CE8 8A72 E9F8 7164 989E 9B3F


The important phrase is "gpg: Good signature from".




### Extract the Kafka binaries archive.


# Change directory to kafka directory.
cd ~/kafka

# Extract the archive.
tar -xvzf ~/downloads/kafka_2.12-2.5.0.tgz --strip-components 1


Note: The
--strip-components 1
will remove the first directory in a target filepath. This means that the archive's contents will be copied directly into the new kafka directory.




## Configure the Kafka server.


Two configuration issues:
- By default, Kafka will not allow us to delete a topic.
- By default, Kafka will store its messages and data in the location
/tmp/kafka-logs
.

Edit the configuration file in order to modify these settings.

Open the file
~/kafka/config/server.properties
.

Change this line:
log.dirs=/tmp/kafka-logs

to:
log.dirs=/var/log/kafka_data


Add this line at the bottom of the "Internal Topic Settings" section:
delete.topic.enable = true


Create the log directory tree.

cd /var/log

sudo mkdir kafka_data

sudo chown franz:franz kafka_data





### Configure the Zookeeper server.


Zookeeper is a service that Kafka uses to manage its cluster state and configurations. It's included in the Kafka binaries archive.

cd ~/kafka


Open the file
~/kafka/config/zookeeper.properties
.

Change this line:
dataDir=/tmp/zookeeper

to:
dataDir=/var/log/zookeeper_data



Create the log directory.

cd /var/log

sudo mkdir zookeeper_data

sudo chown franz:franz zookeeper_data





### Create the unit file for Zookeeper.


This will allow us to perform common service actions such as starting, stopping, and restarting Zookeeper in a manner consistent with other Linux services.

File location:
/etc/systemd/system/zookeeper.service


Open with
sudo
, e.g.:
sudo vim /etc/systemd/system/zookeeper.service


File content:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=franz
ExecStart=/home/franz/kafka/bin/zookeeper-server-start.sh /home/franz/kafka/config/zookeeper.properties
ExecStop=/home/franz/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target


The [Unit] section specifies that Zookeeper requires networking and the filesystem to be ready before it can start.

The [Service] section specifies that systemd should use the
zookeeper-server-start.sh
and
zookeeper-server-stop.sh
shell files for starting and stopping the service. It also specifies that Zookeeper should be restarted automatically if it exits abnormally.




### Create the unit file for Kafka.


This will allow us to perform common service actions such as starting, stopping, and restarting Kafka in a manner consistent with other Linux services.

File location:
/etc/systemd/system/kafka.service


Open with
sudo
, e.g.:
sudo vim /etc/systemd/system/kafka.service


File content:
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=franz
ExecStart=/bin/sh -c '/home/franz/kafka/bin/kafka-server-start.sh /home/franz/kafka/config/server.properties > /var/log/kafka/kafka.log 2>&1'
ExecStop=/home/franz/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target


The [Unit] section specifies that this unit file depends on zookeeper.service. This will ensure that zookeeper gets started automatically when the kafka service starts.

The [Service] section specifies that systemd should use the
kafka-server-start.sh
and
kafka-server-stop.sh
shell files for starting and stopping the service. It also specifies that Kafka should be restarted automatically if it exits abnormally.

Note: I've specified the location of the main Kafka log to be
/var/log/kafka/kafka.log
.

Create the log directory:

cd /var/log

sudo mkdir kafka

sudo chown franz:franz kafka





### Start the Kafka service.


sudo systemctl start kafka


Check the journal logs for the kafka unit:
sudo journalctl -u kafka


Example:
franz@kafkaTest4:/var/log$ sudo journalctl -u kafka

-- Logs begin at Mon 2020-08-10 08:17:24 UTC, end at Mon 2020-08-10 08:46:32 UTC. --
Aug 10 08:46:09 kafkaTest4 systemd[1]: Started kafka.service.


Check the status of each service:

sudo service kafka status | grep Active

sudo service zookeeper status | grep Active


Example:
franz@kafkaTest4:/var/log$ sudo service kafka status | grep Active

Active: active (running) since Mon 2020-08-10 08:46:09 UTC; 52s ago

franz@kafkaTest4:/var/log$ sudo service zookeeper status | grep Active

Active: active (running) since Mon 2020-08-10 08:46:09 UTC; 52s ago





### Kafka application logs directory problem.


Note: Don't run any of the commands in this section.


By default, the Kafka application logs (of various kinds) will be written to the directory
$base_dir/logs
.

$base_dir
is, in this case,
/home/franz/kafka
.

The shell script
kafka-server-start.sh
calls another shell script:
kafka-run-class.sh


Excerpt from
bin/kafka-run-class.sh
:

# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
 LOG_DIR="$base_dir/logs"
fi


I have experimented with using this line in
kafka-server-start.sh
, prior to the line in which
kafka-run-class.sh
is called:
export LOG_DIR="/var/log/kafka/logs"


Note: You need to create the directory tree
/var/log/kafka/logs
and change its ownership recursively to the
kafka
user.

Result: Partial success. Most logs were written to the new logs directory. However, I have found that some logs were still written to the directory
/home/franz/kafka/logs
. Based on timestamps, I think that when the kafka service started, it would write some logs to
$base_dir/logs
, then switch over to the new logs directory.

A situation where the application logs are fractured between multiple locations is undesirable, especially if this fracturing does not occur only once, but on every stop-and-start of the service.

Rather than try to solve this right now, I've settled for a temporary solution: Creating a symlink from
/var/log/kafka/logs
to
/home/franz/kafka/logs
.




### Create a symlink from the main kafka log directory to the default kafka instance's log directory.


Because the Kafka service has been started, there is now a
logs
directory in the directory
/home/franz/kafka
.


Example:

franz@kafkaTest4:/var/log$ ls -1 ~/kafka/

bin
config
libs
LICENSE
logs
NOTICE
site-docs

franz@kafkaTest4:/var/log$ ls -1 ~/kafka/logs

controller.log
kafka-authorizer.log
kafka-request.log
kafkaServer-gc.log.0.current
log-cleaner.log
server.log
state-change.log
zookeeper-gc.log.0.current



We can now symlink to this directory from our chosen main kafka log directory.

cd /var/log/kafka

ln -s /home/franz/kafka/logs logs



Example:

franz@kafkaTest4:/var/log/kafka$ ls -1

kafka.log
logs

franz@kafkaTest4:/var/log/kafka$ file logs

logs: symbolic link to /home/franz/kafka/logs

franz@kafkaTest4:/var/log/kafka$ cd logs


franz@kafkaTest4:/var/log/kafka/logs$ ls -1

controller.log
kafka-authorizer.log
kafka-request.log
kafkaServer-gc.log.0.current
log-cleaner.log
server.log
state-change.log
zookeeper-gc.log.0.current





### Set Kafka to start on boot.

sudo systemctl enable kafka


Example:
franz@kafkaTest4:~/kafka$ sudo systemctl enable kafka

Created symlink from /etc/systemd/system/multi-user.target.wants/kafka.service to /etc/systemd/system/kafka.service.





### Service commands


We've finished the installation process.

The following systemd service commands should now work. They can be used by any user with sudo permissions.

Service commands:

sudo service kafka status
sudo service kafka start
sudo service kafka stop
sudo service kafka restart


sudo service zookeeper status
sudo service zookeeper start
sudo service zookeeper stop
sudo service zookeeper restart


Note: When both services are running, stopping Zookeeper will cause Kafka to stop. When both services are inactive, starting Kafka will start Zookeeper.




















HELLO WORLD TEST



Summary: Publish and consume a "Hello World" message to make sure the Kafka server is behaving correctly.


Publishing messages in Kafka requires:
- A producer, which can publish messages / data to topics.
- A consumer, which reads messages / data from topics.


Change directory to the kafka directory.
cd /home/franz/kafka


Create a topic named TestTopic with a single partition and one replica:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TestTopic


Example:
franz@kafkaTest4:~/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TestTopic

Created topic TestTopic.


Create a Kafka producer from the command line using the
kafka-console-producer.sh
script. It expects the Kafka server's hostname, port, and a topic name as arguments.

Publish the string "Hello, World" to the TestTopic topic.
echo "Hello, World" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null


Create a Kafka consumer using the
kafka-console-consumer.sh
script. It expects the ZooKeeper server's hostname and port and a Kafka topic name as arguments.

The following command consumes messages from TestTopic. Note the use of the
--from-beginning
flag, which allows the consumption of messages that were published before the consumer was started.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning


Example:
franz@kafkaTest4:~/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning

Hello, World


Press Ctrl-C to stop the consumer script.

Example:
franz@kafkaTest4:~/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning

Hello, World
^CProcessed a total of 1 messages


We have successfully a) published a message to a topic b) read a message from a topic.




















KAFKA COMMANDS



You don't have to log in as the
franz
user. These commands can be run by another user with sudo permissions.

Before running any of these commands, change directory to the kafka installation directory.

cd /home/franz/kafka





### Get Kafka version.


bin/kafka-topics.sh --version


Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-topics.sh --version

2.5.0 (Commit:66563e712b0b9f84)





### Get Zookeeper version.


I don't know of a command that shows the Zookeeper version, but you can look through the libs directory to see the version number.

franz@kafkaTest4:/home/franz/kafka$ ls libs | grep zookeeper

zookeeper-3.5.7.jar
zookeeper-jute-3.5.7.jar





### List topics.


bin/kafka-topics.sh --zookeeper localhost:2181 --list


Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

TestTopic
__consumer_offsets





### Create a topic.


Create a topic named TestTopic2 with a single partition and one replica:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TestTopic2


Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TestTopic2

Created topic TestTopic2.





### Delete a topic.


bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic TestTopic2


Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic TestTopic2

Topic TestTopic2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.





### Create a Kafka producer.


Create a Kafka producer from the command line using the
kafka-console-producer.sh
script. It expects the Kafka server's hostname, port, and a topic name as arguments.

Publish the string "Hello, World" to the TestTopic topic.
echo "Hello, World" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null





### Create a Kafka consumer.


Create a Kafka consumer using the
kafka-console-consumer.sh
script. It expects the ZooKeeper server's hostname and port and a Kafka topic name as arguments.

The following command consumes messages from TestTopic. Note the use of the
--from-beginning
flag, which allows the consumption of messages that were published before the consumer was started.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning


Example:
franz@kafkaTest4:~/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning

Hello, World


Press Ctrl-C to stop the consumer script.




### List consumer groups.


bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list


Example:
# Set a consumer running in another terminal.
franz@kafkaTest4:~/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning

Hello, World

# Then list consumer groups.
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

console-consumer-64265





### Describe consumer group.


bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <consumerGroupName>


Example:
franz@kafkaTest4:~/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-64265

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID console-consumer-64265 TestTopic 0 - 2 - consumer-console-consumer-64265-1-808d22d9-6462-4d59-9e44-3715786641e8 /127.0.0.1 consumer-console-consumer-64265-1





### Delete a consumer group.


bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group <consumerGroupName>


Note: Deleting the consumer group won't work if there is at least one remaining active consumer in that group.

Example:

franz@kafkaTest4:~/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-64265


Error: Deletion of some consumer groups failed:
* Group 'console-consumer-64265' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.


# Stop the consumer
console-consumer-64265
in the other terminal by pressing Ctrl-C.


# List consumer groups.
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

console-consumer-64265


# Delete the consumer group.
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-64265

Deletion of requested consumer groups ('console-consumer-64265') was successful.


# List consumer groups.
franz@kafkaTest4:~/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list





### Get the latest offset in a topic.


bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -1


Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -1

TestTopic:0:1


The latest offset is 1. Here, "0" is the partitionID.

Send a new message on the topic to see the offset increase.

Publish the string "Hello, Foo" to the TestTopic topic.

franz@kafkaTest4:/home/franz/kafka$ echo "Hello, Foo" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null


Get the new latest offset (which should be "2").

franz@kafkaTest4:/home/franz/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -1

TestTopic:0:2





### Get the earliest offset still in a topic.

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -2


Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -2

TestTopic:0:0


The earliest offset in the topic TestTopic is 0.




















EXPORT ALL MESSAGES FROM A TOPIC



We're going to export all the messages stored in Kafka for a specific topic to a file.


I've logged out from the user
franz
and am now the user
stjohn
.


Change to a work directory.

cd ~

mkdir export


Copy the standalone properties file from Kafka config directory.
cp /home/franz/kafka/config/connect-standalone.properties export/exportStandalone.properties


In the file
export/exportStandalone.properties
, comment out these lines by adding a hash sign '#' in front of them:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter


Add these lines below them:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter


This means that our export connector will be able to accept any message value, instead of only JSON values.

Create a file named exportFile.properties.

File location:
/home/stjohn/export/exportFile.properties


File content:
name=fileSink
connector.class=FileStreamSink
tasks.max=1
file=export/export.txt
topics=TestTopic


Export the topic "TestTopic" to the designated export file.
[this runs continously, picks up where it left off, appends to export file]
/home/franz/kafka/bin/connect-standalone.sh export/exportStandalone.properties export/exportFile.properties


There will be lots of output in the terminal.


Example:

# In a terminal, run the export command.


# In a second terminal:
stjohn@kafkaTest4:~/export$ cat export.txt

Hello, World
Hello, Foo


# In a third terminal, publish the string "Hello, Foo2" to the TestTopic topic.
stjohn@kafkaTest4:~$ echo "Hello, Foo2" | /home/franz/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null



# In the second terminal:
stjohn@kafkaTest4:~/export$ cat export.txt

Hello, World
Hello, Foo
Hello, Foo2


# In the original terminal, press Ctrl-C to stop the export connector.



Excellent. We've successfully exported all messages in a Kafka topic to a file.















That's the end of this project.















[start of notes]



The installation recipe is derived from a previous recipe:
www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-18-04
- Authors: Hathy A and bsder
- Updated: May 10, 2019

Other information sources:
- kafka.apache.org/quickstart
- ronnieroller.com/kafka/cheat-sheet


[end of notes]