Source code for ipfix.message

# coding: utf8
#
# python-ipfix (c) 2013 Brian Trammell.
#
# Many thanks to the mPlane consortium (http://www.ict-mplane.eu) for
# its material support of this effort.
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
#

"""
Provides the MessageBuffer class for encoding and decoding IPFIX Messages.

This interface allows direct control over Messages; for reading or writing
records automatically from/to streams, see :mod:`ipfix.reader` and
:mod:`ipfix.writer`, respectively.

To create a message buffer:

>>> from __future__ import unicode_literals
>>> import ipfix.message
>>> msg = ipfix.message.MessageBuffer()
>>> msg
<MessageBuffer domain 0 length 0>

To prepare the buffer to write records:

>>> msg.begin_export(8304)
>>> msg
<MessageBuffer domain 8304 length 16 (writing)>

Note that the buffer grows to contain the message header.

To write records to the buffer, first you'll need a template:

>>> import ipfix.ie
>>> ipfix.ie.use_iana_default()
>>> import ipfix.template
>>> tmpl = ipfix.template.from_ielist(256,
...        ipfix.ie.spec_list(("flowStartMilliseconds",
...                            "sourceIPv4Address",
...                            "destinationIPv4Address",
...                            "packetDeltaCount")))
>>> tmpl
<Template ID 256 count 4 scope 0>

To add the template to the message:

>>> msg.add_template(tmpl)
>>> msg
<MessageBuffer domain 8304 length 40 (writing set 2)>

Note that :meth:`MessageBuffer.add_template` exports the template when it
is written by default, and that the current set ID is 2 (template set).

Now, a set must be created to add records to the message; the set ID must match
the ID of the template. MessageBuffer automatically uses the template matching
the set ID for record encoding.

>>> msg.export_ensure_set(256)
>>> msg
<MessageBuffer domain 8304 length 44 (writing set 256)>

Records can be added to the set either as dictionaries keyed by IE name:

>>> from datetime import datetime
>>> from ipaddress import ip_address
>>> rec = { "flowStartMilliseconds" : datetime.strptime("2013-06-21 14:00:00",
...                                       "%Y-%m-%d %H:%M:%S"),
...         "sourceIPv4Address" : ip_address("10.1.2.3"),
...         "destinationIPv4Address" : ip_address("10.5.6.7"),
...         "packetDeltaCount" : 27 }
>>> msg.export_namedict(rec)
>>> msg
<MessageBuffer domain 8304 length 68 (writing set 256)>

or as tuples in template order:

>>> rec = (datetime.strptime("2013-06-21 14:00:02", "%Y-%m-%d %H:%M:%S"),
...        ip_address("10.8.9.11"), ip_address("10.12.13.14"), 33)
>>> msg.export_tuple(rec)
>>> msg
<MessageBuffer domain 8304 length 92 (writing set 256)>

Variable-length information elements will be encoded using the native length
of the passed value:

>>> ipfix.ie.for_spec("myNewInformationElement(35566/1)<string>")  #doctest: +IGNORE_UNICODE
InformationElement('myNewInformationElement', 35566, 1, ipfix.types.for_name('string'), 65535)
>>> tmpl = ipfix.template.from_ielist(257,
...        ipfix.ie.spec_list(("flowStartMilliseconds",
...                            "myNewInformationElement")))
>>> msg.add_template(tmpl)
>>> msg.export_ensure_set(257)
>>> msg
<MessageBuffer domain 8304 length 116 (writing set 257)>
>>> rec = { "flowStartMilliseconds" : datetime.strptime("2013-06-21 14:00:04",
...                                   "%Y-%m-%d %H:%M:%S"),
...         "myNewInformationElement" : "Grüezi, Y'all" }
>>> msg.export_namedict(rec)
>>> msg
<MessageBuffer domain 8304 length 139 (writing set 257)>

Attempts to write past the end of the message (set via the mtu parameter,
default 65535) result in :exc:`EndOfMessage` being raised.

Messages can be written to a stream using :meth:`MessageBuffer.write_message`,
or dumped to a byte array for transmission using :meth:`MessageBuffer.to_bytes`.
The message must be reset before starting to write again.

>>> b = msg.to_bytes()
>>> msg.begin_export()
>>> msg
<MessageBuffer domain 8304 length 16 (writing)>

Reading happens more or less in reverse. To begin, a message is read from a
byte array using :meth:`MessageBuffer.from_bytes`, or from a stream using
:meth:`MessageBuffer.read_message`.

>>> msg.from_bytes(b)
>>> msg
<MessageBuffer domain 8304 length 139 (deframed 4 sets)>

Both of these methods scan the message in advance to find the sets within
the message. The records within these sets can then be accessed by iterating
over the message. As with export, the records can be accessed as a dictionary
mapping IE names to values or as tuples. The dictionary interface is
designed for general IPFIX processing applications, such as collectors
accepting many types of data, or diagnostic tools for debugging IPFIX export:

>>> def print_msg(msg):
...     for rec in msg.namedict_iterator():
...         key = 'myNewInformationElement'
...         if key in rec:
...             unicode_recs = [(key, rec[key])]
...             recs = [item for item in sorted(rec.items()) if item[0] != key]
...         else:
...             unicode_recs = []
...             recs = sorted(rec.items())
...         print(recs)
...         for key, value in unicode_recs:
...             print("{}: {}".format(key, value))
...
>>> print_msg(msg)  #doctest: +IGNORE_UNICODE
[('destinationIPv4Address', IPv4Address('10.5.6.7')), ('flowStartMilliseconds', datetime.datetime(2013, 6, 21, 14, 0)), ('packetDeltaCount', 27), ('sourceIPv4Address', IPv4Address('10.1.2.3'))]
[('destinationIPv4Address', IPv4Address('10.12.13.14')), ('flowStartMilliseconds', datetime.datetime(2013, 6, 21, 14, 0, 2)), ('packetDeltaCount', 33), ('sourceIPv4Address', IPv4Address('10.8.9.11'))]
[('flowStartMilliseconds', datetime.datetime(2013, 6, 21, 14, 0, 4))]
myNewInformationElement: Grüezi, Y'all

The tuple interface for reading messages is designed for applications with a
specific internal data model. It can be much faster than the dictionary
interface, as it skips decoding of IEs not requested by the caller, and can
skip entire sets not containing all the requested IEs. Requested IEs are
specified as an :class:`ipfix.ie.InformationElementList` instance, from
:func:`ie.spec_list()`:

>>> ielist = ipfix.ie.spec_list(["flowStartMilliseconds", "packetDeltaCount"])
>>> for rec in msg.tuple_iterator(ielist):
...     print(rec)
...
(datetime.datetime(2013, 6, 21, 14, 0), 27)
(datetime.datetime(2013, 6, 21, 14, 0, 2), 33)

Notice that the variable-length record written to the message are not returned
by this iterator, since that record doesn't include a packetDeltaCount IE.
The record is, however, still there:

>>> ielist = ipfix.ie.spec_list(["myNewInformationElement"])
>>> for rec in msg.tuple_iterator(ielist):
...     print(rec[0])
...
Grüezi, Y'all

"""

from __future__ import unicode_literals
from . import template, types, compat
from .template import IpfixEncodeError, IpfixDecodeError
from .compat import timezone, ifilter, reduce

import io
import operator
import struct
from datetime import datetime
from warnings import warn

_sethdr_st = struct.Struct("!HH")
_msghdr_st = struct.Struct("!HHLLL")

[docs]class EndOfMessage(Exception): """ Exception raised when a write operation on a Message fails because there is not enough space in the message. """ def __init__(self, *args): super(self.__class__, self).__init__(args)
def accept_all_templates(tmpl): return True
[docs]class MessageBuffer(object): """ Implements a buffer for reading or writing IPFIX messages. """ def __init__(self, buf_sz=65536): """Create a new MessageBuffer instance.""" self.mbuf = compat.get_buffer(bytearray(buf_sz)) self.length = 0 self.sequence = None self.export_epoch = None self.odid = 0 self.streamid = 0 self.templates = {} self.accepted_tids = set() self.sequences = {} self.setlist = [] self.auto_export_time = True self.cursetoff = 0 self.cursetid = None self.curtmpl = None self.last_tuple_iterator_ielist = None self.mtu = 65535 self.template_record_hook = None self.unknown_data_set_hook = None self.ignored_data_set_hook = None self.message_header_hook = None def __repr__(self): if self.cursetid: addinf = " (writing set "+str(self.cursetid)+")" elif self.setlist: addinf = " (deframed "+str(len(self.setlist))+" sets)" elif self.length: addinf = " (writing)" else: addinf = "" return "<MessageBuffer domain "+str(self.odid)+\ " length "+str(self.length)+addinf+">"
[docs] def get_export_time(self): """ Return the export time of this message. When reading, returns the export time as read from the message header. When writing, this is the argument of the last call to :meth:`set_export_time`, or, if :attr:auto_export_time is True, the time of the last message export. :returns: export time of the last message read/written. """ return types._decode_sec(self.export_epoch)
[docs] def set_export_time(self, dt=None): """ Set the export time for the next message written with :meth:`write_message` or :meth:`to_bytes`. Disables automatic export time updates. By default, sets the export time to the current time. :param dt: export time to set, as a datetime """ if not dt: dt = datetime.utcnow() self.export_epoch = types._encode_sec(dt) self.auto_export_time = False
def _increment_sequence(self): self.sequences.setdefault((self.odid, self.streamid), 0) self.sequences[(self.odid, self.streamid)] += 1 def _scan_setlist(self): # We've read a message. Discard all export state. self.cursetoff = 0 self.cursetid = None self.curtmpl = None # Clear the setlist and start from the beginning of the body self.setlist = [] offset = _msghdr_st.size while (offset < self.length): (setid, setlen) = _sethdr_st.unpack_from(self.mbuf, offset) if offset + setlen > self.length: raise IpfixDecodeError("Set too long for message") self.setlist.append((offset, setid, setlen)) offset += setlen
[docs] def read_message(self, stream): """Read a IPFIX message from a stream. This populates message header fields and the internal setlist. Call for each new message before iterating over records when reading from a stream. :param stream: stream to read from :raises: IpfixDecodeError """ # deframe and parse message header msghdr = stream.read(_msghdr_st.size) if (len(msghdr) == 0): raise EOFError() elif (len(msghdr) < _msghdr_st.size): raise IpfixDecodeError("Short read in message header ("+ str(len(msghdr)) +")") self.mbuf[0:_msghdr_st.size] = msghdr (version, self.length, self.export_epoch, self.sequence, self.odid) = \ _msghdr_st.unpack_from(self.mbuf, 0) # verify version and length if version != 10: raise IpfixDecodeError("Illegal or unsupported version " + str(version)) if self.length < 20: raise IpfixDecodeError("Illegal message length" + str(self.length)) # read the rest of the message into the buffer msgbody = stream.read(self.length-_msghdr_st.size) if len(msgbody) < self.length - _msghdr_st.size: raise IpfixDecodeError("Short read in message body (got "+ str(len(msgbody))+", expected "+ str(self.length - _msghdr_st.size)+")") self.mbuf[_msghdr_st.size:self.length] = msgbody # call the message header hook if self.message_header_hook: self.message_header_hook(self) # populate setlist self._scan_setlist()
[docs] def from_bytes(self, str_): """ Read an IPFIX message from a byte array. This populates message header fields and the internal setlist. Call for each new message before iterating over records when reading from a byte array. :param bytes: a byte array containing a complete IPFIX message. :raises: IpfixDecodeError """ return self.read_message(io.BytesIO(str_))
[docs] def record_iterator(self, decode_fn=template.Template.decode_namedict_from, tmplaccept_fn=accept_all_templates, recinf=None): """ Low-level interface to record iteration. Iterate over records in an IPFIX message previously read with :meth:`read_message()` or :meth:`from_bytes()`. Automatically handles templates in set order. By default, iterates over each record in the stream as a dictionary mapping IE name to value (i.e., the same as :meth:`namedict_iterator`) :param decode_fn: Function used to decode a record; must be an (unbound) "decode" instance method of the :class:`ipfix.template.Template` class. :param tmplaccept_fn: Function returning True if the given template is of interest to the caller, False if not. Default accepts all templates. Sets described by templates for which this function returns False will be skipped. :param recinf: Record information opaquely passed to decode function :returns: an iterator over records decoded by decode_fn. """ for (offset, setid, setlen) in self.setlist: setend = offset + setlen offset += _sethdr_st.size # skip set header in decode if setid == template.TEMPLATE_SET_ID or\ setid == template.OPTIONS_SET_ID: while offset < setend: (tmpl, offset) = template.decode_template_from( self.mbuf, offset, setid) # FIXME handle withdrawal self.templates[(self.odid, tmpl.tid)] = tmpl if tmplaccept_fn(tmpl): self.accepted_tids.add((self.odid, tmpl.tid)) else: self.accepted_tids.discard((self.odid, tmpl.tid)) if self.template_record_hook: self.template_record_hook(self, tmpl) elif setid < 256: warn("skipping illegal set id "+str(setid)) else: try: tmpl = self.templates[(self.odid, setid)] if (self.odid, setid) in self.accepted_tids: while offset + tmpl.minlength <= setend: (rec, offset) = decode_fn(tmpl, self.mbuf, offset, recinf = recinf) yield rec self._increment_sequence() elif self.ignored_data_set_hook: # not in accepted tids - ignored data set self.ignored_data_set_hook(self, tmpl, self.mbuf[offset-_sethdr_st.size:setend]) except KeyError as e: if self.unknown_data_set_hook: # KeyError on template lookup - unknown data set self.unknown_data_set_hook(self, self.mbuf[offset-_sethdr_st.size:setend])
[docs] def namedict_iterator(self): """ Iterate over all records in the Message, as dicts mapping IE names to values. :returns: a name dictionary iterator """ return self.record_iterator( decode_fn = template.Template.decode_namedict_from)
def _recache_accepted_tids(self, tmplaccept_fn): for tid in self.active_template_ids(): if tmplaccept_fn(self.templates[(self.odid, tid)]): self.accepted_tids.add((self.odid, tid)) else: self.accepted_tids.discard((self.odid, tid))
[docs] def tuple_iterator(self, ielist): """ Iterate over all records in the Message containing all the IEs in the given ielist. Records are returned as tuples in ielist order. :param ielist: an instance of :class:`ipfix.ie.InformationElementList` listing IEs to return as a tuple :returns: a tuple iterator for tuples as in ielist order """ tmplaccept_fn = lambda tmpl: \ reduce(operator.__and__, (ie in tmpl.ies for ie in ielist)) if ((not self.last_tuple_iterator_ielist) or (ielist is not self.last_tuple_iterator_ielist)): self._recache_accepted_tids(tmplaccept_fn) self.last_tuple_iterator_ielist = ielist return self.record_iterator( decode_fn = template.Template.decode_tuple_from, tmplaccept_fn = tmplaccept_fn, recinf = ielist)
[docs] def to_bytes(self): """ Convert this MessageBuffer to a byte array, suitable for writing to a binary file, socket, or datagram. Finalizes the message by rewriting the message header with current length, and export time. :returns: message as a byte array """ # Close final set self._export_close_set() # Update export time if necessary if self.auto_export_time: dt = datetime.utcnow().replace(tzinfo=timezone.utc) self.export_epoch = int(compat.datetime_to_timestamp(dt)) # Update message header in buffer _msghdr_st.pack_into(self.mbuf, 0, 10, self.length, self.export_epoch, self.sequence, self.odid) return self.mbuf[0:self.length].tobytes()
[docs] def write_message(self, stream): """ Convenience method to write a message to a stream; see :meth:`to_bytes`. """ stream.write(self.to_bytes())
[docs] def add_template(self, tmpl, export=True): """ Add a template to this MessageBuffer. Adding a template makes it available for use for exporting records; see :meth:`export_new_set`. :param tmpl: the template to add :param export: If True, export this template to the MessageBuffer after adding it. :raises: EndOfMessage """ self.templates[(self.odid, tmpl.tid)] = tmpl if export: self.export_template(tmpl.tid) if self.template_record_hook: self.template_record_hook(self, tmpl)
[docs] def delete_template(self, tid, export=True): """ Delete a template by ID from this MessageBuffer. :param tid: ID of the template to delete :param export: if True, export a Template Withdrawal for this Template after deleting it :raises: EndOfMessage """ setid = self.templates[self.odid, tid].native_setid() del(self.templates[self.odid, tid]) if export: self.export_template_withdrawal(setid, tid)
[docs] def active_template_ids(self): """ Get an iterator over all active template IDs in the current domain. Provided to allow callers to export some or all active Templates across multiple Messages. :returns: a template ID iterator """ for tk in ifilter(lambda k: k[0] == self.odid, self.templates): yield tk[1]
[docs] def template_for_id(self, tid): """ Retrieve a Template for a given ID in the current domain. :param tid: template ID to get :returns: the template :raises: KeyError """ return self.templates[(self.odid, tid)]
[docs] def begin_export(self, odid=None): """ Start exporting a new message. Clears any previous message content, but keeps template information intact. Sets the message sequence number. :param odid: Observation domain ID to use for export. By default, uses the observation domain ID of the previous message. Note that templates are scoped to observation domain, so templates will need to be added after switching to a new observation domain ID. :raises: IpfixEncodeError """ # We're exporting. Clear setlist from any previously read message. self.setlist = [] # Set sequence number self.sequences.setdefault((self.odid, self.streamid), 0) # FIXME why do we need this? self.sequence = self.sequences[(self.odid, self.streamid)] # set new domain if necessary if odid: self.odid = odid # reset message and zero header self.length = _msghdr_st.size self.cursetoff = self.length self.mbuf[0:_msghdr_st.size] = bytearray([0] * _msghdr_st.size) if self.mtu <= self.length: raise IpfixEncodeError("MTU too small: "+str(self.mtu)) # no current set self.cursetid = None
[docs] def export_new_set(self, setid): """ Start exporting a new Set with the given set ID. Creates a new set even if the current Set has the given set ID; client code should in most cases use :meth:`export_ensure_set` instead. :param setid: Set ID of the new Set; corresponds to the Template ID of the Template that will be used to encode records into the Set. The require Template must have already been added to the MessageBuffer, see :meth:`add_template`. :raises: IpfixEncodeError, EndOfMessage """ # close current set if any self._export_close_set() if setid >= 256: # make sure we have a template for the set if not ((self.odid, setid) in self.templates): raise IpfixEncodeError("can't start set without template id " + str(setid)) # make sure we have room to export at least one record tmpl = self.templates[(self.odid, setid)] if self.length + _sethdr_st.size + tmpl.minlength > self.mtu: raise EndOfMessage() else: # special Set ID. no template tmpl = None # set up new set self.cursetoff = self.length self.cursetid = setid self.curtmpl = tmpl _sethdr_st.pack_into(self.mbuf, self.length, setid, 0) self.length += _sethdr_st.size
[docs] def export_ensure_set(self, setid): """ Ensure that the current set for export has the given Set ID. Starts a new set if not using :meth:`export_new_set` :param setid: Set ID of the new Set; corresponds to the Template ID of the Template that will be used to encode records into the Set. The require Template must have already been added to the MessageBuffer, see :meth:`add_template`. :raises: IpfixEncodeError, EndOfMessage """ if self.cursetid != setid: self.export_new_set(setid)
[docs] def export_needs_flush(self): """ True if content has been written to this MessageBuffer since the last call to :meth:`begin_export` """ # FIXME we don't prewrite a set, do we? if self.length <= _msghdr_st.size + _sethdr_st.size: return False else: return True
def _export_close_set(self): if self.cursetid: _sethdr_st.pack_into(self.mbuf, self.cursetoff, self.cursetid, self.length - self.cursetoff) self.cursetid = None
[docs] def export_template(self, tid): """ Export a template to this Message given its template ID. :param tid: ID of template to export; must have been added to this message previously with :meth:`add_template`. :raises: EndOfMessage, KeyError """ tmpl = self.templates[(self.odid, tid)] self.export_ensure_set(tmpl.native_setid()) if self.length + tmpl.enclength > self.mtu: raise EndOfMessage self.length = tmpl.encode_template_to(self.mbuf, self.length, tmpl.native_setid())
def _export_template_withdrawal(self, setid, tid): self.export_ensure_set(setid) if self.length + template.withdrawal_length(setid) > self.mtu: raise EndOfMessage self.length = template.encode_withdrawal_to(self.mbuf, self.length, setid, tid)
[docs] def export_record(self, rec, encode_fn=template.Template.encode_namedict_to, recinf = None): """ Low-level interface to record export. Export a record to a MessageBuffer, using the template associated with the Set ID given to the most recent :meth:`export_new_set` or :meth:`export_ensure_set` call, and the given encode function. By default, the record is assumed to be a dictionary mapping IE names to values (i.e., the same as :meth:`export_namedict`). :param encode_fn: Function used to encode a record; must be an (unbound) "encode" instance method of the :class:`ipfix.template.Template` class. :param recinf: Record information opaquely passed to decode function :raises: EndOfMessage """ savelength = self.length try: self.length = encode_fn(self.curtmpl, self.mbuf, self.length, rec, recinf) except struct.error: # out of bounds on the underlying mbuf self.length = savelength raise EndOfMessage() # check for mtu overrun if self.length > self.mtu: self.length = savelength raise EndOfMessage() self._increment_sequence()
[docs] def export_namedict(self, rec): """ Export a record to the message, using the template for the current Set ID. The record is a dictionary mapping IE names to values. The dictionary must contain a value for each IE in the template. Keys in the dictionary not in the template will be ignored. :param rec: the record to export, as a dictionary :raises: EndOfMessage """ self.export_record(rec, template.Template.encode_namedict_to)
[docs] def export_tuple(self, rec): """ Export a record to the message, using the template for the current Set ID. The record is a tuple of values in template order. :param rec: the record to export, as a tuple in template order. :raises: EndOfMessage """ self.export_record(rec, template.Template.encode_tuple_to)