Saturday, January 18, 2025

Xử lý Log với Golang, Kafka và ELK Stack

-

1. Tổng quan.

Bài viết này sẽ hướng dẫn bạn thiết lập một hệ thống xử lý log đơn giản sử dụng Golang và ELK Stack (Elasticsearch, Logstash, Kibana). Hệ thống này sẽ thu thập các log từ file /var/log/auth.log và gửi chúng đến Elasticsearch để lưu trữ và phân tích trong Kibana.

Để giúp bạn có một cái nhìn tổng quan về crawler, mình sẽ cung cấp một cấu trúc chi tiết và một số gợi ý nội dung cho từng phần.

2. Tổng quan về các Công nghệ.

  • Golang: Ngôn ngữ lập trình Golang (Go) nổi tiếng với tính đơn giản, tốc độ và khả năng đồng thời. Trong bài này, chúng ta sẽ sử dụng Go để viết một chương trình thu thập log và gửi chúng đến Apache Kafka.
  • Apache Kafka: Hệ thống messaging phân tán (distributed messaging system) được sử dụng để truyền các luồng dữ liệu theo thời gian thực (real-time). Trong bài này, Kafka đóng vai trò trung gian truyền tải các log được thu thập đến Logstash.
  • ELK Stack: Bộ công cụ mã nguồn mở phổ biến để thu thập, lưu trữ, phân tích và visualize dữ liệu log.
    • Elasticsearch: Là nơi lưu và phân tích log (search & analytics engine) được sử dụng để lưu trữ các log đã được xử lý.
    • Logstash: Công cụ xử lý và chuyển đổi dữ liệu theo thời gian thực, giúp định dạng lại các log trước khi đưa vào Elasticsearch.
    • Kibana: Giao diện web để visualize và tương tác với dữ liệu được lưu trữ trong Elasticsearch.

3. So sánh với các giải pháp khác.

Nhiều giải pháp khác có thể được sử dụng để thu thập và xử lý log. Một số lựa chọn phổ biến:

  • Log agents: Các agent như Filebeat có thể tự động thu thập log từ các nguồn khác nhau và gửi chúng đến các hệ thống trung gian.
  • Syslog servers: Syslog là một giao thức chuẩn để gửi log từ các hệ thống khác nhau đến một máy chủ tập trung.

4. Ví dụ về thu thập Log với Golang.

Mình sẽ thiết kế file auth.collector.go như là một chương trình Golang đơn giản dùng để thu thập các log từ file /var/log/auth.log và gửi chúng đến Kafka với sơ đồ như sau:

Để hiểu rõ hơn, chúng ta cùng phân tích từng phần:

  • Producer:
    • Golang: Đây là một ngôn ngữ lập trình được sử dụng để viết ứng dụng tạo ra các log xác thực. Có thể hiểu đây là các chương trình đang chạy trên các server, khi có sự kiện xác thực xảy ra (ví dụ: người dùng đăng nhập, đăng xuất) thì chúng sẽ tạo ra một dòng log và gửi vào hệ thống.
    • Topic golang-auth-logs: Đây là một chủ đề (topic) trong hệ thống Kafka, dùng để phân loại các log xác thực. Tức là tất cả các log liên quan đến xác thực sẽ được gửi vào topic này.
  • Kafka:
    • Đây là một hệ thống truyền thông tin theo dạng chủ đề (topic), được sử dụng để truyền tải các log với tốc độ cao và độ tin cậy cao.
    • Trong sơ đồ này, Kafka đóng vai trò là một trung gian, nhận các log từ các ứng dụng Golang và phân phối chúng đến các consumer (người tiêu dùng).
  • Consumer:
    • Topic golang-auth-logs: Tương tự như producer, consumer cũng lắng nghe trên topic này để nhận các log.
    • ELK Stack: Đây là một bộ công cụ gồm Elasticsearch, Logstash và Kibana, được sử dụng để xử lý và trực quan hóa dữ liệu log.
      • Logstash: Công cụ này sẽ nhận các log từ Kafka, tiến hành xử lý, lọc và chuyển đổi dữ liệu thành một định dạng phù hợp để lưu trữ vào Elasticsearch.
      • Elasticsearch: Đây là một cơ sở dữ liệu NoSQL, được sử dụng để lưu trữ các log đã được xử lý. Elasticsearch cho phép tìm kiếm, phân tích dữ liệu một cách nhanh chóng và hiệu quả.
      • Kibana: Công cụ này cung cấp một giao diện web để trực quan hóa dữ liệu trong Elasticsearch. Bạn có thể tạo các dashboard, biểu đồ, bảng thống kê để theo dõi, phân tích các log xác thực.
  • Crawler:
    • Đây là một chương trình có nhiệm vụ thu thập các log từ các server. Trong trường hợp này, crawler sẽ định kỳ kiểm tra file /var/log/auth.log trên các server và đưa nội dung của file này vào Kafka.
  • Server Farm:
    • Đây là tập hợp các máy chủ, mỗi máy chủ đều chạy các ứng dụng Golang và tạo ra các log xác thực.

