使用变更数据流读取实时数据

借助与 MongoDB 兼容的 Firestore 的变更流,应用可以访问对集合或整个数据库所做的实时更改(插入、更新和删除)。变更流会按修改时间对更新进行排序。

您可以通过与 MongoDB 兼容的 API 和传统 MongoDB 驱动程序访问变更流。与 MongoDB 兼容的 Firestore 的变更数据流实现可以通过对写入和读取并行执行自动分区这一独特实现来处理任何吞吐量的写入和读取。这样,您就可以构建高吞吐量工作负载。 您还可以改进 Cloud Firestore 与其他存储解决方案之间的迁移和数据同步基础架构。

除了与 MongoDB 驱动程序兼容之外, 您还可以使用 Cloud Firestore 并行读取变更数据流。这样,您就可以构建并行的高吞吐量读取工作负载。每个流都代表结果的分布良好的分区。

变更流支持以下功能:

  • 可配置的变更数据流,其范围可以是数据库或集合。
  • 在创建时指定的变更流的保留时长。 默认保留时长为 7 天,最短保留时长为 1 天。保留时长必须是 1 天的倍数,最长为 7 天。创建后无法更改保留时长。如需更改保留期限,您必须删除并重新创建变更流。
  • 可以使用 db.collection.watch()db.watch() 观察到的 deleteinsertupdatedrop 变更事件。
  • updateDescription.updatedFields 包含更新差异。
  • 所有 fullDocumentfullDocumentBeforeChange 选项。
    • 查找更新的完整文档。
    • 文档在被替换、更新或删除之前的预映像。
    • 文档在被替换或更新之后的后映像。
    • 如需获取超过一小时的预映像和后映像,您需要启用时间点恢复 (PITR)。
  • 所有恢复选项,包括 resumeAfterstartAfter
  • 使用 watch() 观察更改时,您可以链接聚合阶段,例如 $addFields$match$project$replaceRoot$replaceWith$set$unset

配置变更流

如需创建、删除或查看数据库的现有变更流,请使用 Google Cloud 控制台。

角色与权限

如需创建、删除和列出变更流,正文分别需要 datastore.schemas.createdatastore.schemas.deletedatastore.schemas.list Identity and Access Management (IAM) 权限。

例如,Datastore Index Admin (roles/datastore.indexAdmin) 角色会授予这些权限。

创建变更流

您必须先创建变更流,然后才能打开相应的变更流游标。系统不支持在创建集合或数据库时自动启用变更流。

如需创建变更流,请使用 Google Cloud 控制台。

  1. 在 Google Cloud 控制台中,前往数据库 页面。

    前往“数据库”

  2. 从列表中,选择一个与 MongoDB 兼容的 Firestore 数据库。Firestore Studio 面板随即打开。
  3. 探索器 面板中,找到 变更数据流 节点,点击 更多操作 ,然后选择创建变更数据流
  4. 输入唯一的变更流名称、范围和保留期限,然后点击保存

查看变更流

您可以在 Google Cloud 控制台中查看有关变更流的详细信息。

  1. 在 Google Cloud 控制台中,前往数据库 页面。

    前往“数据库”

  2. 从列表中,选择一个与 MongoDB 兼容的 Firestore 数据库。Firestore Studio 面板随即打开。
  3. 探索器 面板中,找到 变更数据流 节点。
  4. 如需打开或关闭该节点,请点击 切换节点

删除变更流

如需删除变更流,请使用 Google Cloud 控制台。

  1. 在 Google Cloud 控制台中,前往数据库 页面。

    前往“数据库”

  2. 从列表中,选择一个与 MongoDB 兼容的 Firestore 数据库。Firestore Studio 面板随即打开。
  3. 探索器 面板中,找到 变更数据流 节点。
  4. 如需打开或关闭该节点,请点击 切换节点
  5. 探索器中,找到要删除的变更流。
  6. 点击 更多操作 ,然后选择 删除变更流
  7. 在对话框中,输入变更流名称以确认删除,然后点击删除

打开或恢复变更流游标

以下示例展示了如何创建、恢复和配置变更流游标。

在创建变更流游标之前,您必须显式 创建变更流为数据库或集合。

创建变更流游标

如需创建新的变更流游标,请使用 MongoDB 驱动程序中的 watch 方法。 如需监听数据库的所有更改,请创建数据库范围的变更流,并对 db 对象调用 watch 方法。

let cursor = db.watch()

如需创建范围限定为集合的游标,您必须先为该集合创建变更流。然后,对相应的集合调用 watch 方法。

let cursor = db.my_collection.watch()

现在,您已创建变更流游标,可以开始流式传输了。 例如,如果您插入文档并对游标调用 tryNext,您会在变更流中看到更改。

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

如果您更新并删除文档,您会在变更流中看到这些更改:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

恢复变更流

如需恢复变更流,请使用 resumeAfterstartAfter 选项。 如需确定从 resumeAfterstartAfter 恢复的更新日志中的位置,请使用恢复令牌。

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

如需使用 startAfter,请执行以下操作:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

在更新和删除中包含预映像和后映像

如果需要,您可以在更新和删除变更事件中包含文档的预映像和后映像。 映像可用性取决于 时间点恢复 (PITR) 窗口,如需读取超过一小时的文档映像,您必须启用 PITR。

变更流利用 PITR 窗口来提供给定变更事件发生之前和之后的文档视图。默认情况下,更新事件包含 updateDescription 字段,该字段是更新操作修改的字段的增量。

如需在变更事件中包含预映像和后映像, 您必须 在变更流查询中指定 fullDocumentBeforeChangefullDocument 选项。

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

如果查询尝试读取 PITR 保留窗口之外的文档,或者未启用 PITR,则 required 值会抛出服务器端错误消息。

除了抛出错误之外,您还可以使用 whenAvailable 值,以便在映像不再可用时返回 null 值。

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

在更新中包含当前映像

默认情况下,更新事件包含 updateDescription 字段,该字段是更新操作修改的字段的增量。如需改为查找整个文档的最新版本,请在 fullDocument 选项中使用 updateLookup 值。

此功能不需要 PITR,并且会执行文档查找。

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

并行读取

如需提高吞吐量,您可以使用 firestoreWorkerConfig 选项将变更流查询拆分到多个工作器中。每个工作器负责处理一组不同的文档的更改。您必须通过 runCommandaggregate 查询创建并行游标。

例如,您可以按如下方式将变更流分布到 3 个工作器中:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

变更流和备份

备份恢复操作中既没有变更流配置,也没有变更流数据。如果您恢复包含变更流的数据库,则必须在目标数据库中重新创建这些变更流,才能打开该数据库的游标。

结算

行为差异

以下部分介绍了与 MongoDB 兼容的 Firestore 和 MongoDB 之间的变更流差异。

updateDescription

updateDescriptionupdate 事件中的文档,用于描述更新操作更新或移除的字段 。在 Cloud Firestore中,值得注意的差异如下:

  • updateDescription 中,truncatedArraysdisambiguatedPaths 字段未填充。
  • updateDescription.updatedFields 表示应用突变之前和之后文档的预映像和后映像之间的规范差异。

请考虑以下文档的初始状态:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

场景 1:仅突变数组的第一个元素。

在此场景中,Cloud Firestore 行为与 MongoDB 匹配。

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

场景 2:使用整个数组覆盖

在此场景中,该操作仅更新第一个数组字段,但会覆盖整个数组。

Cloud Firestore 更新差异不会区分这两种 场景,并且会为这两种场景返回相同的 updateDescription.updatedFields

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

后续步骤