使用变更数据流读取实时数据

借助与 MongoDB 兼容的 Firestore 的变更数据流,应用可以访问对集合或整个数据库所做的实时更改(插入、更新和删除)。变更数据流会按修改时间对更新进行排序。

可通过与 MongoDB 兼容的 API 和传统 MongoDB 驱动程序访问更改流。Firestore 与 MongoDB 兼容的变更数据流实现可以通过独特的写入自动分区和读取并行化实现来处理任何写入和读取吞吐量。这样,您就可以构建高吞吐量工作负载。 您还可以改进 Cloud Firestore 与其他存储解决方案之间的迁移和数据同步基础架构。

除了与 MongoDB 驱动程序兼容之外,您还可以使用 Cloud Firestore 并行读取变更数据流。这样,您就可以构建并行的高吞吐量读取工作负载。每个流都代表一个分布均匀的结果分区。

变更数据流支持以下功能:

  • 可配置的变更数据流,具有数据库或集合范围。
  • 创建时指定的变更数据流的保留时长。 默认保留期限为 7 天,最短保留期限为 1 天。保留期限必须是 1 天的倍数,最长为 7 天。保留时长一经创建便无法更改。如需更改保留期限,您必须舍弃并重新创建变更数据流。
  • deleteinsertupdatedrop 变更事件,可使用 db.collection.watch()db.watch() 进行观测。
  • updateDescription.updatedFields 包含更新差异。
  • 所有 fullDocumentfullDocumentBeforeChange 选项。
    • 正在查找完整文档以进行更新。
    • 文档在被替换、更新或删除之前的预映像。
    • 替换或更新后的文档的后映像。
    • 如果前映像和后映像的时间相差超过 1 小时,则需要启用时间点恢复 (PITR)。
  • 所有恢复选项,包括 resumeAfterstartAfter
  • 使用 watch() 观察更改时,您可以链接 $addFields$match$project$replaceRoot$replaceWith$set$unset 等聚合阶段。

配置变更数据流

如需创建、删除或查看数据库的现有更改流,请使用 Google Cloud 控制台。

角色与权限

如需创建、删除和列出更改流,相应的主账号分别需要 datastore.schemas.createdatastore.schemas.deletedatastore.schemas.list Identity and Access Management (IAM) 权限。

例如,Datastore Index Admin (roles/datastore.indexAdmin) 角色可授予这些权限。

创建变更流

您必须先创建变更数据流,然后才能打开相应的变更数据流游标。不支持在创建集合或数据库时自动启用变更数据流。

如需创建更改流,请使用 Google Cloud 控制台。

  1. 在 Google Cloud 控制台中,转到数据库页面。

    前往“数据库”

  2. 从列表中选择一个与 MongoDB 兼容的 Firestore 数据库。系统会打开 Firestore Studio 面板。
  3. 探索器面板中,找到变更数据流节点,点击 更多操作,然后选择创建变更数据流
  4. 输入唯一的更改流名称、范围和保留期限,然后点击保存

查看变更数据流

您可以在 Google Cloud 控制台中查看有关变更数据流的详细信息。

  1. 在 Google Cloud 控制台中,转到数据库页面。

    前往“数据库”

  2. 从列表中选择一个与 MongoDB 兼容的 Firestore 数据库。系统会打开 Firestore Studio 面板。
  3. 探索器面板中,找到变更数据流节点。
  4. 如需打开或关闭节点,请点击 切换节点

删除变更数据流

如需删除更改流,请使用 Google Cloud 控制台。

  1. 在 Google Cloud 控制台中,转到数据库页面。

    前往“数据库”

  2. 从列表中选择一个与 MongoDB 兼容的 Firestore 数据库。系统会打开 Firestore Studio 面板。
  3. 探索器面板中,找到变更数据流节点。
  4. 如需打开或关闭节点,请点击 切换节点
  5. 探索器中,找到要删除的更改流。
  6. 点击 更多操作,然后选择删除更改数据流
  7. 在对话框中,输入变更数据流名称以确认删除,然后点击删除

