结合 Spark Streaming + MLlib

编程入门 行业动态 更新时间:2024-10-28 17:22:08
本文介绍了结合 Spark Streaming + MLlib的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我尝试使用随机森林模型来预测示例流,但似乎我无法使用该模型对示例进行分类.这是pyspark中使用的代码:

I've tried to use a Random Forest model in order to predict a stream of examples, but it appears that I cannot use that model to classify the examples. Here is the code used in pyspark:

sc = SparkContext(appName="App")

model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', numTrees=150)


ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(hostname, int(port))

parsedLines = lines.map(parse)
parsedLines.pprint()

predictions = parsedLines.map(lambda event: model.predict(event.features))

以及在集群中编译时返回的错误:

and the error returned while compiling it in the cluster:

  Error : "It appears that you are attempting to reference SparkContext from a broadcast "
    Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

有没有办法使用从静态数据生成的模型来预测流示例?

is there a way to use a modèle generated from a static data to predict a streaming examples ?

谢谢你们,我真的很感激!!!!

Thanks guys i really appreciate it !!!!

推荐答案

是的,您可以使用从静态数据生成的模型.您遇到的问题根本与流媒体无关.您根本无法在操作或转换中使用基于 JVM 的模型(请参阅 如何从操作或转换中使用 Java/Scala 函数? 解释原因).相反,您应该将 predict 方法应用于完整的 RDD,例如在 DStream 上使用 transform :

Yes, you can use model generated from static data. The problem you experience is not related to streaming at all. You simply cannot use JVM based model inside action or transformations (see How to use Java/Scala function from an action or a transformation? for an explanation why). Instead you should apply predict method to a complete RDD for example using transform on DStream:

from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from operator import attrgetter


sc = SparkContext("local[2]", "foo")
ssc = StreamingContext(sc, 1)

data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
trainingData, testData = data.randomSplit([0.7, 0.3])

model = RandomForest.trainClassifier(
    trainingData, numClasses=2, nmTrees=3
)

(ssc
    .queueStream([testData])
    # Extract features
    .map(attrgetter("features"))
    # Predict 
    .transform(lambda _, rdd: model.predict(rdd))
    .pprint())

ssc.start()
ssc.awaitTerminationOrTimeout(10)

这篇关于结合 Spark Streaming + MLlib的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 01:55:21,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/951963.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Spark   Streaming   MLlib

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!