Quá trình hoạt động:

  • Các ứng dụng Golang chạy trên các server tạo ra các log xác thực và gửi vào topic golang-auth-logs trên Kafka.
  • Crawler thu thập các log từ file /var/log/auth.log trên các server và gửi vào Kafka.
  • Logstash nhận các log từ Kafka, xử lý và lưu vào Elasticsearch.
  • Kibana sử dụng dữ liệu trong Elasticsearch để tạo các báo cáo, biểu đồ, giúp người dùng dễ dàng theo dõi và phân tích các log xác thực.

Tại sao lại sử dụng hệ thống này?

  • Thu thập dữ liệu tập trung: Tất cả các log xác thực từ các server đều được tập trung vào một nơi, giúp dễ dàng quản lý và phân tích.
  • Xử lý dữ liệu hiệu quả: Kafka và ELK Stack giúp xử lý một lượng lớn dữ liệu log một cách nhanh chóng và hiệu quả.
  • Trực quan hóa dữ liệu: Kibana giúp người dùng dễ dàng hiểu và phân tích dữ liệu, phát hiện các vấn đề tiềm ẩn.
  • Tích hợp với các hệ thống khác: Hệ thống này có thể dễ dàng tích hợp với các hệ thống giám sát, báo cáo khác.

Hệ thống này có thể được sử dụng để:

  • Giám sát hoạt động của hệ thống: Theo dõi các sự kiện đăng nhập, đăng xuất, các lỗi xác thực.
  • Phát hiện các cuộc tấn công: Phân tích các log để tìm kiếm các hoạt động bất thường.
  • Phân tích hành vi người dùng: Hiểu rõ hơn về cách người dùng tương tác với hệ thống.

Quy trình thực hiện bài LAB.

Bước 1: Cài đặt Docker và Docker Compose.

Đảm bảo Docker và Docker Compose đã được cài đặt trên tất cả các server.

Bước 2: Cấu hình Kafka trên Server 192.168.100.253

Tạo thư mục lưu trữ dự án.

mkdir -p /home/crawler

Tạo file docker-compose-kafka.yml trên server 192.168.100.253với nội dung sau:

cat > /home/crawler/docker-compose-kafka.yml << 'OEF'
version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.100.253
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
OEF

Chạy Kafka và Zookeeper bằng lệnh:

docker-compose -f /home/crawler/docker-compose-kafka.yml up -d

Bước 3: Cấu hình ELK Stack trên 192.168.100.253.

Tạo file docker-compose-elk.yml trên server 192.168.100.253:

cat > /home/crawler/docker-compose-elk.yml << 'OEF'
version: '3'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3
    ports:
      - "9200:9200"
    environment:
      - discovery.type=single-node
  logstash:
    image: docker.elastic.co/logstash/logstash:7.9.3
    ports:
      - "5000:5000"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    depends_on:
      - elasticsearch
  kibana:
    image: docker.elastic.co/kibana/kibana:7.9.3
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch
OEF

Tạo file cấu hình logstash.conf để nhận log từ Kafka và đẩy vào Elasticsearch:

cat > /home/crawler/logstash.conf << 'OEF'
input {
  kafka {
    bootstrap_servers => "192.168.100.253:9092"
    topics => ["golang-auth-logs"]
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "golang-auth-logs-%{+YYYY.MM.dd}"
  }
}
OEF

Chạy ELK Stack bằng lệnh:

docker-compose -f /home/crawler/docker-compose-elk.yml up -d

Bằng cách thực hiện các bước trên, bạn đã thiết lập một môi trường để thu thập log từ /var/log/auth.log trên server 192.168.100.253 và 10.237.7.72 và đẩy log về Kafka tại 192.168.100.253 và sau đó xử lý và lưu trữ log trong bộ ELK cũng tại 192.168.100.253 sử dụng Docker Compose.

Bước 4: Cài đặt Golang.

Cài đặt Golang theo cú pháp.

sudo apt update
sudo apt install golang-go

Khởi tạo module Go.

Nếu dự án của bạn chưa có file go.mod, bạn cần khởi tạo module Go. Hãy di chuyển vào thư mục dự án của bạn.

cd /home/crawler

Bước này tạo một file go.mod trong thư mục dự án của bạn, file này theo dõi các phần phụ thuộc của dự án theo cú pháp go mod init. Ví dụ.

go mod init auth.collector

Ví dụ.

shell> go mod init auth.collector
go: creating new go.mod: module auth.collector

Thêm Gói Kafka.

Sau khi khởi tạo các module Go, hãy thêm github.com/segmentio/kafka-go làm phần phụ thuộc bằng cách chạy:

go get github.com/segmentio/kafka-go

Ví dụ.

shell> go get github.com/segmentio/kafka-go
go: added github.com/klauspost/compress v1.15.9
go: added github.com/pierrec/lz4/v4 v4.1.15
go: added github.com/segmentio/kafka-go v0.4.47

Lệnh này sẽ tải xuống gói Kafka và bất kỳ phần phụ thuộc nào của nó, cập nhật go.mod của bạn và tạo file go.sum chứa các giá trị hashes của các module đã tải xuống.

Bước 5: Tạo file dự án vi auth.collector.go.

cat > /home/crawler/auth.collector.go << 'OEF'
package main

// apt install golang-go
import (
	"bufio"
	"context"
	"fmt"
	"os"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	// Địa chỉ của Kafka broker và topic
	brokerAddress := "192.168.100.253:9092"
	topic := "golang-auth-logs"

	// Mở file log
	file, err := os.Open("/var/log/auth.log")
	if err != nil {
		panic(err)
	}
	defer file.Close()

	// Tạo một Kafka writer
	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{brokerAddress},
		Topic:    topic,
		Balancer: &kafka.LeastBytes{},
	})
	defer writer.Close()

	// Đọc file và gửi mỗi dòng log đến Kafka
	scanner := bufio.NewScanner(file)
	for scanner.Scan() {
		line := scanner.Text()
		err := writer.WriteMessages(context.Background(),
			kafka.Message{
				Key:   []byte(fmt.Sprintf("golang-auth-logs-%d", time.Now().Unix())),
				Value: []byte(line),
			},
		)
		if err != nil {
			fmt.Println("Failed to write messages:", err)
			continue
		}
		fmt.Println("Sent log:", line)
	}

	if err := scanner.Err(); err != nil {
		fmt.Println("Error reading log file:", err)
	}
}
OEF

Phân quyền cho file /var/log/auth.log.

chmod -R 775 /var/log/auth.log

Xác minh cài đặt Golang, các module liên quan và file dự án đã tạo thành công.

Đảm bảo rằng file go.mod hiện bao gồm câu lệnh yêu cầu cho github.com/segmentio/kafka-go và file go.sum tồn tại.

Đây là kết quá cuối cùng khi bạn khởi tạo xong các file liên quan đến dự án.

./
├── auth.collector.go
├── docker-compose-elk.yml
├── docker-compose-filebeat.yml
├── docker-compose-kafka.yml
├── go.mod
├── go.sum
└── logstash.conf

0 directories, 8 files

Bước 5: Bây giờ, hãy thử chạy lại dự án của bạn với lệnh go run <tên file dự án>.

go run auth.collector.go

Đầu ra nếu thành công sẽ như dưới.

