SPARK安装配置及介绍"/>
SPARK安装配置及介绍
##################
# spark 安装配置 #
##################
#解压重命名
cd /opt/download/
tar -zxvf spark-3.1.2-bin-hadoop3.2.tgz -C /opt/software/
cd /opt/software/
mv spark-3.1.2-bin-hadoop3.2/ spark312
#配置环境变量
vim /etc/profile.d/my.sh
---------------------------------------------------------
#spark312
export HADOOP_CONF_DIR=/opt/software/hadoop313/etc/hadoop
export SPARK_HOME=/opt/software/spark312
export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
---------------------------------------------------------
#激活环境变量
source /etc/profile
#添加配置信息
cd /opt/software/spark312/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh //添加
---------------------------------
export SPARK_MASTER_HOST=single01
export SPARK_MASTER_PORT=7077
---------------------------------
#启动与关闭
start-master.sh
stop-master.sh
start-worker.sh spark://single01:7077
stop-worker.sh
#查看是否启动成功
jps -ml
-----------------------------------------------------------------------------------
org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://single01:7077
org.apache.spark.deploy.master.Master --host single01 --port 7077 --webui-port 8080
-----------------------------------------------------------------------------------
#web 访问
http://single01:8081
#客户端访问
spark-shell 本地模式
spark-shell --master yarn YARN模式
MapReduce基于磁盘数据交换 5次IO 从磁盘读数据->map->shuffle->reduce->落盘
Spark 基于内存的(以内存进行数据交换)
transform、shuffle、action
DAG:有向无环图
Driver驱动端(客户端) {DAG->DAG}
RDD => 弹性数据集(一次性的,用了就没了,像迭代器一样,,支持序列化但是要持久化,既可以放在内存中也可以放在磁盘中)
RDD持久化操作:
var rdd =... .cache() //加(.cache())结果在内存里保留一份
var rdd =... .persist() //落盘
var rdd =... .checkpoint() //设置检查点:容错
rdd1.cache()//等同于=> persist(StorageLevel.MEMORY_ONLY)
rdd1.persist(StorageLevel.MEMORY_AND_DISK_SER_2)//内存放不下,可以放磁盘,序列化,多备份一份
//设置检查点(1.指定hdfs目录;2.设置检查点)
sparkSession.sparkContext.setCheckpointDir("hdfs://single01:9000/spark/checkpoint/app_name")
rdd1.checkpoint()
persist vs checkpoint
生命周期 临时存储(kill after task ) 持久化存储(手动删除)
容错方式 根据血统重新计算 利用hdfs多副本机制
#CONTROL 5
*rdd:RDD[T]=rdd[T].cache #persist(StorageLevel.MEMORY_ONLY)
***rdd:RDD[T]=rdd[T].persist([newLevel:StorageLevel])
rdd[T].isCheckpoint:Boolean
***rdd[T].checkpoint():Unit #sc.setCheckpointDir(String hdfsPath)
opt:Option[String]=rdd[T].getCheckpointFile()
DateSet
DateFrame
import org.apache.spark.sql.functions._ 可以引入高阶SQL算子
#spark为SparkSession对象
import spark.implicits._ 隐式工具类
把结构外挂
scheme->StructType[StructField...]
RDD<= case class -> Row =>Dataset
通过样例类把数据以行为单位转化为Row对象 DataFrame
val table:DataFrame|DataSet=...
table.select(cols:Column*)
table.where(stringExpr:String)
#joinType:【默认:inner】 left right full cross
#[leftright] semi 左或右表为主,关联条件匹配,显示左或右表数据
#[left|right] anti左或右表为主,关联条件不匹配,显示左或右表数据
#cols:Seq[String] 要求两表关联字段同名Seq("user_id")
table.join(tab2:DataFrame|DataSet,cols:Seq[String],joinType:String)
#colnditionExpr
#$"leftFiled" === $"rightField"
#$"leftFiled" =!= $"rightField"
#$"leftFiled" >|>=|<|<= $"rightField"
table.join(tab2:DataFrame|DataSet,colnditionExpr:Column[,joinType:String])
#$"field" === VALUE $"A.user_id"===$"B.user_id"
#$"field" =!= VALUE
#$"field" >|>=|<|<= VALUE
#$"field".bertween(lower:Any,upper:Any)
#$"field".isin(list:Any*)
#$"field".like(str:String)
#$"field".rlike(regex:String)
#exp1 and|or exp2
#not(expr)
table.where(stringExpr:String)
table.groupBy(col:String*)
table.sum|max|min(col:String*)
table.count
table.agg( //aggl里可以取别名
sum(col:Column)
max(col:Column)
min(col:Column)
avg(col:Column)
count(col:Column)
concat_ws(col:Column)
collect_list(col:Column)
collect_set(col:Column)
stddev_pop
stddev_sample
var_pop
var_sample
covar_pop
covar_sample
)
.sort($"field".desc)
.sortWithinPartitions($"field".desc)
#***宽窄依赖
#查看依赖
val ds:Seq[Dependency]=rdd.dependencies
#依赖抽象层设置
dependency
窄:FatherRDD:SonRDD= 1|N :1 NarrowDependency
OneToOneDependency 1:1
RangeDependency N:1
宽:FatherRDD:SonRDD= 1 :N shuffleDependency
#优化:规避宽窄依赖的发生
1、前置设置Partitioner(根据shuffle字段控制数据进入指定分区)
2、shuffle时:引指定分区的数据流入指定的后续分区
可能发生shuffle的算子: xxxBy
xxxByKey
join
#重新分区
repartition:调用coalesce,会有shuffle过程
coalesce() :shuffle 可以设置为false
#分区优化
计算数据规模
单分区数据规模一般为int BLOCK_SIZE=128M
Hadoop FileSystem 统计文件字节大小 Long FILE_SIZE
FILE_SIZE/BLOCK_SIZE>physicalCores*2 ? physicalCores*2 : Math.ceil(FILE_SIZE*1.0/BLOCK_SIZE)
***rdd[T].checkpoint():Unit #=>sc..setCheckpointDir(hdfsPath:String)
*
#*********************************************
groupBy指定键分组
groupByKey默认面对键值对,对键分组
反射,动态代理,自定义注解
集合
泛型
线程和锁
配置文件解析
日志
正则
idea :
install大资源型jar包,jar包达到maven的本地仓库里
package打可执行jar包
#***********************************************************
spark算子
map
flatMap
rdd[T].mapPartitions[U](f:Iterator[T]=>Iterator[U][.presever1]}
makeRDD
//makeRDD造数据,Seq类型,里面可以放样例类或元组
val rdd1: RDD[(Int, String)] = sc.makeRDD(Seq((1, "henry"), (2, "pola"), (4, "ariel"), (7, "jack"), (10, "rose")))
//并集
rdd1.union(rdd2).foreach(println)
//交集
rdd1.intersection(rdd2).foreach(println)
//差集
rdd1.subtract(rdd2).foreach(println)
rdd2.subtract(rdd1).foreach(println)
collect()
textFile
reduceByKey
*rdd[T].foreach(f:T=>Unit)
*rdd[T].foreachPartition(f:lterator[T]=>Unit)
count()
countByKey()
countByValue()
*** rdd [T].saveAsTextFi1e(path:String[,codec:Class [_<:CompressionCodec]])
#********************************************************************************
saveAsTextFile(s"hdfs://single01:9000/spark/source/src/tags_${System.currentTimeMillis().toDate()}_${System.currentTimeMillis()}")
#***********************************************************************************************
*rdd [T].saveAsSequenceFi1e(path:String[,codec:Class [_<:CompressionCodec]])
rdd[T].saveAsObjectFiIe(path:String)
groupWith :不可以分区
cogroup:可以分区
这2个方法都是 在 rdd间根据key进行合并时的方法;
其实 groupWith 和 cogroup 调用的都是同一个内部函数 python_cogroup; 而且 python_cogroup 是多个rdd间的操作;
因此 groupWith,cogroup都是对 python_cogroup的一层封装,方便用户针对不同情况调用不同的方法;
cogroup: 对2个rdd 先进行合并 然后根据key进行分组, 分组的rdd每个元素的value是 再转为可迭代对象;
groupWith:对多个rdd 先进行合并 然后根据key进行分组, 分组的rdd每个元素的value是 再转为可迭代对象;
Spark on Hive
#创建scala工程
mian
src
recource
core-site.xml
hdfs-site.xml
hive-site.xml
log4j.properties
val spark:SparkSession=SparkSession
.builder()
.appName("spark_sql_01")
.master("local[*]")
.enableHiveSupport()//开启spark对hive支持,自动解析hadoop和hive配置信息
.getOrCreate()
f.foreach(_(spark))
spark.close()
数据存储
.coalesce(numPartitions:Int)
.write.csv(hdfsPath:String)
新建表
.coalesce(numPartitions:Int)
.write.mode(SaveMode.Append|Overwrite).saveAsTable(hiveTable:String)
普通表
.coalesce(1)
.write.mode(SaveMode.Append).insertInto(hiveTable:String)
分区表
.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.option("hive.exec.dynamic.partition.mode","nonstrict")
.insertInto("kb16.par_order_user_diff2_avg3")
spark RDD:
sparkContext
sparkSession
spark Sql
spark 操作hbase
//spark 样例类放在main方法外面
case class UserBehavior(user_id:Int,item_id:Int,category_id:Int,behavior_type:String,time:Long)
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("spark_hbase_02")
.getOrCreate()
//公共配置 : hadoop ,hbase
val config: Configuration = HBaseConfiguration.create()
config.addResource(new Path("core-site.xml"))
config.addResource(new Path("hbase-site.xml"))
config.set(TableInputFormat.INPUT_TABLE,"exam:userbehavior")
config.set(TableOutputFormat.OUTPUT_TABLE,"exam:userbehavior_analysis")
//spark_hbase 输出配置
val job: Job = Job.getInstance(config)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])
import spark.implicits._
import org.apache.spark.sql.functions._
//spark_hbase 输入配置
val frm: DataFrame = spark.sparkContext.newAPIHadoopRDD(
config,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)//数据处理
.mapPartitions(_.map(tp2 => {
val rst: Result = tp2._2
val ps: Array[String] = Bytes.toString(rst.getRow).split("_")
val time = ps(0).toLong
val user_id = ps(1).toInt
val item_id = ps(2).toInt
val behavior_type: String = Bytes.toString(rst.getValue(Bytes.toBytes("info")
, Bytes.toBytes("behavior_type")))
val category_id: Int = Bytes.toString(rst.getValue(Bytes.toBytes("info")
, Bytes.toBytes("category_id"))).toInt
UserBehavior(user_id, item_id, category_id, behavior_type, time)
})).toDF().cache()
spark 图结构计算
社交网络: 推特, 脸书, QQ, 微型,博客,游戏
图结构: 梳理人物关系,人物权重(平台,用户相对权重top10),用户画像,找关联用户
图:链表,树,DAG(有向无环图)
SparkGraph
VertexId=Long
Vertex[VertexId,(String, Int,Strinq)]
Graph(
vertices:RDD[VertexId,T],
edges:RDD[Edge[U]],
defaultVertexAttr:T,
edgeStorageLevels:StorageLevel,
vertexStorageLevels:StorageLevel
)
1、点RDD:RDD[Vertex]
2、边RDD:RDD[Edge[(String, Int)]]
3、关系RDD:RDD[EdgeTriplet[]]
EdgeTriplet extends Edge[(String, Int)]
srcId:VertexId 源点ID
dstId:VertexId 目标点ID
attr:(String, Int)边属性
srcAttr:(String, Int,Strinq) 源点属性
dstAttr:(String, Int,Strinq) 目标点属性
分区切割策略
PartitionStrategy
EdgeCut=>保证点在同一分区
EdgePartition1D => (同顶边(出入度相同),同分区) 从同一个点出发的边会被分配到同一个分区
EdgePartition2D=>邻接矩阵:与顶点关联的边最多会被划分到2 * sqrt(分区总数)个分区中,即每个顶点最多有2 * sqrt(分区总数)个副本
VertexCut=>保证边在同一分区(Spark优选切割策略,因为边较为复杂)
RandomVertexCut =>同点同方向(同边),同分区
CanonicalRandomVertexCut=>同点边,同分区
numVertex //点数量
numEdges //边数量
vertices //点RDD
edges //边RDD
triplets //关系RDD
degrees //出入度
outDegrees //出度
inDegrees //入度
更多推荐
SPARK安装配置及介绍
发布评论