我正在使用需要关闭的不同资源的应用程序,同时使用反应流。
我有一个基于享元模式的工厂,它保留对对象的引用,它们实现了 AutoCloseable 接口。问题是我在 Autocloseable 类中使用 close() ,这是我的问题:删除对工厂内封闭资源的引用的最佳解决方案是什么?我可以抛出某种事件并在工厂中捕获它,还是在每个可以关闭资源的操作之后我应该遍历引用映射并删除关闭的资源?
为了获得更好的上下文:我正在使用发出目录事件(创建、删除文件/目录)的 reactivex Observable,并且在每个订阅者取消订阅它之后,我正在关闭我正在使用的 WatchService。
编辑#1
我的工厂类如下所示:
public final class Factory {
private final ConcurrentHashMap<String, ReactiveStream> reactiveStreams = new ConcurrentHashMap<>();
public ReactiveStream getReactiveStream(Path path) throws IOException {
ReactiveStream stream = reactiveStreams.get(path.toString());
if (stream != null) return stream;
stream = new ReactiveStream(path);
reactiveStreams.put(path.toString(), stream);
return stream;
}
}
这是我的 ReactiveStream 类的样子:
public class ReactiveStream implements AutoCloseable {
(...)
private WatchService service;
private Observable<Event> observable;
public Observable<Event> getObservable() throws IOException {
(...) // where i create observable
return observable;
}
(...)
@Override
public void close() throws IOException {
service.close();
}
}
如您所见,我有一个工厂,它保留对 ReactiveStream 类的引用,该类在可观察后关闭,将不再被订阅(我这样做的方式是在使用 share 之前使用 doOnUnsubscribe(() -> close()) () 在 observable 上,因此当没有订阅者时,将调用 doOnUnsubscribe)。
我的问题是,如何在工厂关闭后从工厂中删除对已关闭 ReactiveStream 的引用?
编辑#2
observable = Observable.fromCallable(new EventObtainer()).flatMap(Observable::from).subscribeOn(Schedulers.io()).repeat().doOnUnsubscribe(() -> {
try {
close();
} catch (IOException e) {
e.printStackTrace();
}
}).share();
这是我创建可观察对象的方式。EventObtainer 是 ReactiveStream 中的嵌套类,它使用 WatchService 需要在每个订阅者停止订阅后关闭。