假设您有一个基于 Kafka 的强大流媒体平台,该平台在将客户的事件数据写入某个仓库之前对其进行清理和丰富。有一天,在一次临时的计划会议上,您的产品经理提出了对传入数据使用机器学习模型(由数据科学团队开发)并为模型标记的消息生成警报的要求。“没问题”,你回答。“我们可以从数据仓库中选择我们想要的任何数据集,然后运行我们想要的任何模型”。“不完全是”,总经理回答。“我们希望它能尽可能实时运行。我们希望 ML 模型的结果在我们收到事件后不到一分钟内就可以在 Kafka 主题中使用”。
这是一个普遍的要求,而且只会越来越受欢迎。对于许多必须对模型结果做出时间敏感决策的客户来说,对流量数据进行实时 ML 推理的要求变得很重要。
似乎大数据工程和数据科学可以很好地结合在一起,应该有一些简单的解决方案,但通常情况并非如此,并且使用 ML 对繁重的数据工作负载进行近乎实时的推理涉及相当多的挑战。例如,在这些挑战中,作为 ML 的主要语言的 Python 作为大数据工程和数据流的主要环境的 JVM 环境(Java/Scala)之间的区别。另一个挑战与我们用于工作负载的数据平台有关。如果您已经在使用 Spark,那么您可以使用 Spark ML 库,但有时它还不够好,有时(如我们的例子)Spark 不是我们堆栈或基础架构的一部分。
确实,生态系统已经意识到了这些挑战,并正在通过新功能慢慢解决这些挑战,尽管我们的特定和常见场景目前为您提供了一些常见的选择。例如,一种是将 Spark 添加到您的堆栈并编写一个 pySpark 作业,将 ML 推理阶段添加到您的管道中。这将为您的数据科学团队提供更好的服务 Python 支持,但这也意味着您的数据处理流程可能需要更长的时间,并且您还需要在堆栈中添加和维护 Spark 集群。另一种选择是使用一些第三方模型服务平台,该平台将根据您的模型公开推理服务端点。这可能会帮助您保持性能,但也可能需要额外的基础设施成本,同时对某些任务来说是过度杀伤力。
常见的解决方案——将 Spark 集群添加到堆栈以运行 ML 推理
在这篇文章中,我想展示另一种使用方法 Kafka Streams 完成这项任务的方法。将 Kafka Streams 用于此任务的优势在于,与 Flink 或 Spark 不同,它不需要专用的计算集群。相反,它可以在您已经使用的任何应用程序服务器或容器环境上运行,如果您已经在使用 Kafka 进行流处理,那么它可以非常无缝地嵌入到您的流中。
虽然 Spark 和 Flink 都有自己的机器学习库和教程,但是使用 Kafka Streams 来完成这项任务似乎不太常见,我的目标是展示它的实现是多么容易。具体来说,我展示了我们如何使用 XGBoost 模型——一种生产级机器学习模型,在 Python 环境中训练,对 Kafka 主题的事件流进行实时推理。
对于此示例,我们首先基于 Kaggle信用欺诈数据集训练一个简单的分类模型。您可以在此处找到完整的模型训练代码。重要的一点(如下)是,在我们(或我们的数据科学家)对我们模型的结果感到满意后,我们只需将其保存为简单的二进制形式。这个二进制文件是我们在 Kafka Streams 应用程序中加载模型所需的全部内容。
# ....
from xgboost import XGBClassifier
# arrange data into train and test sets
# ....
# train xgb model
xgb = XGBClassifier(
learning_rate =0.01,
n_estimators=1000,
objective= 'binary:logistic',
max_depth=5)
xgb.fit(X_train, y_train)
# save the trained model
xgb.save_model('fraud_model.bin')
在本节中,我们首先将机器学习模型包装在一个 Scala 对象(单例)中,开始实现我们的 Kafka Streams 应用程序,我们将使用该对象对传入记录进行推理。该对象将实现一个 predict()方法,我们的流处理应用程序将在每个流事件上使用该方法。该方法将接收一个记录 ID 和一个字段或特征数组,并将返回一个由自己记录 ID 和模型给出的分数组成的元组。
Scala 中的 XGBoost 模型加载和预测非常简单(尽管应该注意,在最近的 Scala 版本中的支持可能会受到限制)。初始导入后,我们首先将 *trained* 模型加载到 Booster 变量中。
import ml.dmlc.xgboost4j.LabeledPoint
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, XGBoost}
// Load model from binary file
object Classifier {
var model: Option[Booster] = None
def Init(modelFile:String): Unit = {
if (model.isEmpty) {
model = Some(XGBoost.loadModel(modelFile))
logger.info(s"Model loaded from $modelFile")
}
}
//.....
实现predict()方法也相当简单。我们的每个事件都包含一个包含 10 个特征或字段的数组,我们需要将它们作为输入提供给我们的模型。
XGboost 用于包装输入向量以进行预测的对象类型是DMatrix,可以通过多种方式构建。我将使用密集矩阵格式,它基于提供表示模型特征或字段的平面浮点数组;每个向量的长度(nCols);以及数据集中的向量数量(nRows)。例如,如果我们的模型用于对具有 10 个特征或字段的向量进行推理,并且我们希望一次预测一个向量,那么我们的 DMatrix 将使用长度 = 10,nCols = 10 的浮点数组实例化, 并且 nRows = 1(因为集合中只有一个向量)。
private def getInputVector(rawVector:Seq[Float]): DMatrix = {
val nRows = 1
val nCols = rawVector.length
val missingVal = Float.NaN
new DMatrix(rawVector.toArray[Float], nRows, nCols, missingVal)
}
def predict(recordID:String, features:Seq[Float]): (String, Float) = {
val xgbInput = getInputVector(features)
val result:Array[Array[Float]] = model.get.predict(xgbInput)
val score:Float = result(0)(0)
(recordID, score)
}
以 Spark 为例,计算的分配是由集群管理器完成的,它接收来自驱动程序应用程序的指令,并将计算任务分配给专用集群中的执行程序节点。每个 Spark 执行器负责处理一组数据分区。Kafka Streams (KS) 的强大之处在于,尽管它通过并行实现了类似的规模——即,通过运行流处理应用程序的多个副本,它并不依赖于专用集群,而仅依赖于 Kafka。换句话说,计算节点的生命周期可以由任何容器编排系统(如 K8S)或任何其他应用服务器管理,而将协调和管理留给 Kafka(和 KS 库)。这似乎是一个次要的优势,但这正是 Spark 最大的痛苦。
事实上,与 Spark 不同的是,KS 是一个可以导入到任何基于 JVM 的应用程序中的库,最重要的是,它可以在任何应用程序基础架构上运行。KS 应用程序通常从 Kafka 主题读取流式消息,执行其转换,并将结果写入输出主题。状态和有状态的转换,例如聚合或窗口计算,由 Kafka 持久化和管理,并且通过简单地运行更多应用程序实例来实现扩展(受主题拥有的分区数量和消费者策略的限制)。
KS 应用程序的基础是拓扑,它定义了应用程序的流处理逻辑或如何将输入数据转换为输出数据。在我们的例子中,拓扑将运行如下
这里的拓扑相当简单。它首先从 Kafka 上的输入主题读取流记录,然后使用 map 操作对每个记录运行模型的预测方法,最后拆分流,并将从模型中获得高分的记录 id 发送到“可疑事件”输出主题,其余为另一个。让我们看看它在代码中的样子。
def getStreamTopology(inputTopic:String):Topology = {
val builder = new StreamsBuilder()
val reqStream = builder.stream[String, PredictRequest](inputTopic)
reqStream
.map( (_, request) => {
Classifier.predict(request.recordID, request.featuresVector)
})
.split()
.branch((key, risk) => risk >= 0.5 ,
Branched.withConsumer(stream => stream.to("suspects-topic")))
.branch((key, risk) => risk < 0.5 ,
Branched.withConsumer(stream => stream.to("regular-topic")))
builder.build()
}
我们的出发点是从Kafka 上的主题builder.stream开始读取消息的方法。inputTopic我将很快解释更多,但请注意,我们将每个 kafka 记录键序列化为 String 并将其有效负载序列化为 type 的对象PredictRequest。PredictRequest 是一个 Scala 案例类,对应于下面的 protobuf 模式。这确保了与消息生产者的集成是直接的,但也使得更容易生成我们在处理自定义对象时需要提供的反序列化方法。
对实时机器学习预测的需求变得越来越流行,并且常常给数据流管道带来不少挑战。常见和最可靠的方法通常是使用 Spark 或 Flink,主要是因为它们支持 ML 和一些 Python 用例。然而,这些方法的缺点之一是它们通常需要维护一个专用的计算集群,这有时成本太高或过度杀伤力。
在这篇文章中,我试图勾勒出一种基于 Kafka Streams 的不同方法,除了您的应用程序服务器和您已经在使用的流平台之外,它不需要额外的计算集群。作为生产级 ML 模型的示例,我使用了 XGBoost 分类器,并展示了如何在 Python 环境中训练的模型可以轻松地包装在 Scala 对象中并用于对流数据进行推理。当 Kafka 用作流媒体平台时,使用 KS 应用程序在所需的开发、维护和性能方面几乎总是具有竞争力。
留言与评论(共有 0 条评论) “” |