我正在尝试通过带有ParquetIO的FileIO使用假设性角色向Amazon S3写信

第1步:Assum角色

public static AWSCredentialsProvider getCredentials() {
        if (roleARN.length() > 0) {
            STSAssumeRoleSessionCredentialsProvider credentialsProvider = new STSAssumeRoleSessionCredentialsProvider
                    .Builder(roleARN, Constants.SESSION_NAME)
                    .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient())
                    .build();
            return credentialsProvider;
        }
        return new ProfileCredentialsProvider();
    }

第二步:将凭据设置为管道

credentials = getCredentials();
pipeline.getOptions().as(AwsOptions.class).setAwsRegion(Regions.US_WEST_2.getName());
pipeline.getOptions().as(AwsOptions.class).setAwsCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials(credentials.getCredentials().getAWSAccessKeyId(), credentials.getCredentials().getAWSAccessKeyId())));

第3步:运行管道以写入S3

PCollection<GenericRecord> parquetRecord = formattedEvent
        .apply("ParquetRecord", ParDo.of(new ParquetWriter()))
        .setCoder(AvroCoder.of(getOutput_schema()));

parquetRecord.apply(FileIO.<GenericRecord, GenericRecord>writeDynamic()
        .by(elm -> elm)
        .via(ParquetIO.sink(getOutput_schema()))
        .to(outputPath).withNumShards(1)
        .withNaming(type -> FileNaming.getNaming("part", ".snappy.parquet", "" + DateTime.now().getMillisOfSecond()))
        .withDestinationCoder(AvroCoder.of(getOutput_schema())));
我正在使用'org.apache.beam:beam-sdks-java-io-parquet:jar:2.22.0''org.apache.beam:beam-sdks-java-io-amazon-web-services:jar:2.22.0'

问题:当前假定角色似乎不起作用。

错误:

org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records.

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'awsCredentialsProvider' with value 'com.amazonaws.auth.InstanceProfileCredentialsProvider@71262020'

解决方案

最近发布的BEAM(2.24.0)具有承担角色的功能。

相关文章