Firestore 流水线操作入门

背景

流水线查询是 Firestore 的一个新查询接口。它提供高级查询功能,包括复杂的表达式。它还添加了对许多新函数的支持,例如 min, max, substring, regex_matcharray_contains_all。借助流水线查询,索引创建也完全是可选项,从而简化了开发新查询的过程。流水线查询还消除了许多查询形状限制,让您可以指定大型 inor 查询。

开始使用

如需安装和初始化客户端 SDK,请参阅使用入门指南中的说明。

语法

以下部分简要介绍了流水线查询的语法。

概念

流水线查询的一个显著区别在于引入了明确的“阶段”排序。这样一来,您就可以表达更复杂的查询。不过,这与现有查询接口(其中阶段的顺序是隐式的)存在显著不同。请考虑以下流水线查询示例:

Web

const pipeline = db.pipeline()
  // Step 1: Start a query with collection scope
  .collection("cities")
  // Step 2: Filter the collection
  .where(field("population").greaterThan(100000))
  // Step 3: Sort the remaining documents
  .sort(field("name").ascending())
  // Step 4: Return the top 10. Note applying the limit earlier in the
  // pipeline would have unintentional results.
  .limit(10);
Swift
let pipeline = db.pipeline()
  // Step 1: Start a query with collection scope
  .collection("cities")
  // Step 2: Filter the collection
  .where(Field("population").greaterThan(100000))
  // Step 3: Sort the remaining documents
  .sort([Field("name").ascending()])
  // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have
  // unintentional results.
  .limit(10)

Kotlin

val pipeline = db.pipeline()
    // Step 1: Start a query with collection scope
    .collection("cities")
    // Step 2: Filter the collection
    .where(field("population").greaterThan(100000))
    // Step 3: Sort the remaining documents
    .sort(field("name").ascending())
    // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have
    // unintentional results.
    .limit(10)

Java

Pipeline pipeline = db.pipeline()
    // Step 1: Start a query with collection scope
    .collection("cities")
    // Step 2: Filter the collection
    .where(field("population").greaterThan(100000))
    // Step 3: Sort the remaining documents
    .sort(field("name").ascending())
    // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have
    // unintentional results.
    .limit(10);
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

pipeline = (
    client.pipeline()
    .collection("cities")
    .where(Field.of("population").greater_than(100_000))
    .sort(Field.of("name").ascending())
    .limit(10)
)

初始化

流水线查询的语法与现有 Cloud Firestore 查询的语法非常相似。首先,您可以通过编写以下内容来初始化查询:

Web

const { getFirestore } = require("firebase/firestore");
const { execute } = require("firebase/firestore/pipelines");
const database = getFirestore(app, "enterprise");
const pipeline = database.pipeline();
Swift
let firestore = Firestore.firestore(database: "enterprise")
let pipeline = firestore.pipeline()

Kotlin

val firestore = Firebase.firestore("enterprise")
val pipeline = firestore.pipeline()

Java

FirebaseFirestore firestore = FirebaseFirestore.getInstance("enterprise");
PipelineSource pipeline = firestore.pipeline();
Python
firestore_client = firestore.client(default_app, "your-new-enterprise-database")
pipeline = firestore_client.pipeline()

结构

在创建流水线查询时,有几个术语非常重要,需要了解:阶段、表达式和函数。

示例:展示查询中的阶段和表达式

阶段:流水线可以包含一个或多个阶段。从逻辑上讲,这些代表执行查询所采取的一系列步骤(或阶段)。注意:在实践中,为了提高性能,阶段可能会乱序执行。不过,这不会改变查询的意图或正确性。

表达式:阶段通常会接受表达式,以便您表达更复杂的查询。表达式可以很简单,只包含一个函数,例如 eq("a", 1)。您还可以通过嵌套表达式(如 and(eq("a", 1), eq("b", 2)).)来表达更复杂的表达式

字段引用与常量引用

流水线查询支持复杂的表达式。因此,可能需要区分某个值代表的是字段还是常量。请参考以下示例:

Web

const pipeline = db.pipeline()
  .collection("cities")
  .where(field("name").equal(constant("Toronto")));
Swift
let pipeline = db.pipeline()
  .collection("cities")
  .where(Field("name").equal(Constant("Toronto")))

Kotlin

val pipeline = db.pipeline()
    .collection("cities")
    .where(field("name").equal(constant("Toronto")))

Java

Pipeline pipeline = db.pipeline()
    .collection("cities")
    .where(field("name").equal(constant("Toronto")));
Python
from google.cloud.firestore_v1.pipeline_expressions import Field, Constant

pipeline = (
    client.pipeline()
    .collection("cities")
    .where(Field.of("name").equal(Constant.of("Toronto")))
)

阶段

输入阶段

输入阶段表示查询的第一个阶段。它定义了您要查询的初始文档集。对于流水线查询,这在很大程度上与现有查询类似,其中大多数查询都以 collection(...)collection_group(...) 阶段开头。两个新的输入阶段是 database()documents(...),其中 database() 允许返回数据库中的所有文档,而 documents(...) 的作用与批量读取相同。

Web

let results;

// Return all restaurants in San Francisco
results = await execute(db.pipeline().collection("cities/sf/restaurants"));

// Return all restaurants
results = await execute(db.pipeline().collectionGroup("restaurants"));

// Return all documents across all collections in the database (the entire database)
results = await execute(db.pipeline().database());

// Batch read of 3 documents
results = await execute(db.pipeline().documents([
  doc(db, "cities", "SF"),
  doc(db, "cities", "DC"),
  doc(db, "cities", "NY")
]));
Swift
var results: Pipeline.Snapshot

// Return all restaurants in San Francisco
results = try await db.pipeline().collection("cities/sf/restaurants").execute()

// Return all restaurants
results = try await db.pipeline().collectionGroup("restaurants").execute()

// Return all documents across all collections in the database (the entire database)
results = try await db.pipeline().database().execute()

// Batch read of 3 documents
results = try await db.pipeline().documents([
  db.collection("cities").document("SF"),
  db.collection("cities").document("DC"),
  db.collection("cities").document("NY")
]).execute()

Kotlin

var results: Task<Pipeline.Snapshot>

// Return all restaurants in San Francisco
results = db.pipeline().collection("cities/sf/restaurants").execute()

// Return all restaurants
results = db.pipeline().collectionGroup("restaurants").execute()

// Return all documents across all collections in the database (the entire database)
results = db.pipeline().database().execute()

// Batch read of 3 documents
results = db.pipeline().documents(
    db.collection("cities").document("SF"),
    db.collection("cities").document("DC"),
    db.collection("cities").document("NY")
).execute()

Java

Task<Pipeline.Snapshot> results;

// Return all restaurants in San Francisco
results = db.pipeline().collection("cities/sf/restaurants").execute();

// Return all restaurants
results = db.pipeline().collectionGroup("restaurants").execute();

// Return all documents across all collections in the database (the entire database)
results = db.pipeline().database().execute();

// Batch read of 3 documents
results = db.pipeline().documents(
    db.collection("cities").document("SF"),
    db.collection("cities").document("DC"),
    db.collection("cities").document("NY")
).execute();
Python
# Return all restaurants in San Francisco
results = client.pipeline().collection("cities/sf/restaurants").execute()

# Return all restaurants
results = client.pipeline().collection_group("restaurants").execute()

# Return all documents across all collections in the database (the entire database)
results = client.pipeline().database().execute()

# Batch read of 3 documents
results = (
    client.pipeline()
    .documents(
        client.collection("cities").document("SF"),
        client.collection("cities").document("DC"),
        client.collection("cities").document("NY"),
    )
    .execute()
)

与其他所有阶段一样,这些输入阶段的结果顺序并不稳定。如果需要特定的排序,则应始终添加 sort(...) 运算符。

地点

where(...) 阶段充当对前一阶段生成的文档执行的传统过滤操作,并且在很大程度上反映了现有查询的“where”语法。如果给定表达式的计算结果为非 true 值,则相应文档会从返回的文档中过滤掉。

