PySpark-Note-SparkSubmit

Introduction

由于之前工作常用到spark, hadoop mapreduce,hive在集群上面的数据进行处理,所以这里总结一下这3者里面用到的一些配置的含义,以便之后能快速自己开发

Spark-submit

这里以一个spark-submit的命令进行解释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spark-submit  \
--name "train_data_v4" \
--master yarn \
--deploy-mode cluster \
--num-executors 200 \
--executor-memory ${exe_memory}G \
--executor-cores 4 \
--driver-memory 4G \
--queue ${queue} \
--archives s3://tools/py37_mr_min.tar.gz#python3 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python3/bin/python3 \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python3/bin/python3 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.sql.shuffle.partitions=1600 \
--conf spark.debug.maxToStringFields=2000 \
--conf spark.speculation=true \
--py-files conf.py,create_mapping_dict.py,feature_extractor.py \
--files ${range_dict_pkl_file} \
map_reduce.py main ${input_path} ${output_path} ${goods_info_file}
  • –name “train_data_v4” : 应用名称Application name, 会在Hadoop cluster的节点信息里面看到

  • –master yarn : 指定master节点位置, 可以是 yarn / local / master node url (e.g. spark://23.195.26.187:7077).

    • 如果是local,就是直接在本地机跑
    • 如果是yarn就默认在yarn cluster上面跑
    • 如果是local可以不指定deploy-mode设定
  • –deploy-mode cluster :deploy-mode指定代码执行的环境。

    • cluster模式表示AM会随机的在Worker节点中的任意一台上启动运行。如果设置此参数,需要指定Master为yarn。
    • client模式表示作业的AM会放在Master节点上运行。如果设置此参数,需要指定Master为yarn。
      
  • –num-executors 200: 创建executor执行器个数为200

  • –executor-memory ${exe_memory}G : 指定各个Executor使用的最大内存,不可以超过单机的最大可使用内存,另外后面要加个G

  • –executor-cores 4: 各个Executor使用的并发线程数目,即每个Executor最大可并发执行的Task数目。

  • –driver-memory 4G: Driver使用的内存,不可超过单机的总内存。这里设置为4G

  • –queue ${queue} : 指定应用要排队的队列名称。在集群上面跑任务时有一个或多个任务队列对任务逐个执行。这里指定这个任务队列

  • –archives s3://tools/py37_mr_min.tar.gz#python3 : 从一个url路径里面上传压缩包到yarn cluster里面并解压。其中#python3 指把压缩包解压后的文件夹名称为python3,#起命名作用

  • –conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python3/bin/python3 :

    • –conf后面接spark配置参数,spark.yarn.appMasterEnv.PYSPARK_PYTHON这里把spark的PYTHON环境指定到./python3/bin/python路径
  • –conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python3/bin/python3 : 把spark的Driver的python环境指定到后面的python3路径

  • –conf spark.dynamicAllocation.enabled=true : 关闭spark的动态资源分配功能。 这个功能基于工作量workload来自动设定开启和关闭的executor的数量

  • –conf spark.sql.shuffle.partitions=1600 :shuffle data时aggregate和join操作的默认的数据分区数量(把数据分成多少份)

  • –conf spark.debug.maxToStringFields=2000 : 设定SQL表格最大的字段数目为2000

  • –conf spark.speculation=true : 开启spark推测(speculation)功能时,spark会在一个或多个任务在一个stage执行很慢时重启任务

  • –py-files conf.py,create_mapping_dict.py,feature_extractor.py : –py-files 后面接一个逗号相隔的多个python文件,把这些python文件上传到yarn cluster里面

  • –files ${range_dict_pkl_file} : 上传其他文件

  • map_reduce.py main ${input_path} ${output_path} ${goods_info_file} : spark执行 map_reduce.py 里面的main函数

Hadoop MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hadoop jar /usr/lib/hadoop/hadoop-streaming.jar \
-D mapreduce.job.queuename="ai" \
-D mapred.job.name="shuffle_tfrecord" \
-D mapred.map.tasks=200 \
-D mapred.reduce.tasks=100 \
-D mapreduce.job.reduce.slowstart.completedmaps=0.5 \
-D mapreduce.task.timeout=120000 \
-D mapreduce.job.runing.map.limit=200 \
-D mapreduce.reduce.failures.maxpercent=5 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.comparator.options="-k1,1" \
-D mapreduce.reduce.shuffle.memory.limit.percent=0.15 \
-D mapreduce.reduce.shuffle.merge.percent=0.9 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-file conf.py -file exec.sh -file shuffle_tfrecord.py -file start_tf_eval.sh -file start_tf_train.sh \
-input ${input}/${date_list} \
-output ${reduce_output} \
-mapper "/bin/sh -x exec.sh shuffle_tfrecord.py shuffle_mapper" \
-reducer "/bin/sh -x exec.sh shuffle_tfrecord.py shuffle_reducer ${tf_output}" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-cacheArchive s3://py37_mr_min.tar.gz#python3
  • jar /usr/lib/hadoop/hadoop-streaming.jar :
    指定hadoop-streaming数据流处理用到的java class文件

  • -D : 类似pyspark的 –conf后面接mapreduce的配置设定

  • -D mapreduce.job.queuename=”ai” :
    指定mapreduce job在集群上在哪个队列排队执行,和spark的–queue 类似

  • -D mapred.job.name=”shuffle_tfrecord” :
    指定Hadoop mapreduce job的名称

  • -D mapred.map.tasks=200 : mapper的数量

  • -D mapred.reduce.tasks=100 : reducer的数量

  • -D mapreduce.job.reduce.slowstart.completedmaps=0.5 :
    当Map Task完成的比例达到该值后才会为Reduce Task申请资源, 这里比例设置为0.5

  • -D mapreduce.task.timeout=120000 :
    mapreduce等待超时时间设置为120000 秒

  • -D mapreduce.job.runing.map.limit=200 :
    指定mapper同时跑的task数(资源足够时)

  • -D mapreduce.reduce.failures.maxpercent=5 :当失败的 Map Task 失败比例超过该值,整个作业则失败,默认值为 0.

  • -D stream.num.map.output.key.fields=1 :
    Mapper的输出使用.做分割符,并且第1个.之前的部分作为key, 剩余的部分作为value (包含剩余的.)

  • -D mapreduce.reduce.shuffle.memory.limit.percent=0.15 : 单个reduce shuffle能用的memory占总memory比例

  • -D mapreduce.reduce.shuffle.merge.percent=0.9 :
    reduce端的输入缓冲区使用达到多少比例时开始merge到磁盘的过程

  • -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator :
    如果需要对字段进行排序,需要指定使用mapred.text.key.comparator.opt提供所需的排序类型class。一些有用的是-n:数字排序,-r:反向排序

  • -D mapred.text.key.comparator.options=”-k1,1”:
    k1,1只用第一列排序,n数字排序,r倒序(从大到小)

  • -file conf.py -file exec.sh -file shuffle_tfrecord.py -file start_tf_eval.sh -file start_tf_train.sh :

  • file指定要上传到HDFS的文件

  • -input {input}/{date_list} :要输入到程序的数据的路径

  • -output {reduce_output} : mapreduce/reducer 输出的数据路径

  • -mapper “/bin/sh -x exec.sh shuffle_tfrecord.py shuffle_mapper” : 要执行的mapper程序函数。 这里输入string后hadoop会执行string里面的内容。

  • -reducer “/bin/sh -x exec.sh shuffle_tfrecord.py shuffle_reducer {tf_output}” :要执行的reducer 程序函数。 这里输入string后hadoop会执行string里面的内容。

  • -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner : 如果需要对字段排序、分区,默认都得加上此设置

  • -cacheArchive s3://py37_mr_min.tar.gz#python3 : 从url s3路径下载压缩包,并把压缩包解压后文件夹命名为python3, 这里# 后面的内容指代解压后的别称

Hive

在hadoop里面,可以直接用hive直接对数据表格进行读写,这里简单举个hive的例子

1
2
3
4
hive -e "select *
From table
where dt = '20211021'
limit 10"

hive 后面加 -e 可以直接执行后面string里面的sql代码对数据进行查询

Reference

[1] https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html

[2] https://spark.apache.org/docs/latest/configuration.html

Comments