打开或恢复变更数据流光标

以下示例展示了如何创建、恢复和配置变更数据流游标。

在创建变更数据流游标之前,您必须明确为数据库或集合创建变更数据流

创建变更数据流游标

如需创建新的变更数据流游标,请使用 MongoDB 驱动程序中的 watch 方法。如需监听数据库中的所有更改,请创建数据库级变更数据流,并对 db 对象调用 watch 方法。

let cursor = db.watch()

如需创建范围限定为集合的游标,您必须先为该集合创建更改流。然后,对相应集合调用 watch 方法。

let cursor = db.my_collection.watch()

现在,您已经创建了变更数据流游标,可以开始进行流式传输了。 例如,如果您插入一个文档并对光标调用 tryNext,您会在更改流中看到相应更改。

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

如果您更新并删除文档,则会在更改流中看到这些更改:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

恢复变更数据流

如需恢复变更数据流,请使用 resumeAfterstartAfter 选项。 如需确定从 resumeAfterstartAfter 的哪个位置恢复更新日志,请使用恢复令牌。

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

使用 startAfter

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

在更新和删除中包含之前和之后的图片

如果需要,您可以在更新和删除变更事件中包含文档的更新前和更新后图片。图片可用性取决于时间点恢复 (PITR) 窗口,如需读取超过 1 小时的旧版文档图片,您必须启用 PITR。

变更流利用 PITR 时间窗口来提供给定变更事件发生之前和之后的文档视图。默认情况下,更新事件包含一个 updateDescription 字段,该字段是更新操作修改的字段的增量。

如需在更改事件中包含更改前后的图片,您必须在更改流查询中指定 fullDocumentBeforeChangefullDocument 选项。

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

如果查询尝试读取 PITR 保留期之外的文档,或者如果未启用 PITR,则 required 值会抛出服务器端错误消息。

除了抛出错误之外,您还可以使用 whenAvailable 值在图片不再可用时返回 null 值。

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

在更新中包含当前映像

默认情况下,更新事件包含一个 updateDescription 字段,该字段是更新操作修改的字段的增量。如需改为查找整个文档的最新版本,请在 fullDocument 选项中使用 updateLookup 值。

此功能不需要 PITR,并且会查找相应文档。

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

并行读取

如需提高吞吐量,您可以使用 firestoreWorkerConfig 选项将变更数据流查询拆分到多个工作器中。每个工作器负责提供一组不同的文档的更改。您必须通过 runCommandaggregate 查询创建并行游标。

例如,您可以按如下方式将变更数据流分配给 3 个工作器:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

变更流和备份

在备份恢复操作中,变更数据流配置和变更数据流数据均不可用。如果您恢复了包含变更数据流的数据库,则必须在目标数据库中重新创建这些变更数据流,才能打开指向该数据库的游标。

结算

行为差异

以下部分介绍了与 MongoDB 兼容的 Firestore 和 MongoDB 之间的更改流差异。

updateDescription

updateDescriptionupdate 事件中的一个文档,用于描述更新操作更新或移除的字段。在 Cloud Firestore 中,显著的区别在于:

  • updateDescription 中,字段 truncatedArraysdisambiguatedPaths 未填充。
  • updateDescription.updatedFields 表示在应用突变之前和之后,文档的前后图片之间的规范差异。

假设文档的初始状态如下:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

情形 1:仅更改数组的第一个元素。

在这种情况下,Cloud Firestore 的行为与 MongoDB 的行为一致。

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

场景 2:使用整个数组进行覆盖

在此场景中,该操作仅更新第一个数组字段,但会覆盖整个数组。

Cloud Firestore 更新差异不会区分这两种情况,并且会针对这两种情况返回相同的 updateDescription.updatedFields

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

后续步骤