1

我在 kafka 应用程序中有一个 gRPC 客户端。这意味着客户端将不断打开和关闭通道。

public class UserAgentClient {

    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private static final Config uaparserConfig = ConfigFactory.load().getConfig(ua);
    private final ManagedChannel channel;
    private final UserAgentServiceGrpc.UserAgentServiceBlockingStub userAgentBlockingStub;

    public UserAgentParserClient() {
        this(ManagedChannelBuilder.forAddress(uaConfig.getString("host"), uaConfig.getInt("port")).usePlaintext());
    }

    public UserAgentClient(ManagedChannelBuilder<?> usePlaintext) {
        channel = usePlaintext.build();
        userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
    }

    public UserAgentParseResponse getUserAgent(String userAgent ) {
        UserAgentRequest request = UserAgentRequest.newBuilder().setUserAgent(userAgent).build();
        UserAgentParseResponse response = null;
        try {
            response = userAgentBlockingStub.parseUserAgent(request);
        } catch(Exception e) {
            logger.warn("An exception has occurred during gRPC call to the user agent.", e.getMessage());
        }
        shutdown();
        return response;
    }

    public void shutdown() {
        try {
            channel.shutdown();
        } catch (InterruptedException ie) {
            logger.warn("Interrupted exception during gRPC channel close", ie);
        }
    }
}

我想知道我是否可以一直保持频道打开?还是每次拨打新电话都必须打开频道?我想知道,因为我正在测试性能,如果我只是保持通道打开,它似乎会大大改善。另一方面,有什么我想念的吗?

4

2 回答 2

0

创建新频道的开销很大,您应该尽可能长时间地保持频道打开。

于 2020-02-06T18:10:31.077 回答
0

由于通道的打开和关闭很昂贵,我channel = usePlaintext.build();完全从我的客户中删除了它,而是在我的 kafka 中打开和关闭它Transformer。在我UserAgentDataEnricher的实现Transformer.

public class UserAgentDataEnricher implements Transformer<byte[], EnrichedData, KeyValue<byte[], EnrichedData>> {

    private UserAgentParserClient userAgentParserClient;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        open();
        // schedule a punctuate() method every 15 minutes
        this.context.schedule(900000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            close();
            open();
            logger.info("Re-opening of user agent channel is initialized");
        });
    }

    @Override
    public void close() {
        userAgentParserClient.shutdown();
    }

    private void open() {
        channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
        userAgentClient = new UserAgentClient(channel);
    }

    ...
}

现在我像这样初始化我的客户:

public UserAgentClient(ManagedChannel channel) {
    this.channel = channel;
    userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
}
于 2020-03-27T12:29:28.510 回答