使用 MongoDB 聚合框架计算一阶导数

问题描述

是否可以使用聚合框架计算一阶导数?

Is it possible to calculate a first order derivative using the aggregate framework?

例如,我有数据:

{time_series : [10,20,40,70,110]}

我正在尝试获得如下输出:

I'm trying to obtain an output like:

{derivative : [10,20,30,40]}


解决方案

db.collection.aggregate(
    [
      {
        "$addFields": {
          "indexes": {
            "$range": [
              0,
              {
                "$size": "$time_series"
              }
            ]
          },
          "reversedSeries": {
            "$reverseArray": "$time_series"
          }
        }
      },
      {
        "$project": {
          "derivatives": {
            "$reverseArray": {
              "$slice": [
                {
                  "$map": {
                    "input": {
                      "$zip": {
                        "inputs": [
                          "$reversedSeries",
                          "$indexes"
                        ]
                      }
                    },
                    "in": {
                      "$subtract": [
                        {
                          "$arrayElemAt": [
                            "$$this",
                            0
                          ]
                        },
                        {
                          "$arrayElemAt": [
                            "$reversedSeries",
                            {
                              "$add": [
                                {
                                  "$arrayElemAt": [
                                    "$$this",
                                    1
                                  ]
                                },
                                1
                              ]
                            }
                          ]
                        }
                      ]
                    }
                  }
                },
                {
                  "$subtract": [
                    {
                      "$size": "$time_series"
                    },
                    1
                  ]
                }
              ]
            }
          },
          "time_series": 1
        }
      }
    ]
)

我们可以在 3.4+ 版本中使用上述管道来执行此操作.在管道中,我们使用 $addFields 流水线阶段.运算符添加time_series"的元素索引的数组来做文档,我们还反转了时间序列数组并将其添加到文档中分别使用 $range$reverseArray 运算符

We can use the pipeline above in version 3.4+ to do this. In the pipeline, we use the $addFields pipeline stage. operator to add the array of the "time_series"'s elements index to do document, we also reversed the time series array and add it to the document using respectively the $range and $reverseArray operators

我们在这里反转了数组,因为数组中 p 位置的元素总是大于 p+1 位置的元素,这意味着 [p] - [p+1] <0 并且我们不想使用 $multiply 这里.(请参阅 3.2 版的管道)

We reversed the array here because the element at position p in the array is always greater than the element at position p+1 which means that [p] - [p+1] < 0 and we do not want to use the $multiply here.(see pipeline for version 3.2)

接下来我们用索引数组$zipped 时间序列数据并应用 $map 运算符将 rel="nofollow noreferrer">substract 表达式添加到结果数组.

Next we $zipped the time series data with the indexes array and applied a substract expression to the resulted array using the $map operator.

我们然后$slice将结果从数组中丢弃null/None值并重新反转结果.

We then $slice the result to discard the null/None value from the array and re-reversed the result.

在 3.2 中我们可以使用 $unwind 运算符来展开我们的数组,并通过将文档指定为操作数而不是以 $ 为前缀的传统路径"来包含数组中每个元素的索引.

In 3.2 we can use the $unwind operator to unwind our array and include the index of each element in the array by specifying a document as operand instead of the traditional "path" prefixed by $.

接下来,我们需要 $group 我们的文档并使用 $push 累加器运算符返回一个子文档数组,如下所示:

Next in the pipeline, we need to $group our documents and use the $push accumulator operator to return an array of sub-documents that look like this:

{
    "_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
    "time_series" : [
        { "value" : 10, "index" : NumberLong(0) },
        { "value" : 20, "index" : NumberLong(1) },
        { "value" : 40, "index" : NumberLong(2) },
        { "value" : 70, "index" : NumberLong(3) },
        { "value" : 110, "index" : NumberLong(4) }
    ]
}

<小时>

终于来了 $project 舞台.在这个阶段,我们需要使用 $map 运算符将一系列表达式应用于 $group 阶段中新计算的数组中的每个元素.


Finally comes the $project stage. In this stage, we need to use the $map operator to apply a series of expression to each element in the the newly computed array in the $group stage.

这是 $map 内部发生的事情(将 $map 视为 for 循环)in 表达式:

Here is what is going on inside the $map (see $map as a for loop) in expression:

对于每个子文档,我们使用 value 字段分配给一个变量="nofollow noreferrer">$let 变量运算符.然后我们从数组中下一个元素的value"字段的值中减去它的值.

For each subdocument, we assign the value field to a variable using the $let variable operator. We then subtract it value from the value of the "value" field of the next element in the array.

由于数组中的下一个元素是当前索引处的元素加一,我们所需要的只是 $arrayElemAt 运算符和一个简单的 $addition 当前元素的索引和 1.

Since the next element in the array is the element at the current index plus one, all we need is the help of the $arrayElemAt operator and a simple $addition of the current element's index and 1.

$subtract 表达式返回一个负值,因此我们需要使用 -1"nofollow noreferrer">$multiply 运算符.

The $subtract expression return a negative value so we need to multiply the value by -1 using the $multiply operator.

我们还需要$filter 结果数组,因为它的最后一个元素是 Nonenull.原因是当当前元素是最后一个元素时,$subtract 返回None,因为下一个元素的索引等于数组的大小.

We also need to $filter the resulted array because it the last element is None or null. The reason is that when the current element is the last element, $subtract return None because the index of the next element equal the size of the array.

db.collection.aggregate([
  {
    "$unwind": {
      "path": "$time_series",
      "includeArrayIndex": "index"
    }
  },
  {
    "$group": {
      "_id": "$_id",
      "time_series": {
        "$push": {
          "value": "$time_series",
          "index": "$index"
        }
      }
    }
  },
  {
    "$project": {
      "time_series": {
        "$filter": {
          "input": {
            "$map": {
              "input": "$time_series",
              "as": "el",
              "in": {
                "$multiply": [
                  {
                    "$subtract": [
                      "$$el.value",
                      {
                        "$let": {
                          "vars": {
                            "nextElement": {
                              "$arrayElemAt": [
                                "$time_series",
                                {
                                  "$add": [
                                    "$$el.index",
                                    1
                                  ]
                                }
                              ]
                            }
                          },
                          "in": "$$nextElement.value"
                        }
                      }
                    ]
                  },
                  -1
                ]
              }
            }
          },
          "as": "item",
          "cond": {
            "$gte": [
              "$$item",
              0
            ]
          }
        }
      }
    }
  }
])

<小时>

我认为效率较低的另一个选项是使用 map_reduce 方法.

>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
...               function() {
...                 var derivatives = [];
...                 for (var index=1; index<this.time_series.length; index++) {
...                   derivatives.push(this.time_series[index] - this.time_series[index-1]);
...                 }
...                 emit(this._id, derivatives);
...               }
...               """)
>>> reducer = Code("""
...                function(key, value) {}
...                """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
...     print(res)  # or do something with the document.
... 
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}

<小时>

您还可以检索所有文档并使用 numpy.diff 像这样返回导数:

import numpy as np


for document in collection.find({}, {'time_series': 1}):
    result = np.diff(document['time_series']) 

相关文章