根据Dataproc docos ,它具有自动与自动集成与BigQuery 。
我在BigQuery中有一个表格。我想读取该表并使用我创建的Dataproc集群(使用PySpark作业)对其进行一些分析。然后将此分析的结果写回到BigQuery。您可能会问为什么不直接在BigQuery中进行分析!? - 原因是因为我们正在创建复杂的统计模型,而SQL对于开发它们来说太高。我们需要像Python或R,ergo Dataproc之类的东西。
他们是否有Dataproc + BigQuery示例?我找不到任何。 解决方案
首先,如这个问题 BigQuery连接器预装在 Cloud Dataproc 集群。
这里是一个关于如何将数据从BigQuery读取到Spark中的例子。在这个例子中,我们将从BigQuery读取数据来执行字数统计。 您可以使用 SparkContext.newAPIHadoopRDD 从Spark中的BigQuery中读取数据。 Spark文档有有关使用 SparkContext.newAPIHadoopRDD 的更多信息。 '
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat import com.google.gson.JsonObject import org.apache.hadoop.io.LongWritable $ b $ val projectId =< your-project-id> val fullyQualifiedInputTableId =publicdata:samples.shakespeare val fullyQualifiedOutputTableId =< your-fully-qualified-table-id> val outputTableSchema = [{'name':'Word','type':'STRING'},{'name':'Count','type':'INTEGER'}] val jobName =wordcount val conf = sc.hadoopConfiguration //设置作业级别的projectId。 conf.set(BigQueryConfiguration.PROJECT_ID_KEY,projectId) //使用systemBucket获取InputFormat使用的临时BigQuery导出数据。 val systemBucket = conf.get(fs.gs.system.bucket) conf.set(BigQueryConfiguration.GCS_BUCKET_KEY,systemBucket) //将输入和输出配置为BigQuery访问。 BigQueryConfiguration.configureBigQueryInput(conf,fullyQualifiedInputTableId) BigQueryConfiguration.configureBigQueryOutput(conf, fullyQualifiedOutputTableId,outputTableSchema) $ b $ val fieldName =word val tableData = sc.newAPIHadoopRDD(conf, classOf [GsonBigQueryInputFormat],classOf [LongWritable],classOf [JsonObject]) tableData.cache() tableData.count() tableData.map(entry =>(entry._1.toString(),entry._2.toString()))。take(10)您需要使用您的设置自定义此示例,其中包括您在< your-project-id> 和你的输出表ID在< your-fully-qualified-table-id> 。
如果最终在MapReduce中使用BigQuery连接器,此页面如何编写MapReduce作业的示例使用BigQuery连接器。
According to the Dataproc docos, it has "native and automatic integrations with BigQuery".
I have a table in BigQuery. I want to read that table and perform some analysis on it using the Dataproc cluster that I've created (using a PySpark job). Then write the results of this analysis back to BigQuery. You may be asking "why not just do the analysis in BigQuery directly!?" - the reason is because we are creating complex statistical models, and SQL is too high level for developing them. We need something like Python or R, ergo Dataproc.
Are they any Dataproc + BigQuery examples available? I can't find any.
解决方案To begin, as noted in this question the BigQuery connector is preinstalled on Cloud Dataproc clusters.
Here is an example on how to read data from BigQuery into Spark. In this example, we will read data from BigQuery to perform a word count. You read data from BigQuery in Spark using SparkContext.newAPIHadoopRDD. The Spark documentation has more information about using SparkContext.newAPIHadoopRDD. '
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat import com.google.gson.JsonObject import org.apache.hadoop.io.LongWritable val projectId = "<your-project-id>" val fullyQualifiedInputTableId = "publicdata:samples.shakespeare" val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>" val outputTableSchema = "[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]" val jobName = "wordcount" val conf = sc.hadoopConfiguration // Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId) // Use the systemBucket for temporary BigQuery export data used by the InputFormat. val systemBucket = conf.get("fs.gs.system.bucket") conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket) // Configure input and output for BigQuery access. BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId) BigQueryConfiguration.configureBigQueryOutput(conf, fullyQualifiedOutputTableId, outputTableSchema) val fieldName = "word" val tableData = sc.newAPIHadoopRDD(conf, classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject]) tableData.cache() tableData.count() tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)You will need to customize this example with your settings, including your Cloud Platform project ID in <your-project-id> and your output table ID in <your-fully-qualified-table-id>.
Finally, if you end up using the BigQuery connector with MapReduce, this page has examples for how to write MapReduce jobs with the BigQuery connector.
更多推荐
Dataproc + BigQuery示例
发布评论