Source code for ipfix.template

#
# python-ipfix (c) 2013 Brian Trammell.
#
# Many thanks to the mPlane consortium (http://www.ict-mplane.eu) for
# its material support of this effort.
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
#

"""
Representation of IPFIX templates.
Provides template-based packing and unpacking of data in IPFIX messages.

For reading, templates are handled internally. For writing, use
:func:`from_ielist` to create a template.

See :mod:`ipfix.message` for examples.

"""
from . import ie, types, compat
from .compat import izip, xrange, lru_cache

import struct


# Builtin exceptions
[docs]class IpfixEncodeError(Exception): """Raised on internal encoding errors, or if message MTU is too small""" def __init__(self, *args): super(self.__class__, self).__init__(args)
[docs]class IpfixDecodeError(Exception): """Raised when decoding a malformed IPFIX message""" def __init__(self, *args): super(self.__class__, self).__init__(args)
# constants for v9 V9_TEMPLATE_SET_ID = 0 V9_OPTIONS_SET_ID = 1 # constants TEMPLATE_SET_ID = 2 OPTIONS_SET_ID = 3 # template encoding/decoding structs _tmplhdr_st = struct.Struct("!HH") _otmplhdr_st = struct.Struct("!HHH") _iespec_st = struct.Struct("!HH") _iepen_st = struct.Struct("!L")
[docs]class TemplatePackingPlan(object): """ Plan to pack/unpack a specific set of indices for a template. Used internally by Templates for efficient encoding and decoding. """ def __init__(self, tmpl, indices): self.tmpl = tmpl self.indices = indices self.ranks = sorted(xrange(len(indices)), key=indices.__getitem__) self.valenc = [] self.valdec = [] packstring = "!" for i, t in enumerate(e.type for e in tmpl.ies): if i >= tmpl.fixlen_count(): break if i in indices: packstring += t.stel self.valenc.append(t.valenc) self.valdec.append(t.valdec) else: packstring += t.skipel self.st = struct.Struct(packstring) def __repr__(self): return "<TemplatePackingPlan "+repr(self.tmpl) +\ " pack " + str(self.st.format) +\ " indices " + " ".join(str(i) for i in self.indices)+">"
[docs]class Template(object): """ An IPFIX Template. A template is an ordered list of IPFIX Information Elements with an ID. """ def __init__(self, tid = 0, iterable = None): if tid < 256 or tid > 65535: raise ValueError("bad template ID "+str(tid)) self.tid = tid self.minlength = 0 self.enclength = 0 self.scopecount = 0 self.varlenslice = None self.packplan = None self.ies = [] if iterable: if not isinstance(iterable, ie.InformationElementList): iterable = ie.InformationElementList(iterable) for elem in iterable: self.append(elem) def __repr__(self): return "<Template ID "+str(self.tid)+" count "+ \ str(len(self.ies))+" scope "+str(self.scopecount)+">"
[docs] def identical_to(self, other): """ Determine if two templates are identical to each other. Two templates are considered identical if they contain the same IEs in the same order, and the same scope count. Template ID is not considered as part of the test for template identity. """ # FIXME this needs to check IE lengths as well return (self.ies == other.ies) and (self.scopecount == other.scopecount)
[docs] def append(self, ie): """Append an IE to this Template""" self.ies.append(ie) if ie.length == types.VARLEN: self.minlength += 1 if self.varlenslice is None: self.varlenslice = len(self.ies) - 1 else: self.minlength += ie.length self.enclength += _iespec_st.size if (ie.pen): self.enclength += _iepen_st.size
[docs] def count(self): """Count IEs in this template""" return len(self.ies)
[docs] def fixlen_count(self): """ Count of fixed-length IEs in this template before the first variable-length IE; this is the size of the portion of the template which can be encoded/decoded efficiently. """ if self.varlenslice is not None: return self.varlenslice else: return self.count()
[docs] def finalize(self): """Compile a default packing plan. Called after append()ing all IEs.""" self.packplan = TemplatePackingPlan(self, xrange(self.count()))
[docs] @lru_cache(maxsize = 32) def packplan_for_ielist(self, ielist): """ Given a list of IEs, devise and cache a packing plan. Used by the tuple interfaces. """ return TemplatePackingPlan(self, [self.ies.index(ie) for ie in ielist])
[docs] def decode_from(self, buf, offset, packplan = None): """Decodes a record into a tuple containing values in template order""" # use default packplan unless someone hacked us not to if not packplan: packplan = self.packplan # decode fixed values vals = [f(v) for f, v in izip(packplan.valdec, packplan.st.unpack_from(buf, offset))] offset += packplan.st.size # short circuit on no varlen if not self.varlenslice: return (vals, offset) # direct iteration over remaining IEs for i, ie in izip(xrange(self.varlenslice, self.count()), self.ies[self.varlenslice:]): length = ie.length if length == types.VARLEN: (length, offset) = types.decode_varlen(buf, offset) if i in packplan.indices: vals.append(ie.type.decode_single_value_from( buf, offset, length)) offset += length return (vals, offset)
[docs] def decode_namedict_from(self, buf, offset, recinf = None): """Decodes a record from a buffer into a dict keyed by IE name.""" (vals, offset) = self.decode_from(buf, offset) return (dict(( k, v) for k,v in izip((ie.name for ie in self.ies), vals)), offset)
[docs] def decode_tuple_from(self, buf, offset, recinf = None): """ Decodes a record from a buffer into a tuple, ordered as the IEs in the InformationElementList given as recinf. """ if recinf: packplan = self.packplan_for_ielist(recinf) else: packplan = self.packplan (vals, offset) = self.decode_from(buf, offset, packplan = packplan) outvals = tuple(v for i,v in sorted(izip(packplan.ranks, vals))) # re-sort values in same order as packplan indices return (outvals, offset)
[docs] def encode_to(self, buf, offset, vals, packplan = None): """Encodes a record from a tuple containing values in template order""" # use default packplan unless someone hacked us not to if not packplan: packplan = self.packplan # encode fixed values fixvals = [f(v) for f,v in izip(packplan.valenc, vals)] packplan.st.pack_into(buf, offset, *fixvals) offset += packplan.st.size # shortcircuit no varlen if not self.varlenslice: return offset # direct iteration over remaining IEs for i, ie, val in izip(xrange(self.varlenslice, self.count()), self.ies[self.varlenslice:], vals[self.varlenslice:]): if i in packplan.indices: #print(" encoding "+str(ie)) if ie.length == types.VARLEN: # FIXME this arrangement requires double-encode of varlen # values, one to get the length, one to do the encode. # Fixing this requires a rearrangement of type encoding # though. For now we'll just say that if you're exporting # varlen you get to put up with some inefficiency. :) offset = types.encode_varlen(buf, offset, len(ie.type.valenc(val))) offset = ie.type.encode_single_value_to(val, buf, offset) return offset
[docs] def encode_namedict_to(self, buf, offset, rec, recinf = None): """Encodes a record from a dict containing values keyed by IE name""" return self.encode_to(buf, offset, [rec[ie.name] for ie in self.ies])
[docs] def encode_tuple_to(self, buf, offset, rec, recinf = None): """ Encodes a record from a tuple containing values ordered as the IEs in the template. """ return self.encode_to(buf, offset, rec)
[docs] def encode_template_to(self, buf, offset, setid): """ Encodes the template to a buffer. Encodes as a Template if setid is TEMPLATE_SET_ID, as an Options Template if setid is OPTIONS_SET_ID. """ if setid == TEMPLATE_SET_ID: _tmplhdr_st.pack_into(buf, offset, self.tid, self.count()) offset += _tmplhdr_st.size elif setid == OPTIONS_SET_ID: _otmplhdr_st.pack_into(buf, offset, self.tid, self.count(), self.scopecount) offset += _otmplhdr_st.size else: raise IpfixEncodeError("bad template set id "+str(setid)) for e in self.ies: if e.pen: _iespec_st.pack_into(buf, offset, e.num | 0x8000, e.length) offset += _iespec_st.size _iepen_st.pack_into(buf, offset, e.pen) offset += _iepen_st.size else: _iespec_st.pack_into(buf, offset, e.num, e.length) offset += _iespec_st.size return offset
def native_setid(self): if self.scopecount: return OPTIONS_SET_ID else: return TEMPLATE_SET_ID
def withdrawal_length(setid): if setid == TEMPLATE_SET_ID: return _tmplhdr_st.size elif setid == OPTIONS_SET_ID: return _otmplhdr_st.size else: return IpfixEncodeError("bad template set id "+str(setid)) def encode_withdrawal_to(buf, offset, setid, tid): if setid == TEMPLATE_SET_ID: _tmplhdr_st.pack_into(buf, offset, tid, 0) offset += _tmplhdr_st.size elif setid == OPTIONS_SET_ID: _otmplhdr_st.pack_into(buf, offset, tid, 0, 0) offset += _otmplhdr_st.size else: raise IpfixEncodeError("bad template set id "+str(setid)) return offset
[docs]def decode_template_from(buf, offset, setid): """ Decodes a template from a buffer. Decodes as a Template if setid is TEMPLATE_SET_ID, as an Options Template if setid is OPTIONS_SET_ID. """ if (setid == TEMPLATE_SET_ID) or (setid == V9_TEMPLATE_SET_ID): (tid, count) = _tmplhdr_st.unpack_from(buf, offset); scopecount = 0 offset += _tmplhdr_st.size elif (setid == OPTIONS_SET_ID) or (setid == V9_OPTIONS_SET_ID): (tid, count, scopecount) = _otmplhdr_st.unpack_from(buf, offset); offset += _otmplhdr_st.size else: raise IpfixDecodeError("bad template set id "+str(setid)) tmpl = Template(tid) tmpl.scopecount = scopecount while count: (num, length) = _iespec_st.unpack_from(buf, offset) offset += _iespec_st.size if num & 0x8000: num &= 0x7fff pen = _iepen_st.unpack_from(buf, offset)[0] offset += _iespec_st.size else: pen = 0 tmpl.append(ie.for_template_entry(pen, num, length)) count -= 1 tmpl.finalize() return (tmpl, offset)
def from_ielist(tid, ielist): tmpl = Template(tid, ielist) tmpl.finalize() return tmpl
[docs]def for_specs(tid, *specs): """ Create a template from a template ID and a list of IESpecs :param tid: Template ID, must be between 256 and 65535. :param *specs: List of IESpecs :return: A new Template, ready to use for writing to a Message """ return from_ielist(tid, ie.spec_list(specs))