Flow 是封装函数,与直接调用相比具有一些额外的特性:它们是强类型、可流式传输、可在本地和远程调用,并且可完全观测。Firebase Genkit 提供了用于运行和调试 flow 的 CLI 和开发者界面工具。

定义 flow

形式最简单的 flow 仅封装一个函数:

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		suggestion := makeMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	})

这样一来,您就可以从 Genkit CLI 和开发者界面运行该函数,并且这是 Genkit 的许多功能(包括部署和可观测性)的要求。

与直接调用模型 API 相比,Genkit 流的一个重要优势是输入和输出均具有类型安全性。Flow 的参数和结果类型可以是简单值或结构化值。Genkit 将使用 invopop/jsonschema 为这些值生成 JSON 架构。

以下 flow 以 string 作为输入,并输出 struct

type MenuSuggestion struct {
	ItemName    string `json:"item_name"`
	Description string `json:"description"`
	Calories    int    `json:"calories"`
}

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
		suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	},
)

运行 flow

如需在代码中运行 flow,请使用以下代码:

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

您还可以使用 CLI 运行 flow:

genkit flow:run menuSuggestionFlow '"French"'

流式

下面是一个可流式处理值的 flow 的简单示例:

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string

menuSuggestionFlow := genkit.DefineStreamingFlow(
	"menuSuggestionFlow",
	func(
		ctx context.Context,
		restaurantTheme InputType,
		callback func(context.Context, StreamType) error,
	) (OutputType, error) {
		var menu strings.Builder
		menuChunks := make(chan StreamType)
		go makeFullMenuSuggestion(restaurantTheme, menuChunks)
		for {
			chunk, ok := <-menuChunks
			if !ok {
				break
			}
			if callback != nil {
				callback(context.Background(), chunk)
			}
			menu.WriteString(string(chunk))
		}
		return OutputType(menu.String()), nil
	},
)

请注意,流式回调可以未定义。只有在调用客户端请求流式回答时才会定义。

如需在流式模式下调用 flow,请使用以下代码:

menuSuggestionFlow.Stream(
	context.Background(),
	"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
	if err != nil {
		// handle err
		return false
	}
	if !sfv.Done {
		fmt.Print(sfv.Stream)
		return true
	} else {
		fmt.Print(sfv.Output)
		return false
	}
})

如果 flow 未实现流式处理,则 StreamFlow() 的行为与 RunFlow() 相同。

您还可以使用 CLI 对 flow 进行流式处理:

genkit flow:run menuSuggestionFlow '"French"' -s

部署 flow

如果您希望能够通过 HTTP 访问 flow,则需要先部署 flow。 如需使用 Cloud Run 和类似服务部署 flow,请定义 flow,然后调用 Init()

func main() {
	genkit.DefineFlow(
		"menuSuggestionFlow",
		func(ctx context.Context, restaurantTheme string) (string, error) {
			// ...
			return "", nil
		},
	)
	if err := genkit.Init(context.Background(), nil); err != nil {
		log.Fatal(err)
	}
}

Init 会启动 net/http 服务器,该服务器将 flow 作为 HTTP 端点公开(例如 http://localhost:3400/menuSuggestionFlow)。

第二个参数是可选的 Options,用于指定以下各项:

  • FlowAddr:要监听的地址和端口。如果未指定,服务器会监听 PORT 环境变量指定的端口;如果为空,则系统会使用默认端口 3400。
  • Flows:要处理的 flow。如果未指定,则 Init 会处理您定义的所有 flow。

如果您希望在与其他端点相同的主机和端口上处理 flow,可以将 FlowAddr 设置为 -,并改为调用 NewFlowServeMux() 以获取 Genkit flow 的处理程序,您可以将其与其他路由处理程序进行多路复用:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Flow 可观测性

有时,在使用未针对可观测性进行插桩的第三方 SDK 时,您可能希望在开发者界面中将它们视为单独的跟踪步骤。您只需将代码封装在 run 函数中即可。

genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
			// ...
			return "", nil
		})

		// ...
		return themes, err
	})