Using Python to Process collectd Network Data
PublishedThe collectd daemon is a pretty useful tool for system administrators to track metrics on their servers. Actually, it’s useful for anyone who wants a daemon that will collect arbitrary statistics and shovel them somewhere on a regular basis. I used to use it a lot more before I moved most of my workloads to Kubernetes and began using Prometheus for monitoring. But I still use collectd, combined with Graphite, to build long term historical graphs of my infrastructure usage and performance to build projections for future needs rather than monitoring for immediate problems. (Prometheus does not work very well as a historical tool.)
When using collectd you configure some read plugins like CPU or Memory or PostgreSQL. Then you configure at least one write plugin to forward what is read by the read plugins to some permanent storage. I would venture to guess that most people configure the Graphite plugin but there are others like AMQP or CSV or even Prometheus.
All that said, there is one write plugin that I think goes unnoticed: the Network plugin. This plugin is usually advertised as being used to forward data from one collectd instance to another as it functions as both a read and write plugin. But you can also use it to forward traffic to a program that you’ve written to process in your own way, in real time.
First, let’s configure collectd to enable this plugin. We’re going to have it forward every single metric to localhost on port 25826. The network plugin uses UDP and according to the binary protocol documentation, port 25826 is the default port for this protocol. So add this to your collectd.conf
file and restart collectd.
LoadPlugin network
<Plugin "network">
Server "127.0.0.1" "25826"
</Plugin>
The next step is to write our decoder which we will do in Python. This code is based on code created Adrian Perez and updated by Rami Sayer and Grégory Starck and licensed under GPLv2. I updated it with some cosmetic changes and also to guarantee that plugin names and type names always appeared and appeared with consistent formatting. Here’s the code for the decoder.
# Copyright © 2009 Adrian Perez <aperez@igalia.com> | |
# | |
# Distributed under terms of the GPLv2 license. | |
# | |
# Updated by Rami Sayar for Collectd 5.1. Added DERIVE handling. | |
# Updated by Grégory Starck with few enhancements. | |
# Updated by Paul Lockaby with only cosmetic changes. | |
import logging | |
import struct | |
from copy import deepcopy | |
from typing import Iterator | |
logger = logging.getLogger(__name__) | |
# message types | |
TYPE_HOST = 0x0000 | |
TYPE_TIME = 0x0001 | |
TYPE_PLUGIN = 0x0002 | |
TYPE_PLUGIN_INSTANCE = 0x0003 | |
TYPE_TYPE = 0x0004 | |
TYPE_TYPE_INSTANCE = 0x0005 | |
TYPE_VALUES = 0x0006 | |
TYPE_INTERVAL = 0x0007 | |
TYPE_TIMEHR = 0x0008 | |
TYPE_INTERVALHR = 0x0009 | |
# DS types | |
DS_TYPE_COUNTER = 0 | |
DS_TYPE_GAUGE = 1 | |
DS_TYPE_DERIVE = 2 | |
DS_TYPE_ABSOLUTE = 3 | |
DECODE_HEADER = struct.Struct("!2H") | |
DECODE_NUMBER = struct.Struct("!Q") | |
DECODE_SIGNED_NUMBER = struct.Struct("!q") # DERIVE are signed long longs | |
DECODE_SHORT = struct.Struct("!H") | |
DECODE_DOUBLE = struct.Struct("<d") | |
DS_TYPE_DECODER = { | |
DS_TYPE_COUNTER: DECODE_NUMBER, | |
DS_TYPE_ABSOLUTE: DECODE_NUMBER, | |
DS_TYPE_DERIVE: DECODE_SIGNED_NUMBER, | |
DS_TYPE_GAUGE: DECODE_DOUBLE, | |
} | |
VALUES_HEADER_SIZE = DECODE_HEADER.size + DECODE_SHORT.size | |
SINGLE_VALUE_SIZE = 1 + 8 # 1 byte for type, 8 bytes for value | |
def cdtime_to_time(cdt): | |
""" | |
:param cdt: A CollectD Time or Interval HighResolution encoded value. | |
:return: A float, representing a time EPOCH value, with up to nanosec after comma. | |
""" | |
# fairly copied from http://git.verplant.org/?p=collectd.git;a=blob;f=src/daemon/utils_time.h | |
sec = cdt >> 30 | |
nsec = ((cdt & 0b111111111111111111111111111111) / 1.073741824) / 10**9 | |
return sec + nsec | |
def decode_network_values(_part_type, part_length, buffer): | |
"""Decodes a list of DS values in collectd network format | |
""" | |
values_count = DECODE_SHORT.unpack_from(buffer, DECODE_HEADER.size)[0] | |
values_total_size = VALUES_HEADER_SIZE + values_count * SINGLE_VALUE_SIZE | |
if values_total_size != part_length: | |
raise DecoderValueError("values total size != part len ({} vs {})".format(values_total_size, part_length)) | |
results = [] | |
off = VALUES_HEADER_SIZE + values_count | |
for ds_type in buffer[VALUES_HEADER_SIZE:off]: | |
if ds_type in DS_TYPE_DECODER: | |
decoder = DS_TYPE_DECODER[ds_type] | |
results.append((ds_type, decoder.unpack_from(buffer, off)[0])) | |
else: | |
logger.warning("ds type {} not supported".format(ds_type)) | |
off += 8 | |
return results | |
def decode_network_number(_part_type, _part_length, buffer): | |
"""Decodes a number (64-bit unsigned) in collectd network format. | |
""" | |
return DECODE_NUMBER.unpack_from(buffer, DECODE_HEADER.size)[0] | |
def decode_network_string(_part_type, part_length, buffer): | |
"""Decodes a string (\0 terminated) in collectd network format. | |
:return: The string in utf8 format. | |
""" | |
return buffer[DECODE_HEADER.size:part_length - 1].decode("utf-8") | |
DECODERS = { | |
TYPE_VALUES: decode_network_values, | |
TYPE_TIME: decode_network_number, | |
TYPE_INTERVAL: decode_network_number, | |
TYPE_HOST: decode_network_string, | |
TYPE_PLUGIN: decode_network_string, | |
TYPE_PLUGIN_INSTANCE: decode_network_string, | |
TYPE_TYPE: decode_network_string, | |
TYPE_TYPE_INSTANCE: decode_network_string, | |
TYPE_TIMEHR: decode_network_number, | |
TYPE_INTERVALHR: decode_network_number, | |
} | |
class DecoderException(Exception): | |
pass | |
class DecoderValueError(DecoderException, ValueError): | |
pass | |
class DecoderDecodeError(DecoderValueError): | |
pass | |
class DecoderUnsupportedMessageType(DecoderValueError): | |
pass | |
class DecoderBufferOverflow(DecoderValueError): | |
pass | |
def decode(buffer) -> Iterator[dict]: | |
offset = 0 | |
buffer_length = len(buffer) | |
result = { | |
"timestamp": None, | |
"interval": None, | |
"host_name": None, | |
"plugin_name": None, | |
"plugin_instance": None, | |
"type_name": None, | |
"type_instance": None, | |
"value": None, | |
} | |
while offset < buffer_length: | |
try: | |
part_type, part_length = DECODE_HEADER.unpack_from(buffer, offset) | |
except struct.error as err: | |
raise DecoderDecodeError(err) | |
if not part_length: | |
raise DecoderValueError("invalid part with size=0: buflen={} offset={} part_type={}".format(buffer_length, offset, part_type)) | |
rest = buffer_length - offset | |
if part_length > rest: | |
raise DecoderBufferOverflow("encoded part size greater than left amount of data in buffer: buffer_length={} offset={} part_length={}".format(buffer_length, offset, part_length)) | |
try: | |
decoder = DECODERS[part_type] | |
except KeyError: | |
raise DecoderUnsupportedMessageType("part type {} not recognized (offset={})".format(part_type, offset)) | |
try: | |
decoded = decoder(part_type, part_length, buffer[offset:]) | |
except struct.error as e: | |
raise DecoderDecodeError(e) | |
if part_type == TYPE_TIME: | |
result["timestamp"] = decoded | |
elif part_type == TYPE_TIMEHR: | |
result["timestamp"] = cdtime_to_time(decoded) | |
elif part_type == TYPE_INTERVAL: | |
result["interval"] = decoded | |
elif part_type == TYPE_INTERVALHR: | |
result["interval"] = cdtime_to_time(decoded) | |
elif part_type == TYPE_HOST: | |
result["host_name"] = decoded | |
elif part_type == TYPE_PLUGIN: | |
result["plugin_name"] = decoded | |
elif part_type == TYPE_PLUGIN_INSTANCE: | |
result["plugin_instance"] = decoded | |
elif part_type == TYPE_TYPE: | |
result["type_name"] = decoded | |
elif part_type == TYPE_TYPE_INSTANCE: | |
result["type_instance"] = decoded | |
elif part_type == TYPE_VALUES: | |
if len(decoded) == 1: | |
result["value"] = decoded[0][1] | |
else: | |
result["value"] = [x[1] for x in decoded] | |
# fix values | |
if result["plugin_name"] is None: | |
result["plugin_name"] = "" | |
if result["plugin_instance"] is None: | |
result["plugin_instance"] = "" | |
if result["type_name"] is None: | |
result["type_name"] = "" | |
if result["type_instance"] is None: | |
result["type_instance"] = "" | |
yield deepcopy(result) | |
# when we get to the "values" field then we are at the end of the | |
# message. other kinds of types are ignored as permitted by the | |
# collectd packet format. | |
offset += part_length |
So that just decodes what we pass to it. The next part is to write a listener to gets the data from the network plugin and decodes it into some Python objects that you can actually use.
import argparse | |
import select | |
import socket | |
import sys | |
import traceback | |
from decoder import decode, DecoderException | |
def main(host: str, port: int) -> None: | |
print("listening on {}:{}".format(host, port)) | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.bind((host, port)) | |
s.setblocking(False) | |
while True: | |
try: | |
ready = select.select([s], [], [], 1)[0] | |
for r in ready: | |
data, addr = r.recvfrom(9000) | |
print("received connection from {}".format(addr[0])) | |
total = 0 | |
try: | |
for datum in decode(data): | |
total = total + 1 | |
print(datum) | |
except DecoderException as e: | |
print("could not process data from {}: {}".format(addr[0], e)) | |
print("received {} metrics from {}".format(total, addr[0])) | |
except Exception as e: | |
traceback.print_exc() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(prog="listener") | |
parser.add_argument("--ip", default="127.0.0.1", help="IP address to listen on") | |
parser.add_argument( "--port", default=25826, type=int, help="port number to listen on") | |
args = parser.parse_args() | |
main(args.ip, args.port) | |
sys.exit(0) |
The above is just some basic template code that you can use to get started. It will print all of the metrics it receives as it receives them, in a format that looks like this:
{'host_name': 'myhost.example.com',
'interval': 10.0,
'plugin_instance': '',
'plugin_name': 'cpu',
'timestamp': 1641153893.9170213,
'type_instance': 'user',
'type_name': 'percent',
'value': 4.0439340988517225}
{'host_name': 'myhost.example.com',
'interval': 10.0,
'plugin_instance': '',
'plugin_name': 'cpu',
'timestamp': 1641153893.917231,
'type_instance': 'system',
'type_name': 'percent',
'value': 2.9455816275586613}
{'host_name': 'myhost.example.com',
'interval': 10.0,
'plugin_instance': '',
'plugin_name': 'cpu',
'timestamp': 1641153893.9172502,
'type_instance': 'wait',
'type_name': 'percent',
'value': 0.0}
{'host_name': 'myhost.example.com',
'interval': 10.0,
'plugin_instance': '',
'plugin_name': 'cpu',
'timestamp': 1641153893.9172595,
'type_instance': 'nice',
'type_name': 'percent',
'value': 0.0}
{'host_name': 'myhost.example.com',
'interval': 10.0,
'plugin_instance': '',
'plugin_name': 'cpu',
'timestamp': 1641153893.9172673,
'type_instance': 'interrupt',
'type_name': 'percent',
'value': 0.0}
{'host_name': 'myhost.example.com',
'interval': 10.0,
'plugin_instance': '',
'plugin_name': 'cpu',
'timestamp': 1641153893.9172761,
'type_instance': 'softirq',
'type_name': 'percent',
'value': 0.9985022466300548}
Different plugins have different ways of consolidating their metrics so you’ll have to experiment a little bit to see what you get. Note that even on plugins that read one thing, like CPU, the timestamp is different for all of the readings, so don’t try to use the timestamp to group events together. Also note that events can come in any order at any time. Finally, note that if you have two plugins generating the same values for plugin_instance
, plugin_name
, type_instance
, and type_name
then you will have to fix your configuration because it’s going to be hard to figure out which is what.
Now you can do with this data whatever you want. It can be a pretty handy tool and message format if you need streamed data and want to do arbitrary things with it.