Flink_输入数据集 Data Sources

编程入门 行业动态 更新时间:2024-10-23 18:34:38

Flink_输入<a href=https://www.elefans.com/category/jswz/34/1771445.html style=数据集 Data Sources"/>

Flink_输入数据集 Data Sources

文章目录

    • 1.Flink 在流处理上常见的 Source
    • 2.基于集合的 source
    • 3.基于文件的 source(File-based-source)
    • 4. 基于网络套接字的 source(Socket-based-source)
    • 5. 自定义的 source(Custom-source)
      • 5.1 SourceFunction:创建非并行数据源。
      • 5.2 ParallelSourceFunction:创建并行数据源。
      • 5.3 RichParallelSourceFunction:创建并行数据源。
    • 6. 基于 kafka 的 source 操作
      • 0、kafka 集群启动与停止
      • 1、创建 topic
      • 2、查看主题命令
      • 3、生产者生产数据
      • 4、消费者消费数据
      • 5、运行 describe topics 命令
      • 6、增加 topic 分区数
      • 7、增加配置
      • 8、删除配置
      • 9、删除 topic
      • 代码示例
    • 7 .基于 mysql 的 source 操作

Flink 中你可以使用 StreamExecutionEnvironment.addSource(source) 来为你的程序添 加数据来源。 Flink 已 经 提 供 了 若 干 实 现 好 了 的 source functions ,当 然 你 也 可 以 通 过 实 现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

1.Flink 在流处理上常见的 Source

Flink 在流处理上常见的 Source ,Flink 在流处理上的 source 和在批处理上的 source 基本一致。
大致有 4 大类

  • 基于本地集合的 source(Collection-based-source)
  • 基于文件的 source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范 的文件,并将其作为字符串返回
  • 基于网络套接字的 source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。
  • 自定义的 source(Custom-source)

2.基于集合的 source

单个示例

package com.czxy.flink.stream.source.collectionimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}//DataStream[String]
object StreamFromElementsSource {def main(args: Array[String]): Unit = {//1.创建流处理的执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据源import org.apache.flink.api.scala._val elementDataStream: DataStream[String] = env.fromElements("hadoop hadoop hive flink")//3.打印输出elementDataStream.print()//4.执行程序env.execute("StreamFromElementsSource")//env.execute(this.getClass.getSimpleName)}
}

多个示例

package cn.czxy.stream.sourceimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import scala.collection.immutable.{Queue, Stack} import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.flink.api.scala._object StreamDataSourceDemo {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment//0.用element创建DataStream(fromElements)val ds0: DataStream[String] = senv.fromElements("spark", "flink") ds0.print()//1.用Tuple创建DataStream(fromElements)val ds1: DataStream[(Int, String)] = senv.fromElements((1, "spark"), (2, "flink"))
ds1.print()//2.用Array创建DataStreamval ds2: DataStream[String] = senv.fromCollection(Array("spark", "flink"))
ds2.print()//3.用ArrayBuffer创建DataStream val ds3: DataStream[String] =
senv.fromCollection(ArrayBuffer("spark", "flink"))ds3.print()//4.用List创建DataStreamval ds4: DataStream[String] = senv.fromCollection(List("spark", "flink"))
ds4.print()//5.用List创建DataStreamval ds5: DataStream[String] =
senv.fromCollection(ListBuffer("spark", "flink")) ds5.print()//6.用Vector创建DataStreamval ds6: DataStream[String] = senv.fromCollection(Vector("spark", "flink"))
ds6.print()
//7.用Queue创建DataStreamval ds7: DataStream[String] = senv.fromCollection(Queue("spark", "flink"))
ds7.print()//8.用Stack创建DataStreamval ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink"))
ds8.print()//9.用Stream创建DataStream(Stream相当于lazy List,避免在中间过程中生成不必要的集合)val ds9: DataStream[String] = senv.fromCollection(Stream("spark", "flink"))
ds9.print()//10.用Seq创建DataStreamval ds10: DataStream[String] = senv.fromCollection(Seq("spark", "flink"))
ds10.print()//11.用Set创建DataStream(不支持)//val ds11: DataStream[String] = senv.fromCollection(Set("spark", "flink"))//ds11.print()//12.用Iterable创建DataStream(不支持)//val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink"))//ds12.print()//13.用ArraySeq创建DataStream val ds13: DataStream[String] =
senv.fromCollection(mutable.ArraySeq("spark", "flink")) ds13.print()//14.用ArrayStack创建DataStream val ds14: DataStream[String] =
senv.fromCollection(mutable.ArrayStack("spark", "flink")) ds14.print()//15.用Map创建DataStream(不支持)//val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1
-> "spark", 2 -> "flink"))//ds15.print()//16.用Range创建DataStreamval ds16: DataStream[Int] = senv.fromCollection(Range(1, 9)) ds16.print()//17.用fromElements创建DataStreamval ds17: DataStream[Long] = senv.generateSequence(1, 9) ds17.print()
}
}

