Skip to main content

Using Python to Process collectd Network Data

Published

The collectd daemon is a pretty useful tool for system administrators to track metrics on their servers. Actually, it’s useful for anyone who wants a daemon that will collect arbitrary statistics and shovel them somewhere on a regular basis. I used to use it a lot more before I moved most of my workloads to Kubernetes and began using Prometheus for monitoring. But I still use collectd, combined with Graphite, to build long term historical graphs of my infrastructure usage and performance to build projections for future needs rather than monitoring for immediate problems. (Prometheus does not work very well as a historical tool.)

When using collectd you configure some read plugins like CPU or Memory or PostgreSQL. Then you configure at least one write plugin to forward what is read by the read plugins to some permanent storage. I would venture to guess that most people configure the Graphite plugin but there are others like AMQP or CSV or even Prometheus.

All that said, there is one write plugin that I think goes unnoticed: the Network plugin. This plugin is usually advertised as being used to forward data from one collectd instance to another as it functions as both a read and write plugin. But you can also use it to forward traffic to a program that you’ve written to process in your own way, in real time.

First, let’s configure collectd to enable this plugin. We’re going to have it forward every single metric to localhost on port 25826. The network plugin uses UDP and according to the binary protocol documentation, port 25826 is the default port for this protocol. So add this to your collectd.conf file and restart collectd.

LoadPlugin network
<Plugin "network">
  Server "127.0.0.1" "25826"
</Plugin>

The next step is to write our decoder which we will do in Python. This code is based on code created Adrian Perez and updated by Rami Sayer and Grégory Starck and licensed under GPLv2. I updated it with some cosmetic changes and also to guarantee that plugin names and type names always appeared and appeared with consistent formatting. Here’s the code for the decoder.

