# coding: utf8
#
# python-ipfix (c) 2013-2014 Brian Trammell.
#
# Many thanks to the mPlane consortium (http://www.ict-mplane.eu) for
# its material support of this effort.
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Implementation of IPFIX abstract data types (ADT) and mappings to Python types.
Maps each IPFIX ADT to the corresponding Python type, as below:
======================= =============
IPFIX Type Python Type
======================= =============
octetArray bytes
unsigned8 int
unsigned16 int
unsigned32 int
unsigned64 int
signed8 int
signed16 int
signed32 int
signed64 int
float32 float
float64 float
boolean bool
macAddress bytes
string str
dateTimeSeconds datetime
dateTimeMilliseconds datetime
dateTimeMicroseconds datetime
dateTimeNanoseconds datetime
ipv4Address ipaddress
ipv6Address ipaddress
======================= =============
Though client code generally will not use this module directly, it defines how
each IPFIX abstract data type will be represented in Python, and the concrete
IPFIX representation of each type. Type methods operate on buffers, as used
internally by the :class:`ipfix.message.MessageBuffer` class, so we'll create
one to illustrate encoding and decoding:
>>> from __future__ import unicode_literals
>>> import ipfix.compat
>>> import ipfix.types
>>> buf = ipfix.compat.get_buffer(16)
Each of the encoding methods returns the offset into the buffer of the first
byte after the encoded value; since we're always encoding to the beginning
of the buffer in this example, this is equivalent to the length.
We use this to bound the encoded value on subsequent decode.
Integers are represented by the python int type:
>>> unsigned32 = ipfix.types.for_name("unsigned32")
>>> length = unsigned32.encode_single_value_to(42, buf, 0)
>>> buf[0:length].tolist()
[0, 0, 0, 42]
>>> unsigned32.decode_single_value_from(buf, 0, length)
42
...floats by the float type, with the usual caveats about precision:
>>> float32 = ipfix.types.for_name("float32")
>>> length = float32.encode_single_value_to(42.03579, buf, 0)
>>> buf[0:length].tolist()
[66, 40, 36, 166]
>>> float32.decode_single_value_from(buf, 0, length)
42.035789489746094
...strings by the str type, encoded as UTF-8 (note that the Python 2
interpreter prints unicode characters as hex escapes, but using 'print'
avoids this, and produces the same result in Python 2/3):
>>> string = ipfix.types.for_name("string")
>>> length = string.encode_single_value_to("Grüezi", buf, 0)
>>> buf[0:length].tolist()
[71, 114, 195, 188, 101, 122, 105]
>>> print (string.decode_single_value_from(buf, 0, length))
Grüezi
...addresses as the IPv4Address and IPv6Address types in the ipaddress module:
>>> from ipaddress import ip_address
>>> ipv4Address = ipfix.types.for_name("ipv4Address")
>>> length = ipv4Address.encode_single_value_to(ip_address("198.51.100.27"), buf, 0)
>>> buf[0:length].tolist()
[198, 51, 100, 27]
>>> ipv4Address.decode_single_value_from(buf, 0, length)
IPv4Address('198.51.100.27')
>>> ipv6Address = ipfix.types.for_name("ipv6Address")
>>> length = ipv6Address.encode_single_value_to(ip_address("2001:db8::c0:ffee"), buf, 0)
>>> buf[0:length].tolist()
[32, 1, 13, 184, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 255, 238]
>>> ipv6Address.decode_single_value_from(buf, 0, length)
IPv6Address('2001:db8::c0:ffee')
...and the timestamps of various precision as a python datetime,
encoded as per RFC5101bis:
>>> from datetime import datetime
>>> dtfmt = "%Y-%m-%d %H:%M:%S.%f"
>>> dt = datetime.strptime("2013-06-21 14:00:03.456789", dtfmt)
dateTimeSeconds truncates microseconds:
>>> dateTimeSeconds = ipfix.types.for_name("dateTimeSeconds")
>>> length = dateTimeSeconds.encode_single_value_to(dt, buf, 0)
>>> buf[0:length].tolist()
[81, 196, 92, 99]
>>> dateTimeSeconds.decode_single_value_from(buf, 0, length).strftime(dtfmt)
'2013-06-21 14:00:03.000000'
dateTimeMilliseconds truncates microseconds to the nearest millisecond:
>>> dateTimeMilliseconds = ipfix.types.for_name("dateTimeMilliseconds")
>>> length = dateTimeMilliseconds.encode_single_value_to(dt, buf, 0)
>>> buf[0:length].tolist()
[0, 0, 1, 63, 103, 8, 228, 128]
>>> dateTimeMilliseconds.decode_single_value_from(buf, 0, length).strftime(dtfmt)
'2013-06-21 14:00:03.456000'
dateTimeMicroseconds exports microseconds fully in NTP format:
>>> dateTimeMicroseconds = ipfix.types.for_name("dateTimeMicroseconds")
>>> length = dateTimeMicroseconds.encode_single_value_to(dt, buf, 0)
>>> buf[0:length].tolist()
[213, 110, 218, 227, 116, 240, 32, 0]
>>> dateTimeMicroseconds.decode_single_value_from(buf, 0, length).strftime(dtfmt)
'2013-06-21 14:00:03.456789'
dateTimeNanoseconds is also supported, but is identical to
dateTimeMicroseconds, as the datetime class in Python only supports
microsecond-level timing.
"""
from __future__ import unicode_literals
from __future__ import division
from datetime import datetime, timedelta
from functools import total_ordering
from ipaddress import ip_address
import binascii
import struct
import math
VARLEN = 65535
ISO8601_FMT = "%Y-%m-%d %H:%M:%S"
# Seconds between NTP epoch and Unix epoch
NTP_EPOCH_TO_UNIX_EPOCH = 0x83AA7E80
[docs]class IpfixTypeError(ValueError):
"""Raised when attempting to do an unsupported operation on a type"""
def __init__(self, *args):
super(self.__class__, self).__init__(args)
# Table for downconverting struct elements for reduced length encoding
_stel_rle = { ('H', 1) : 'B',
('L', 2) : 'H',
('L', 1) : 'B',
('Q', 4) : 'L',
('Q', 2) : 'H',
('Q', 1) : 'B',
('h', 1) : 'b',
('l', 2) : 'h',
('l', 1) : 'b',
('q', 4) : 'l',
('q', 2) : 'h',
('q', 1) : 'b',
('d', 4) : 'f'}
# Builtin structs for varlen information
_varlen1_st = struct.Struct("!B")
_varlen2_st = struct.Struct("!H")
# builtin default encode/decode function
def _identity(x):
return x
[docs]@total_ordering
class IpfixType(object):
# Builtin type implementation
"""Abstract interface for all IPFIX types. Used internally. """
def __init__(self, name, num, valenc, valdec, valstr, valparse, roottype=None):
self.name = name
self.num = num
self.valenc = valenc
self.valdec = valdec
self.valstr = valstr
self.valparse = valparse
if roottype:
self.roottype = roottype
else:
self.roottype = self
self.length = 0
def __eq__(self, other):
return (self.num, self.length) == (other.num, other.length)
def __lt__(self, other):
return (self.num, self.length) < (other.num, other.length)
def __str__(self):
return "<%s>" % self.name
def __repr__(self):
return "ipfix.types.for_name(%s)" % repr(self.name)
[docs]class StructType(IpfixType):
"""Type encoded by struct packing. Used internally."""
def __init__(self, name, num, stel, valenc=_identity, valdec=_identity, valstr=str, valparse=int, roottype=None):
super(self.__class__, self).__init__(name, num, valenc, valdec, valstr, valparse, roottype)
self.stel = stel
self.st = struct.Struct("!"+stel)
self.length = self.st.size
self.skipel = str(self.length)+"x"
def for_length(self, length):
if not length or length == self.length:
return self
elif self.roottype is _roottypes[0]:
# FIXME this is kind of a hack to allow any-length encoding of octet arrays
return StructType(self.name, self.num, str(length)+"s",
self.valenc, self.valdec, self.roottype)
else:
try:
return StructType(self.name, self.num,
_stel_rle[(self.stel, length)],
self.valenc, self.valdec, self.roottype)
except KeyError:
raise IpfixTypeError("No RLE for <%s>[%u]" %
(self.name, length))
def encode_single_value_to(self, val, buf, offset):
self.st.pack_into(buf, offset, self.valenc(val))
return offset + self.length
def decode_single_value_from(self, buf, offset, length):
assert(self.length == length)
return self.valdec(self.st.unpack_from(buf, offset)[0])
[docs]class OctetArrayType(IpfixType):
"""Type encoded by byte array packing. Used internally."""
def __init__(self, name, num, valenc=_identity, valdec=_identity, valstr=binascii.hexlify, valparse=binascii.unhexlify, roottype=None):
super(self.__class__, self).__init__(name, num, valenc, valdec, valstr, valparse, roottype)
self.length = VARLEN
def for_length(self, length):
if not length or length == self.length:
return self
else:
return StructType(self.name, self.num, str(length)+"s",
self.valenc, self.valdec, self.roottype)
def encode_single_value_to(self, val, buf, offset):
enc = self.valenc(val)
buf[offset:offset+len(enc)] = enc
return offset + len(enc)
def decode_single_value_from(self, buf, offset, length):
return self.valdec(buf[offset:offset+length].tobytes())
# Utility calls for buildin encoders/decoders
def dt2epoch(dt):
return (dt - datetime(1970,1,1,0,0,tzinfo=None)).total_seconds()
# Builtin encoders/decoders
def _encode_smibool(bool):
if bool:
return 1
else:
return 2
def _decode_smibool(byte):
if byte == 1:
return True
else:
return False
def _str_bool(bool):
if bool:
return "true"
else:
return "false"
def _parse_bool(string):
if string == 'true':
return True
else:
return False
def _encode_utf8(string):
return string.encode('utf8')
def _decode_utf8(octets):
return octets.decode('utf8')
def _encode_sec(dt):
return int(dt2epoch(dt))
def _decode_sec(epoch):
return datetime.utcfromtimestamp(epoch)
def _str_sec(dt):
return dt.strftime(ISO8601_FMT)
def _parse_sec(string):
return datetime.strptime(string, ISO8601_FMT)
def _encode_msec(dt):
return int(dt2epoch(dt) * 1000)
def _decode_msec(epoch):
return datetime.utcfromtimestamp(epoch/1000)
def _str_msec(dt):
return "%s.%03u" % (dt.strftime(ISO8601_FMT), int(dt.microsecond / 1000))
def _parse_msec(string):
(ss, mss) = string.split(".")
return datetime.strptime(ss, ISO8601_FMT).replace(microsecond = int(mss) * 1000)
def _encode_ntp(dt):
(tsf, tsi) = math.modf(dt2epoch(dt))
return int(((int(tsi) + NTP_EPOCH_TO_UNIX_EPOCH) << 32) + (tsf * 2**32))
def _decode_ntp(ntp):
tsf = ntp & (2**32 - 1)
tsi = (ntp >> 32) - NTP_EPOCH_TO_UNIX_EPOCH
return datetime.utcfromtimestamp(tsi + tsf / 2**32)
def _str_usec(dt):
return "%s.%06u" % (dt.strftime(ISO8601_FMT), dt.microsecond)
def _parse_usec(string):
(ss, uss) = string.split(".")
return datetime.strptime(ss, ISO8601_FMT).replace(microsecond = int(uss))
def _encode_ip(ipaddr):
return ipaddr.packed
# builtin type registry
_roottypes = [
OctetArrayType("octetArray", 0),
StructType("unsigned8", 1, "B"),
StructType("unsigned16", 2, "H"),
StructType("unsigned32", 3, "L"),
StructType("unsigned64", 4, "Q"),
StructType("signed8", 5, "b"),
StructType("signed16", 6, "h"),
StructType("signed32", 7, "l"),
StructType("signed64", 8, "q"),
StructType("float32", 9, "f", valparse=float),
StructType("float64", 10, "d", valparse=float),
StructType("boolean", 11, "B",
valenc=_encode_smibool, valdec=_decode_smibool,
valstr=_str_bool, valparse=_parse_bool),
StructType("macAddress", 12, "6s"),
OctetArrayType("string", 13,
valenc=_encode_utf8, valdec=_decode_utf8,
valstr=_identity, valparse=_identity),
StructType("dateTimeSeconds", 14, "L",
valenc=_encode_sec, valdec=_decode_sec,
valstr=_str_sec, valparse=_parse_sec),
StructType("dateTimeMilliseconds", 15, "Q",
valenc=_encode_msec, valdec=_decode_msec,
valstr=_str_msec, valparse=_parse_msec),
StructType("dateTimeMicroseconds", 16, "Q",
valenc=_encode_ntp, valdec=_decode_ntp,
valstr=_str_usec, valparse=_parse_usec),
StructType("dateTimeNanoseconds", 17, "Q",
valenc=_encode_ntp, valdec=_decode_ntp,
valstr=_str_usec, valparse=_parse_usec),
StructType("ipv4Address", 18, "4s",
valenc=_encode_ip, valdec=ip_address,
valparse=ip_address),
StructType("ipv6Address", 19, "16s",
valenc=_encode_ip, valdec=ip_address,
valparse=ip_address)
]
_TypeForName = dict((ietype.name, ietype) for ietype in _roottypes)
[docs]def use_integer_ipv4():
"""
Use integers instead of ipaddress.IPv4Address to store IPv4 addresses.
Changes behavior globally; should be called before using any IPFIX types.
Designed for use with numpy arrays, to not require a Python object for
storing IP addresses.
"""
_roottypes[18] = StructType("ipv4address", 18, "L")
global _TypeForName
_TypeForName = dict((ietype.name, ietype) for ietype in _roottypes)
[docs]def for_name(name):
"""
Return an IPFIX type for a given type name
:param name: the name of the type to look up
:returns: IpfixType -- type instance for that name
:raises: IpfixTypeError
"""
try:
return _TypeForName[name]
except KeyError:
raise IpfixTypeError("no such type "+name)
[docs]def decode_varlen(buf, offset):
"""Decode a IPFIX varlen encoded length; used internally by template"""
length = _varlen1_st.unpack_from(buf, offset)[0]
offset += _varlen1_st.size
if length == 255:
length = _varlen2_st.unpack_from(buf, offset)[0]
offset += _varlen2_st.size
return (length, offset)
[docs]def encode_varlen(buf, offset, length):
"""Encode a IPFIX varlen encoded length; used internally by template"""
if length >= 255:
_varlen1_st.pack_into(buf, offset, 255)
offset += _varlen1_st.size
_varlen2_st.pack_into(buf, offset, length)
offset += _varlen2_st.size
else:
_varlen1_st.pack_into(buf, offset, length)
offset += _varlen1_st.size
return offset
def test_types_internals():
try:
for_name("bogus")
assert False
except IpfixTypeError:
pass