14

今天我尝试重构这段代码,它从目录中的文件中读取 id,

Set<Long> ids = new HashSet<>();
for (String fileName : fileSystem.list("my-directory")) {
    InputStream stream = fileSystem.openInputStream(fileName);
    BufferedReader br = new BufferedReader(new InputStreamReader(stream));
    String line;
    while ((line = br.readLine()) != null) {
        ids.add(Long.valueOf(line.trim()));
    }
    br.close();
}

使用流 api

Set<Long> ids = fileSystem.list("my-directory").stream()
    .map(fileName -> fileSystem::openInputStream)
    .map(is -> new BufferedReader(new InputStreamReader(is)))
    .flatMap(BufferedReader::lines)
    .map(String::trim)
    .map(Long::valueOf)
    .collect(Collectors.toSet());

然后我发现IO流不会被关闭并且我没有看到关闭它们的简单方法,因为它们是在管道内部创建的。

有任何想法吗?

upd:例子中的FileSystem是HDFS,Files#lines类似的方法不能用。

4

3 回答 3

17

一旦流的所有元素都被消耗完,就可以挂接到流中以“关闭”资源。因此,可以通过以下修改在读取所有行后关闭阅读器:

.flatMap(reader -> reader.lines().onClose(() -> close(reader)))

Whereclose(AutoClosable)处理 IOException。

作为概念证明,以下代码和输出已经过测试:

import java.util.stream.Stream;

class Test {
    public static void main(String[] args) {
        Stream.of(1, 2, 3).flatMap(i ->
                Stream.of(i, i * 2).onClose(() ->
                        System.out.println("Closed!")
                )
        ).forEach(System.out::println);
    }
}

1
2
Closed!
2
4
Closed!
3
6
Closed!
于 2017-04-25T11:39:30.090 回答
4

为什么不简单一点,通过Files.lines

try (Stream<String> s = Files.lines(Paths.get("yourpath" + fileName))) {
    s.map(String::trim)
      .map(Long::valueOf)
      .collect(Collectors.toSet());
}
于 2017-04-25T11:22:42.100 回答
0

我还没有测试过实际的代码,但也许是这些方面的东西?

Set<Long> ids = fileSystem.list("my-directory").stream()
.map(fileName -> fileSystem::openInputStream)
.flatMap(is -> {
    try (BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
      return is.lines().map(String::trim).map(Long::valueOf);
    }
 })
.collect(Collectors.toSet());

当然不如你的漂亮,但我相信它是最接近的,可以让你关闭它。

于 2017-04-25T11:14:19.973 回答