下面是我的代码,我试图通过 Redis 主题从另一个使用 Redisson 库的应用程序向 Process OutputStream 提供输入,但我无法在控制台上看到任何输出。
是因为来自 Redisson 库的消息被拉到不同的线程中,而 exec 进程在不同的线程中运行,还是因为该进程无法获取 System.in/System.out 以外的其他输出。
如您所见,我正在尝试通过 redis 主题发布命令,因为我的另一个应用程序负责将命令发送到该应用程序,该应用程序已使用 pod 启动了 exec 进程。
这两个应用程序都使用 Spring Boot 2.0。我被困在这里很长时间了,感谢您的帮助。
package com.demo.kubeconnector.service;
import com.google.api.client.util.ByteStreams;
import com.demo.kubeconnector.model.Exec;
import com.demo.kubeconnector.utils.SocResponse;
import com.demo.kubeconnector.utils.WebSocket;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.util.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ExecStartThread extends Thread {
private V1Pod mPod;
private String mContainer;
private RTopic<Exec> mRTopic;
private SimpMessagingTemplate mSimpMessagingTemplate;
private Process mProcess;
private BufferedReader mBufferedReader;
private ByteArrayInputStream mByteArrayInputStream;
private int mListenerId;
private String mToken;
private RBucket<String> mSessionBucket;
public ExecStartThread(RedissonClient redissonClient, RBucket<String> sessionBucket, String session, String topic, V1Pod pod, String container, SimpMessagingTemplate simpMessagingTemplate) {
mPod = pod;
mRTopic = redissonClient.getTopic(topic);
mContainer = container;
mSimpMessagingTemplate = simpMessagingTemplate;
mToken = session;
mSessionBucket = sessionBucket;
}
@Override
public void run() {
String destination = String.format(WebSocket.POD_EXEC, mToken);
try {
log.info(String.format("Starting a thread {%s} for process", Thread.currentThread().getName()));
mProcess = getProcess();
mBufferedReader = new BufferedReader(new InputStreamReader(mProcess.getInputStream()));
mRTopic.addListener(new MessageListener<Exec>() {
@Override
public void onMessage(String channel, Exec msg) {
String command = msg.getCommand();
try {
if (!command.equals("exit")) {
log.info("Executing command in Thread " + Thread.currentThread().getName() + "-->:" + msg);
mByteArrayInputStream = new ByteArrayInputStream(command.getBytes());
ByteStreams.copy(mByteArrayInputStream, mProcess.getOutputStream());
} else {
log.info("Exiting thread.... " + Thread.currentThread().getId());
mSessionBucket.deleteAsync();
mRTopic.removeListener(mListenerId);
IOUtils.closeQuietly(mBufferedReader);
IOUtils.closeQuietly(mByteArrayInputStream);
mProcess.destroy();
interrupt();
}
} catch (IOException ex) {
log.info(String.format("IOException occurred in the SSH topic {%s}", mRTopic.getChannelNames()));
}
}
});
/* Return the created session.
* */
mSimpMessagingTemplate.convertAndSend(destination, SocResponse.ok(mToken));
String line;
while ((line = mBufferedReader.readLine()) != null) {
System.out.println("Thread is " + Thread.currentThread().getName() + "-->:" + line);
mSimpMessagingTemplate.convertAndSend(destination, SocResponse.ok(line));
}
log.info("Buffer reader has ended");
} catch (IOException e) {
log.info(String.format("Exception occurred while getting process ---> \n %s", e.getMessage()), e);
} catch (ApiException e) {
log.error(String.format("APIException occurred while getting process ---> \n %s", e.getResponseBody()), e);
}
}
private Process getProcess() throws IOException, ApiException {
ApiClient client = Config.defaultClient();
client.getHttpClient().setReadTimeout(0l, TimeUnit.MILLISECONDS);
// Configuration.setDefaultApiClient(client);
io.kubernetes.client.Exec exec = new io.kubernetes.client.Exec();
exec.setApiClient(client);
return exec.exec(
mPod,
new String[]{"sh"},
mContainer,
Boolean.TRUE,
Boolean.TRUE);
}
}