Giới thiệu

Apache Kafka, hay gọi đơn giản Kafka là một hệ thống stream phân tán. Điều này có nghĩa là gì?

Một hệ thống stream có những đặc tính như sau:

  • Cơ chế publish/subscribe một stream messages. Cái này tương tự như là message queue.

  • Có khả năng lưu trữ stream record chịu lỗi trong một khoảng thời gian

  • Xử lý stream record theo thứ tự nó diễn ra

Kafka được dùng chính với mục đích:

  • Xây một hệ thống real-time data pipeline để lấy dữ liệu giữa các hệ thống hoặc ứng dụng

  • Xây dựng một hệ thống streaming real-time trao đổi/tương tác với dòng dữ liệu (stream of data)

Một vài khái niệm cơ bản:

  • Kafka chạy như 1 cluster trên 1 hoặc nhiều server, có thể phân bố ra nhiều data center

  • Dữ liệu lưu trữ trên Kafka gọi là record và phân chia theo topic

  • Mỗi record sẽ có key, valuetimestamp

Các APIs chính:

  • Producer API: có nhiệm vụ sinh ra record, hiểu như là nhà xuất bản

  • Consumer API: nhận record từ Producer, hiểu như là subscriber

  • Stream API: hiểu như là bộ xử lý cho stream. Nhận input từ stream, xử lý và output ra stream khác

  • Connector API: kết nối Kafka topics với ứng dụng hoặc cơ sở dữ liệu. Ví dụ: connector kết nối đến MySQL ghi bất kỳ thay đổi nào vào một bảng

Tìm hiểu thêm về Kafka trên trang chủ: https://kafka.apache.org/intro

Implement

Dưới đây demo cài đặt Kafka server trên MacOS với brew, cách tạo topic, chạy thử Producer, Consumer và implement một Consumer đơn giản bằng NodeJS.

Cài đặt Kafka với brew:

brew install kafka

Như hiện tại, lệnh này sẽ cài đặt cả KafkaZooKeeper.

Đường dẫn cài đặt:

  • Kafka: /usr/local/Cellar/kafka/2.0.0

  • Zookeeper: /usr/local/Cellar/zookeeper/3.4.13

Để chạy server:

Sửa file /usr/local/etc/kafka/server.properties, thêm dòng sau vào cuối:

port = 9092
advertised.host.name = localhost

Khởi động ZooKeeper:

zkServer start

Khởi động Kafka:

 /usr/local/Cellar/kafka/2.0.0/bin/kafka-server-start  /usr/local/etc/kafka/server.properties

Tạo thử topic

Với Kafka server đang chạy như trên, mở một terminal khác, chạy lệnh:

/usr/local/Cellar/kafka/2.0.0/bin/kafka-topics  --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic a_test_topic

Nếu không có lỗi gì, output sẽ là:

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "a_test_topic".

Như vậy thì topic đã được tạo thành công

Gửi message từ Producer

Với Kafka server đang khởi động như trên, chạy lệnh sau trên một terminal mới:

/usr/local/Cellar/kafka/2.0.0/bin/kafka-console-producer --broker-list localhost:9092 --topic a_test_topic

Khi này Producer sẽ khởi động và chúng ta có thể gõ bất kỳ message nào dưới dạng text.

Nhận message từ Comsumer

Với Kafka server & Producer đang chạy như trên, mở một terminal mới, chạy lệnh:

/usr/local/Cellar/kafka/2.0.0/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic a_test_topic --from-beginning

Lệnh này sẽ cho chạy Consumer, mỗi khi có message mới gõ từ terminal của Producer, message này cũng sẽ hiện ra ở terminal của Consumer.

Implement Consumer với NodeJS

Bản implement Consumer này dùng thư viện: https://github.com/SOHU-Co/kafka-node

Cài gói thư viện với lệnh:

npm init -y
npm install --save kafka-node

Tạo một file consumer.js với nội dung:

const kafka = require('kafka-node');

const { Consumer, Offset, Client } = kafka;

const topic = 'a_test_topic';

const client = new Client();

const topics = [{
    topic,
}];

const options = {
    autoCommit: false,
    fetchMaxWaitMs: 1000,
    fetchMaxBytes: 1024 * 1024
};

const consumer = new Consumer(client, topics, options);
const offset = new Offset(client);

consumer.on('message', (message) => {
    console.log(message);
});

consumer.on('error', (err) => {
    console.log('error', err);
});

/*
* If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset
*/
consumer.on('offsetOutOfRange', (_topic) => {
    const t = _topic;
    t.maxNum = 2;

    offset.fetch([topic], (err, offsets) => {
        if (err) {
            console.error(err);
            return;
        }
        const min = Math.min.apply(null, offsets[t.topic][t.partition]);
        consumer.setOffset(t.topic, t.partition, min);
    });
});

Chạy với lệnh:

node consumer.js

Mỗi khi có message mới từ Producer thì message này cũng hiện ra ở màn hình của NodeJS client. Ví dụ:

{ topic: 'a_test_topic',
  value: 'Hello world',
  offset: 1,
  partition: 0,
  highWaterOffset: 10,
  key: null }

Một vài tài liệu có thể tham khảo khi tìm hiểu về Kafka:

  • Sách tiếng Nhật:
[商品価格に関しましては、リンクが作成された時点と現時点で情報が変更されている場合がございます。]

Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) [ 株式会社NTTデータ ]
価格:3888円(税込、送料無料) (2018/11/7時点)