Source code for ipfix.writer
#
# python-ipfix (c) 2013 Brian Trammell.
#
# Many thanks to the mPlane consortium (http://www.ict-mplane.eu) for
# its material support of this effort.
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
from . import message
[docs]class MessageStreamWriter(object):
"""
Writes records to a stream of IPFIX messages.
Uses an :class:`ipfix.message.MessageBuffer` internally, and continually
writes records into messages, exporting messages to the stream each time the
maximum message size (MTU) is reached. Use :func:`to_stream` to get an
instance.
Suitable for writing to IPFIX files (see :rfc:`5655`) as well as to TCP
sockets. When writing a stream to a file, use mode='wb'.
..warning: This class is not yet suitable for UDP export; this is an open
issue to be fixed in a subsequent release.
"""
def __init__(self, stream, mtu=65535):
self.stream = stream
self.msg = message.MessageBuffer()
self.msg.mtu = mtu
self.msgcount = 0
def _retry_after_flush(self, fn, *args, **kwargs):
try:
return fn(*args, **kwargs)
except message.EndOfMessage:
self.flush()
return fn(*args, **kwargs)
[docs] def set_domain(self, odid):
"""
Sets the observation domain for subsequent messages sent with
this Writer.
:param odid: Observation domain ID to use for export. Note that
templates are scoped to observation domain, so
templates will need to be added after switching to a
new observation domain ID.
"""
if self.msg.export_needs_flush():
self.msg.write_message(self.stream)
self.msg.begin_export(odid)
[docs] def add_template(self, tmpl):
"""
Add a template to this Writer. Adding a template makes it
available for use for exporting records; see :meth:`set_export_template`.
:param tmpl: the template to add
"""
self.msg.add_template(tmpl)
[docs] def set_export_template(self, tid):
"""
Set the template to be used for export by subsequent calls to
:meth:`export_namedict` and :meth:`export_tuple`.
:param tid: Template ID of the Template that will be used to encode
records to the Writer. The corresponding Template must
have already been added to the Writer, see
:meth:`add_template`.
"""
self.curtid = tid
self._retry_after_flush(message.MessageBuffer.export_ensure_set,
self.msg, self.curtid)
[docs] def export_namedict(self, rec):
"""
Export a record to the message, using the current template
The record is a dictionary mapping IE names to values. The
dictionary must contain a value for each IE in the template. Keys in the
dictionary not in the template will be ignored.
:param rec: the record to export, as a dictionary
"""
self._retry_after_flush(message.MessageBuffer.export_ensure_set,
self.msg, self.curtid)
self._retry_after_flush(message.MessageBuffer.export_namedict,
self.msg, rec)
def export_tuple(self, rec):
self._retry_after_flush(message.MessageBuffer.export_ensure_set,
self.msg, self.curtid)
self._retry_after_flush(message.MessageBuffer.export_tuple,
self.msg, rec)
[docs] def flush(self):
"""
Export an in-progress Message immediately.
Used internally to manage message boundaries, but
can also be used to force immediate export (e.g. to reduce delay
due to buffer dwell time), as well as to finish write operations on
a Writer before closing the underlying stream.
"""
setid = self.msg.cursetid
self.msg.write_message(self.stream)
self.msgcount += 1
self.msg.begin_export()
self.msg.export_ensure_set(setid)
[docs]def to_stream(stream, mtu=65535):
"""
Get a MessageStreamWriter for a given stream
:param stream: stream to write
:param mtu: maximum message size in bytes; defaults to 65535,
the largest possible ipfix message.
:return: a :class:`MessageStreamWriter` wrapped around the stream.
"""
return MessageStreamWriter(stream, mtu)