1. Tổng quan.
Hệ thống msg-driven (Message-Driven System) là một hệ thống được thiết kế để xử lý, quản lý và định tuyến tin nhắn giữa các dịch vụ và ứng dụng. Trong hệ thống msg-driven, các thành phần tương tác với nhau thông qua việc trao đổi tin nhắn, thay vì gọi trực tiếp đến nhau. Điều này giúp tăng cường khả năng mở rộng, độ tin cậy và linh hoạt của hệ thống.
RabbitMQ là một message broker phổ biến, nghĩa là nó chấp nhận và chuyển tiếp tin nhắn. Nó là một phần quan trọng của hệ thống msg-driven. RabbitMQ cho phép các ứng dụng, dịch vụ và hệ thống giao tiếp với nhau thông qua việc trao đổi tin nhắn.
RabbitMQ hỗ trợ nhiều giao thức messaging, bao gồm AMQP (Advanced Message Queuing Protocol). Nó cung cấp tính năng như hàng đợi tin nhắn, routing (định tuyến), reliability (độ tin cậy), và load balancing (cân bằng tải).
2. RabbitMQ thường sử dụng ở hệ thống như thế nào?
RabbitMQ thường được sử dụng trong các hệ thống phân tán và microservices, nơi mà các thành phần cần giao tiếp với nhau một cách linh hoạt và đáng tin cậy.
- Decoupling (Tách rời): RabbitMQ cho phép tách rời các thành phần của hệ thống, giúp chúng giao tiếp mà không cần biết chi tiết về nhau. Điều này giúp tăng cường khả năng mở rộng và bảo dưỡng hệ thống.
- Load Balancing (Cân bằng tải): RabbitMQ có thể phân phối tin nhắn đến nhiều consumers, giúp cân bằng tải và tăng hiệu suất hệ thống.
- Resiliency (Khả năng phục hồi): RabbitMQ giữ tin nhắn cho đến khi chúng được xử lý thành công, giúp hệ thống phục hồi từ lỗi mà không mất dữ liệu.
- Asynchronous Processing (Xử lý không đồng bộ): RabbitMQ cho phép hệ thống xử lý các tác vụ một cách không đồng bộ, giúp cải thiện hiệu suất và thời gian phản hồi.
- Event-Driven Architecture (Kiến trúc dựa trên sự kiện): RabbitMQ thường được sử dụng trong kiến trúc dựa trên sự kiện, nơi mà các thành phần của hệ thống phản ứng với các sự kiện thay vì gọi trực tiếp đến nhau.
Ví dụ như OpenStack là một dự án mã nguồn mở cung cấp một nền tảng để xây dựng và quản lý các dịch vụ đám mây. OpenStack bao gồm nhiều thành phần khác nhau, mỗi thành phần đảm nhận một chức năng cụ thể trong việc cung cấp dịch vụ đám mây.
RabbitMQ trong OpenStack đóng vai trò là message broker, giúp các thành phần trong OpenStack giao tiếp với nhau.
Sau đây là một số lý do tại sao OpenStack sử dụng RabbitMQ:
- Decoupling (Tách rời): RabbitMQ giúp tách rời các thành phần của OpenStack, cho phép chúng giao tiếp mà không cần biết chi tiết về nhau. Điều này giúp tăng cường khả năng mở rộng và bảo dưỡng hệ thống.
- Reliability (Độ tin cậy): RabbitMQ giữ tin nhắn cho đến khi chúng được xử lý thành công, giúp OpenStack phục hồi từ lỗi mà không mất dữ liệu.
- Scalability (Khả năng mở rộng): RabbitMQ hỗ trợ khả năng mở rộng, cho phép OpenStack xử lý lượng lớn các yêu cầu và tác vụ.
- Asynchronous Communication (Giao tiếp không đồng bộ): RabbitMQ cho phép các thành phần của OpenStack giao tiếp một cách không đồng bộ, giúp cải thiện hiệu suất và thời gian phản hồi.
- Multi-tenancy (Đa người dùng): RabbitMQ hỗ trợ đa người dùng, cho phép nhiều người dùng và dịch vụ sử dụng cùng một hệ thống RabbitMQ mà không ảnh hưởng đến nhau.
3. Thực hành thiết kế một hệ thống msg-driven.
Để thiết kế một hệ thống msg-driven bạn sẽ cần một số thành phần chính sau:
- Message Broker (RabbitMQ): Đây là trung tâm của hệ thống msg-driven. Nó sẽ nhận, lưu trữ và chuyển tiếp tin nhắn giữa các thành phần khác trong hệ thống.
- Producer: Đây là thành phần sẽ gửi tin nhắn đến RabbitMQ. Trong trường hợp của bạn, producer có thể là một dịch vụ hoặc ứng dụng mà bạn phát triển để thu thập số liệu từ ứng dụng nào đó và gửi chúng dưới dạng tin nhắn đến RabbitMQ.
- Consumer: Đây là thành phần sẽ nhận tin nhắn từ RabbitMQ. Trong trường hợp của bạn, consumer có thể là portal mà bạn cung cấp cho khách hàng. Nó sẽ nhận tin nhắn từ RabbitMQ, xử lý chúng và cập nhật số liệu cho khách hàng.
Dưới đây là một các bước đơn giản về cách sử dụng RabbitMQ để gửi và nhận một tin nhắn “Hello World” sử dụng Python và thư viện pika
.
Bước 1: Cài đặt và cấu hình RabbitMQ
Cài đặt RabbitMQ trên máy chủ của bạn. Sau khi cài đặt, bạn cần cấu hình RabbitMQ để tạo ra một exchange và một hàng đợi. Bạn cũng cần cấu hình routing để tin nhắn từ producer được chuyển đến hàng đợi phù hợp.
# Install RabbitMQ
sudo apt install rabbitmq-server -y
# Add a user for HoangHD
sudo rabbitmqctl add_user hoanghd RABBIT_PASS
# Set permissions for the HoangHD user
sudo rabbitmqctl set_permissions hoanghd ".*" ".*" ".*"
# Restart and enable the RabbitMQ service
sudo systemctl restart rabbitmq-server.service
sudo systemctl enable rabbitmq-server.service
# Check the status of the RabbitMQ service
sudo systemctl status rabbitmq-server.service | grep 'active'
Bước 2: Cài đặt thư viện pika
cho Python.
pika
là một thư viện Python dành cho RabbitMQ. Nó cho phép các ứng dụng Python tạo kết nối và tương tác với RabbitMQ, một message broker phổ biến.
Thư viện pika
hỗ trợ nhiều mô hình tương tác với RabbitMQ, bao gồm blocking connections (kết nối chặn), asynchronous connections (kết nối không đồng bộ) và adapters for select, poll, epoll, and kqueue.
Với pika
, bạn có thể gửi và nhận tin nhắn thông qua RabbitMQ, tạo và quản lý hàng đợi, exchanges, và bindings, và thực hiện nhiều tác vụ khác liên quan đến RabbitMQ.
pip install pika
Bước 3: Phát triển Producer.
Phát triển một ứng dụng hoặc dịch vụ để thu thập số liệu từ OpenStack Swift. Ứng dụng này cần có khả năng gửi tin nhắn đến RabbitMQ. Bạn có thể sử dụng thư viện pika
(đối với Python) hoặc thư viện tương tự để tương tác với RabbitMQ.
import pika
# Tạo thông tin xác thực
credentials = pika.PlainCredentials('hoanghd', 'RABBIT_PASS')
# Tạo kết nối đến RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
# Tạo một hàng đợi
channel.queue_declare(queue='hello')
# Gửi một tin nhắn
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
Bước 4: Phát triển Consumer
Phát triển portal của bạn để nhận tin nhắn từ RabbitMQ. Tương tự như producer, bạn có thể sử dụng thư viện pika
hoặc thư viện tương tự để tương tác với RabbitMQ.
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# Tạo thông tin xác thực
credentials = pika.PlainCredentials('hoanghd', 'RABBIT_PASS')
# Tạo kết nối đến RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Trong đoạn mã trên, bạn cần thay thế 'RABBIT_PASS'
bằng mật khẩu thực sự của bạn.
Bước 4: Chạy thử.
Các bước sau sẽ ví dụ cho việc sử dụng RabbitMQ để gửi và nhận tin nhắn.
Ví dụ về việc Producer chạy và Consumer sẽ nhận tin nhắn ngay lập tức.
Đầu tiên mình sẽ chạy 3 lần Producer trước để nó gửi tin nhắn “Hello World!” đến hàng đợi RabbitMQ. Bạn đã chạy lệnh này ba lần, vì vậy đã có ba tin nhắn được gửi.
shell> python3 producer.py
[x] Sent 'Hello World!'
shell> python3 producer.py
[x] Sent 'Hello World!'
shell> python3 producer.py
[x] Sent 'Hello World!'
Tiếp theo mình sẽ chạy Consumer, nó sẽ lắng nghe và nhận tin nhắn từ hàng đợi RabbitMQ. Bạn nhận được ba tin nhắn “Hello World!” từ hàng đợi, tương ứng với ba tin nhắn bạn đã gửi từ producer.py
.
shell> python3 consumer.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World!'
[x] Received b'Hello World!'
[x] Received b'Hello World!'
Liệt kê tất cả các hàng đợi trong RabbitMQ và số lượng tin nhắn trong mỗi hàng đợi. Output hello 0
cho thấy có một hàng đợi tên là “hello” và không còn tin nhắn nào trong hàng đợi này, vì tất cả đã được consumer.py
nhận.
shell> rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 0
Ví dụ về việc Producer chạy và Consumer tạm thời chưa nhận tin nhắn và sau đó mới nhận tin nhắn.
Gửi tin nhắn “Hello World!” đến hàng đợi RabbitMQ, chúng ta đã chạy lệnh này năm lần, vì vậy đã có năm tin nhắn được gửi.
shell> python3 producer.py
[x] Sent 'Hello World!'
shell> python3 producer.py
[x] Sent 'Hello World!'
shell> python3 producer.py
[x] Sent 'Hello World!'
shell> python3 producer.py
[x] Sent 'Hello World!'
shell> python3 producer.py
[x] Sent 'Hello World!'
Liệt kê tất cả các hàng đợi trong RabbitMQ và số lượng tin nhắn trong mỗi hàng đợi. Output hello 5
cho thấy có một hàng đợi tên là “hello” và có năm tin nhắn trong hàng đợi này, tương ứng với năm tin nhắn bạn đã gửi từ producer.py
.
shell> rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 5
Lắng nghe và nhận tin nhắn từ hàng đợi RabbitMQ. Bạn nhận được năm tin nhắn “Hello World!” từ hàng đợi, tương ứng với năm tin nhắn bạn đã gửi từ producer.py
ở trên.
shell> python3 consumer.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World!'
[x] Received b'Hello World!'
[x] Received b'Hello World!'
[x] Received b'Hello World!'
[x] Received b'Hello World!'
Chạy lại lệnh rabbitmqctl list_queues
này một lần nữa sau khi consumer.py
nhận tất cả các tin nhắn. Output hello 0
cho thấy không còn tin nhắn nào trong hàng đợi “hello”, vì tất cả đã được consumer.py
nhận.
shell> rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 0
Thử nghiệm hàng đợi với chương trình gửi tin nhắn đến Telegram.
Ví dụ này chúng ta sẽ sử dụng RabbitMQ để lưu trữ các tin nhắn cần gửi khi mạng bị rớt. Khi mạng hoạt động trở lại, chúng ta sẽ tiếp tục gửi các tin nhắn từ hàng đợi. Để làm điều này, chúng ta cần cài đặt thư viện pika
để tương tác với RabbitMQ từ Python.
Dưới đây là mã Python bình thường mình sử dụng, nếu chỉ sử dụng mã này thì bạn đã có thể gửi tin nhắn đến Telegram được rồi nhưng trong trường hợp rớt mạng, tin nhắn không được gửi và nó sẽ timeout và mất tin nhắn đó.
import requests
from datetime import datetime
TELEGRAM_CHAT_ID = '-863816906'
TELEGRAM_BOT_ID = '5536432897:AAEsq6I7lfiZdO6mRYDX7lWgU5-bamih-MI'
message = 'Hello World!!! - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def enviroment(bot_id):
URL = 'https://api.telegram.org/bot{}/'.format(bot_id)
return URL
def send_message(message,chat_id,bot_id):
URL = enviroment(bot_id)
url = URL + 'sendMessage?parse_mode=Markdown&chat_id={}&text={}'.format(chat_id,message)
requests.get(url)
send_message(message, TELEGRAM_CHAT_ID, TELEGRAM_BOT_ID)
Và đoạn mã dưới đây chúng ta sẽ sử dụng RabbitMQ để lưu trữ các tin nhắn cần gửi khi mạng bị rớt. Khi mạng hoạt động trở lại, chúng ta sẽ tiếp tục gửi các tin nhắn từ hàng đợi
import requests
import pika
import json
from datetime import datetime
TELEGRAM_CHAT_ID = '-863816906'
TELEGRAM_BOT_ID = '5536432897:AAEsq6I7lfiZdO6mRYDX7lWgU5-bamih-MI'
RABBITMQ_HOST = 'localhost'
RABBITMQ_QUEUE = 'telegram_messages'
RABBITMQ_USERNAME = 'hoanghd'
RABBITMQ_PASSWORD = 'RABBIT_PASS'
message = 'Hello World!!! - {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def enviroment(bot_id):
URL = 'https://api.telegram.org/bot{}/'.format(bot_id)
return URL
def send_message(message, chat_id, bot_id):
URL = enviroment(bot_id)
url = URL + 'sendMessage?parse_mode=Markdown&chat_id={}&text={}'.format(chat_id, message)
try:
requests.get(url)
except requests.exceptions.RequestException:
# Network error, add message to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)))
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_QUEUE)
channel.basic_publish(exchange='', routing_key=RABBITMQ_QUEUE, body=json.dumps({'message': message, 'chat_id': chat_id, 'bot_id': bot_id}))
connection.close()
def process_messages():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)))
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_QUEUE)
def callback(ch, method, properties, body):
message_data = json.loads(body)
try:
send_message(message_data['message'], message_data['chat_id'], message_data['bot_id'])
ch.basic_ack(delivery_tag=method.delivery_tag) # Acknowledge message success
except requests.exceptions.RequestException:
pass # Network error, message will be requeued
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=RABBITMQ_QUEUE, on_message_callback=callback)
channel.start_consuming()
# Send a message
send_message(message, TELEGRAM_CHAT_ID, TELEGRAM_BOT_ID)
# Process messages from the queue
process_messages()
Mình đã cố ý gửi 3 tin nhắn Telegram trong điều kiện không có mạng. Bạn sẽ thấy chương trình tạm thời dừng lại và sẽ không có thông báo gì vì nó đang chờ có mạng để xử lý tin nhắn.
shell> python3 send-telegram.py
< dòng trống >
Sử dụng lệnh watch rabbitmqctl list_queues
để xem thông tin hàng đợi theo thời gian thực.
Every 2.0s: rabbitmqctl list_queues controller: Fri Mar 15 14:35:22 2024
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 0
telegram_messages 3
Và bây giờ mình sẽ cấp mạng cho máy chủ và chạy lại python3 send-telegram.py
3 tin nhắn đang chờ sẽ được gửi đi và thêm 1 tin nhắn mới do mình vừa chạy lại file send-telegram.py
nó sẽ sinh ra 1 tin nhắn nữa. Như vậy chúng ta có tổng là ba 4 tin nhắn.
Sau khi xử lý thành công, bạn sẽ thấy hàng đợi từ 3 trở về 0.
Every 2.0s: rabbitmqctl list_queues controller: Fri Mar 15 14:37:08 2024
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 0
telegram_messages 0
Kết quả mình nhận được 4 tin nhắn trên Telegram.
4. Kết luận.
Dựa trên các ví dụ bạn đã cung cấp, có thể rút ra một số kết luận sau về cách hoạt động của RabbitMQ và các script Python của bạn:
- Producer: Script
producer.py
hoạt động như một producer, gửi tin nhắn “Hello World!” đến hàng đợi RabbitMQ. Số lượng tin nhắn gửi đi tương ứng với số lần bạn chạy script. - Message Queue: RabbitMQ lưu trữ các tin nhắn trong hàng đợi cho đến khi chúng được xử lý. Số lượng tin nhắn trong hàng đợi tương ứng với số lượng tin nhắn đã được gửi nhưng chưa được nhận.
- Consumer: Script
consumer.py
hoạt động như một consumer, nhận tin nhắn từ hàng đợi RabbitMQ. Số lượng tin nhắn nhận được tương ứng với số lượng tin nhắn trong hàng đợi. - Message Consumption: Khi một tin nhắn được consumer nhận, nó sẽ bị xóa khỏi hàng đợi. Điều này giải thích tại sao số lượng tin nhắn trong hàng đợi là 0 sau khi tất cả các tin nhắn đã được nhận.
Những kết luận này cho thấy cách RabbitMQ hoạt động như một hệ thống message-driven, cho phép các ứng dụng giao tiếp với nhau thông qua việc trao đổi tin nhắn.