多个 where(...) 语句可以链接在一起,并充当 and(...) 表达式。例如,以下两个查询在逻辑上是等效的,可以互换使用。

Web

let results;

results = await execute(db.pipeline().collection("books")
  .where(field("rating").equal(5))
  .where(field("published").lessThan(1900))
);

results = await execute(db.pipeline().collection("books")
  .where(and(field("rating").equal(5), field("published").lessThan(1900)))
);
Swift
var results: Pipeline.Snapshot

results = try await db.pipeline().collection("books")
  .where(Field("rating").equal(5))
  .where(Field("published").lessThan(1900))
  .execute()

results = try await db.pipeline().collection("books")
  .where(Field("rating").equal(5) && Field("published").lessThan(1900))
  .execute()

Kotlin

var results: Task<Pipeline.Snapshot>

results = db.pipeline().collection("books")
    .where(field("rating").equal(5))
    .where(field("published").lessThan(1900))
    .execute()

results = db.pipeline().collection("books")
    .where(Expression.and(field("rating").equal(5),
      field("published").lessThan(1900)))
    .execute()

Java

Task<Pipeline.Snapshot> results;

results = db.pipeline().collection("books")
    .where(field("rating").equal(5))
    .where(field("published").lessThan(1900))
    .execute();

results = db.pipeline().collection("books")
    .where(Expression.and(
        field("rating").equal(5),
        field("published").lessThan(1900)
    ))
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import And, Field

results = (
    client.pipeline()
    .collection("books")
    .where(Field.of("rating").equal(5))
    .where(Field.of("published").less_than(1900))
    .execute()
)

results = (
    client.pipeline()
    .collection("books")
    .where(And(Field.of("rating").equal(5), Field.of("published").less_than(1900)))
    .execute()
)

选择/添加和移除字段

借助 select(...)add_fields(...)remove_fields(...),您可以修改从上一个阶段返回的字段。这三个阶段通常称为投影样式阶段。

借助 select(...)add_fields(...),您可以将表达式的结果指定给用户提供的字段名称。如果表达式导致错误,则会生成 null 值。select(...) 只会返回具有指定字段名称的文档,而 add_fields(...) 会扩展前一阶段的架构(可能会覆盖具有相同字段名称的值)。

remove_fields(...) 可用于指定要从上一个阶段移除的一组字段。指定不存在的字段名称不会产生任何影响。

请参阅下方的限制要返回的字段部分,但一般来说,使用此类阶段将结果限制为仅包含客户端所需的字段有助于降低大多数查询的费用和延迟时间。

汇总/去重

借助 aggregate(...) 阶段,您可以对输入文档执行一系列聚合操作。默认情况下,所有文档都会汇总在一起,但您可以提供可选的 grouping 实参,以便将输入文档汇总到不同的桶中。

Web

const results = await execute(db.pipeline()
  .collection("books")
  .aggregate(
    field("rating").average().as("avg_rating")
  )
  .distinct(field("genre"))
);
Swift
let results = try await db.pipeline()
  .collection("books")
  .aggregate([
    Field("rating").average().as("avg_rating")
  ], groups: [
    Field("genre")
  ])
  .execute()

Kotlin

val results = db.pipeline()
    .collection("books")
    .aggregate(
        AggregateStage
            .withAccumulators(AggregateFunction.average("rating").alias("avg_rating"))
            .withGroups(field("genre"))
    )
    .execute()

Java

Task<Pipeline.Snapshot> results = db.pipeline()
    .collection("books")
    .aggregate(AggregateStage
        .withAccumulators(
            AggregateFunction.average("rating").alias("avg_rating"))
        .withGroups(field("genre")))
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

results = (
    client.pipeline()
    .collection("books")
    .aggregate(
        Field.of("rating").average().as_("avg_rating"), groups=[Field.of("genre")]
    )
    .execute()
)

如果未指定 groupings,此阶段将仅生成一个文档;否则,将为每个唯一的 groupings 值组合生成一个文档。

