SparkStreaming的WordCount"/>
SparkStreaming的WordCount
Spark中的编程入口是:
- SparkCore的编程入口是SparkContext
- SparkSQL的编程入口是SparkSession
- SparkStreaming的编程入口是StreaminContext
首先引入依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version>
</dependency>
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object WordCountDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(1))//通过netCat的方式进行模拟流数据,此种是无状态的转换val lineDStream = ssc.socketTextStream("hadoop02", 9999)val wordsDStream = lineDStream.flatMap(_.split(" "))val result = wordsDStream.map((_, 1)).reduceByKey(_ + _)ssc.start()ssc.awaitTermination()}
}
按照Spark Core中的方式进行打包,并将程序上传到Spark机器上。并运行:
bin/spark-submit --class com.atguigu.streaming.WorldCount ~/wordcount-jar-with-dependencies.jar
通过Netcat发送数据:
TERMINAL 1:
Running Netcat$ nc -lk 9999
hello world
如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件里面的日志级别改成WARN。
自定义Receiver
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** SparkStreaming 的wordcount*/
object WordCountDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(1))//通过netCat的方式进行模拟流数据,此种是无状态的转换val lineDStream = ssc.socketTextStream("hadoop02", 9999)val wordsDStream = lineDStream.flatMap(_.split(" "))val result = wordsDStream.map((_, 1)).reduceByKey(_ + _)ssc.start()ssc.awaitTermination()}
}
更多推荐
SparkStreaming的WordCount
发布评论