我一直在使用Beam管道示例作为尝试的指南从S3为我的管道加载文件.像在示例中一样,我已经定义了自己的 PipelineOptions ,它也扩展了 S3Options ,而我尝试使用 DefaultAWSCredentialsProviderChain .配置它的代码是:
I have been using a Beam pipeline examples as a guide in an attempt to load files from S3 for my pipeline. Like in the examples I have defined my own PipelineOptions that also extends S3Options and I am attempting to use the DefaultAWSCredentialsProviderChain. The code to configure this is:
MyPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(MyPipelineOptions.class); options.setAwsCredentialsProvider(new DefaultAWSCredentialsProviderChain()); options.setAwsRegion("us-east-1"); runPipeline(options);当我从Intellij运行它时,它可以使用直接运行器正常运行但是当我将其打包为jar并执行(也使用Direct Runner)时,我会看到:
When I run it from Intellij it works fine using the Direct Runner but when I package it as a jar and it execute it (also using the Direct Runner) I see:
Exception in thread "main" java.lang.IllegalArgumentException: PipelineOptions specified failed to serialize to JSON. at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:166) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at a.b.c.beam.CleanSkeleton.runPipeline(CleanSkeleton.java:69) at a.b.c.beam.CleanSkeleton.main(CleanSkeleton.java:53) Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'awsCredentialsProvider' with value 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain@40f33492' at com.fasterxml.jackson.databind.JsonMappingException.fromUnexpectedIOE(JsonMappingException.java:338) at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3247) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:163) ... 5 more我正在使用gradle来执行以下任务来构建我的jar:
I am using gradle to build my jar with the following task:
jar { manifest { attributes ( 'Main-Class': 'a.b.c.beam.CleanSkeleton' ) } from { configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) } } from('src') { include '/main/resources/*' } zip64 true exclude 'META-INF/*.RSA', 'META-INF/*.SF', 'META-INF/*.DSA' } 推荐答案之所以出现此问题,是因为在创建fat/uber jar时, META-INF/serivces 中的文件被覆盖了通过重复的文件.特别是 com.fasterxml.jackson.databind.Module ,其中需要定义许多Jackson模块,但缺少这些模块.其中包括 org.apache.beam.sdk.io.aws.options.AwsModule 和 com.fasterxml.jackson.datatype.joda.JodaModule . DirectRunner 中的代码实例化 ObjectMapper ,如下所示:
The problem was occuring because when the the fat/uber jar was being created, files in META-INF/serivces where being overwritten by duplicate files. Specifically com.fasterxml.jackson.databind.Module where a number of Jackson modules needed to be defined but where missing. These include org.apache.beam.sdk.io.aws.options.AwsModule and com.fasterxml.jackson.datatype.joda.JodaModule. The code in the DirectRunner instantiates the ObjectMapper like so :
new ObjectMapper() .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));ObjectMapper :: findModules 依靠 java.util.ServiceLoader 来从 META-INF/services/文件中定位服务.
ObjectMapper::findModules relies on java.util.ServiceLoader which locates services from META-INF/services/ files.
解决方案是使用gradle 阴影插件来构建fat/uber jar并对其进行配置合并服务文件:
The solution was to use the gradle Shadow plugin to build the fat/uber jar and configure it to merge the services files:
apply plugin: 'com.github.johnrengelman.shadow' shadowJar { mergeServiceFiles() zip64 true }更多推荐
Beam:无法序列化和反序列化属性'awsCredentialsProvider
发布评论