distinct(...) 阶段是一个简化的聚合运算符,允许仅生成唯一的 groupings,而无需任何累加器。在所有其他方面,它的行为与 aggregate(...) 的行为完全相同。下面给出了一个示例:

Web

const results = await execute(db.pipeline()
  .collection("books")
  .distinct(
    field("author").toUpper().as("author"),
    field("genre")
  )
);
Swift
let results = try await db.pipeline()
  .collection("books")
  .distinct([
    Field("author").toUpper().as("author"),
    Field("genre")
  ])
  .execute()

Kotlin

val results = db.pipeline()
    .collection("books")
    .distinct(
        field("author").toUpper().alias("author"),
        field("genre")
    )
    .execute()

Java

Task<Pipeline.Snapshot> results = db.pipeline()
    .collection("books")
    .distinct(
        field("author").toUpper().alias("author"),
        field("genre")
    )
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

results = (
    client.pipeline()
    .collection("books")
    .distinct(Field.of("author").to_upper().as_("author"), "genre")
    .execute()
)

函数

函数是用于创建表达式和复杂查询的基本构建块。如需查看包含示例的完整函数列表,请参阅函数参考。简单回顾一下,典型的查询结构如下所示:

演示查询中的阶段和函数的示例

许多阶段都接受包含一个或多个函数的表达式。最常见的函数用法是在 where(...)select(...) 阶段。您应该熟悉以下两种主要类型的函数:

Web

let results;

// Type 1: Scalar (for use in non-aggregation stages)
// Example: Return the min store price for each book.
results = await execute(db.pipeline().collection("books")
  .select(field("current").logicalMinimum(field("updated")).as("price_min"))
);

// Type 2: Aggregation (for use in aggregate stages)
// Example: Return the min price of all books.
results = await execute(db.pipeline().collection("books")
  .aggregate(field("price").minimum().as("min_price"))
);
Swift
var results: Pipeline.Snapshot

// Type 1: Scalar (for use in non-aggregation stages)
// Example: Return the min store price for each book.
results = try await db.pipeline().collection("books")
  .select([
    Field("current").logicalMinimum(["updated"]).as("price_min")
  ])
  .execute()

// Type 2: Aggregation (for use in aggregate stages)
// Example: Return the min price of all books.
results = try await db.pipeline().collection("books")
  .aggregate([Field("price").minimum().as("min_price")])
  .execute()

Kotlin

var results: Task<Pipeline.Snapshot>

// Type 1: Scalar (for use in non-aggregation stages)
// Example: Return the min store price for each book.
results = db.pipeline().collection("books")
    .select(
        field("current").logicalMinimum("updated").alias("price_min")
    )
    .execute()

// Type 2: Aggregation (for use in aggregate stages)
// Example: Return the min price of all books.
results = db.pipeline().collection("books")
    .aggregate(AggregateFunction.minimum("price").alias("min_price"))
    .execute()

Java

Task<Pipeline.Snapshot> results;

// Type 1: Scalar (for use in non-aggregation stages)
// Example: Return the min store price for each book.
results = db.pipeline().collection("books")
    .select(
        field("current").logicalMinimum("updated").alias("price_min")
    )
    .execute();

// Type 2: Aggregation (for use in aggregate stages)
// Example: Return the min price of all books.
results = db.pipeline().collection("books")
    .aggregate(AggregateFunction.minimum("price").alias("min_price"))
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

# Type 1: Scalar (for use in non-aggregation stages)
# Example: Return the min store price for each book.
results = (
    client.pipeline()
    .collection("books")
    .select(
        Field.of("current").logical_minimum(Field.of("updated")).as_("price_min")
    )
    .execute()
)

# Type 2: Aggregation (for use in aggregate stages)
# Example: Return the min price of all books.
results = (
    client.pipeline()
    .collection("books")
    .aggregate(Field.of("price").minimum().as_("min_price"))
    .execute()
)

限制

在大多数情况下,企业版不会对查询的结构施加限制。换句话说,您在 INOR 查询中使用的值的数量并不仅限于少数几个。不过,您应该了解以下两项主要限制:

  • 截止时间:60 秒(与标准版相同)。
  • 内存使用量:查询执行期间,物化数据的量限制为 128 MiB。

