Temporal GO SDK
Temporal Server 和特定语言的SDK(本例中是Go SDK)为现代应用程序开发带来的复杂性提供了全面的解决方案。
您可以将Temporal看作是一种“万能药”,可以解决作为开发人员在尝试构建可靠应用程序时所经历的痛苦。
Temporal提供了开箱即用的可靠性原语,比如无缝和容错的应用程序状态跟踪、自动重试、超时、由于流程失败而回滚等等。
让我们运行我们的第一个Temporal Workflow 应用程序,并永远地改变您进行应用程序开发的方式。
这个项目示例模拟了一个“转账”应用程序,它有一个单一的Workflow函数,编排Withdraw()和Deposit()函数的执行,表示从一个账户到另一个账户的转账。
Tempory称这些实际的函数为Activity functions.
要运行这个程序,你需要做到以下几点:
Workflow功能是应用程序的入口点。这是我们的转账的workflow看起来如下:
func TransferMoney(ctx workflow.Context, transferDetails TransferDetails) error {
// RetryPolicy specifies how to automatically handle retries if an Activity fails.
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 500,
}
options := workflow.ActivityOptions{
// Timeout options specify when to automatically timeout Actvitivy functions.
StartToCloseTimeout: time.Minute,
// Optionally provide a customized RetryPolicy.
// Temporal retries failures by default, this is just an example.
RetryPolicy: retrypolicy,
}
ctx = workflow.WithActivityOptions(ctx, options)
err := workflow.ExecuteActivity(ctx, Withdraw, transferDetails).Get(ctx, nil)
if err != nil {
return err
}
err = workflow.ExecuteActivity(ctx, Deposit, transferDetails).Get(ctx, nil)
if err != nil {
return err
}
return nil
}
当你“启动”一个工作流时,你基本上是在告诉Temporal Server:"track the state of the Workflow with this function signature"。Workers 将依次执行下面的Workflow代码,将执行事件和结果传回服务器。
在使用Temporal Server的时候,有两种方式来启动工作流:一种是通过SDK,一种是通过CLI,在本教程中,我们使用SDK来启动工作流,这是大多数工作流在活动环境中启动的方式。对Temporal服务器的调用可以是 synchronously or asynchronously。这里我们是异步执行的,因此将看到程序运行,告诉我们事务正在处理,然后退出。
func main() {
// Create the client object just once per process
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
options := client.StartWorkflowOptions{
ID: "transfer-money-workflow",
TaskQueue: app.TransferMoneyTaskQueue,
}
transferDetails := app.TransferDetails{
Amount: 54.99,
FromAccount: "001-001",
ToAccount: "002-002",
ReferenceID: uuid.New().String(),
}
we, err := c.ExecuteWorkflow(context.Background(), options, app.TransferMoney, transferDetails)
if err != nil {
log.Fatalln("error starting TransferMoney workflow", err)
}
printResults(transferDetails, we.GetID(), we.GetRunID())
}
确保Temporal Server 在终端中运行,然后运行start/main。使用以下命令从项目根目录进入
go run start/main.go
如果这是第一次运行这个应用程序,开始时可能会下载一些依赖项,但最终你会得到一些类似这样的反馈:
2021/03/12 01:34:36 INFO No logger configured for temporal client. Created default one.
2021/03/12 01:34:37
Transfer of $54.990002 from account 001-001 to account 002-002 is processing. ReferenceID: 8e37aafe-5fb8-4649-99e3-3699e35e6c32
2021/03/12 01:34:37
WorkflowID: transfer-money-workflow RunID: 0730a9a3-d17b-4cb5-a4a0-279c9759dfa1
现在是时候看看Temporal提供的一个非常酷的功能:应用程序状态可见性。访问时态Web UI,您将看到您的工作流列表。
接下来,单击工作流的“Run Id”。现在,我们可以看到关于我们告诉Temporal 服务器要跟踪的Workflow代码执行的所有我们想知道的信息,比如给出了什么参数值、超时配置、计划重试、尝试次数、堆栈可跟踪错误等等。
状态可视化
状态可视化
看起来我们的工作流正在“运行”,但是为什么工作流和活动代码还没有执行呢?通过单击任务队列名称进行调查,查看为处理这些任务而注册的有效的“轮询器”。该列表将为空。没有worker轮询任务队列!
该启动Worker了。Worker负责执行工作流和活动代码。
在The Worker执行代码之后,它将结果返回给Temporal Server。注意,Worker监听Workflow和Activity任务发送到的同一个任务队列。这被称为“任务路由”,是一种用于负载平衡的内置机制。
func main() {
// Create the client object just once per process
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
// This worker hosts both Workflow and Activity functions
w := worker.New(c, app.TransferMoneyTaskQueue, worker.Options{})
w.RegisterWorkflow(app.TransferMoney)
w.RegisterActivity(app.Withdraw)
w.RegisterActivity(app.Deposit)
// Start listening to the Task Queue
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}
任务队列由一个简单的字符串名称定义:
const TransferMoneyTaskQueue = "TRANSFER_MONEY_TASK_QUEUE"
go run worker/main.go
当您启动Worker时,它将开始轮询任务队列(如果您再次检查Temporal Web,您将看到一个新的轮询器在以前没有注册的地方注册)。
终端输出看起来是这样的:
2021/03/12 01:43:49 INFO No logger configured for temporal client. Created default one.
2021/03/12 01:43:49 INFO Started Worker Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41326@swyxs-Mac-mini@
2021/03/12 01:43:50 DEBUG ExecuteActivity Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41326@swyxs-Mac-mini@ WorkflowType TransferMoney WorkflowID transfer-money-workflow RunID 0730a9a3-d17b-4cb5-a4a0-279c9759dfa1 ActivityID 5 ActivityType Withdraw
Withdrawing $54.990002 from account 001-001. ReferenceId: 8e37aafe-5fb8-4649-99e3-3699e35e6c32
内部机制:
这些都是可以在Temporal Web中审计的History Events(在Summary旁边的历史选项卡下)。一旦工作流完成并关闭,在被删除之前,完整的历史记录将保留一段设置的保留期(通常是7-30天)。您可以设置archive特性,以便将它们发送到长期存储,以满足合规/审计需求。
祝贺您,您刚刚运行了一个Temporal 工作流应用程序!
您刚刚领略了Temporal的重要功能之一:工作流的可见性和执行代码的Workers的状态。让我们探讨另一个重要的功能特性:即使在失败面前,也要保持工作流的状态。为了演示这一点,我们将为我们的工作流模拟一些失败。确保在继续之前停止Worker。
与许多需要复杂的领导者选举过程和外部数据库来处理故障的现代应用程序不同,Temporal会自动保存工作流的状态,即使服务器宕机。你可以很容易地通过以下步骤进行测试(同样,确保你的Worker已经停止,这样你的Workflow就不会完成):
您的工作流仍然存在!
接下来让我们在Deposit() Activity函数中模拟一个错误。让您的工作流继续运行。打开activity.go文件并切换return语句上的注释,这样Deposit()函数就会返回一个错误:
func Deposit(ctx context.Context, transferDetails TransferDetails) error {
fmt.Printf(
"
Depositing $%f into account %s. ReferenceId: %s
",
transferDetails.Amount,
transferDetails.ToAccount,
transferDetails.ReferenceID,
)
// Switch out comments on the return statements to simulate an error
//return fmt.Errorf("deposit did not occur due to an issue")
return nil
}
保存更改并运行Worker。您将看到Worker完成Withdraw() Activity函数,但在尝试Deposit() Activity 函数时出错。这里需要注意的重要一点是,Worker一直在重试Deposit()函数:
021/03/12 02:03:23 INFO No logger configured for temporal client. Created default one.
2021/03/12 02:03:23 INFO Started Worker Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@
2021/03/12 02:04:25 DEBUG ExecuteActivity Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowType TransferMoney WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityID 5 ActivityType Withdraw
Withdrawing $54.990002 from account 001-001. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
2021/03/12 02:04:25 DEBUG ExecuteActivity Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowType TransferMoney WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityID 11 ActivityType Deposit
Depositing $54.990002 into account 002-002. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
2021/03/12 02:04:25 ERROR Activity error. Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityType Deposit Error deposit did not occur due to an issue
Depositing $54.990002 into account 002-002. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
2021/03/12 02:04:26 ERROR Activity error. Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityType Deposit Error deposit did not occur due to an issue
# it keeps retrying... with the RetryPolicy specified in workflow.go
你可以查看更多关于UI中发生的事情的信息。单击工作流的RunId。您将看到挂起的Activity列在那里,其中包含详细信息,如它的状态、它被尝试的次数和下一次计划的尝试
传统上,应用程序开发人员被迫在服务代码本身中实现超时和重试逻辑。这是重复和容易出错的。对于Temporal,一个关键的价值主张是超时配置和重试策略在Workflow代码中作为Activity选项指定。在workflow.go,您可以看到我们已经为活动指定了一个StartToCloseTimeout,并设置了一个重试策略,告诉服务器重试它们最多500次。你可以在我们的文档中阅读更多关于活动和工作流重试的内容。
因此,您的工作流正在运行,但只有Withdraw() Activity 函数成功。在任何其他应用程序中,整个流程可能都必须放弃并回滚。因此,这是本教程的最后一个价值主张:使用Temporal,我们可以在工作流运行时调试问题!假设你找到了解决这个问题的潜在方法;在activity.go切换回Deposit()函数的return语句上的注释。进入文件并保存更改。
我们怎么可能更新一个已经完成了一半的工作流?对于Temporal,它实际上非常简单:只要重新启动Worker!
# continuing logs from previous retries...
Depositing $54.990002 into account 002-002. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
2021/03/12 02:09:28 ERROR Activity error. Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityType Deposit Error deposit did not occur due to an issue
# cancel and restart...
^C2021/03/12 02:10:17 INFO Worker has been stopped. Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ Signal interrupt
2021/03/12 02:10:17 INFO Stopped Worker Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@
$ go run worker/main.go
2021/03/12 02:10:23 INFO No logger configured for temporal client. Created default one.
2021/03/12 02:10:23 INFO Started Worker Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41708@swyxs-Mac-mini@
Depositing $54.990002 into account 002-002. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
在下一次计划的尝试中,Worker将从工作流失败的地方开始,并成功执行新编译的Deposit()Activity函数,完成工作流。基本上,您只是在不丢失工作流状态的情况下“动态地”修复了一个bug。
现在您已经了解了如何运行Temporal Workflow,并了解了Temporal提供的一些关键值。让我们快速复习一下,确保你记住了一些更重要的内容:
利用相同的 Task Queue
重启Worker
留言与评论(共有 0 条评论) “” |