Akka 流:读取多个文件

编程入门 行业动态 更新时间:2024-10-28 14:29:39
本文介绍了Akka 流:读取多个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个文件列表.我要:

I have a list of files. I want:

  • 将所有这些作为单一来源读取.
  • 文件应该按顺序读取.(无循环)
  • 任何时候都不应该要求任何文件完全在内存中.
  • 从文件读取错误应该折叠流.
  • 感觉这应该可行:(Scala,akka-streams v2.4.7)

    It felt like this should work: (Scala, akka-streams v2.4.7)

    val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) .map(bs => bs.utf8String) ) val source = sources.reduce( (a, b) => Sourcebine(a, b)(MergePreferred(_)) ) source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines

    但这会导致编译错误,因为 FileIO 有一个与之关联的物化值,而 Sourcebine 不支持.

    But that results in a compile error since FileIO has a materialized value associated with it, and Sourcebine doesn't support that.

    映射物化值让我想知道文件读取错误是如何处理的,但确实可以编译:

    Mapping the materialized value away makes me wonder how file-read errors get handled, but does compile:

    val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) .map(bs => bs.utf8String) .mapMaterializedValue(f => NotUsed.getInstance()) ) val source = sources.reduce( (a, b) => Sourcebine(a, b)(MergePreferred(_)) ) source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines

    但在运行时抛出 IllegalArgumentException:

    But throws an IllegalArgumentException at runtime:

    java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]

    推荐答案

    为了清楚地模块化不同的关注点,下面的代码没有尽可能简洁.

    The code below is not as terse as it could be, in order to clearly modularize the different concerns.

    // Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String) // given as stream of Paths we read those files and count the number of lines val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right) // Here's our test data source (replace paths with real paths) val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath)) // Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes testFiles runWith lineCounter foreach println

    更多推荐

    Akka 流:读取多个文件

    本文发布于:2023-11-25 05:05:46,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1628447.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:多个   文件   Akka

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!