2

下面是我的代码,我试图通过 Redis 主题从另一个使用 Redisson 库的应用程序向 Process OutputStream 提供输入,但我无法在控制台上看到任何输出。

是因为来自 Redisson 库的消息被拉到不同的线程中,而 exec 进程在不同的线程中运行,还是因为该进程无法获取 System.in/System.out 以外的其他输出。

如您所见,我正在尝试通过 redis 主题发布命令,因为我的另一个应用程序负责将命令发送到该应用程序,该应用程序已使用 pod 启动了 exec 进程。

这两个应用程序都使用 Spring Boot 2.0。我被困在这里很长时间了,感谢您的帮助。

    package com.demo.kubeconnector.service;

import com.google.api.client.util.ByteStreams;
import com.demo.kubeconnector.model.Exec;
import com.demo.kubeconnector.utils.SocResponse;
import com.demo.kubeconnector.utils.WebSocket;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.util.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ExecStartThread extends Thread {

    private V1Pod mPod;
    private String mContainer;
    private RTopic<Exec> mRTopic;
    private SimpMessagingTemplate mSimpMessagingTemplate;
    private Process mProcess;
    private BufferedReader mBufferedReader;
    private ByteArrayInputStream mByteArrayInputStream;
    private int mListenerId;
    private String mToken;
    private RBucket<String> mSessionBucket;

    public ExecStartThread(RedissonClient redissonClient, RBucket<String> sessionBucket, String session, String topic, V1Pod pod, String container, SimpMessagingTemplate simpMessagingTemplate) {
        mPod = pod;
        mRTopic = redissonClient.getTopic(topic);
        mContainer = container;
        mSimpMessagingTemplate = simpMessagingTemplate;
        mToken = session;
        mSessionBucket = sessionBucket;
    }

    @Override
    public void run() {

        String destination = String.format(WebSocket.POD_EXEC, mToken);

        try {

            log.info(String.format("Starting a thread {%s} for process", Thread.currentThread().getName()));

            mProcess = getProcess();
            mBufferedReader = new BufferedReader(new InputStreamReader(mProcess.getInputStream()));

            mRTopic.addListener(new MessageListener<Exec>() {
                @Override
                public void onMessage(String channel, Exec msg) {
                    String command = msg.getCommand();

                    try {
                        if (!command.equals("exit")) {
                            log.info("Executing command in Thread " + Thread.currentThread().getName() + "-->:" + msg);
                            mByteArrayInputStream = new ByteArrayInputStream(command.getBytes());
                            ByteStreams.copy(mByteArrayInputStream, mProcess.getOutputStream());
                        } else {
                            log.info("Exiting thread.... " + Thread.currentThread().getId());

                            mSessionBucket.deleteAsync();
                            mRTopic.removeListener(mListenerId);

                            IOUtils.closeQuietly(mBufferedReader);
                            IOUtils.closeQuietly(mByteArrayInputStream);
                            mProcess.destroy();

                            interrupt();
                        }
                    } catch (IOException ex) {
                        log.info(String.format("IOException occurred in the SSH topic {%s}", mRTopic.getChannelNames()));
                    }
                }
            });

            /* Return the created session.
             * */
            mSimpMessagingTemplate.convertAndSend(destination, SocResponse.ok(mToken));

            String line;
            while ((line = mBufferedReader.readLine()) != null) {
                System.out.println("Thread is " + Thread.currentThread().getName() + "-->:" + line);
                mSimpMessagingTemplate.convertAndSend(destination, SocResponse.ok(line));
            }

            log.info("Buffer reader has ended");
        } catch (IOException e) {
            log.info(String.format("Exception occurred while getting process ---> \n %s", e.getMessage()), e);
        } catch (ApiException e) {
            log.error(String.format("APIException occurred while getting process ---> \n %s", e.getResponseBody()), e);
        }
    }

    private Process getProcess() throws IOException, ApiException {

        ApiClient client = Config.defaultClient();
        client.getHttpClient().setReadTimeout(0l, TimeUnit.MILLISECONDS);

//        Configuration.setDefaultApiClient(client);

        io.kubernetes.client.Exec exec = new io.kubernetes.client.Exec();
        exec.setApiClient(client);

        return exec.exec(
                mPod,
                new String[]{"sh"},
                mContainer,
                Boolean.TRUE,
                Boolean.TRUE);
    }
}
4

0 回答 0