如何使用 rusoto 将文件上传到 s3,而不将文件内容读取到内存(流式传输)?
使用此代码:
使用 std::fs::File;使用 std::io::BufReader;使用 rusoto_core::Region;使用 rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};fn 主(){let file = File::open("input.txt").unwrap();让 mut reader = BufReader::new(file);让 s3_client = S3Client::new(Region::UsEast1);让结果 = s3_client.put_object(PutObjectRequest {存储桶:String::from("example_bucket"),键:example_filename".to_string(),//这有效://body: Some("example string".to_owned().into_bytes().into()),//这不是:正文:一些(StreamingBody::new(读者)),..默认::默认()}).sync().expect("无法上传");}我收到以下错误:
error[E0277]: trait bound `std::io::BufReader: futures::stream::Stream` 不满足-->src/bin/example.rs:18:20|18 |正文:一些(StreamingBody::new(读者)),|^^^^^^^^^^^^^^^^^^ 特性 `futures::stream::Stream` 没有为 `std::io::BufReader<std::fs::File> 实现.`|= 注意:`rusoto_core::stream::ByteStream::new` 需要 解决方案好的.系好安全带,这很有趣.
StreamingBody 是 ByteStream 的别名,它本身采用参数类型S:Stream+ 发送 + '静态.简而言之,它需要是一个字节流.
BufReader,显然, 没有实现这个特性,因为它早于期货和流很长一段时间.也没有简单的转换到 Stream 可以用来隐式转换成这个.
第一个(注释)示例之所以有效是因为 String::into_bytes().into() 将遵循类型转换链:String -> Vec -> ByteStream 感谢 From
既然我们知道为什么这不起作用,我们可以修复它.有一个快速的方法,然后有一个正确的方法.两个我都给你看.
快捷方式快速(但不是最佳)方法是简单地调用 File::read_to_end().这将填充一个 Vec,然后您可以像以前一样使用它:
let mut buf:Vec= vec![];file.read_to_end(&mut buf)?;//buf 现在包含整个文件这是低效和次优的,原因有两个:
- read_to_end() 是一个阻塞调用.根据您从何处读取文件,此阻塞时间可能被证明是不合理的
- 您需要拥有比文件中字节数更多的可用 RAM(+ 64 位或 128 位用于 Vec 定义+ 一些我们并不真正关心的额外内容)
将您的文件转换为实现 AsyncRead.由此,我们可以形成一个Stream.
既然你已经有了一个std::fs::File,我们会先把它转换成一个tokio::fs::File.这实现了AsyncRead,这对后面很重要:
let tokio_file = tokio::fs::File::from_std(file);因此,遗憾的是,我们需要做一些管道工作才能将其转换为 Stream.多个板条箱已经实现了它;从头开始的方法如下:
使用 tokio_util::codec;让 byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new()).map(|r| r.as_ref().to_vec());byte_stream 是 tokio_util::codec::FramedRead 的一个实例,其中 使用基于我们的解码器的特定项目实现 Stream.因为我们的解码器是 BytesCodec,因此您的流是 Stream.
由于 Playground 不知道 rusoto_core,我无法向您展示完整流程.不过,我可以告诉你,你可以生成一个Stream
How can I upload file to s3 using rusoto, without reading file content to memory (streamed)?
With this code:
use std::fs::File; use std::io::BufReader; use rusoto_core::Region; use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody}; fn main() { let file = File::open("input.txt").unwrap(); let mut reader = BufReader::new(file); let s3_client = S3Client::new(Region::UsEast1); let result = s3_client.put_object(PutObjectRequest { bucket: String::from("example_bucket"), key: "example_filename".to_string(), // this works: // body: Some("example string".to_owned().into_bytes().into()), // this doesn't: body: Some(StreamingBody::new(reader)), ..Default::default() }).sync().expect("could not upload"); }I receive the following error:
error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied --> src/bin/example.rs:18:20 | 18 | body: Some(StreamingBody::new(reader)), | ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>` | = note: required by `rusoto_core::stream::ByteStream::new`
解决方案
Okay. Strap yourself in, this is a fun one.
StreamingBody is an alias for ByteStream, which itself takes a parameter type S: Stream<Item = Bytes, Error = Error> + Send + 'static. In short, it needs to be a stream of bytes.
BufReader, evidently, does not implement this trait, as it predates futures and streams by a long while. There is also no easy conversion to Stream<Item = Bytes> that you can use to implicitly convert into this.
The reason the first (commented) example works is because String::into_bytes().into() will follow the typecast chain: String -> Vec<u8> -> ByteStream thanks to the implementation of From<Vec<u8>> on ByteStream.
Now that we know why this doesn't work, we can fix it. There is a fast way, and then there is a right way. I'll show you both.
The fast wayThe fast (but not optimal) way is simply to call File::read_to_end(). This will fill up a Vec<u8>, which you can then use like you did before:
let mut buf:Vec<u8> = vec![]; file.read_to_end(&mut buf)?; // buf now contains the entire fileThis is inefficient and suboptimal for two reasons:
- read_to_end() is a blocking call. Based on where you are reading the file from, this blocking time may prove unreasonable
- You are required to have more free RAM than you have bytes in your file (+ either 64 or 128 bits for the Vec definition + some extra we don't really care about)
The good way turns your file into a structure implementing AsyncRead. From this, we can then form a Stream.
Since you already have a std::fs::File, we will first convert it into a tokio::fs::File. This implements AsyncRead, which is very important for later:
let tokio_file = tokio::fs::File::from_std(file);From this, we sadly need to do some pipework to get it into a Stream. Multiple crates have implemented it; the way to do so from scratch is the following:
use tokio_util::codec; let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new()) .map(|r| r.as_ref().to_vec());byte_stream is an instance of tokio_util::codec::FramedRead which implements Stream with a specific item based on our decoder. As our decoder is BytesCodec, your stream is therefore Stream<Item = BytesMut>.
As the playground doesn't know rusoto_core, I cannot show you the full flow. I can, however, show you that you can generate a Stream<Item = Vec<u8>, Error = io::Error>, which is the crux of this: play.rust-lang/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133
更多推荐
使用 rusoto 流式上传到 s3
发布评论