Flink_DataSet API 开发(入门案例)

编程入门 行业动态 更新时间:2024-10-24 00:31:46

Flink_DataSet API 开发(<a href=https://www.elefans.com/category/jswz/34/1770026.html style=入门案例)"/>

Flink_DataSet API 开发(入门案例)

文章目录

  • 1.Flink 批处理程序的一般流程
    • 步骤
    • 实现
    • 参考代码
    • 将程序打包,提交到 yarn

1.Flink 批处理程序的一般流程

  1. 获取 Flink 批处理执行环境
  2. 构建 source
  3. 数据处理
  4. 构建 sink
  • 示例
    编写 Flink 程序,用来统计单词的数量。

步骤

  1. IDEA 创建项目
  2. 导入 Flink 所需的 Maven 依赖
  3. 创建 scala 单例对象,添加 main 方法
  4. 获取 Flink 批处理运行环境
  5. 构建一个 collection 源
  6. 使用 flink 操作进行单词统计
  7. 打印

实现

  1. 在 IDEA 中创建 flink-base 项目
  2. 导入 Flink Maven 依赖
  3. 分别在 main 和 test 目录创建 scala 文件夹
  4. 添加 main 方法
  5. 获取批处理运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
  1. 构建一个 collection 源
val wordDataSet = env.fromCollection {
List("hadoop hive spark", "flink mapreduce hadoop hive", "flume spark spark hive") 
}
  1. 导入 Flink 隐式参数
import org.apache.flink.api.scala._ 
  1. 使用 flatMap 操作将字符串进行切割后扁平化
    val words: DataSet[String] = wordDataSet.flatMap(_.split(" "))
  2. 使用 map 操作将单词转换为,(单词,数量)的元组
val wordNumDataSet: DataSet[(String, Int)] = words.map(_ -> 1) 
  1. 使用 groupBy 操作按照第一个字段进行分组
val wordGroupDataSet: GroupedDataSet[(String, Int)] = wordNumDataSet.groupBy(0) 
  1. 使用 sum 操作进行分组累加统计
 val wordCountDataSet: AggregateDataSet[(String, Int)] = wordGroupDataSet.sum(1) 
  1. 打印
wordCountDataSet.print() 
  1. 运行测试

参考代码

package com.czxy.flink.batch
/*** 实现思路 *1、创建执行环境 *2、接入数据源 * 3、进行数据处理 * 4、数据保存或输出 */
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem.WriteMode/*** 单词统计,入门案例*/
object BatchWordCount {def main(args: Array[String]): Unit = {//1.创建批处理执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.构建数据源//导入隐式转换包import org.apache.flink.api.scala._val sourceDataSet: DataSet[String] = env.fromCollection(List("hadoop hadoop spark","flink flink hive"))//3.数据处理val wordAndOne: DataSet[(String, Int)] = sourceDataSet.flatMap(x=>x.split(" ")).map((_,1))val resultDataSet: DataSet[(String, Int)] = wordAndOne.groupBy(0).reduce((v1, v2)=>(v1._1,v1._2+v2._2))//4.打印测试
//    resultDataSet.print()resultDataSet.writeAsText("hdfs://node01:8020/test/output/wordcount0708",WriteMode.OVERWRITE).setParallelism(1)env.execute("BatchWordCount")}
}

将程序打包,提交到 yarn

  • 添加 maven 打包插件
<properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.2</scala.version><scala.compat.version>2.11</scala.compat.version><hadoop.version>2.6.0</hadoop.version><flink.version>1.7.2</flink.version><scala.binary.version>2.11</scala.binary.version><iheart.version>1.4.3</iheart.version><fastjson.version>1.2.7</fastjson.version></properties><dependencies><!-- 导入scala的依赖 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- 导入flink streaming和scala的依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><!-- 导入flink和scala的依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>${flink.version}</version></dependency><!-- 指定flink-client API的版本 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><!-- 导入flink-table的依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_2.11</artifactId><version>${flink.version}</version></dependency><!-- 指定hadoop-client API的版本 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>xml-apis</groupId><artifactId>xml-apis</artifactId></exclusion></exclusions></dependency><!-- 指定mysql-connector的依赖 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- 指定fastjson的依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.60</version></dependency><dependency><groupId>com.jayway.jsonpath</groupId><artifactId>json-path</artifactId><version>2.3.0</version></dependency><!-- 指定flink-connector-kafka的依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>${flink.version}</version></dependency><!-- 指定 json/xml 转对象的依赖包 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.9.9</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.9.3</version></dependency><dependency><groupId>com.fasterxml.jackson.module</groupId><artifactId>jackson-module-scala_2.11</artifactId><version>2.9.9</version></dependency><!-- 指定 redis的依赖包 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.7.1</version><!--版本号可根据实际情况填写--></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.5.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><!--<arg>-make:transitive</arg>--><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF--><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.czxy.flink.batch</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

上传 jar 包到服务器上,然后执行程序:

bin/flink run -m yarn-cluster -yn 2 
/export/servers/flink-1.7.2/jar/day01-1.0-SNAPSHOT.jar cn.czxy.batch.BatchWordCount

在 yarn 的 8088 页面可以观察到提交的程序:

更多推荐

Flink_DataSet API 开发(入门案例)

本文发布于:2023-07-27 22:08:21,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1225146.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:入门   案例   Flink_DataSet   API

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!