Data Sinks

Common operations for all vehicle data sinks.

class openxc.sinks.base.DataSink

A base interface for all data sinks. At the minimum, a data sink must have a receive() method.

receive(message, **kwargs)

Handle an incoming vehicle data message.

Args:
message (dict) - a new OpenXC vehicle data message
Kwargs:
data_remaining (bool) - if the originating data source can peek ahead
in the data stream, this argument will True if there is more data available.

A data sink implementation for the core listener notification service of openxc.vehicle.Vehicle.

class openxc.sinks.notifier.MeasurementNotifierSink

Notify previously registered callbacks whenever measurements of a certian type have been received.

This data sink is the core of the asynchronous interface of openxc.vehicle.Vehicle.

class Notifier(queue, callback)
run()
MeasurementNotifierSink.register(measurement_class, callback)

Call the callback with any new values of measurement_class received.

MeasurementNotifierSink.unregister(measurement_class, callback)

Stop notifying callback of new values of measurement_class.

If the callback wasn’t previously registered, this method will have no effect.

Common functinality for data sinks that work on a queue of incoming messages.

class openxc.sinks.queued.QueuedSink

Store every message received and any kwargs from the originating data source as a tuple in a queue.

The queue can be reference in subclasses via the queue attribute.

receive(message, **kwargs)

Add the message and kwargs to the queue.

Trace file recording operations.

class openxc.sinks.recorder.FileRecorderSink

A sink to record trace files based on the messages received from all data sources.

FILENAME_DATE_FORMAT = '%Y-%m-%d-%H'
FILENAME_FORMAT = '%s.json'
class Recorder(queue)
run()
class openxc.sinks.uploader.UploaderSink(url)

Uploads all incoming vehicle data to a remote web application via HTTP.

TODO document service side format

Args: url (str) - the URL to send an HTTP POST request with vehicle data

HTTP_TIMEOUT = 5000
UPLOAD_BATCH_SIZE = 25
class Uploader(queue, url)
run()