问题描述
限时送ChatGPT账号..我正在开发一个 Apache Beam 管道,它从发布/订阅中读取一堆事件,然后根据事件类型将它们写入单独的 BigQuery 表中.
I am working on an Apache Beam pipeline that reads a bunch of events from a pub/sub and then based on event type, it writes them into separate BigQuery tables.
我知道 WriteToBigQuery
支持动态目的地,但就我而言,问题在于目的地是从从事件中读取的数据派生而来的.例如:一个事件看起来像
I know that WriteToBigQuery
supports dynamic destinations but the problem in my case is that the destination is derived from the data that is read from the event. For example:
an event looks like
{
"object_id": 123,
... some metadata,
"object_data": {object related info}
}
应写入 BigQuery 表的数据位于事件的 object_data
键下,但是,表名来自元数据中的其他字段.我尝试使用侧输入参数,但问题是因为每个事件可以有不同的目的地,侧输入不会相应地更新.代码如下:
Data that should be written to BigQuery table is under the object_data
key from the event, but, the table name is derived from other fields in the metadata.
I tried to use the side inputs params but the issue is that because each event can have different destinations, the side inputs don't update accordingly. The code is below:
class DumpToBigQuery(PTransform):
def _choose_table(self, element, table_names):
# table_names = {"table_name": "project_name.dataset.table_name}
table_name = table_names["table_name"]
return table_name
def expand(self, pcoll):
events = (
pcoll
| "GroupByObjectType" >> Map(lambda e: (e["object_type"], e))
| "Window"
>> WindowInto(
windowfn=FixedWindows(self.window_interval_seconds)
)
| "GroupByKey" >> GroupByKey()
| "KeepLastEventOnly" >> ParDo(WillTakeLatestEventForKey()
)
table_name = events | Map(lambda e: ["table_name", f"{self.project}:{self.dataset}.{e[0]}"])
table_names_dct = AsDict(table_name)
events_to_write = events | Map(lambda e: e[1]) | Map(self._drop_unwanted_fields)
return events_to_write | "toBQ" >> WriteToBigQuery(
table=self._choose_table,
table_side_inputs=(table_names_dct,),
create_disposition=BigQueryDisposition.CREATE_NEVER,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
您可以看到侧边输入取自管道 table_name
的另一个分支,该分支基本上是从事件中提取表名.然后,将其作为 WriteToBigQuery
的输入.不幸的是,这在负载下并没有真正起作用,侧面输入没有更新,并且一些事件使用了错误的目的地.
You can see that the side input is taken from the other branch of the pipeline table_name
that is basically extracting the table name from the event. And then, this is given as input to WriteToBigQuery
. Unfortunately, this doesn't really work as under load, the side input is not updated and some events are using wrong destinations.
在这种特定情况下我可以使用哪些其他方法?所有文档都使用静态示例,并没有真正涵盖这种动态方法.
What other approach I can use in this specific case? All the docs are using static examples and don't really cover this dynamic approach.
我尝试的另一件事是编写一个使用 HTTP BigQuery 客户端并插入行的自定义 DoFn
,这里的问题是管道的速度,因为每个插入大约 6-7 个事件秒.
The other thing I tried is to write a custom DoFn
that used the HTTP BigQuery client and inserts the rows, the issue here is the speed of the pipeline as is inserting around 6-7 events per seconds.
推荐答案
我遇到了类似的问题,我有一个解决方法.
I had a similar problem, for which I have a work around.
我看到您有 create_disposition=BigQueryDisposition.CREATE_NEVER
,因此在代码运行之前就知道表列表.也许它很笨拙,但它是众所周知的.我有一个 DoFn
,其中 yeild
有许多 TaggedOutput
的 process
方法.然后我的管道看起来像:
I see that you have create_disposition=BigQueryDisposition.CREATE_NEVER
so the list of tables is known before the code runs. Perhaps it is unweildy, but it is known. I have a DoFn
which yeild
s many TaggedOutput
s its process
method. Then my pipeline looks like:
parser_outputs = ['my', 'list', 'of', 'tables']
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
pipe = (
p
| "Start" >> beam.Create(["example row"])
| "Split"
>> beam.ParDo(MySplitFn()).with_outputs(*parser_outputs)
)
for output in parser_outputs:
pipe[output] | "write {}".format(output) >> beam.io.WriteToBigQuery(
bigquery.TableReference(
projectId=options.projectId, datasetId=DATASET_ID, tableId=output
),
schema=padl_shared.getBQSchema(parser.getSchemaForDataflow(rowTypeName=output)),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
p.run().wait_until_finish()
这篇关于具有动态目标的 WriteToBigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论