Apache Kafka & Memphis: Building a Real-Time Data Processing Pipelines
In this tutorial, we get hands on in building a real time data processing pipeline using Apache Kafka and Memphis.
Introduction
Real-time data has become the new powerful type of brand that sets businesses apart in competition when it is well tapped, processed and used. This is because when it is well used, carries the key to hacking decision-making and setting apart businesses in the competing market.
Apache Kafka and Memphis are two essential tools that are used to build and handle real-time data processing pipelines. They both major in processing real-time data at scale. By combining Apache Kafka and Memphis, you can build a powerful real-time data processing pipeline that can handle large volumes of data and scale horizontally as needed.
In this article, we will explore how using Apache Kafka and Memphis, you can build a powerful real-time data processing pipeline that is simple, scalable and efficient.
Why Apache Kafka?
Apache Kafka is an open-source distributed messaging system that allows the streaming of data between applications and services.
Kafka has a distributed architecture that allows it to handle large volumes of data and scale horizontally by adding more brokers to the cluster. Kafka uses a publish-subscribe model, where producers publish data to topics, and consumers subscribe to those topics to consume the data.
Kafka is widely used because it is fault-tolerant and highly available by replicating data across multiple brokers.
Why Memphis?
What is Memphis? Memphis is a real-time data processing platform that provides a simple and efficient way to process and analyze streaming data. It is designed to handle large volumes of data and can be easily scaled to handle more complex data processing pipelines.
With Memphis, you can be able to monitor and manage your data processing pipelines using a number of management tools and APIs provided.
Memphis uses a declarative query language called Memphis Query Language (MQL) to define real-time data processing pipelines. MQL queries can be used to filter, transform, and aggregate data in real time.
By combining Apache Kafka and Memphis, you will be able to build a powerful real-time data processing pipeline that can handle large volumes of data and scale horizontally as needed.
This is how to do it!!
Installing Apache Kafka
Before we install Apache Kafka locally, we need to make sure that we have Java installed on our system.
Run the following command to check if Java is installed:
# Checking the version of Java
java --version
If Java is not installed, use the following command to install Java:
# Installing Java
sudo apt install default-jdk
Then, download the latest Apache Kafka from the official download page using the following command:
# Downloading latest Apache Kafka
wget https://downloads.apache.org/kafka/3.2.3/kafka_2.12-3.2.3.tgz
After downloading the file, extract it using the tar command below and move the extracted directory to the /opt/kafka
.
# Unzip the downloaded file
unzip kafka_2.12-3.2.3.tgz
# Moving the extracted file to /opt/kafka directory
sudo mv kafka_2.12-3.2.3 /opt/kafka
Now, we'll create the systemd scripts for the Zookeeper and the Kafka services which help in starting and stopping the services.
I will use vim
as my editor( but you can use your preferred editor) to create the systemd scripts and paste the following contents.
sudo vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
After creating the systemd file for the zookeeper service, next, we will create the systemd file for Kafka and paste the following contents.
sudo vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Note: Make sure that you provide the right path to Java installed in your system.
To apply the changes that we have made, we will reload the systemd daemon using the following command:
sudo systemctl daemon-reload
Now, we are good to go!
Using the following commands, you can enable the zookeeper and Kafka, then start them.
# Enabling the zookeeper service
sudo systemctl enable zookeeper
# Starting the Zookeeper
sudo systemctl start zookeeper
# Enabling the kafka service
sudo systemctl enable kafka
# Starting kafka
sudo systemctl start kafka
Creating Kafka Topic
In Kafka, producers and consumers send and receive messages via the Apache Kafka topic.
We'll create the Kafka topic of the data we want to process. To create the topic, you can do it either through the Kafka UI tools or the command-line tools.
After starting both the Zookeeper and Kafka, we will now create the topic named sensors
from the command line using this command:
sudo /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic sensors
This will create a Kafka topic called "sensors" using the default Kafka configuration and a single broker running on localhost:9092.
Setting up Kafka Producer
We have created a topic, now we need to have a producer that sends messages to our Kafka topic.
You can create a Kafka topic using one of Kafka's client libraries, such as the Java client library as they are fast and easy to produce messages.
Here's an example of how to set up Kafka producer using the Java client library:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
import java.util.Random;
public class SensorDataProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "sensors";
Random rand = new Random();
for (int i = 0; i < 1000; i++) {
String sensorId = "sensor" + (rand.nextInt(10) + 1);
String sensorType = i % 2 == 0 ? "temperature" : "humidity";
String sensorData = "{\"sensor_id\": \"" + sensorId + "\", \"sensor_type\": \"" + sensorType + "\", \"temperature\": " + rand.nextInt(50) + ", \"humidity\": " + rand.nextInt(100) + "}";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, sensorData);
producer.send(record);
}
producer.close();
}
}
The above code sets up a Kafka producer that publishes data to the "sensors" topic. The producer generates 1000 random sensor data records with temperature and humidity values and publishes them to the Kafka topic.
Creating a Memphis stream
We have successfully set up a Kafka producer that publishes data on the "Sensors" topic. Now we need to set up a consumer which is the Memphis stream that will subscribe to our topic.
Memphis provides stations just like we have "Topics" for Kafka and "Queues" for RabbitMQ. These stations provide a powerful yet easy-to-use messaging queue for apps.
Installing Memphis
To install and access Memphis, there are two ways provided, either Via Kubernetes or Docker.
Memphis provides a native state-of-the-art GUI, hosted inside the broker, built to act as a management layer of all Memphis aspects, including cluster config, resources, data observability, notifications, processing, and more. Instead of endless amounts of producers, consumers, orchestrations, manual scaling, and scattered monitoring - one just needs to create a station.
Run the following command to install and access Memphis:
- Via Kubernetes
# To install using Helm for kubernetes
helm repo add memphis https://k8s.memphis.dev/charts/ --force-update && \
helm install my-memphis memphis/memphis --create-namespace --namespace memphis
# To access Memphis using UI/CLI/SDK from localhost, run the below commands:
- kubectl port-forward service/memphis-cluster 6666:6666 9000:9000 7770:7770 --namespace memphis > /dev/null &
# For interacting with the broker via HTTP:
- kubectl port-forward service/memphis-http-proxy 4444:4444 --namespace memphis > /dev/null &
Dashboard/CLI: http://localhost:9000
Broker: localhost:6666 (Client Connections)
HTTP proxy: localhost:4444 (Data + Mgmt)
- Via Docker
# Installing Memphis using docker-compose
curl -s https://memphisdev.github.io/memphis-docker/docker-compose.yml -o docker-compose.yml && \
docker compose -f docker-compose.yml -p memphis up
# To access
Dashboard/CLI: http://localhost:9000
Broker: localhost:6666
After the installation, you can access the UI through localhost:9000 and then create an account.
Now, we will need to create a "station" in Memphis and then connect it to Kafka.
First, make sure that the Memphis cluster is up and running, then click on the "Create New Station" button on the Memphis web UI, after which you will be prompted to enter the name of the station and the description.
There are two types of Memphis, that is the Memphis Cloud and the self-hosted Memphis and since we are using the self-hosted version, we'll be needed to add the input source using code.
We will write a script that will create a new Memphis client, connect to your stream, and send data.
import com.memphis.memphis_sdk.*;
public class MemphiaClientExample {
public static void main(String[] args) {
// create a new Memphia client
MemphiaClient client = new MemphiaClient("localhost", 9000);
// connect to your Memphis stream
Stream stream = client.connect("kafka-data");
// send data to your stream
String data = "{\"sensor_id\": \"sensor1\", \"temperature\": 23.4, \"humidity\": 50.0}";
stream.send(data);
// disconnect from your stream and close the client
stream.disconnect();
client.close();
}
}
Note: Remember to replace "localhost" and "9000" with the hostname and port of your Memphis instance, respectively, and "kafka-data" with the name of your Memphis stream.
After running the script above, it will start the station running and then it will consume messages from the Kafka topic that we specified and make them available for further processing within Memphis.
Up to now, we have created a station in Memphis and connected it to Kafka.
Processing the Data using Memphis Query
Memphis is not just for setting up data streams, you can also process this data using a Memphis query.
Memphis provide a way to write queries in MQL, which is a declarative language that allows you to define real-time data processing pipelines.
Using the data that we have consumed from the Kafka topic, here is an example of how to write an MQL query to calculate the average temperature from the sensor data:
SELECT avg(data.temperature) as avg_temperature
FROM sensor_data
WHERE data.sensor_type = 'temperature'
GROUP BY data.sensor_id
What this query does is filter the data by temperature sensors and then calculate the average temperature for each sensor, and output the result to a new stream.
This is just an example, but with Memphis, you can be able to perform complex processing and aggregations of your real-time data without worrying about anything.
Publishing the processed data to Kafka
We have created a new dataset but processing the data consumed from the Kafka topic.
We will publish this processed data to a new Kafka topic using a Kafka producer and then this data can now be consumed by other downstream applications or services e.g an analytics dashboard.
Below, we have the code of the Kafka producer that will publish the average temperature data to a new Kafka topic.
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProcessedDataProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "processed_sensor_data";
String queryResult = "{\"avg_temperature\": 24.5}";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, queryResult);
producer.send(record);
producer.close();
}
}
With this code, we set up a new Kafka producer that publishes the average temperature data to the "processed_sensor_data" topic.
Up to now, we have been successful in developing a full real-time processing data pipeline that collects data, processes it and then stores it. This is a basic pipeline and with this knowledge, you can scale it or even build a more sophisticated pipeline that will serve you a great deal.
Conclusion
Apache Kafka and Memphis are powerful tools for building real-time data processing pipelines. By combining Kafka's high-throughput, distributed data streaming with Memphis' real-time data processing and analytics capabilities, you can build scalable, fault-tolerant data pipelines that process and analyze data in real time. In this tutorial, we have discussed using examples how you can leverage the capabilities of Apache Kafka and Memphis to build a real time data processing pipeline which will give you maximum performance.
Together, these technologies can be used to build a wide range of real-time applications, such as real-time analytics, monitoring systems, and IoT applications. Whether you're building a simple data processing pipeline or a complex data streaming application, Apache Kafka and Memphis are powerful tools that can help you achieve your goals.