分片时间戳

如果某个集合中包含值已依序编入索引的文档,则 Cloud Firestore 会将写入速率限制为每秒 500 次写入。本页面介绍了如何通过将文档字段分片来克服这项限制。首先,我们将定义“依序编入索引的字段”,并阐明这项限制适用的情况。

依序编入索引的字段

“依序编入索引的字段”是指包含以单调递增或递减方式编入索引的字段的任何文档集合。在许多情况下,依序编入索引的字段都是指 timestamp 字段,但任何单调递增或递减的字段值都会触发写入限制(即每秒 500 次写入)。

例如,如果应用按如下所示为字段 userid 分配值,那么包含编入索引的字段 useriduser 文档集合就会应用这项限制:

  • 1281, 1282, 1283, 1284, 1285, ...

另一方面,并非所有 timestamp 字段都会触发此限制。如果某个 timestamp 字段跟踪的是随机分布值,那么这项写入限制就不适用。该字段的实际值也无关紧要,唯一紧要的是其值是单调递增或递减的情况。例如,以下两组单调递增的字段值都会触发写入限制:

  • 100000, 100001, 100002, 100003, ...
  • 0, 1, 2, 3, ...

将 timestamp 字段分片

假设您的应用使用单调递增的 timestamp 字段。 如果您的应用未在任何查询中使用 timestamp 字段,您可以不将 timestamp 字段编入索引,从而移除每秒 500 次写入这项限制。如果您的查询确实需要 timestamp 字段,您可以使用分片时间戳来解决此限制问题:

  1. timestamp 字段的旁边添加 shard 字段。在 shard 字段中使用 1..n 的不同的值。这样做可以将该集合的写入限制提高至 500*n,但是您需要对 n 次查询进行聚合。
  2. 更新您的写入逻辑,以便为每个文档随机分配一个 shard 值。
  3. 更新您的查询以聚合各分片结果集。
  4. shard 字段和 timestamp 字段停用单字段索引。删除包含 timestamp 字段的现有复合索引。
  5. 创建新的复合索引以用于支持更新后的查询。索引中各字段的顺序非常重要,其中 shard 字段必须位于 timestamp 字段之前。任何包含 timestamp 字段的索引都必须包含 shard字段。

分片时间戳实现应仅适用于持续写入速率超过每秒 500 次写入的使用场景。在其他使用场景中,这种实现属于过早优化。将 timestamp 字段分片可消除每秒 500 次写入这项限制,但需要对客户端查询进行聚合操作。

以下示例展示了如何将 timestamp 字段分片以及如何查询分片结果集。

示例数据模型和查询

例如,假设有一个应用可以对货币、普通股和 ETF 等金融工具进行近乎实时的分析。该应用会将文档写入 instruments 集合,具体如下所示:

Node.js
async function insertData() {
  const instruments = [
    {
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

该应用按 timestamp 字段运行以下查询和命令:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  return fs.collection('instruments')
      .where(fieldName, fieldOperator, fieldValue)
      .orderBy('timestamp', 'desc')
      .limit(limit)
      .get();
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

经过一番研究之后,您可以确定该应用每秒会收到 1000 至 1500 次工具更新。这一数量远远超过了带有编入索引的 timestamp 字段的文档集合所允许的每秒 500 次写入这项限制。为提高写入吞吐量,您需要 3 个分片值(即 MAX_INSTRUMENT_UPDATES/500 = 3)。此示例使用分片值 xyz。您也可以将分片值设为数字或其他字符。

添加 shard 字段

向您的文档添加一个 shard 字段。将 shard 字段的值设置为 xyz,以便将该集合的写入限制提高至每秒 1500 次写入。

Node.js
// Define our 'K' shard values
const shards = ['x', 'y', 'z'];
// Define a function to help 'chunk' our shards for use in queries.
// When using the 'in' query filter there is a max number of values that can be
// included in the value. If our number of shards is higher than that limit
// break down the shards into the fewest possible number of chunks.
function shardChunks() {
  const chunks = [];
  let start = 0;
  while (start < shards.length) {
    const elements = Math.min(MAX_IN_VALUES, shards.length - start);
    const end = start + elements;
    chunks.push(shards.slice(start, end));
    start = end;
  }
  return chunks;
}

// Add a convenience function to select a random shard
function randomShard() {
  return shards[Math.floor(Math.random() * Math.floor(shards.length))];
}
async function insertData() {
  const instruments = [
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

查询分片时间戳

如需添加一个 shard 字段,您需要更新查询以对分片结果进行聚合:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  // For each shard value, map it to a new query which adds an additional
  // where clause specifying the shard value.
  return Promise.all(shardChunks().map(shardChunk => {
        return fs.collection('instruments')
            .where('shard', 'in', shardChunk)  // new shard condition
            .where(fieldName, fieldOperator, fieldValue)
            .orderBy('timestamp', 'desc')
            .limit(limit)
            .get();
      }))
      // Now that we have a promise of multiple possible query results, we need
      // to merge the results from all of the queries into a single result set.
      .then((snapshots) => {
        // Create a new container for 'all' results
        const docs = [];
        snapshots.forEach((querySnapshot) => {
          querySnapshot.forEach((doc) => {
            // append each document to the new all container
            docs.push(doc);
          });
        });
        if (snapshots.length === 1) {
          // if only a single query was returned skip manual sorting as it is
          // taken care of by the backend.
          return docs;
        } else {
          // When multiple query results are returned we need to sort the
          // results after they have been concatenated.
          // 
          // since we're wanting the `limit` newest values, sort the array
          // descending and take the first `limit` values. By returning negated
          // values we can easily get a descending value.
          docs.sort((a, b) => {
            const aT = a.data().timestamp;
            const bT = b.data().timestamp;
            const secondsDiff = aT.seconds - bT.seconds;
            if (secondsDiff === 0) {
              return -(aT.nanoseconds - bT.nanoseconds);
            } else {
              return -secondsDiff;
            }
          });
          return docs.slice(0, limit);
        }
      });
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

更新索引定义

如需移除每秒 500 次写入这项限制条件,请删除使用 timestamp 字段的现有单字段索引和复合索引。

删除复合索引定义

Firebase 控制台

  1. 打开 Firebase 控制台中的 Cloud Firestore“复合索引”页面。

    转到“复合索引”

  2. 对于包含 timestamp 字段的每个索引,点击 按钮,然后点击删除

GCP Console

  1. 在 Google Cloud 控制台中,转到数据库页面。

    转到“数据库”

  2. 从数据库列表中选择所需的数据库。

  3. 在导航菜单中,点击索引,然后点击复合标签页。

  4. 使用过滤条件字段搜索包含 timestamp 字段的索引定义。

  5. 对于其中每个索引,点击 按钮,然后点击删除

Firebase CLI

  1. 如果您尚未设置 Firebase CLI,请按照此处的说明安装 CLI 并运行 firebase init 命令。在 init 命令运行期间,请务必选择 Firestore: Deploy rules and create indexes for Firestore
  2. 在设置过程中,Firebase CLI 会默认将您现有的索引定义下载到一个名为 firestore.indexes.json 的文件中。
  3. 移除所有包含 timestamp 字段的索引定义,例如:

    {
    "indexes": [
      // Delete composite index definition that contain the timestamp field
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "exchange",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "instrumentType",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "price.currency",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
     ]
    }
    
  4. 部署更新后的索引定义:

    firebase deploy --only firestore:indexes
    

更新单字段索引定义

Firebase 控制台

  1. 打开 Firebase 控制台中的 Cloud Firestore“单字段索引”页面。

    转到“单字段索引”

  2. 点击添加豁免项

  3. 集合 ID 中,输入 instruments。在字段路径中,输入 timestamp

  4. 查询范围下,同时选择集合集合组

  5. 点击下一步

  6. 将所有索引设置切换为已停用。点击保存

  7. shard 字段重复上述相同的步骤。

GCP Console

  1. 在 Google Cloud 控制台中,转到数据库页面。

    转到“数据库”

  2. 从数据库列表中选择所需的数据库。

  3. 在导航菜单中,点击索引,然后点击单个字段标签页。

  4. 点击单字段标签页。

  5. 点击添加豁免项

  6. 集合 ID 中,输入 instruments。在字段路径中,输入 timestamp

  7. 查询范围下,同时选择集合集合组

  8. 点击下一步

  9. 将所有索引设置切换为已停用。点击保存

  10. shard 字段重复上述相同的步骤。

Firebase CLI

  1. 在索引定义文件的 fieldOverrides 部分中添加以下内容:

    {
     "fieldOverrides": [
       // Disable single-field indexing for the timestamp field
       {
         "collectionGroup": "instruments",
         "fieldPath": "timestamp",
         "indexes": []
       },
     ]
    }
    
  2. 部署更新后的索引定义:

    firebase deploy --only firestore:indexes
    

创建新的复合索引

移除所有包含 timestamp 的旧索引之后,定义您的应用所需的新索引。任何包含 timestamp 字段的索引还必须包含 shard 字段。例如,如需支持上述查询,请添加以下索引:

集合 编入索引的字段 查询范围
instruments shard、 price.currency、 timestamp 集合
instruments shard、 exchange、 timestamp 集合
instruments shard、 instrumentType、 timestamp 集合

错误消息

您可以通过运行更新后的查询来构建这些索引。

每个查询都会返回一条错误消息,并提供一个链接,用于在 Firebase 控制台中创建所需的索引。

Firebase CLI

  1. 将以下索引添加到索引定义文件中:

     {
       "indexes": [
       // New indexes for sharded timestamps
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "exchange",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "instrumentType",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "price.currency",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
       ]
     }
    
  2. 部署更新后的索引定义:

    firebase deploy --only firestore:indexes
    

了解依序编入索引的字段的写入限制

依序编入索引的字段之所以存在写入速率限制,是因为 Cloud Firestore 存储索引值和扩缩索引写入次数的方式。对于每次索引写入操作,Cloud Firestore 都会定义一个键值对条目,该条目用于将文档名称和每个编入索引的字段值串联起来。Cloud Firestore 会将这些索引条目整理为数据组(称为“片”)。每个 Cloud Firestore 服务器可容纳一个或多个这样的片。如果某一特定片的写入负载过高,Cloud Firestore 会将该片拆分成若干较小的片,并将这些新片分布到不同的 Cloud Firestore 服务器,从而实现横向扩容。

Cloud Firestore 会按字典顺序将各索引条目紧挨着放入同一片上。如果某个片中的各索引值之间靠得太近(例如对于 timestamp 字段),则 Cloud Firestore 无法将该片有效拆分为若干较小的片。在这种情况下,单个片会收到过多流量,从而形成热点,而对该热点执行读写操作的速度会更慢。

通过将 timestamp 字段分片,您可以让 Cloud Firestore 在多个片之间高效分配工作负载。尽管各 timestamp 字段值之间可能仍然靠得很近,但串联的分片和索引值会为各索引条目提供足够的间隔空间,从而使 Cloud Firestore 能够将各条目拆分到多个片中。

后续步骤