We need two tasks do be done:
Short version:
1) Create a scheme migration script
2) Aggregate data using the db’s aggregation pipeline
Long version:
1) Create scheme migration script
The **current data scheme** of a collection which contains time series data is:
Collection: data_raw_old
{
"_id" : ObjectId("558a7b789d712bd5f43e515d"),
"dt" : {
"week_of_year" : 26,
"day_of_week" : 2,
"hour" : 0,
"year" : 2015,
"timestamp" : ISODate("2015-06-24T00:05:00.000Z"),
"day_of_year" : 175,
"day" : 24,
"minute" : 5,
"month" : 6
},
"owner" : {
"version" : "1.1",
"type" : "plugin_object",
"name" : "FunctionSourcePlugin",
"id" : ObjectId("558a79d0d3da461d003c6ebb")
},
"dtype" : "real",
"modified" : ISODate("2015-06-24T11:42:16.440Z"),
"values" : {
"vel" : 5.6840797248015100
"temp" : 12.1
},
"created" : ISODate("2015-06-24T11:42:16.440Z")
}
The values-subdocument contains an varying number of key-value pairs, one per measurement. In the new scheme, we want to have one document per value. The document given above would yield two documents:
Collection: data_raw_new
{
"_id" : "20150624000500000_FunctionSourceObject. [login to view URL]",
"dtype" : "real",
"modified" : ISODate("2015-06-24T11:42:16.440Z"),
"value" : 5.6840797248015100,
"sid" : "FunctionSourceObject. [login to view URL]",
"owner" : "FunctionSourceObject. 558a79d0d3da461d003c6ebb",
"dt" : ISODate("2015-06-24T00:05:00.000Z"),
"created" : ISODate("2015-06-24T11:42:16.440Z")
}, {
"_id" : "20150624000500000_FunctionSourceObject. [login to view URL]",
"dtype" : "real",
"modified" : ISODate("2015-06-24T11:42:16.440Z"),
"value" : 12.1,
"sid" : "FunctionSourceObject. [login to view URL]",
"owner" : "FunctionSourceObject. 558a79d0d3da461d003c6ebb",
"dt" : ISODate("2015-06-24T00:05:00.000Z"),
"created" : ISODate("2015-06-24T11:42:16.440Z")
}
Please note:
- The new id is a combination of the dt-timestamp and the sid-value (sid = series id)
- To construct the sid value, user [login to view URL], [login to view URL] and the respective value key. In the initial schema, all [login to view URL] values end with “Plugin”. This must be replaced by “Object”
- A lot of fields and documents are removed
The migration script can either be executed directly in mongodb, in python (using pymongo) or nodejs.
2) Aggregate data using aggregation pipeline
The documents in the collection data_raw_new shall be aggregate in 1min, 5min, 15min, 1h, 24h and 7d buckets (collection names data_1min_new, data_5min_new etc.).
An aggregated document in the data_1min_new collection might looks as follow:
{
"_id" : "20150624000500000_FunctionSourceObject. [login to view URL]",
"dtype" : "real",
"modified" : ISODate("2015-06-24T11:42:16.440Z"),
"avg" : 5.1,
"sum" : 8,
"count" : 2,
"min" : 2.9,
"max" : 5.1,
"sid" : "FunctionSourceObject. [login to view URL]",
"owner" : "FunctionSourceObject. 558a79d0d3da461d003c6ebb",
"dt" : ISODate("2015-06-24T00:05:00.000Z"),
"created" : ISODate("2015-06-24T11:42:16.440Z")
}
The aggregation script will be executed regularly (e.g., every second minute), should query for documents in data_raw_new that have a modified date which is newer than the date when the aggregation command was executed last, compute aggregations and add them to the aggregation collection.
It would be best if the aggregation could be run directly in mongodb. I do, however, not know if this is possible. If not, the command will be triggered from a python script.
References that may be useful:
- [login to view URL]
- [login to view URL]
- [login to view URL]
- [login to view URL]