错误

查询失败的原因可能有很多。以下链接中列出了常见错误以及您可以采取的相关措施:

错误代码 操作
DEADLINE_EXCEEDED 您执行的查询超过了 60 秒的期限,需要进行额外优化。如需获取提示,请参阅“性能”部分。如果您无法确定问题的根本原因,请与团队联系。
RESOURCE_EXHAUSTED 您执行的查询超出了内存限制,需要进行额外优化。如需获取提示,请参阅“性能”部分。如果您无法确定问题的根本原因,请与团队联系。
INTERNAL 联系团队以获取支持。

性能

与现有查询不同,流水线查询并不要求始终存在索引。这意味着,与现有查询相比,新查询可能会表现出更高的延迟时间,而现有查询会立即失败并显示 FAILED_PRECONDITION 缺少索引错误。如需提高流水线查询的性能,您可以采取以下几个步骤。

创建索引

使用的索引

借助查询解释功能,您可以确定查询是由索引提供服务,还是回退到效率较低的操作(例如表扫描)。如果您的查询未完全通过索引提供服务,您可以按照说明创建索引。

创建索引

您可以按照现有的索引管理文档创建索引。在创建索引之前,请先熟悉 Firestore 中有关索引的一般最佳实践。为确保查询可以利用索引,请遵循最佳实践,按以下顺序创建包含字段的索引:

  1. 将用于等式过滤条件的所有字段(按任意顺序)
  2. 将用于排序的所有字段(按相同顺序)
  3. 用于范围或不等式过滤条件的字段(按查询限制条件的选择性降序排列)。

例如,对于以下查询:

Web

const results = await execute(db.pipeline()
  .collection("books")
  .where(field("published").lessThan(1900))
  .where(field("genre").equal("Science Fiction"))
  .where(field("rating").greaterThan(4.3))
  .sort(field("published").descending())
);
Swift
let results = try await db.pipeline()
  .collection("books")
  .where(Field("published").lessThan(1900))
  .where(Field("genre").equal("Science Fiction"))
  .where(Field("rating").greaterThan(4.3))
  .sort([Field("published").descending()])
  .execute()

Kotlin

val results = db.pipeline()
    .collection("books")
    .where(field("published").lessThan(1900))
    .where(field("genre").equal("Science Fiction"))
    .where(field("rating").greaterThan(4.3))
    .sort(field("published").descending())
    .execute()

Java

Task<Pipeline.Snapshot> results = db.pipeline()
    .collection("books")
    .where(field("published").lessThan(1900))
    .where(field("genre").equal("Science Fiction"))
    .where(field("rating").greaterThan(4.3))
    .sort(field("published").descending())
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

results = (
    client.pipeline()
    .collection("books")
    .where(Field.of("published").less_than(1900))
    .where(Field.of("genre").equal("Science Fiction"))
    .where(Field.of("rating").greater_than(4.3))
    .sort(Field.of("published").descending())
    .execute()
)

建议在 books 上为 (genre [...], published DESC, avg_rating DESC). 创建集合范围索引

索引密度

Cloud Firestore 支持稀疏索引和非稀疏索引。如需了解详情,请参阅索引密度

涵盖的查询 + 二级索引

如果返回的所有字段都存在于某个辅助索引中,Firestore 可以跳过提取完整文档的步骤,而只返回索引中的结果。这通常会显著缩短延迟时间(并降低开销)。使用以下示例查询:

Web

const results = await execute(db.pipeline()
  .collection("books")
  .where(field("category").like("%fantasy%"))
  .where(field("title").exists())
  .where(field("author").exists())
  .select(field("title"), field("author"))
);
Swift
let results = try await db.pipeline()
  .collection("books")
  .where(Field("category").like("%fantasy%"))
  .where(Field("title").exists())
  .where(Field("author").exists())
  .select([Field("title"), Field("author")])
  .execute()

Kotlin

val results = db.pipeline()
    .collection("books")
    .where(field("category").like("%fantasy%"))
    .where(field("title").exists())
    .where(field("author").exists())
    .select(field("title"), field("author"))
    .execute()

