Saturday, January 18, 2025

Sử dụng Python để đẩy dữ liệu vào InfluxDB

-

1. Tổng quan.

Việc sử dụng script để đưa dữ liệu vào InfluxDB mang lại nhiều lợi ích:

  • Tự động hóa: Script cho phép bạn tự động hóa quá trình thu thập và ghi dữ liệu, giảm thiểu công việc thủ công và tiết kiệm thời gian.
  • Độ chính xác: Script giảm thiểu lỗi do con người và đảm bảo độ chính xác của dữ liệu.
  • Lập lịch: Bạn có thể lập lịch script để chạy vào thời điểm cố định, giúp thu thập dữ liệu đều đặn và liên tục.
  • Khả năng mở rộng: Script có thể dễ dàng mở rộng để thu thập dữ liệu từ nhiều nguồn khác nhau hoặc ghi dữ liệu vào nhiều bucket khác nhau.
  • Tùy chỉnh: Script có thể được tùy chỉnh để xử lý các tình huống cụ thể, chẳng hạn như xử lý lỗi, thử lại khi ghi không thành công, hoặc biến đổi dữ liệu trước khi ghi.
  • Tiết kiệm tài nguyên: Khi dữ liệu được gửi vào InfluxDB thông qua script, bạn có thể tận dụng các tính năng như batch writing, giúp giảm bớt tải lên hệ thống và tối ưu hóa hiệu suất.

Trong bài viết này mình sẽ chia thành 2 phần chính.

  • Phần đầu tiên cài đặt thư viện influxdb-client và sau đó sử dụng nó để ghi dữ liệu vào một InfluxDB instance. Dữ liệu được ghi dưới dạng chuỗi, tuân theo InfluxDB Line Protocol. Thông tin về InfluxDB instance (như URL, token, organization và bucket) được cung cấp trực tiếp trong mã.
  • Phần thứ hai của mã định nghĩa một hàm icmp_status để kiểm tra tình trạng ICMP của một địa chỉ IP mục tiêu. Hàm này sử dụng lệnh ping và sau đó phân tích kết quả để xác định tình trạng ICMP. Nếu tất cả các gói tin ICMP đều được nhận (tức là không có gói tin nào bị mất), hàm sẽ trả về 0. Ngược lại, nếu có ít nhất một gói tin bị mất, hàm sẽ trả về 1. Kết quả này sẽ đẩy về lưu tại InfluxDB.

2. Cài đặt thư viện influxdb-client-python.

Đẩy dữ liệu vào InfluxDB bằng Python, bạn có thể sử dụng thư viện influxdb-client-python từ InfluxData. Dưới đây là một ví dụ về cách làm điều này:

Đầu tiên, cài đặt thư viện bằng pip:

apt install python3-pip
pip3 install influxdb-client

Sau đó, bạn có thể sử dụng đoạn code sau để ghi dữ liệu vào InfluxDB:

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

# You can generate a Token from the "Tokens Tab" in the UI
token = "Your-Generated-Token"
org = "your-org"
bucket = "your-bucket"

client = InfluxDBClient(url="http://localhost:8086", token=token)

write_api = client.write_api(write_options=SYNCHRONOUS)

data = "mem,host=host1 used_percent=23.43234543"
write_api.write(bucket, org, data)

client.close()

Trong đoạn code trên, data là một chuỗi đại diện cho dữ liệu mà bạn muốn ghi vào InfluxDB. Định dạng của chuỗi này phải tuân theo định dạng InfluxDB Line Protocol.

Lưu ý: Thay thế "Your-Generated-Token""your-org" và "your-bucket" bằng token, tổ chức và bucket tương ứng của bạn trong InfluxDB.

Ví dụ.

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

# You can generate a Token from the "Tokens Tab" in the UI
token = "eWoIo9hK5ELMqq3jkKlDHcGWAYeoR4ggZwJnyUk6vNozi0CRMg9kITMSFS9f1aQPKdNcwAIhCsOY-PXD5E6seA=="
org = "tig-stack"
bucket = "telegraf-bucket"

client = InfluxDBClient(url="http://10.237.7.80:8086", token=token)

write_api = client.write_api(write_options=SYNCHRONOUS)

data = "mem,host=hoanghd hoanghd_used_percent=23.43234543"
write_api.write(bucket, org, data)

client.close()

Giờ hãy chạy nó, nếu không có lỗi gì thì bạn sẽ không nhận được thông báo ở đầu ra nhé. Hãy vào Explore và sử dụng tính năng Query Builder bạn sẽ nhận được data như hình dưới.

Bạn có thể sửa lại code như sau để thêm tính năng debug và trả về status code sau khi ghi dữ liệu, bạn có thể sử dụng try-except để bắt các ngoại lệ có thể xảy ra. Nếu không có ngoại lệ nào xảy ra, bạn có thể trả về một status code thành công (ví dụ: 200).

Dưới đây là mã Python đã được cập nhật:

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException

# You can generate a Token from the "Tokens Tab" in the UI
token = "eWoIo9hK5ELMqq3jkKlDHcGWAYeoR4ggZwJnyUk6vNozi0CRMg9kITMSFS9f1aQPKdNcwAIhCsOY-PXD5E6seA=="
org = "tig-stack"
bucket = "telegraf-bucket"

client = InfluxDBClient(url="http://10.237.7.80:8086", token=token)

write_api = client.write_api(write_options=SYNCHRONOUS)

data = "mem,host=hoanghd hoanghd_used_percent=23.43234543"

try:
    write_api.write(bucket, org, data)
    print("Data written successfully. Status code: 200")
except ApiException as e:
    print(f"Failed to write data. Status code: {e.status}, Error message: {e.reason}")

