如何使用AirFlow提取使用Apache Livy批处理POST方法提交的Spark作业客户端日志

编程入门 行业动态 更新时间:2024-10-27 02:27:46
本文介绍了如何使用AirFlow提取使用Apache Livy批处理POST方法提交的Spark作业客户端日志的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在使用Apache Livy批处理POST方法提交Spark作业。

此HTTP请求是使用AirFlow发送的。提交作业后,我正在使用批处理ID跟踪状态。 Livy /资源管理器。

使用Apache Livy REST API可以做到吗?

解决方案

Livy有一个端点来获取日志 / sessions / {sessionId} / log & / batches / {batchId} / log 。

文档:

  • https ://livy.incubator.apache/docs/latest/rest-api.html#get-sessionssessionidlog
  • livy.incubator.apache/docs/latest/rest-api.html#get -batchesbatchidlog

您可以创建如下所示的python函数来获取日志:

http = HttpHook( GET,http_conn_id = http_conn_id) def _http_rest_call(自身,方法,终结点,数据=无,标头=无,extra_options =无):(如果没有)extra_options: extra_options = {} self.http.method =方法响应= http.run(端点,json.dumps(数据),标头,extra_options = extra_options) 返回响应 def _get_batch_session_logs(self,batch_id): method = GET 端点=批处理/ + str(batch_id)+ / log 响应= self._http_rest_call(方法=方法,端点=端点)#返回response.json()返回响应

I am working on submitting Spark job using Apache Livy batches POST method.

This HTTP request is send using AirFlow. After submitting job, I am tracking status using batch Id.

I want to show driver ( client logs) logs on Air Flow logs to avoid going to multiple places AirFLow and Apache Livy/Resource Manager.

Is this possible to do using Apache Livy REST API?

解决方案

Livy has an endpoint to get logs /sessions/{sessionId}/log & /batches/{batchId}/log.

Documentation:

  • livy.incubator.apache/docs/latest/rest-api.html#get-sessionssessionidlog
  • livy.incubator.apache/docs/latest/rest-api.html#get-batchesbatchidlog

You can create python functions like the one shown below to get logs:

http = HttpHook("GET", http_conn_id=http_conn_id) def _http_rest_call(self, method, endpoint, data=None, headers=None, extra_options=None): if not extra_options: extra_options = {} self.http.method = method response = http.run(endpoint, json.dumps(data), headers, extra_options=extra_options) return response def _get_batch_session_logs(self, batch_id): method = "GET" endpoint = "batches/" + str(batch_id) + "/log" response = self._http_rest_call(method=method, endpoint=endpoint) # return response.json() return response

更多推荐

如何使用AirFlow提取使用Apache Livy批处理POST方法提交的Spark作业客户端日志

本文发布于:2023-11-24 05:09:07,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1624070.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:作业   批处理   如何使用   客户端   方法

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!