3.基于文件的 source(File-based-source)

package com.czxy.flink.stream.source.fileimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}//基于文件构建数据源  基于文件的source
object StreamFromFileSource {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据源 基于文件的sourceval fileDataStream: DataStream[String] = env.readTextFile("day03/data/input/wordcount.txt")//3.输出打印fileDataStream.print()//4.执行程序env.execute("StreamFromFileSource")}
}

4. 基于网络套接字的 source(Socket-based-source)

val source = env.socketTextStream("IP", PORT)

5. 自定义的 source(Custom-source)

除了预定义的 Source 外,我们还可以通过实现 SourceFunction 来自定义 Source,然 后通过 StreamExecutionEnvironment.addSource(sourceFunction)添加进来。 比如读取 Kafka 数据的 Source: addSource(new FlinkKafkaConsumer08<>); 我们可以实现以下三个接口来自定义 Source:

5.1 SourceFunction:创建非并行数据源。

参考代码

package com.czxy.flink.stream.source.customerimport org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._//自定义非并行数据源
object StreamCustomerNoParallelSource {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据源val NoParallelDataStream: DataStream[Long] = env.addSource( new NoParallelSource()).setParallelism(1)//3.打印输出NoParallelDataStream.print()//4.执行程序env.execute("StreamCustomerNoParallelSource")}//实现一个单线程的,数据从1开始递增的数据集class NoParallelSource extends  SourceFunction[Long]() {var number:Long=1Lvar isRunning:Boolean=trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning){ctx.collect(number)number+=1Thread.sleep(1)if (number>5){cancel()}}}override def cancel(): Unit = {isRunning=false}}
}

5.2 ParallelSourceFunction:创建并行数据源。

参考代码

package com.czxy.flink.stream.source.customerimport org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/*** 自定义创建并行数据源 */object StreamCustomerParallelSource {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据源import org.apache.flink.api.scala._val parallelSourceDataStream: DataStream[Long] = env.addSource(new ParallelSource()).setParallelism(2)//3.打印输出parallelSourceDataStream.print()//4.执行 程序env.execute("StreamCustomerParallelSource")}//创建一个并行度为1的数据源//实现从1开始产生递增数字class ParallelSource extends ParallelSourceFunction[Long]() {//声明一个Long类型的变量var number: Long = 1L//声明一个初始化为true的Boolean变量var isRunning: Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {ctx.collect(number)number += 1Thread.sleep(1)if (number > 5) {cancel()}}}override def cancel(): Unit = {isRunning = false}}
}

5.3 RichParallelSourceFunction:创建并行数据源。

参考代码

package com.czxy.flink.stream.source.customerimport org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala._//自定义扩展类的并行数据源
object StreamCustomerRichParallelSource {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.添加数据源val richParallelSourceDataStream: DataStream[Long] = env.addSource(new RichParallelSource()).setParallelism(2)//3.打印输出richParallelSourceDataStream.print()//4.执行程序env.execute("StreamCustomerRichParallelSource")}//自定义的数据源,实现从1开递增的数据集class RichParallelSource extends RichParallelSourceFunction[Long]() {var number: Long = 1Lvar isRunning: Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {ctx.collect(number)number += 1Thread.sleep(1)if (number > 5) {cancel()}}}override def cancel(): Unit = {isRunning = false}override def open(parameters: Configuration): Unit = { super.close() }}
}

6. 基于 kafka 的 source 操作

0、kafka 集群启动与停止

注意事项:在 kafka 启动前,一定要让 zookeeper 启动起来。
node01、node02、node03 执行以下命令将 kafka 进程启动在后台