# Copyright © 2009 Adrian Perez <aperez@igalia.com>
#
# Distributed under terms of the GPLv2 license.
#
# Updated by Rami Sayar for Collectd 5.1. Added DERIVE handling.
# Updated by Grégory Starck with few enhancements.
# Updated by Paul Lockaby with only cosmetic changes.
import logging
import struct
from copy import deepcopy
from typing import Iterator
logger = logging.getLogger(__name__)
# message types
TYPE_HOST = 0x0000
TYPE_TIME = 0x0001
TYPE_PLUGIN = 0x0002
TYPE_PLUGIN_INSTANCE = 0x0003
TYPE_TYPE = 0x0004
TYPE_TYPE_INSTANCE = 0x0005
TYPE_VALUES = 0x0006
TYPE_INTERVAL = 0x0007
TYPE_TIMEHR = 0x0008
TYPE_INTERVALHR = 0x0009
# DS types
DS_TYPE_COUNTER = 0
DS_TYPE_GAUGE = 1
DS_TYPE_DERIVE = 2
DS_TYPE_ABSOLUTE = 3
DECODE_HEADER = struct.Struct("!2H")
DECODE_NUMBER = struct.Struct("!Q")
DECODE_SIGNED_NUMBER = struct.Struct("!q") # DERIVE are signed long longs
DECODE_SHORT = struct.Struct("!H")
DECODE_DOUBLE = struct.Struct("<d")
DS_TYPE_DECODER = {
DS_TYPE_COUNTER: DECODE_NUMBER,
DS_TYPE_ABSOLUTE: DECODE_NUMBER,
DS_TYPE_DERIVE: DECODE_SIGNED_NUMBER,
DS_TYPE_GAUGE: DECODE_DOUBLE,
}
VALUES_HEADER_SIZE = DECODE_HEADER.size + DECODE_SHORT.size
SINGLE_VALUE_SIZE = 1 + 8 # 1 byte for type, 8 bytes for value
def cdtime_to_time(cdt):
"""
:param cdt: A CollectD Time or Interval HighResolution encoded value.
:return: A float, representing a time EPOCH value, with up to nanosec after comma.
"""
# fairly copied from http://git.verplant.org/?p=collectd.git;a=blob;f=src/daemon/utils_time.h
sec = cdt >> 30
nsec = ((cdt & 0b111111111111111111111111111111) / 1.073741824) / 10**9
return sec + nsec
def decode_network_values(_part_type, part_length, buffer):
"""Decodes a list of DS values in collectd network format
"""
values_count = DECODE_SHORT.unpack_from(buffer, DECODE_HEADER.size)[0]
values_total_size = VALUES_HEADER_SIZE + values_count * SINGLE_VALUE_SIZE
if values_total_size != part_length:
raise DecoderValueError("values total size != part len ({} vs {})".format(values_total_size, part_length))
results = []
off = VALUES_HEADER_SIZE + values_count
for ds_type in buffer[VALUES_HEADER_SIZE:off]:
if ds_type in DS_TYPE_DECODER:
decoder = DS_TYPE_DECODER[ds_type]
results.append((ds_type, decoder.unpack_from(buffer, off)[0]))
else:
logger.warning("ds type {} not supported".format(ds_type))
off += 8
return results
def decode_network_number(_part_type, _part_length, buffer):
"""Decodes a number (64-bit unsigned) in collectd network format.
"""
return DECODE_NUMBER.unpack_from(buffer, DECODE_HEADER.size)[0]
def decode_network_string(_part_type, part_length, buffer):
"""Decodes a string (\0 terminated) in collectd network format.
:return: The string in utf8 format.
"""
return buffer[DECODE_HEADER.size:part_length - 1].decode("utf-8")
DECODERS = {
TYPE_VALUES: decode_network_values,
TYPE_TIME: decode_network_number,
TYPE_INTERVAL: decode_network_number,
TYPE_HOST: decode_network_string,
TYPE_PLUGIN: decode_network_string,
TYPE_PLUGIN_INSTANCE: decode_network_string,
TYPE_TYPE: decode_network_string,
TYPE_TYPE_INSTANCE: decode_network_string,
TYPE_TIMEHR: decode_network_number,
TYPE_INTERVALHR: decode_network_number,
}
class DecoderException(Exception):
pass
class DecoderValueError(DecoderException, ValueError):
pass
class DecoderDecodeError(DecoderValueError):
pass
class DecoderUnsupportedMessageType(DecoderValueError):
pass
class DecoderBufferOverflow(DecoderValueError):
pass
def decode(buffer) -> Iterator[dict]:
offset = 0
buffer_length = len(buffer)
result = {
"timestamp": None,
"interval": None,
"host_name": None,
"plugin_name": None,
"plugin_instance": None,
"type_name": None,
"type_instance": None,
"value": None,
}
while offset < buffer_length:
try:
part_type, part_length = DECODE_HEADER.unpack_from(buffer, offset)
except struct.error as err:
raise DecoderDecodeError(err)
if not part_length:
raise DecoderValueError("invalid part with size=0: buflen={} offset={} part_type={}".format(buffer_length, offset, part_type))
rest = buffer_length - offset
if part_length > rest:
raise DecoderBufferOverflow("encoded part size greater than left amount of data in buffer: buffer_length={} offset={} part_length={}".format(buffer_length, offset, part_length))
try:
decoder = DECODERS[part_type]
except KeyError:
raise DecoderUnsupportedMessageType("part type {} not recognized (offset={})".format(part_type, offset))
try:
decoded = decoder(part_type, part_length, buffer[offset:])
except struct.error as e:
raise DecoderDecodeError(e)
if part_type == TYPE_TIME:
result["timestamp"] = decoded
elif part_type == TYPE_TIMEHR:
result["timestamp"] = cdtime_to_time(decoded)
elif part_type == TYPE_INTERVAL:
result["interval"] = decoded
elif part_type == TYPE_INTERVALHR:
result["interval"] = cdtime_to_time(decoded)
elif part_type == TYPE_HOST:
result["host_name"] = decoded
elif part_type == TYPE_PLUGIN:
result["plugin_name"] = decoded
elif part_type == TYPE_PLUGIN_INSTANCE:
result["plugin_instance"] = decoded
elif part_type == TYPE_TYPE:
result["type_name"] = decoded
elif part_type == TYPE_TYPE_INSTANCE:
result["type_instance"] = decoded
elif part_type == TYPE_VALUES:
if len(decoded) == 1:
result["value"] = decoded[0][1]
else:
result["value"] = [x[1] for x in decoded]
# fix values
if result["plugin_name"] is None:
result["plugin_name"] = ""
if result["plugin_instance"] is None:
result["plugin_instance"] = ""
if result["type_name"] is None:
result["type_name"] = ""
if result["type_instance"] is None:
result["type_instance"] = ""
yield deepcopy(result)
# when we get to the "values" field then we are at the end of the
# message. other kinds of types are ignored as permitted by the
# collectd packet format.
offset += part_length
view raw decoder.py hosted with ❤ by GitHub

