任务队列函数充分利用 Google Cloud Tasks 来帮助您的应用在主应用流之外异步运行耗时、资源密集型或带宽受限的任务。
例如,假设您想要为当前通过某个 API 托管的大量图片文件创建备份,但该 API 设有速率限制。为了负责任地使用该 API,您需要遵循其速率限制。此外,由于超时和内存限制,这种长时间运行的作业可能很容易发生错误。
为了降低这种复杂性,您可以编写一个任务队列函数来设置 scheduleTime 和 dispatchDeadline 等基本任务选项,然后将该函数传递到 Cloud Tasks 中的某个队列。Cloud Tasks 环境经过特别设计,可确保针对此类操作实现有效的拥塞控制和重试政策。
Firebase SDK for Cloud Functions for Firebase v3.20.1 及更高版本可与 Firebase Admin SDK v10.2.0 及更高版本进行交互,以支持任务队列函数。
在 Firebase 中使用任务队列函数可能会产生 Cloud Tasks 处理费用。如需了解详情,请参阅 Cloud Tasks 价格。
创建任务队列函数
如需使用任务队列函数,请按以下流程操作:
- 使用 Firebase SDK for Cloud Functions 编写任务队列函数。
- 使用 HTTP 请求触发您的函数,以测试该函数。
- 使用 Firebase CLI 部署您的函数。首次部署任务队列函数时,CLI 会在 Cloud Tasks 中创建一个采用源代码中指定的选项(速率限制和重试)的任务队列。
- 将任务添加到新建的任务队列中,并传递参数以设置执行时间(如果需要)。您可以使用 Admin SDK 编写代码并将其部署到 Cloud Functions for Firebase 来实现此目的。
编写任务队列函数
本部分中的代码示例基于一个应用,该应用设置了一项服务,可备份美国国家航空航天局 (NASA) 的每日一天文图中的所有图片。首先,导入所需的模块:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");
// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
对任务队列函数使用 onTaskDispatched 或 on_task_dispatched。在编写任务队列函数时,您可以设置每个队列的重试和速率限制配置。
配置任务队列函数
任务队列函数附带一组强大的配置,可用于精确控制任务队列的速率限制和重试行为:
Node.js
exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
- retryConfig.maxAttempts=5:任务队列中的每个任务最多自动重试 5 次。这有助于减少暂时性错误,例如网络错误或所依赖的外部服务临时中断。
- retryConfig.minBackoffSeconds=60:每个任务的重试间隔时间至少为 60 秒。这样可以在上一次和下一次尝试之间留出较长的缓冲时间,避免过早用尽 5 次重试机会。
- rateLimits.maxConcurrentDispatch=6:同时最多调度 6 个任务。这有助于确保稳定地向底层函数发送请求,同时有助于减少活跃实例数和冷启动次数。
测试任务队列函数
在大多数情况下,Cloud Functions 模拟器是测试任务队列函数的最佳方式。请参阅 Emulator Suite 文档,了解如何针对任务队列函数模拟对您的应用进行插桩处理。
此外,任务队列函数以简单 HTTP 函数的形式在 Firebase Local Emulator Suite 中公开。 您可以通过发送包含 JSON 数据载荷的 HTTP POST 请求来测试模拟任务函数:
 # start the Local Emulator Suite
 firebase emulators:start
 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data
部署任务队列函数
使用 Firebase CLI 部署任务队列函数:
$ firebase deploy --only functions:backupapod
首次部署任务队列函数时,CLI 会在 Cloud Tasks 中创建一个采用源代码中指定的选项(速率限制和重试)的任务队列。
如果在部署函数时遇到权限错误,请确保已将适当的 IAM 角色分配给运行部署命令的用户。
将任务队列函数加入队列
可以使用 Node.js 版 Firebase Admin SDK 或 Python 版 Google Cloud 库在 Cloud Tasks 中将任务队列函数从受信任的服务器环境(如 Cloud Functions for Firebase)加入队列。如果您刚开始接触 Admin SDK,请先参阅将 Firebase 添加到服务器。
典型流程会创建新任务,在 Cloud Tasks 中将其加入队列,并设置该任务的配置:
Node.js
exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");
      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);
        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")
    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE
        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay
        dispatch_deadline_seconds = 60 * 5  # 5 minutes
        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
- 此示例代码会尝试将第 N 个任务推迟在第 N 分钟执行,从而分散执行任务。这意味着,每分钟大约会触发 1 个任务。请注意,如果您希望 Cloud Tasks 在特定时间触发任务,也可以使用 - scheduleTime(Node.js) 或- schedule_time(Python)。
- 示例代码会设置 Cloud Tasks 等待任务完成的最长时间。Cloud Tasks 会根据队列的重试配置或此截止期限来重试任务。在该示例中,队列配置为最多重试任务 5 次,但如果整个过程(包括重试过程)超过 5 分钟,则任务会自动取消。 
问题排查
开启 Cloud Tasks 日志记录
Cloud Tasks 中的日志包含有用的诊断信息,例如与任务关联的请求的状态。默认情况下,Cloud Tasks 日志处于关闭状态,因为项目可能会生成大量日志。我们建议您在开发和调试任务队列函数时开启调试日志。请参阅开启日志记录。
IAM 权限
将任务加入队列或在 Cloud Tasks 尝试调用任务队列函数时,您可能会看到 PERMISSION DENIED 错误。请确保您的项目具有以下 IAM 绑定:
- 用于将任务加入 Cloud Tasks 队列的身份需要 - cloudtasks.tasks.createIAM 权限。- 在本示例中,该身份为 App Engine 默认服务账号 
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
- 用于将任务加入 Cloud Tasks 队列的身份需要相应权限,才能使用与 Cloud Tasks 中的任务关联的服务账号。 - 在本示例中,该身份为 App Engine 默认服务账号。 
请参阅 Google Cloud IAM 文档,了解如何将 App Engine 默认服务账号添加为该 App Engine 默认服务账号本身的用户。
- 用于触发任务队列函数的身份需要 - cloudfunctions.functions.invoke权限。- 在本示例中,该身份为 App Engine 默认服务账号 
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker