1

我想使用 RxJS 监控活动的 chrome 运行时连接,以下工作但必须有一种更清洁的方法。

import { Subject, merge } from "rxjs";
import { scan, map } from "rxjs/operators";

type Port = chrome.runtime.Port;

const onConnect$ = new Subject<Port>();
const onDisconnect$ = new Subject<Port>();

onConnect$.subscribe(port => port.onDisconnect.addListener(port => onDisconnect$.next(port)));

const connectedPorts$ = merge(
  onConnect$.pipe(map(port => ({ type: "connected", port }))),
  onDisconnect$.pipe(map(port => ({ type: "disconnected", port })))
).pipe(
  scan((connections, action) => {
    if (action.type == "connected") return [...connections, action.port];

    return connections.filter(c => c.sender?.id != action.port.sender?.id);
  }, [] as Port[])
);

connectedPorts$.subscribe(conns => console.log("Connected Ports Updated", conns));
4

1 回答 1

2

(我自己没有测试过,但它看起来与 Chrome 运行时 API 一致。)

您当前的方法可能更简洁的一种方法是更好地概括运行时事件流的创建。使用主题(以及在另一个订阅调用中订阅侦听器)会增加复杂性,而且通常是不必要的。

此代码设置了 的两个实例Observable<Port>,一个是刚刚连接的端口,一个是刚刚断开的端口。

const fromRuntimeEvent = (target, eventType) =>
  new Observable((observer) => {
    const listener = (event) => observer.next(event);
    try {
      target[eventType].addListener(listener);
    } catch (e) {
      observer.error(e);
    }
    return () => target[eventType].removeListener(listener);
  });

const connections$ = fromRuntimeEvent(chrome.runtime, 'onConnect');
const disconnectsFor = (port) => fromRuntimeEvent(port, 'onDisconnect');

const disconnections$ = connections$.pipe(
  mergeMap((port) => disconnectsFor(port))
);

从这里开始,scan按照您的方式使用对我来说是有意义的,只要确保也考虑断开连接。

于 2020-01-31T17:35:07.373 回答