Java

Task<Pipeline.Snapshot> results = db.pipeline()
    .collection("books")
    .where(field("category").like("%fantasy%"))
    .where(field("title").exists())
    .where(field("author").exists())
    .select(field("title"), field("author"))
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

results = (
    client.pipeline()
    .collection("books")
    .where(Field.of("category").like("%fantasy%"))
    .where(Field.of("title").exists())
    .where(Field.of("author").exists())
    .select("title", "author")
    .execute()
)

如果数据库已针对 (category [...], title [...], author [...])books 创建了集合范围的索引,则可以避免从主文档本身中提取任何内容。在这种情况下,索引中的顺序并不重要,[...] 用于表示这一点。

限制要返回的字段

默认情况下,Firestore 查询会返回文档中的所有字段,类似于传统系统中的 SELECT *。不过,如果您的应用只需要部分字段,则可以使用 select(...)restrict(...) 阶段将此过滤操作推送到服务器端。这样可以减小响应大小(从而降低网络出站流量费用),并缩短延迟时间。

问题排查工具

查询解释

借助查询解释功能,您可以清晰地查看执行指标以及所用索引的详细信息。

指标

流水线查询可与现有的 Firestore 指标完全集成。

限制/已知问题

专用索引

流水线查询尚不支持现有的 array-containsvector 索引类型。Firestore 不会直接拒绝此类查询,而是会尝试使用其他现有的 ascendingdescending 索引。预计在非公开预览期间,包含此类 array_containsfind_nearest 表达式的流水线查询会比其现有等效查询慢。

分页

在非公开预览版期间,不支持轻松地对结果集进行分页。可以通过链接等效的 where(...)sort(...) 阶段来解决此问题,如下所示。

Web

// Existing pagination via `startAt()`
const q =
  query(collection(db, "cities"), orderBy("population"), startAt(1000000));

// Private preview workaround using pipelines
const pageSize = 2;
const pipeline = db.pipeline()
  .collection("cities")
  .select("name", "population", "__name__")
  .sort(field("population").descending(), field("__name__").ascending());

// Page 1 results
let snapshot = await execute(pipeline.limit(pageSize));

// End of page marker
const lastDoc = snapshot.results[snapshot.results.length - 1];

// Page 2 results
snapshot = await execute(
  pipeline
    .where(
      or(
        and(
          field("population").equal(lastDoc.get("population")),
          field("__name__").greaterThan(lastDoc.ref)
        ),
        field("population").lessThan(lastDoc.get("population"))
      )
    )
    .limit(pageSize)
);
Swift
// Existing pagination via `start(at:)`
let query = db.collection("cities").order(by: "population").start(at: [1000000])

// Private preview workaround using pipelines
let pipeline = db.pipeline()
  .collection("cities")
  .where(Field("population").greaterThanOrEqual(1000000))
  .sort([Field("population").descending()])

Kotlin

// Existing pagination via `startAt()`
val query = db.collection("cities").orderBy("population").startAt(1000000)

// Private preview workaround using pipelines
val pipeline = db.pipeline()
    .collection("cities")
    .where(field("population").greaterThanOrEqual(1000000))
    .sort(field("population").descending())

Java

// Existing pagination via `startAt()`
Query query = db.collection("cities").orderBy("population").startAt(1000000);

// Private preview workaround using pipelines
Pipeline pipeline = db.pipeline()
    .collection("cities")
    .where(field("population").greaterThanOrEqual(1000000))
    .sort(field("population").descending());
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

# Existing pagination via `start_at()`
query = (
    client.collection("cities")
    .order_by("population")
    .start_at({"population": 1_000_000})
)

# Private preview workaround using pipelines
pipeline = (
    client.pipeline()
    .collection("cities")
    .where(Field.of("population").greater_than_or_equal(1_000_000))
    .sort(Field.of("population").descending())
)

模拟器支持

模拟器尚不支持流水线查询。

实时和离线支持

流水线查询尚不具备实时和离线功能。

后续步骤