我想根据 S3 上的文件上传事件在 AWS EMR 集群上执行 spark 提交作业.我正在使用 AWS Lambda 函数来捕获事件,但我不知道如何通过 Lambda 函数在 EMR 集群上提交 spark 提交作业.
I want to execute spark submit job on AWS EMR cluster based on the file upload event on S3. I am using AWS Lambda function to capture the event but I have no idea how to submit spark submit job on EMR cluster from Lambda function.
我搜索的大多数答案都谈到在 EMR 集群中添加一个步骤.但我不知道我是否可以在添加的步骤中添加添加任何步骤来触发spark submit --with args".
Most of the answers that i searched talked about adding a step in the EMR cluster. But I do not know if I can add add any step to fire "spark submit --with args" in the added step.
推荐答案你可以,我上周不得不做同样的事情!
You can, I had to same thing last week!
将 boto3 用于 Python(其他语言肯定有类似的解决方案),您可以使用定义的步骤启动集群,也可以将步骤附加到已经启动的集群.
Using boto3 for Python (other languages would definitely have a similar solution) you can either start a cluster with the defined step, or attach a step to an already up cluster.
def lambda_handler(event, context): conn = boto3.client("emr") cluster_id = conn.run_job_flow( Name='ClusterName', ServiceRole='EMR_DefaultRole', JobFlowRole='EMR_EC2_DefaultRole', VisibleToAllUsers=True, LogUri='s3n://some-log-uri/elasticmapreduce/', ReleaseLabel='emr-5.8.0', Instances={ 'InstanceGroups': [ { 'Name': 'Master nodes', 'Market': 'ON_DEMAND', 'InstanceRole': 'MASTER', 'InstanceType': 'm3.xlarge', 'InstanceCount': 1, }, { 'Name': 'Slave nodes', 'Market': 'ON_DEMAND', 'InstanceRole': 'CORE', 'InstanceType': 'm3.xlarge', 'InstanceCount': 2, } ], 'Ec2KeyName': 'key-name', 'KeepJobFlowAliveWhenNoSteps': False, 'TerminationProtected': False }, Applications=[{ 'Name': 'Spark' }], Configurations=[{ "Classification":"spark-env", "Properties":{}, "Configurations":[{ "Classification":"export", "Properties":{ "PYSPARK_PYTHON":"python35", "PYSPARK_DRIVER_PYTHON":"python35" } }] }], BootstrapActions=[{ 'Name': 'Install', 'ScriptBootstrapAction': { 'Path': 's3://path/to/bootstrap.script' } }], Steps=[{ 'Name': 'StepName', 'ActionOnFailure': 'TERMINATE_CLUSTER', 'HadoopJarStep': { 'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar', 'Args': [ "/usr/bin/spark-submit", "--deploy-mode", "cluster", 's3://path/to/code.file', '-i', 'input_arg', '-o', 'output_arg' ] } }], ) return "Started cluster {}".format(cluster_id)将一个步骤附加到一个已经运行的集群
根据此处
def lambda_handler(event, context): conn = boto3.client("emr") # chooses the first cluster which is Running or Waiting # possibly can also choose by name or already have the cluster id clusters = conn.list_clusters() # choose the correct cluster clusters = [c["Id"] for c in clusters["Clusters"] if c["Status"]["State"] in ["RUNNING", "WAITING"]] if not clusters: sys.stderr.write("No valid clusters ") sys.stderr.exit() # take the first relevant cluster cluster_id = clusters[0] # code location on your emr master node CODE_DIR = "/home/hadoop/code/" # spark configuration example step_args = ["/usr/bin/spark-submit", "--spark-conf", "your-configuration", CODE_DIR + "your_file.py", '--your-parameters', 'parameters'] step = {"Name": "what_you_do-" + time.strftime("%Y%m%d-%H:%M"), 'ActionOnFailure': 'CONTINUE', 'HadoopJarStep': { 'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar', 'Args': step_args } } action = conn.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step]) return "Added step: %s"%(action)更多推荐
如何从 Lambda 函数在亚马逊 EMR 上执行 spark 提交?
发布评论