问题描述
限时送ChatGPT账号..我正在尝试合并以下 2 个 JSON 输入(此示例来自文件,但稍后将来自 Google Pub Sub 输入):
I am trying to merge 2 JSON inputs (this example is from a file, but it will be from a Google Pub Sub input later) from these:
orderID.json:
{"orderID":"test1","orderPacked":"Yes","orderSubmitted":"Yes","orderVerified":"Yes","stage":1}
combined.json:
{"barcode":"95590","name":"Ash","quantity":6,"orderID":"test1"}
{"barcode":"95591","name":"Beat","quantity":6,"orderID":"test1"}
{"barcode":"95592","name":"Cat","quantity":6,"orderID":"test1"}
{"barcode":"95593","name":"Dog","quantity":6,"orderID":"test2"}
{"barcode":"95594","name":"Scar","quantity":6,"orderID":"test2"}
像这样(使用 orderID 作为唯一键和主键):
To something like this (using orderID as the unique and primary key):
output.json:
{"orderID":"test1","orderPacked":"Yes","orderSubmitted":"Yes","orderVerified":"Yes","stage":1,"barcode":"95590","name":"Ash","quantity":6}
{"orderID":"test1","orderPacked":"Yes","orderSubmitted":"Yes","orderVerified":"Yes","stage":1,"barcode":"95591","name":"Beat","quantity":6}
{"orderID":"test1","orderPacked":"Yes","orderSubmitted":"Yes","orderVerified":"Yes","stage":1,"barcode":"95592","name":"Cat","quantity":6}
我现在有这样的代码,它改编自 使用数据流在 Google Cloud Platform 中加入两个 json
I have my codes like this now which was adapted from join two json in Google Cloud Platform with dataflow
from __future__ import absolute_import
import argparse
import apache_beam as beam
import json
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from google.api_core import datetime_helpers
from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.exceptions import TooManyRequests
from google.cloud import bigquery
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--topic',
type=str,
help='Pub/Sub topic to read from')
parser.add_argument(
'--topic2',
type=str,
help='Pub/Sub topic to match with'
)
parser.add_argument(
'--output',
help=('Output local filename'))
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
orderID = (p | 'read from text1' >> beam.io.ReadFromText('orderID.json')
#'Read from orderID PubSub' >> beam.io.ReadFromPubSub(topic=args.topic2)
| 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
| 'key_orderID' >> beam.Map(lambda orders: (orders['orderID'], orders))
)
orders_si = beam.pvalue.AsDict(orderID)
orderDetails = (p | 'read from text' >> beam.io.ReadFromText('combined.json')
| 'Parse JSON to Dict1' >> beam.Map(lambda e: json.loads(e)))
#'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic))
def join_orderID_orderDetails(order, order_dict):
return order.update(order_dict[order['orderID']])
joined_dicts = orderDetails | beam.Map(join_orderID_orderDetails, order_dict=orders_si)
joined_dicts | beam.io.WriteToText('beam.output')
p.run()
#result.wait_until_finish()
if __name__ == '__main__':
run()
但我现在在 beam.output 中的输出只显示:
But my output now in beam.output just shows:
None
None
None
有人能指出我在这方面做错了什么吗?
Can someone point out to me what I am doing wrong about this ?
与报告的重复帖子不同的问题是:
The question that is different from the reported duplicate post is:
为什么我的结果是无"?我在这里做错了什么?我怀疑是这些问题: Why are my results "None"? What am I doing wrong here?
I suspect these are the issues: order"变量 - 是否在join_orderID_orderDetails"中正确引用在join_dicts"中列出项目join_orderID_orderDetails"? - 是否也正确引用?
推荐答案
试试下面的方法,希望对你有所帮助.
Try the below, Hope this will help you a little.
这里我使用了您的订单数组并进行了组合,而不是使用文件.
Here i have used an array of your order and combined, instead of using a file.
order = [{"orderID":"test1","orderPacked":"Yes","orderSubmitted":"Yes","orderVerified":"Yes","stage":1}]
combined = [
{"barcode":"95590","name":"Ash","quantity":6,"orderID":"test1"},
{"barcode":"95591","name":"Beat","quantity":6,"orderID":"test1"},
{"barcode":"95592","name":"Cat","quantity":6,"orderID":"test1"},
{"barcode":"95593","name":"Dog","quantity":6,"orderID":"test2"},
{"barcode":"95594","name":"Scar","quantity":6,"orderID":"test2"}
]
def joinjson(repl, tobeCombined):
newarr = []
for data in tobeCombined:
replData = getOrderData(repl,data['orderID'])
if replData is not None:
data.update(replData)
newarr.append(data)
return newarr
def getOrderData(order, orderID):
for data in order:
print("Data OrderID : ",data['orderID'])
if data['orderID'] == orderID:
return data
print(joinjson(order,combined))
这篇关于加入由主键链接的 2 个 JSON 输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论