So that just decodes what we pass to it. The next part is to write a listener to gets the data from the network plugin and decodes it into some Python objects that you can actually use.

import argparse
import select
import socket
import sys
import traceback
from decoder import decode, DecoderException
def main(host: str, port: int) -> None:
print("listening on {}:{}".format(host, port))
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((host, port))
s.setblocking(False)
while True:
try:
ready = select.select([s], [], [], 1)[0]
for r in ready:
data, addr = r.recvfrom(9000)
print("received connection from {}".format(addr[0]))
total = 0
try:
for datum in decode(data):
total = total + 1
print(datum)
except DecoderException as e:
print("could not process data from {}: {}".format(addr[0], e))
print("received {} metrics from {}".format(total, addr[0]))
except Exception as e:
traceback.print_exc()
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="listener")
parser.add_argument("--ip", default="127.0.0.1", help="IP address to listen on")
parser.add_argument( "--port", default=25826, type=int, help="port number to listen on")
args = parser.parse_args()
main(args.ip, args.port)
sys.exit(0)
view raw listener.py hosted with ❤ by GitHub

The above is just some basic template code that you can use to get started. It will print all of the metrics it receives as it receives them, in a format that looks like this:

{'host_name': 'myhost.example.com',
 'interval': 10.0,
 'plugin_instance': '',
 'plugin_name': 'cpu',
 'timestamp': 1641153893.9170213,
 'type_instance': 'user',
 'type_name': 'percent',
 'value': 4.0439340988517225}
{'host_name': 'myhost.example.com',
 'interval': 10.0,
 'plugin_instance': '',
 'plugin_name': 'cpu',
 'timestamp': 1641153893.917231,
 'type_instance': 'system',
 'type_name': 'percent',
 'value': 2.9455816275586613}
{'host_name': 'myhost.example.com',
 'interval': 10.0,
 'plugin_instance': '',
 'plugin_name': 'cpu',
 'timestamp': 1641153893.9172502,
 'type_instance': 'wait',
 'type_name': 'percent',
 'value': 0.0}
{'host_name': 'myhost.example.com',
 'interval': 10.0,
 'plugin_instance': '',
 'plugin_name': 'cpu',
 'timestamp': 1641153893.9172595,
 'type_instance': 'nice',
 'type_name': 'percent',
 'value': 0.0}
{'host_name': 'myhost.example.com',
 'interval': 10.0,
 'plugin_instance': '',
 'plugin_name': 'cpu',
 'timestamp': 1641153893.9172673,
 'type_instance': 'interrupt',
 'type_name': 'percent',
 'value': 0.0}
{'host_name': 'myhost.example.com',
 'interval': 10.0,
 'plugin_instance': '',
 'plugin_name': 'cpu',
 'timestamp': 1641153893.9172761,
 'type_instance': 'softirq',
 'type_name': 'percent',
 'value': 0.9985022466300548}

Different plugins have different ways of consolidating their metrics so you’ll have to experiment a little bit to see what you get. Note that even on plugins that read one thing, like CPU, the timestamp is different for all of the readings, so don’t try to use the timestamp to group events together. Also note that events can come in any order at any time. Finally, note that if you have two plugins generating the same values for plugin_instance, plugin_name, type_instance, and type_name then you will have to fix your configuration because it’s going to be hard to figure out which is what.

Now you can do with this data whatever you want. It can be a pretty handy tool and message format if you need streamed data and want to do arbitrary things with it.