Flow 是封装函数,与直接调用相比具有一些额外的特性:它们是强类型、可流式传输、可在本地和远程调用,并且可完全观测。Firebase Genkit 提供了用于运行和调试 flow 的 CLI 和开发者界面工具。
定义 flow
形式最简单的 flow 仅封装一个函数:
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
这样一来,您就可以从 Genkit CLI 和开发者界面运行该函数,并且这是 Genkit 的许多功能(包括部署和可观测性)的要求。
与直接调用模型 API 相比,Genkit 流的一个重要优势是输入和输出均具有类型安全性。Flow 的参数和结果类型可以是简单值或结构化值。Genkit 将使用 invopop/jsonschema
为这些值生成 JSON 架构。
以下 flow 以 string
作为输入,并输出 struct
:
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
运行 flow
如需在代码中运行 flow,请使用以下代码:
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
您还可以使用 CLI 运行 flow:
genkit flow:run menuSuggestionFlow '"French"'
流式
下面是一个可流式处理值的 flow 的简单示例:
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
请注意,流式回调可以未定义。只有在调用客户端请求流式回答时才会定义。
如需在流式模式下调用 flow,请使用以下代码:
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if err != nil {
// handle err
return false
}
if !sfv.Done {
fmt.Print(sfv.Stream)
return true
} else {
fmt.Print(sfv.Output)
return false
}
})
如果 flow 未实现流式处理,则 StreamFlow()
的行为与 RunFlow()
相同。
您还可以使用 CLI 对 flow 进行流式处理:
genkit flow:run menuSuggestionFlow '"French"' -s
部署 flow
如果您希望能够通过 HTTP 访问 flow,则需要先部署 flow。
如需使用 Cloud Run 和类似服务部署 flow,请定义 flow,然后调用 Init()
:
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
会启动 net/http
服务器,该服务器将 flow 作为 HTTP 端点公开(例如 http://localhost:3400/menuSuggestionFlow
)。
第二个参数是可选的 Options
,用于指定以下各项:
FlowAddr
:要监听的地址和端口。如果未指定,服务器会监听 PORT 环境变量指定的端口;如果为空,则系统会使用默认端口 3400。Flows
:要处理的 flow。如果未指定,则Init
会处理您定义的所有 flow。
如果您希望在与其他端点相同的主机和端口上处理 flow,可以将 FlowAddr
设置为 -
,并改为调用 NewFlowServeMux()
以获取 Genkit flow 的处理程序,您可以将其与其他路由处理程序进行多路复用:
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
Flow 可观测性
有时,在使用未针对可观测性进行插桩的第三方 SDK 时,您可能希望在开发者界面中将它们视为单独的跟踪步骤。您只需将代码封装在 run
函数中即可。
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})