shell> go run auth.collector.go 
Sent log: Jul 14 21:17:01 netbox-server CRON[200998]: pam_unix(cron:session): session closed for user root
Sent log: Jul 14 21:33:43 netbox-server sshd[201058]: Accepted publickey for root from 192.168.100.100 port 53146 ssh2: RSA SHA256:G5JQ7SbNjxvdSMgYO8czoIBDPfuF+tzqpIxjCwoPubA
Sent log: Jul 14 21:33:43 netbox-server sshd[201058]: pam_unix(sshd:session): session opened for user root(uid=0) by (uid=0)
Sent log: Jul 14 21:33:43 netbox-server systemd-logind[824]: New session 292 of user root.
Sent log: Jul 14 21:34:08 netbox-server sshd[201058]: Received disconnect from 192.168.100.100 port 53146:11: disconnected by user
Sent log: Jul 14 21:34:08 netbox-server sshd[201058]: Disconnected from user root 192.168.100.100 port 53146
Sent log: Jul 14 21:34:08 netbox-server sshd[201058]: pam_unix(sshd:session): session closed for user root
Sent log: Jul 14 21:34:08 netbox-server systemd-logind[824]: Session 292 logged out. Waiting for processes to exit.
Sent log: Jul 14 21:34:08 netbox-server systemd-logind[824]: Removed session 292.
Sent log: Jul 14 22:17:01 netbox-server CRON[201291]: pam_unix(cron:session): session opened for user root(uid=0) by (uid=0)
Sent log: Jul 14 22:17:01 netbox-server CRON[201291]: pam_unix(cron:session): session closed for user root
Sent log: Jul 14 23:17:01 netbox-server CRON[201501]: pam_unix(cron:session): session opened for user root(uid=0) by (uid=0)
Sent log: Jul 14 23:17:01 netbox-server CRON[201501]: pam_unix(cron:session): session closed for user root
Sent log: Jul 14 23:51:27 netbox-server sshd[201624]: Accepted publickey for root from 192.168.100.101 port 57704 ssh2: RSA SHA256:G5JQ7SbNjxvdSMgYO8czoIBDPfuF+tzqpIxjCwoPubA
Sent log: Jul 14 23:51:27 netbox-server sshd[201624]: pam_unix(sshd:session): session opened for user root(uid=0) by (uid=0)
Sent log: Jul 14 23:51:27 netbox-server systemd-logind[824]: New session 295 of user root.
Sent log: Jul 15 00:03:00 netbox-server sshd[201624]: Received disconnect from 192.168.100.101 port 57704:11: disconnected by user
Sent log: Jul 15 00:03:00 netbox-server sshd[201624]: Disconnected from user root 192.168.100.101 port 57704
Sent log: Jul 15 00:03:00 netbox-server sshd[201624]: pam_unix(sshd:session): session closed for user root
Sent log: Jul 15 00:03:00 netbox-server systemd-logind[824]: Session 295 logged out. Waiting for processes to exit.
Sent log: Jul 15 00:17:01 netbox-server CRON[202308]: pam_unix(cron:session): session opened for user root(uid=0) by (uid=0)

Nếu khi bạn chạy dự án mà gặp lỗi "Failed to write messages: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker", nó thường xuất hiện khi bạn cố gắng gửi tin nhắn đến một topic hoặc partition không tồn tại trên broker Kafka của bạn.

shell> go run auth.collector.go
Failed to write messages: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
Failed to write messages: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
Failed to write messages: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
Failed to write messages: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
Failed to write messages: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker

Mặc định, Kafka có thể được cấu hình để tự động tạo topic khi một producer hoặc consumer cố gắng giao tiếp với một topic chưa tồn tại. Tuy nhiên, tính năng này có thể được bật hoặc tắt thông qua cấu hình auto.create.topics.enable trên broker.

  • Nếu auto.create.topics.enable được thiết lập thành true, Kafka sẽ tự động tạo topic mới khi nhận được yêu cầu từ producer hoặc consumer cho một topic chưa tồn tại. Các tham số mặc định cho topic mới, như số lượng partitions và replication factor, sẽ được lấy từ cấu hình mặc định của broker.
  • Nếu auto.create.topics.enable được thiết lập thành false, Kafka sẽ không tự động tạo topic mới và sẽ trả về lỗi khi có yêu cầu giao tiếp với một topic không tồn tại.

Tìm file server.properties trên hệ thống của bạn, bạn có thể sử dụng lệnh find từ thư mục gốc:

shell> find / -type f -name "server.properties"
/opt/kafka_2.13-2.8.1/config/kraft/server.properties
/opt/kafka_2.13-2.8.1/config/server.properties

Trong Kafka, có thể có nhiều file cấu hình cho các mục đích khác nhau

  • Nếu bạn đang sử dụng Zookeeper để quản lý cluster Kafka của mình, bạn nên sử dụng file /opt/kafka_2.13-2.8.1/config/server.properties.
  • Nếu bạn muốn sử dụng chế độ KRaft (không sử dụng Zookeeper) và đang thử nghiệm với tính năng mới này, bạn sẽ sử dụng file /opt/kafka_2.13-2.8.1/config/kraft/server.properties.

Để kiểm tra hoặc thay đổi cài đặt này, bạn cần truy cập vào file cấu hình của Kafka broker (thường là server.properties) và tìm kiếm hoặc thêm dòng sau:

auto.create.topics.enable=true

Sau khi thay đổi cấu hình, bạn cần khởi động lại Kafka broker để áp dụng thay đổi.

Việc tự động tạo topic có thể tiện lợi trong môi trường phát triển, nhưng trong môi trường prod, việc kiểm soát chặt chẽ các topic thông qua việc tạo topic một cách có chủ ý thường được ưu tiên để tránh tạo ra các topic không mong muốn hoặc lạm dụng tài nguyên. Vậy để tạo topic theo cú pháp kafka-topics --create --topic your-topic-name --bootstrap-server your-broker-address:port --replication-factor 1 --partitions 1.

Dưới đây là ví dụ quy trình tự tạo topic.

shell> docker ps
CONTAINER ID   IMAGE                                                 COMMAND                  CREATED         STATUS         PORTS                                                                   NAMES
d024dd1aefb8   docker.elastic.co/logstash/logstash:7.9.3             "/usr/local/bin/dock…"   6 minutes ago   Up 6 minutes   5044/tcp, 0.0.0.0:5000->5000/tcp, :::5000->5000/tcp, 9600/tcp           crawler-logstash-1
21051b14308c   docker.elastic.co/kibana/kibana:7.9.3                 "/usr/local/bin/dumb…"   6 minutes ago   Up 6 minutes   0.0.0.0:5601->5601/tcp, :::5601->5601/tcp                               crawler-kibana-1
09f21c9ffa42   docker.elastic.co/elasticsearch/elasticsearch:7.9.3   "/tini -- /usr/local…"   6 minutes ago   Up 6 minutes   0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 9300/tcp                     crawler-elasticsearch-1
8f043853b21d   wurstmeister/zookeeper                                "/bin/sh -c '/usr/sb…"   7 minutes ago   Up 7 minutes   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp   crawler-zookeeper-1
6ce72471e149   wurstmeister/kafka                                    "start-kafka.sh"         7 minutes ago   Up 7 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                               crawler-kafka-1

Hãy vào container Kafka.

docker exec -it crawler-kafka-1 bash

Tiếp theo vào thư mục quản lý các file script chạy cho Kafka.

cd /opt/kafka/bin

Và chạy lệnh tạo topic nhé.

shell> ./kafka-topics.sh --create --topic golang-auth-logs --bootstrap-server 192.168.100.253:9092 --replication-factor 1 --partitions 1
Created topic golang-auth-logs.

Bạn có thể sử dụng nhanh lệnh dưới để tạo topic cũng được.

shell> docker exec crawler-kafka-1 bash -c 'cd /opt/kafka/bin && ./kafka-topics.sh --create --topic golang-auth-logs --bootstrap-server 192.168.100.253:9092 --replication-factor 1 --partitions 1'
Created topic golang-auth-logs.

Bạn có thể list các topic để xác minh topic mới đã tạo thành công.

shell> kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
golang-auth-logs

Hoặc.

shell> docker exec crawler-kafka-1 bash -c 'kafka-topics.sh --list --bootstrap-server localhost:9092'
__consumer_offsets
golang-auth-logs

Sau khi xử lý xong phần tạo topic thì chạy lại dự án của bạn.

shell> go run auth.collector.go
Sent log: Jul 14 00:17:01 netbox-server CRON[195687]: pam_unix(cron:session): session opened for user root(uid=0) by (uid=0)
Sent log: Jul 14 00:17:01 netbox-server CRON[195687]: pam_unix(cron:session): session closed for user root
Sent log: Jul 14 00:24:01 netbox-server CRON[195714]: pam_unix(cron:session): session opened for user root(uid=0) by (uid=0)
Sent log: Jul 14 00:24:01 netbox-server CRON[195714]: pam_unix(cron:session): session closed for user root
Sent log: Jul 14 01:17:01 netbox-server CRON[196521]: pam_unix(cron:session): session opened for user root(uid=0) by (uid=0)

Bước 6: Xác mình log đã được đẩy vào ELK thành công trên Kibana.

Giờ bạn truy cập vào Kibana bằng url http://192.168.100.253:5601, vào Stack Management

Tạo Create index pattern.

Kết quả trên Index Management.

Kết quả trên Discover.

5. Clean Lab.

Để xóa cả container, volume liên quan và network trong Docker, bạn cần sử dụng các lệnh khác nhau như dưới.

docker-compose -f /home/crawler/docker-compose-kafka.yml down
docker-compose -f /home/crawler/docker-compose-elk.yml down
docker-compose -f /home/crawler/docker-compose-kafka.yml down
docker volume prune
rm -rf /home/crawler

6. Tổng kết.

Bài viết này đã giới thiệu cách sử dụng Golang và ELK Stack để xây dựng một hệ thống xử lý log đơn giản. Ưu điểm của giải pháp này là sự linh hoạt và khả năng tùy chỉnh cao. Bạn có thể mở rộng hệ thống này bằng cách thêm các tính năng xử lý log phức tạp hơn và tích hợp với các nguồn log khác.

LEAVE A REPLY

Please enter your comment!
Please enter your name here

4,956FansLike
256FollowersFollow
223SubscribersSubscribe
spot_img

Related Stories