Dataproc + BigQuery示例

编程入门 行业动态 更新时间:2024-10-10 02:24:03
本文介绍了Dataproc + BigQuery示例 - 任何可用的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

根据Dataproc docos ,它具有自动与自动集成与BigQuery 。

我在BigQuery中有一个表格。我想读取该表并使用我创建的Dataproc集群(使用PySpark作业)对其进行一些分析。然后将此分析的结果写回到BigQuery。您可能会问为什么不直接在BigQuery中进行分析!? - 原因是因为我们正在创建复杂的统计模型,而SQL对于开发它们来说太高。我们需要像Python或R,ergo Dataproc之类的东西。

他们是否有Dataproc + BigQuery示例?我找不到任何。 解决方案

首先,如这个问题 BigQuery连接器预装在 Cloud Dataproc 集群。

这里是一个关于如何将数据从BigQuery读取到Spark中的例子。在这个例子中,我们将从BigQuery读取数据来执行字数统计。 您可以使用 SparkContext.newAPIHadoopRDD 从Spark中的BigQuery中读取数据。 Spark文档有有关使用 SparkContext.newAPIHadoopRDD 的更多信息。 '

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat import com.google.gson.JsonObject import org.apache.hadoop.io.LongWritable $ b $ val projectId =< your-project-id> val fullyQualifiedInputTableId =publicdata:samples.shakespeare val fullyQualifiedOutputTableId =< your-fully-qualified-table-id> val outputTableSchema = [{'name':'Word','type':'STRING'},{'name':'Count','type':'INTEGER'}] val jobName =wordcount val conf = sc.hadoopConfiguration //设置作业级别的projectId。 conf.set(BigQueryConfiguration.PROJECT_ID_KEY,projectId) //使用systemBucket获取InputFormat使用的临时BigQuery导出数据。 val systemBucket = conf.get(fs.gs.system.bucket) conf.set(BigQueryConfiguration.GCS_BUCKET_KEY,systemBucket) //将输入和输出配置为BigQuery访问。 BigQueryConfiguration.configureBigQueryInput(conf,fullyQualifiedInputTableId) BigQueryConfiguration.configureBigQueryOutput(conf, fullyQualifiedOutputTableId,outputTableSchema) $ b $ val fieldName =word val tableData = sc.newAPIHadoopRDD(conf, classOf [GsonBigQueryInputFormat],classOf [LongWritable],classOf [JsonObject]) tableData.cache() tableData.count() tableData.map(entry =>(entry._1.toString(),entry._2.toString()))。take(10)

您需要使用您的设置自定义此示例,其中包括您在< your-project-id> 和你的输出表ID在< your-fully-qualified-table-id> 。

如果最终在MapReduce中使用BigQuery连接器,此页面如何编写MapReduce作业的示例使用BigQuery连接器。

According to the Dataproc docos, it has "native and automatic integrations with BigQuery".

I have a table in BigQuery. I want to read that table and perform some analysis on it using the Dataproc cluster that I've created (using a PySpark job). Then write the results of this analysis back to BigQuery. You may be asking "why not just do the analysis in BigQuery directly!?" - the reason is because we are creating complex statistical models, and SQL is too high level for developing them. We need something like Python or R, ergo Dataproc.

Are they any Dataproc + BigQuery examples available? I can't find any.

解决方案

To begin, as noted in this question the BigQuery connector is preinstalled on Cloud Dataproc clusters.

Here is an example on how to read data from BigQuery into Spark. In this example, we will read data from BigQuery to perform a word count. You read data from BigQuery in Spark using SparkContext.newAPIHadoopRDD. The Spark documentation has more information about using SparkContext.newAPIHadoopRDD. '

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat import com.google.gson.JsonObject import org.apache.hadoop.io.LongWritable val projectId = "<your-project-id>" val fullyQualifiedInputTableId = "publicdata:samples.shakespeare" val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>" val outputTableSchema = "[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]" val jobName = "wordcount" val conf = sc.hadoopConfiguration // Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId) // Use the systemBucket for temporary BigQuery export data used by the InputFormat. val systemBucket = conf.get("fs.gs.system.bucket") conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket) // Configure input and output for BigQuery access. BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId) BigQueryConfiguration.configureBigQueryOutput(conf, fullyQualifiedOutputTableId, outputTableSchema) val fieldName = "word" val tableData = sc.newAPIHadoopRDD(conf, classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject]) tableData.cache() tableData.count() tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)

You will need to customize this example with your settings, including your Cloud Platform project ID in <your-project-id> and your output table ID in <your-fully-qualified-table-id>.

Finally, if you end up using the BigQuery connector with MapReduce, this page has examples for how to write MapReduce jobs with the BigQuery connector.

更多推荐

Dataproc + BigQuery示例

本文发布于:2023-05-31 10:29:39,感谢您对本站的认可!
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:示例   Dataproc   BigQuery

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!