新疆工程建设云网站百度天津seo顾问
一、引言
MongoDB 的聚合管道(Aggregation Pipeline)是一种强大的数据处理工具,它允许你对文档进行一系列的操作,如过滤、转换、分组和聚合等。聚合管道由多个管道组成,每个管道对输入的文档进行特定的处理,并将处理后的结果传递给下一个管道。
二、基础管道
1. 过滤管道 $match
功能
$match
管道用于过滤文档,它的作用类似于 find()
方法的条件查询。只有满足指定条件的文档才会进入下一个管道。
示例代码
假设我们有一个名为 students
的集合,包含以下文档:
[{ "name": "Alice", "age": 20, "status": "A" },{ "name": "Bob", "age": 17, "status": "B" },{ "name": "Charlie", "age": 22, "status": "A" }
]
以下是使用 $match
管道过滤出年龄大于等于 18 且状态为 "A" 的学生的聚合管道代码:
from pymongo import MongoClient# 连接到 MongoDB 数据库
uri = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $match 管道,过滤出 age 大于等于 18 且 status 为 "A" 的文档'$match': {'age': {'$gte': 18},'status': "A"}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)# 关闭连接
client.close()
’$gte‘是比较操作符(>=),类似的比较操作符如下:
名称 | 说明 |
---|---|
$eq | 匹配等于指定值的值。 |
$gt | 匹配大于指定值的值。 |
$gte | 匹配大于等于指定值的值。 |
$in | 匹配数组中指定的任何值。 |
$lt | 匹配小于指定值的值。 |
$lte | 匹配小于等于指定值的值。 |
$ne | 匹配所有不等于指定值的值。 |
$nin | 不匹配数组中指定的任何值。 |
结果
执行上述代码后,预期的结果是返回年龄大于等于 18 且状态为 "A" 的学生文档,即:
[{ "name": "Alice", "age": 20, "status": "A" },{ "name": "Charlie", "age": 22, "status": "A" }
]
2. 分组管道 $group
功能
$group
管道用于按指定字段对文档进行分组,并可以对每个组进行聚合操作,如求和、计数、求平均值等。
示例代码
继续使用 students
集合,以下是按 status
字段分组并统计每组学生数量的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $group 管道,按 status 字段分组'$group': {# _id 是分组的依据字段'_id': "$status",# 统计每组的学生数量'studentCount': {'$sum': 1}}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是按 status
字段分组并统计每组的学生数量,结果如下:
[{ "_id": "A", "studentCount": 2 },{ "_id": "B", "studentCount": 1 }
]
3.投影管道 $project
功能
$project
管道用于选择、重命名或删除字段,还可以计算新的字段。
示例代码
以下是使用 $project
管道选择 name
和 age
字段,并计算 ageInDays
字段(假设一年按 365 天计算)的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $project 管道,选择和计算字段'$project': {# 保留 name 字段'name': 1,# 保留 age 字段'age': 1,# 计算 ageInDays 字段'ageInDays': {'$multiply': ["$age", 365]}}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是返回包含 name
、age
和 ageInDays
字段的文档,结果如下:
[{ "name": "Alice", "age": 20, "ageInDays": 7300 },{ "name": "Bob", "age": 17, "ageInDays": 6205 },{ "name": "Charlie", "age": 22, "ageInDays": 8030 }
]
4. 排序管道 $sort
功能
$sort
管道用于按指定字段对文档进行排序,排序规则为 1 表示升序, -1 表示降序。
示例代码
以下是按 age
字段降序排序的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $sort 管道,按 age 字段降序排序'$sort': {'age': -1}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是按 age
字段降序排列的文档,结果如下:
[{ "name": "Charlie", "age": 22, "status": "A" },{ "name": "Alice", "age": 20, "status": "A" },{ "name": "Bob", "age": 17, "status": "B" }
]
5. 分页管道 $limit
和 $skip
功能
$limit
管道用于限制返回的文档数量,$skip
管道用于跳过指定数量的文档,通常一起使用来实现分页功能。
示例代码
假设我们要获取第二页的数据,每页显示 1 条记录,以下是对应的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $skip 管道,跳过 1 条记录'$skip': 1},{# $limit 管道,限制返回 1 条记录'$limit': 1}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是跳过第一条记录,返回第二条记录,结果如下:
[{ "name": "Bob", "age": 17, "status": "B" }
]
三、高级管道
1.展开管道 $unwind
功能
$unwind
管道用于展开数组字段,将包含数组字段的文档拆分成多个文档,每个文档包含数组中的一个元素。
示例代码
假设我们有一个名为 books
的集合,包含以下文档:
[{ "title": "Book 1", "authors": ["Author 1", "Author 2"] },{ "title": "Book 2", "authors": ["Author 3"] }
]
以下是使用 $unwind
管道展开 authors
数组的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['books']# 定义聚合管道
pipeline = [{# $unwind 管道,展开 authors 数组'$unwind': "$authors"}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是将包含数组字段的文档拆分成多个文档,每个文档包含数组中的一个元素,结果如下:
[{ "title": "Book 1", "authors": "Author 1" },{ "title": "Book 1", "authors": "Author 2" },{ "title": "Book 2", "authors": "Author 3" }
]
2.连接管道 $lookup
功能
$lookup
管道用于在不同的集合之间进行左外连接,类似于 SQL 中的 JOIN 操作。
示例代码
假设我们有两个集合:orders
和 customers
,orders
集合包含以下文档:
[{ "orderId": 1, "customerId": 1, "amount": 100 },{ "orderId": 2, "customerId": 2, "amount": 200 }
]
customers
集合包含以下文档:
[{ "customerId": 1, "name": "Customer 1" },{ "customerId": 2, "name": "Customer 2" }
]
以下是使用 $lookup
管道将 orders
集合和 customers
集合进行连接的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
orders_collection = database['orders']# 定义聚合管道
pipeline = [{# $lookup 管道,连接 orders 集合和 customers 集合'$lookup': {# 要连接的集合名称'from': "customers",# 当前集合中用于连接的字段'localField': "customerId",# 要连接的集合中用于连接的字段'foreignField': "customerId",# 连接结果存储的字段名'as': "customerInfo"}}
]# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是将 orders
集合和 customers
集合进行左外连接,结果如下:
[{"orderId": 1,"customerId": 1,"amount": 100,"customerInfo": [{ "customerId": 1, "name": "Customer 1" }]},{"orderId": 2,"customerId": 2,"amount": 200,"customerInfo": [{ "customerId": 2, "name": "Customer 2" }]}
]
3.新增管道 $addFields
功能
$addFields
管道用于向文档中添加新的字段,而不影响原有的字段。
示例代码
继续使用 students
集合,以下是使用 $addFields
管道添加 isAdult
字段的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $addFields 管道,添加 isAdult 字段'$addFields': {'isAdult': {'$gte': ["$age", 18]}}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是向文档中添加 isAdult
字段,该字段表示学生是否成年,结果如下:
[{ "name": "Alice", "age": 20, "status": "A", "isAdult": true },{ "name": "Bob", "age": 17, "status": "B", "isAdult": false },{ "name": "Charlie", "age": 22, "status": "A", "isAdult": true }
]
4.设置管道 $set
功能
$set
管道用于更新或添加字段,如果字段已经存在,则更新其值;如果字段不存在,则添加该字段。
示例代码
以下是使用 $set
管道更新 status
字段为 "completed" 的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $set 管道,更新 status 字段'$set': {'status': "completed"}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是将所有文档的 status
字段更新为 "completed",结果如下:
[{ "name": "Alice", "age": 20, "status": "completed" },{ "name": "Bob", "age": 17, "status": "completed" },{ "name": "Charlie", "age": 22, "status": "completed" }
]
5. 地图管道 $geoNear
功能
$geoNear
管道用于基于地理位置进行查询,返回距离指定坐标点最近的文档。
示例代码
假设我们有一个名为 places
的集合,包含以下文档:
[{"name": "Place 1","location": {"type": "Point","coordinates": [116.4074, 39.9042]}},{"name": "Place 2","location": {"type": "Point","coordinates": [116.4174, 39.9142]}}
]
以下是使用 $geoNear
管道查询距离坐标点 [116.4074, 39.9042]
最近的地点的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['places']# 创建 2dsphere 索引,用于地理空间查询
collection.create_index([("location", "2dsphere")])# 定义聚合管道
pipeline = [{# $geoNear 管道,查询距离指定坐标点最近的地点'$geoNear': {# 指定查询的中心点坐标'near': {'type': "Point", 'coordinates': [116.4074, 39.9042]},# 存储计算出的距离的字段名'distanceField': "distance",# 最大查询距离(单位:米)'maxDistance': 10000,# 是否返回包含中心点的文档'spherical': True}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是返回距离指定坐标点最近的地点文档,并在文档中添加 distance
字段表示距离中心点的距离。因为 Place 1
的坐标与查询中心点坐标相同,所以它会排在第一位,Place 2
排在第二位,结果大致如下(distance
值会根据实际计算得出):
[{"name": "Place 1","location": {"type": "Point","coordinates": [116.4074, 39.9042]},"distance": 0},{"name": "Place 2","location": {"type": "Point","coordinates": [116.4174, 39.9142]},"distance": 1234 // 实际距离值}
]
四、其他常用管道
1. 分组连接管道 $graphLookup
功能
$graphLookup
管道用于处理图结构数据,例如树状关系。它可以从一个集合中递归地查找相关文档。
示例代码
假设我们有一个名为 employees
的集合,存储员工及其上级关系,文档结构如下:
[{ "_id": 1, "name": "Alice", "managerId": null },{ "_id": 2, "name": "Bob", "managerId": 1 },{ "_id": 3, "name": "Charlie", "managerId": 2 }
]
以下是使用 $graphLookup
管道查找每个员工的所有上级的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['employees']# 定义聚合管道
pipeline = [{# $graphLookup 管道,查找每个员工的所有上级'$graphLookup': {# 要查找的集合'from': "employees",# 起始字段,即当前文档中用于开始查找的字段'startWith': "$managerId",# 连接字段,用于递归查找'connectFromField': "managerId",# 被连接字段,与 connectFromField 对应'connectToField': "_id",# 存储查找结果的字段名'as': "managers"}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是为每个员工文档添加一个 managers
数组字段,其中包含该员工的所有上级信息。例如:
[{"_id": 1,"name": "Alice","managerId": null,"managers": []},{"_id": 2,"name": "Bob","managerId": 1,"managers": [{ "_id": 1, "name": "Alice", "managerId": null }]},{"_id": 3,"name": "Charlie","managerId": 2,"managers": [{ "_id": 2, "name": "Bob", "managerId": 1 },{ "_id": 1, "name": "Alice", "managerId": null }]}
]
2.面管道 $facet
功能
$facet
管道允许在单个聚合管道中执行多个不同的聚合操作,并将结果分组返回。这对于在一个查询中进行多维度分析非常有用。
示例代码
继续使用 students
集合,以下是使用 $facet
管道同时进行分页和统计学生总数的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $facet 管道,同时进行分页和统计'$facet': {# 分页操作'paginatedResults': [{'$skip': 0},{'$limit': 2}],# 统计学生总数'totalCount': [{'$count': "total"}]}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是返回一个包含两个字段的文档,paginatedResults
字段包含分页后的学生文档,totalCount
字段包含学生总数。例如:
[{"paginatedResults": [{ "name": "Alice", "age": 20, "status": "A" },{ "name": "Bob", "age": 17, "status": "B" }],"totalCount": [{ "total": 3 }]}
]
3. 分阶管道 $bucket
和 $bucketAuto
功能
$bucket
管道用于将文档分到不同的区间(桶)中,需要手动指定区间边界;$bucketAuto
管道则会自动将文档分到指定数量的区间中。
示例代码($bucket
)
以下是使用 $bucket
管道按 age
字段将学生文档分到不同区间的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $bucket 管道,按 age 字段分组'$bucket': {# 分组依据字段'groupBy': "$age",# 区间边界'boundaries': [15, 20, 25],# 缺省桶,用于存储不在指定区间内的文档'default': "Other",# 每个桶中要统计的信息'output': {'count': {'$sum': 1}}}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是将学生文档按 age
字段分到不同的区间中,并统计每个区间的文档数量。例如:
[{ "_id": 15, "count": 1 },{ "_id": 20, "count": 1 },{ "_id": "Other", "count": 1 }
]
示例代码($bucketAuto
)
以下是使用 $bucketAuto
管道按 age
字段自动将学生文档分到 2 个区间的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $bucketAuto 管道,自动分组'$bucketAuto': {# 分组依据字段'groupBy': "$age",# 区间数量'buckets': 2,# 每个桶中要统计的信息'output': {'count': {'$sum': 1}}}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
结果
执行上述代码后,预期的结果是将学生文档按 age
字段自动分到 2 个区间中,并统计每个区间的文档数量。例如:
[{"_id": {"min": 17,"max": 20},"count": 2},{"_id": {"min": 20,"max": 22},"count": 1}
]
4.输出管道 $out
功能
$out
管道用于将聚合结果输出到一个新的集合中。
示例代码
以下是使用 $out
管道将过滤后的学生文档输出到一个新集合 filtered_students
中的聚合管道代码:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $match 管道,过滤出 age 大于等于 18 的学生'$match': {'age': {'$gte': 18}}},{# $out 管道,将结果输出到新集合'$out': "filtered_students"}
]# 执行聚合操作
list(collection.aggregate(pipeline))
print("聚合结果已输出到 filtered_students 集合")client.close()
结果
执行上述代码后,会在数据库中创建一个名为 filtered_students
的新集合,其中包含年龄大于等于 18 的学生文档。可以通过查询该集合来验证结果:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['filtered_students']# 查询新集合中的文档
result = list(collection.find())
print(result)client.close()
预期结果与 $match
管道过滤后的结果相同:
[{ "name": "Alice", "age": 20, "status": "A" },{ "name": "Charlie", "age": 22, "status": "A" }
]
五、管道使用建议
1. 管道的组合使用
通过多个管道的组合可以实现复杂的逻辑。例如,先使用 $match
管道过滤数据,减少后续管道处理的数据量,再使用 $group
管道进行分组统计。以下是一个组合示例:
from pymongo import MongoClienturi = 'mongodb://localhost:27017'
client = MongoClient(uri)
database = client['testdb']
collection = database['students']# 定义聚合管道
pipeline = [{# $match 管道,过滤出 age 大于等于 18 的学生'$match': {'age': {'$gte': 18}}},{# $group 管道,按 status 字段分组并统计每组学生数量'$group': {'_id': "$status",'studentCount': {'$sum': 1}}}
]# 执行聚合操作
result = list(collection.aggregate(pipeline))
print(result)client.close()
2. 性能优化
- 优先使用
$match
或$limit
管道:在聚合管道的开头使用$match
管道过滤数据,减少后续管道处理的数据量;使用$limit
管道限制返回的文档数量,避免处理过多不必要的数据。 - 避免在
$group
管道中使用复杂表达式:复杂的表达式会增加计算量,影响性能。尽量在$group
管道之前进行数据处理,将复杂的计算放到$project
管道。
3. 参考官方文档
MongoDB 的官方文档提供了完整的聚合管道列表和详细的示例,建议参考 MongoDB Aggregation Pipeline 获取更多信息。
通过本教程,你应该对 MongoDB 聚合管道的常用管道有了更深入的了解,可以根据具体需求选择合适的管道组合来完成数据处理任务。