As funções de fila de tarefas usam o Google Cloud Tasks para ajudar seu app a executar tarefas demoradas, que consomem muitos recursos ou limitadas por largura de banda de maneira assíncrona e fora do fluxo principal do app.
Por exemplo, imagine que você quer criar backups de um grande conjunto de arquivos de imagem hospedados atualmente em uma API com um limite de taxa. Para fazer o consumo responsável dessa API, precisamos respeitar esses limites. Além disso, esse tipo de job de longa duração pode ser vulnerável a falhas devido a tempos limite e limites de memória.
Para atenuar essa complexidade, crie uma função de fila de tarefas que defina opções
básicas de tarefa, como scheduleTime
e dispatchDeadline
,
que seja distribuída para uma fila no Cloud Tasks. O ambiente do Cloud Tasks
foi projetado especificamente para garantir um controle de congestionamento e
políticas de repetição eficientes para esses tipos de operações.
O SDK do Firebase para Cloud Functions for Firebase v3.20.1 e versões mais recentes interopera com o Firebase Admin SDK v10.2.0 e versões mais recentes para oferecer suporte a funções de fila de tarefas.
O uso de funções de fila de tarefas com o Firebase pode resultar em cobranças pelo processamento do Cloud Tasks. Para mais informações, consulte Preços do Cloud Tasks.
Criar funções da fila de tarefas
Para usar as funções da fila de tarefas, siga este fluxo de trabalho:
- Crie uma função de fila de tarefas usando o SDK do Firebase para Cloud Functions.
- Acione a função com uma solicitação HTTP para testá-la.
- Implante a função usando a CLI do Firebase. Ao implantar a função de fila de tarefas pela primeira vez, a CLI vai criar uma fila de tarefas no Cloud Tasks com opções (limitação de taxa e nova tentativa) especificadas no código-fonte.
- Adicione tarefas à fila de tarefas recém-criada, transmitindo parâmetros para configurar uma programação de execução. Para isso, escreva o código usando o Admin SDK e implante-o no Cloud Functions for Firebase.
Escrever funções da fila de tarefas
Os exemplos de código desta seção são baseados em um app que configura um serviço de backup de todas as imagens do site de fotos astronômicas diárias (em inglês) da NASA: Para começar, importe os módulos necessários:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Use onTaskDispatched
ou on_task_dispatched
para funções da fila de tarefas. Ao escrever uma função da fila de tarefas, você pode definir uma configuração de limitação de taxa e nova tentativa por fila.
Configurar funções da fila de tarefas
As funções da fila de tarefas têm um conjunto avançado de configurações para controlar com precisão os limites de taxa e o comportamento de repetição de uma fila de tarefas:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: cada tarefa na fila de tarefas é repetida automaticamente até cinco vezes. Isso nos ajuda a reduzir falhas transitórias, como erros de rede, ou a interrupção temporária de um serviço externo e dependente.retryConfig.minBackoffSeconds=60
: cada tarefa é executada novamente após um intervalo mínimo de 60 segundos. Com isso, temos um grande buffer entre cada tentativa, e não há pressa para realizar as cinco tentativas muito rapidamente.rateLimits.maxConcurrentDispatch=6
: são enviadas, no máximo, seis tarefas em um determinado momento. Isso ajuda a garantir um fluxo estável de solicitações para a função, além de reduzir o número de instâncias ativas e inicializações a frio.
Testar funções da fila de tarefas
Na maioria dos casos, o emulador do Cloud Functions é a melhor maneira de testar funções de fila de tarefas. Consulte a documentação do pacote de emuladores para saber como fazer a instrumentação do app para a emulação de funções de fila de tarefas.
Além disso, as funções de fila de tarefas do functions_sdk são expostas como funções HTTP simples no Firebase Local Emulator Suite. Para testar uma função de tarefa emulada, envie uma solicitação HTTP POST com um payload de dados JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Implantar funções da fila de tarefas
Implante a função de fila de tarefas usando a CLI do Firebase:
$ firebase deploy --only functions:backupapod
Ao implantar uma função de fila de tarefas pela primeira vez, a CLI cria uma fila de tarefas no Cloud Tasks com opções (limitação de taxa e nova tentativa) especificadas no código-fonte.
Se você encontrar erros de permissão ao implantar funções, verifique se os papéis do IAM apropriados estão atribuídos ao usuário que executa os comandos de implantação.
Enfileirar funções da fila de tarefas
As funções de fila de tarefas podem ser enfileiradas no Cloud Tasks a partir de um ambiente de servidor confiável, como Cloud Functions for Firebase, usando o Firebase Admin SDK para bibliotecas do Node.js ou do Google Cloud para Python. Se você não tem experiência com Admin SDKs, consulte Adicionar o Firebase a um servidor para saber mais.
Um fluxo típico cria uma tarefa e a enfileira no Cloud Tasks, definindo esta configuração para ela:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
O exemplo de código tenta distribuir a execução das tarefas associando um atraso de N minutos à Nª tarefa. Isso se traduz no acionamento de aproximadamente 1 tarefa/minuto. Observe que você também pode usar
scheduleTime
(Node.js) ouschedule_time
(Python) para que o Cloud Tasks acione uma tarefa em um horário específico.O exemplo de código define o tempo máximo que o Cloud Tasks vai esperar a conclusão de uma tarefa. O Cloud Tasks vai tentar realizar a tarefa novamente de acordo com a configuração de nova tentativa da fila ou até que esse prazo seja atingido. No exemplo, a fila é configurada para tentar realizar a tarefa novamente até cinco vezes, mas a tarefa é cancelada automaticamente se todo o processo (incluindo novas tentativas) levar mais de cinco minutos.
Recuperar e incluir o URI de destino
Como o Cloud Tasks cria um token de autenticação para autenticar as solicitações feitas à função de fila de tarefas, você precisa especificar o URL do Cloud Run da função ao enfileirar uma tarefa. Recomendamos que você recupere de maneira programática o URL da função, conforme demonstrado abaixo:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Solução de problemas
Ativar a geração de Cloud Tasks
Os registros do Cloud Tasks contêm informações de diagnóstico úteis, como o status da solicitação associada a uma tarefa. Por padrão, os registros do Cloud Tasks estão desativados devido ao grande volume de registros que podem ser gerados no projeto. Recomendamos ativar os registros de depuração enquanto você desenvolve e depura ativamente as funções da fila de tarefas. Consulte Como ativar a geração de registros.
Permissões do IAM
É possível encontrar erros PERMISSION DENIED
ao enfileirar tarefas ou quando o
Cloud Tasks tenta invocar as funções de fila de tarefas. Verifique se o projeto tem as
seguintes vinculações do IAM:
A identidade usada para enfileirar tarefas para o Cloud Tasks precisa da permissão
cloudtasks.tasks.create
do IAM.No exemplo, é a conta de serviço padrão do App Engine.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
A identidade usada para enfileirar tarefas para o Cloud Tasks precisa de permissão para usar a conta de serviço associada a uma tarefa no Cloud Tasks.
No exemplo, é a conta de serviço padrão do App Engine.
Consulte a documentação do Google Cloud IAM para saber como adicionar a conta de serviço padrão do App Engine como usuário da conta de serviço padrão do App Engine.
A identidade usada para acionar a função da fila de tarefas precisa da permissão
cloudfunctions.functions.invoke
.No exemplo, é a conta de serviço padrão do App Engine.
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker