Last modified 5 years ago Last modified on 08/08/2012 11:41:27 AM


Work in progress: This page describes the internal API of the datastream module.

Datastream API

Basic API operations:

  • init(backend, callback)
    • API initialization
    • takes a backend instance and optional callback function to call every time a new datapoint is inserted or downsampled
  • ensure_metric(query_tags, tags, downsamplers, highest_granularity) -> metric_id
    • query_tags: tags which uniquely determine a metric, otherwise a MultipleMetricsReturned exception is thrown
    • if metric does not yet exist, it is created using downsamplers, highest_granularity, and union of query_tags and tags
    • highest_granularity must be one of constant strings: seconds, minutes, hours, days
      • predicted highest granularity of the data the metric will store, may be used to optimize data storage
    • downsamplers is a set of names of downsampler functions supported by the backend
      • an UnsupportedDownsampler exception is thrown if downsampler function is unsupported by the backend
  • get_tags(metric_id) -> tags
    • metric metadata are presented as read-only tags: metric_id, metric_type, highest_granularity
  • update_tags(metric_id, tags)
    • updates metric tags with new tags, overriding existing ones
    • tag names used for metric metadata are reserved and cannot be used
  • remove_tag(metric_id, tag)
  • clear_tags(metric_id)
    • removes all tags
  • find_metrics(query_tags) -> [tags]
    • finds all metrics matching given query tags (metadata tags can also be used)
    • each metric is represented as a set of tags, same as returned from get_tags
  • insert(metric_id, value)
    • value can be any value supported by the metric_type
  • get_data(metric_id, granularity, start, end, downsamplers) -> data
    • end is optional
    • downsamplers is optional; if specified, it limits the values returned for each datapoint
  • downsample_metrics(query_tags)

Tags are a set of arbitrary JSON-serializable values that can be assigned to each metric. Although tags can be complex values, simple values like strings or dicts of strings are preferred.

The above basic API operations are implemented in backends. Backends are responsible for storing the data points, performing downsampling and executing time span queries. Currently, the only supported backends are MongoDB and Tempo and the internal stream API is modeled after them so it might not be usable for other backends. If a need for another backend arise, this internal API should be revised.


Currently defined downsampling functions are:

  • mean: average of all datapoints (key: m)
  • median: median of all datapoints (key: e)
  • sum: sum of all datapoints (key: s)
  • min: minimum value of all dataponts (key: l, for lower)
  • max: maximum value of all datapoints (key: u, for upper)
  • sum_squares: sum of squares of all datapoints (key: q)
  • std_dev: standard deviation of all datapoints (key: d)
  • count: number of all datapoints (key: c)
  • most_often: the most often occurring value of all datapoints (key: o, for often)
  • least_often: the least often occurring value of all datapoints (key: r, for rare)
  • frequencies: for each value number of occurrences in all datapoints (key: f)

(Not all values are necessary stored in the backend storage, could be computed on the fly.)



Metrics are stored in the metrics collection, datapoints are stored in the datapoints.<granularity> collections, where <granularity> is seconds, minutes, hours, and days.

TODO: Describe the schema.

This backend supports all defined downsampling functions.

There are two ways in which updates can be handled:

  • At the end of each measurement interval, downsampling is performed for all values in the previous interval.
  • Incrementally, after inserting each point.

When performing downsampling, we have to differentiate between two timestamps:

  • Datapoint raw timestamp is the timestamp of the raw datapoint that has been inserted for a given metric. It always has second granularity.
  • Datapoint downsampled timestamp is generated from the raw timestamp by rounding it to the given granularity. For example if raw timestamp is 31-07-2012 12:23:52, then the downsampled timestamp for hour granularity would be 31-07-2012 12:00:00 and for month granularity would be 01-07-2012 00:00:00. TODO: I am not sure if this is the best. You are discarding information about time-spread of datapoints on higher granularity here. Like missing datapoints on higher granularity, having only one datapoint there, having datapoints not in in regular intervals and so on. I would take the average (or median?) of timestamps of datapoints in the interval. Or we could give whole interval information. This could be encoded by simply storing the timestamp of the first datapoint in the interval. So then client would know that all this (mean, deviation, ...) is for the interval between this and next timestamp. Or, we could have both information, interval, mean/median of timestamps. In some way, we should have functions which downsample timestamps. And provide those different values. So not just downsampling values, but also downsampling timestamps should be implemented.