cd /export/servers/kafka_2.11-1.0.0 nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

三台机器也可以执行以下命令停止 kafka 集群

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-server-stop.sh

1、创建 topic

创建一个名字为 test 的主题, 有三个分区,有两个副本 node01 执行以下命令来创建 topic

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test

2、查看主题命令

查看 kafka 当中存在的主题
node01 使用以下命令来查看 kafka 当中存在的 topic 主题

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181

3、生产者生产数据

模拟生产者来生产数据
node01 服务器执行以下命令来模拟生产者进行生产数据

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

4、消费者消费数据

node02 服务器执行以下命令来模拟消费者进行消费数据

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181

5、运行 describe topics 命令

node01 执行以下命令运行 describe 查看 topic 的相关信息

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
  • 结果说明:
    这是输出的解释。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由
    于我们只有一个分 区用于此主题,因此只有一行。 “leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的 领导者。(因为在 kafka 中 如果有多个副本的话,就会存在 leader 和 follower 的关系,表 示当前这个副本为 leader 所在的 broker 是哪一个) “replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活 动状态。(所有副本列表 0 ,1,2) “isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领 导者捕获。(可用的列表 数)

6、增加 topic 分区数

任意 kafka 服务器执行以下命令可以增加 topic 分区数

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8

7、增加配置

动态修改 kakfa 的配置
任意 kafka 服务器执行以下命令可以增加 topic 分区数

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1

8、删除配置

动态删除 kafka 集群配置

cd /export/servers/kafka_2.11-1.0.0 bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages

9、删除 topic

目前删除 topic 在默认情况下知识打上一个删除的标记,在重新启动 kafka 后才删除。如果 需要立即删除,
则需要在 server.properties 中配置: delete.topic.enable=true
然后执行以下命令进行删除 topic

bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic test

代码示例

package com.czxy.flink.stream.source.customerimport java.util.Properties
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
/*** 基于 kafka 的 source 操作 */
object StreamKafkaSource {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.设置Kafka  指定消费者主题val topic="test"//设置参数val props = new Propertiesprops.setProperty("bootstrap.servers", "node01:9092")props.setProperty("group.id", "test01")props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")//基于Flink,创建kafka消费者val consumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic,new SimpleStringSchema(),props)//设置Flink 从 topic 中最新的数据开始消费consumer.setStartFromLatest()//2.添加数据val kafkaSource: DataStream[String] = env.addSource(consumer)//3.打印输出kafkaSource.print()//4.执行程序env.execute("StreamKafkaSource")}
}

7 .基于 mysql 的 source 操作

上面就是 Flink 自带的 Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据 的 Source。

代码参考

package com.czxy.flink.stream.source.customerimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}//自定义从mysql获取数据源
object StreamFromMysqlSource {case class Student(stuId: Int, stuName: String, stuAddr: String, stuSex: String)def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//2.添加数据源val mysqlSource: DataStream[Student] = env.addSource(new MysqlSource())//3.打印输出mysqlSource.print()//4.执行程序env.execute("StreamFromMysqlSource")}//3.创建mysql自定义数据源对象class MysqlSource extends RichSourceFunction[Student]() {//声明一些对象var connection: Connection = nullvar ps: PreparedStatement = null//这个方法在初始化的时候被执行一次override def open(parameters: Configuration): Unit = {val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://localhost:3306/test"val username = "root"val password = "root"Class.forName(driver)connection = DriverManager.getConnection(url, username, password)val sql ="""|select id,name,addr,sex|from student|""".stripMarginps = connection.prepareStatement(sql)}// 在run方法中进行查询,结果封装成样例类  每条数据执行一次override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {val queryResultSet: ResultSet = ps.executeQuery()while (queryResultSet.next()) {val stuId: Int = queryResultSet.getInt("id")val stuName: String = queryResultSet.getString("name")val stuAddr: String = queryResultSet.getString("addr")val stuSex: String = queryResultSet.getString("sex")val student: Student = Student(stuId, stuName, stuAddr, stuSex)ctx.collect(student)}}override def cancel(): Unit = {}}
}

更多推荐

Flink_输入数据集 Data Sources

本文发布于:2023-07-27 22:08:21,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1225149.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数据   Sources   Data

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!