finally:
    client.close()

Trong đoạn code trên, nếu việc ghi dữ liệu thành công, chương trình sẽ in ra “Data written successfully. Status code: 200”. Nếu có lỗi xảy ra, nó sẽ in ra thông báo lỗi và status code tương ứng.

shell> python3 run-script.py 
Data written successfully. Status code: 200

3. Một số lỗi có thể xảy ra khi chạy code.

Name ‘InfluxDBClient’ is not defined.

Lỗi này cho biết rằng Python không thể tìm thấy tên ‘InfluxDBClient’. Điều này có thể xảy ra nếu bạn chưa import thư viện influxdb_client hoặc import không đúng cách.

/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (2.2.1) or chardet (3.0.4) doesn't match a supported version!
  warnings.warn("urllib3 ({}) or chardet ({}) doesn't match a supported "
Traceback (most recent call last):
  File "demo.py", line 5, in <module>
    client = InfluxDBClient(url="http://10.237.7.80:8086", token=token)
NameError: name 'InfluxDBClient' is not defined

Bạn hãy cài đặt lại thư viện influxdb-client bằng pip không sử dụng pip3:

pip install influxdb-client

Insufficient permissions for write 403 Forbidden.

Lỗi 403 Forbidden thường xuất hiện khi token bạn sử dụng không có quyền ghi vào bucket mà bạn đang cố gắng ghi dữ liệu vào.

/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (2.2.1) or chardet (3.0.4) doesn't match a supported version!
  warnings.warn("urllib3 ({}) or chardet ({}) doesn't match a supported "
Traceback (most recent call last):
  File "demo.py", line 14, in <module>
    write_api.write(bucket, org, data)
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/write_api.py", line 381, in write
    results = list(map(write_payload, payloads.items()))
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/write_api.py", line 379, in write_payload
    return self._post_write(_async_req, bucket, org, final_string, payload[0])
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/write_api.py", line 516, in _post_write
    return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/write_service.py", line 60, in post_write
    (data) = self.post_write_with_http_info(org, bucket, body, **kwargs)  # noqa: E501
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/write_service.py", line 90, in post_write_with_http_info
    return self.api_client.call_api(
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/_sync/api_client.py", line 343, in call_api
    return self.__call_api(resource_path, method,
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/_sync/api_client.py", line 173, in __call_api
    response_data = self.request(
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/_sync/api_client.py", line 388, in request
    return self.rest_client.POST(url,
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/_sync/rest.py", line 311, in POST
    return self.request("POST", url,
  File "/usr/local/lib/python3.8/dist-packages/influxdb_client/_sync/rest.py", line 261, in request
    raise ApiException(http_resp=r)
influxdb_client.rest.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json; charset=utf-8', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Version': '2.1.1', 'X-Platform-Error-Code': 'forbidden', 'Date': 'Thu, 18 Apr 2024 16:02:46 GMT', 'Content-Length': '67'})
HTTP response body: {"code":"forbidden","message":"insufficient permissions for write"}
  • Đây là một số cách để khắc phục:
    • Kiểm tra lại token: Đảm bảo rằng token bạn đang sử dụng là đúng và vẫn còn hiệu lực.
    • Kiểm tra quyền của token: Truy cập vào giao diện người dùng của InfluxDB, tìm đến phần “Tokens” và kiểm tra xem token của bạn có quyền ghi vào bucket mà bạn đang cố gắng ghi dữ liệu vào không.
    • Tạo token mới: Nếu cần, bạn có thể tạo một token mới với quyền ghi vào bucket cần thiết.

4. Ứng dụng thực tế.

Để đưa kết quả của hàm icmp_status vào InfluxDB, bạn cần tạo một chuỗi dữ liệu theo định dạng InfluxDB Line Protocol, sau đó sử dụng API ghi của InfluxDB để ghi dữ liệu vào một bucket.

Dưới đây là cách mình thực hiện điều này:

import subprocess
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

def icmp_status(target_ipaddr, staging=True):
    temp = 1
    count = 3
    while temp <= count:
        if staging == False:
            response = subprocess.check_output("ping -c 2 -W 0.5 " + target_ipaddr + ' | grep -Eo "[0-9]+% packet loss" | grep -Eo "^[0-9]"', shell=True).decode("utf-8").strip('\n')
        else:
            response = 1
        if int(response) == 1:
            if temp == count:
                response = 0
        else:
            response = 1
            break
        temp += 1
    return response

# You can generate a Token from the "Tokens Tab" in the UI
token = "eWoIo9hK5ELMqq3jkKlDHcGWAYeoR4ggZwJnyUk6vNozi0CRMg9kITMSFS9f1aQPKdNcwAIhCsOY-PXD5E6seA=="
org = "tig-stack"
bucket = "telegraf-bucket"

client = InfluxDBClient(url="http://10.237.7.80:8086", token=token)

write_api = client.write_api(write_options=SYNCHRONOUS)

ip_addresses = ['8.8.8.8', '1.1.1.1', '192.168.199.1']

for ip in ip_addresses:
    icmp_result = icmp_status(ip, staging=True)
    data = f"icmp,host={ip} status={icmp_result}"
    write_api.write(bucket, org, data)

client.close()

Trong đoạn code trên, chúng tôi lặp qua danh sách các địa chỉ IP, gọi hàm icmp_status cho mỗi địa chỉ IP, sau đó tạo một chuỗi dữ liệu theo định dạng InfluxDB Line Protocol với kết quả và ghi chuỗi dữ liệu này vào InfluxDB.

Và dưới đây là kết quả.

LEAVE A REPLY

Please enter your comment!
Please enter your name here

4,956FansLike
256FollowersFollow
223SubscribersSubscribe
spot_img

Related Stories