Based on highest_granularity value, raw datapoints are stored in the collection configured by highest_granularity and only lower granularity values are downsampled. Requests for granularity higher than highest_granularity simply return values from highest_granularity collection.

TODO: Describe how downsampled metadata is stored and updated by downsampling functions.

Tempo backend



TODO: There should be API and functions to retrieve data from multiple metrics at the same time.

We should probably not support arbitrary aggregations, but only those which are defined in advance. We could create something like virtual metrics which are based on some other metrics and get updated whenever new data is inserted into those other metrics, through some additional function.

TODO: It would be also useful to be able to in advance define such aggregated metrics, for example, aggregated metrics of all future gateway traffic.


HTTP API is REST-like API and is based on Django. For this document we assume that API is nested under /api/ URL prefix.

List of all metrics can be obtained at:


Accessing particular metric is through its ID:


Together with some metadata also datapoints are returned as a list of t (time) and v (value) dictionaries. Which data is returned can be configured with query parameters:

granularity (s, m, h, and d, for seconds, minutes, hours, and days, respectively)
start time (in a number of seconds from UNIX epoch)
end time (in a number of seconds from UNIX epoch)
downsamplers, you can specify them to limit returned downsampled values to only those, can be a comma-separated list or specified multiple times in the query; possible values are same as downsamplers' keys

For example, query to return minutes granularity with only average, min, and max values, could be:


TODO: Support for data HTTP streaming.

High-level Interface

These operations are pretty low-level and a higher-level interface for managing Datastream API metrics is provided for nodewatcher monitoring system. The nodewatcher's Datastream API connector module implements a monitoring processor called DatastreamProcessor which is a node processor that iterates over all registry items in node.monitoring and attempts to insert their data into Datastream API.

Any object can be transformed into a datastream object by adding a special connect_datastream attribute of type datastream.ConnectDatastream to it. This attribute contains defintion of the fields that will be connected to Datastream API when the object is processed.

Fields are specified as follows:

class MyItem(object):
  # ...
  connect_datastream = datastream.ConnectDatastream(
    latency = datastream.IntegerField(),
    packet_loss = datastream.IntegerField(attribute = "ploss"),
    traffic = datastream.RateField()

Fields are part of the higher-level interface and provide a unified way to define metrics which are automatically derived from fields.

The following fields are supported:

  • IntegerField
  • FloatField
  • RateField
  • EnumField

Fields are responsible for the following thigs:

  • Acquiring data from the object they are defined on and transforming it into an appropriate data type.
  • Generating metrics from fields.
  • Defining which downsampler functions will be used.

Data is retrieved from an object by calling getattr on the attribute that has the same name as the field (unless an attribute is defined for a field in which case that name is used for attribute access).

Each field can create one or more (multiple metrics are for example needed for the RateField) metrics in the datastream API.

When fields create their metrics, they populate their tags with relevant data:

  • tag type mapping field type to string (for example, integer, rate, enum`)
  • tag name (for example above, one of latency, packet_loss, traffic)
  • tags returned by calling get_metric_tags() method on "datastream object"
    • it should return a dictionary containing a set of tags to give metrics generated for this object
    • the monitoring registry item by default returns:
      • tag node containing the node's UUID
      • tag registry_id
      • tag object_id
    • if more appropriate unique values are available, monitoring registry items use those instead of registry_id and object_id (implemented by simply overriding default get_metric_tags method)
  • TODO: Add more meta data. So that stand-alone metric browser can support independent browsing of metrics. For example, for enum type, possible values.

Fields can also add their own tags, for example:

  • RateField adds derived_from tag pointing to another metric_id

For a field to access its metric(s), it calls the get_metric_query_tags method on "datastream object" which should return tags uniquely identifying a metric. A subset of those returned by get_metric_tags(). For example, the monitoring registry items by default return a subset of tags returned get_metric_tags() which are uniquely identifying a metric.

This can be augmented by field to access particular metric, for example in the case of RateField, to choose between original or derived metric.