我已经使用标头在 C++ 中编写了基本服务器的代码sys/socket.h
。运行服务器并响应请求的方法如下所示。只要成员变量 (Server::keep_server_
设置为 true ),代码就会在连续循环中运行。收到 JSON 请求后,它会从映射中查找适当的处理程序,调用它以生成 JSON 响应,然后send
/write
将其返回给客户端:
/**
* Run server, listen for client requests, send responses.
*/
void Run() {
int sock_fd, client_sock;
struct sockaddr_in server{}, client{};
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
throw std::runtime_error("Socket creation failed.");
}
int enable = 1;
setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(port_);
// Bind socket.
if (bind(sock_fd, (struct sockaddr *) &server, sizeof(server)) < 0) {
throw std::runtime_error("Socket bind failed.");
}
// Listen and accept incoming connections.
listen(sock_fd, max_clients_);
listening_ = true;
int c = sizeof(struct sockaddr_in);
// Char buffer to which clients requests will be written.
char client_req[2048];
// JSON parser and serializer.
JSONCPP_STRING parse_err;
while ((client_sock = accept(sock_fd,
(struct sockaddr *) &client,
(socklen_t *) &c))
&& keep_server_)
{
// JSON objects to hold parsed request and serialized response.
Json::Value json_req, json_resp;
json_resp["SUCCESS"] = true;
if (client_sock < 0) {
throw std::runtime_error("Socket accept failed.");
}
bzero(client_req, 2048);
long req_size = recv(client_sock, client_req, 2048, 0);
if (req_size > 0) {
std::string client_req_str(client_req);
// Read clientReq as JSON first, write response to json_resp.
if (reader_->parse(client_req_str.c_str(),
client_req_str.c_str() +
client_req_str.length(),
&json_req, &parse_err))
{
try {
// Get JSON response.
json_resp = ProcessRequest(json_req);
} catch (const std::exception &ex) {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(ex.what());
}
} else {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(parse_err);
}
std::string resp = Json::writeString(writer_, json_resp);
if(send(client_sock, strdup(resp.c_str()), MAX_SOCKET_WRITE, 0) == -1)
throw std::runtime_error("Socket write failed.");
} else if (req_size == -1) {
throw std::runtime_error("Socket receive failed.");
}
}
listening_ = false;
close(sock_fd);
}
此代码的功能与预期的数百个请求和响应周期一样,此时send
或write
(我都尝试过,它们都遇到相同的问题)返回-1
,并且进程退出。
为什么socket写了这么久会突然失败?我应该如何处理?
编辑:澄清一下,所有客户端/服务器活动都在本地运行(即它们只是我机器上的两个不同线程)。
编辑 2:errno
是 14,这显然是“错误的文件描述符”。
编辑 3:另一位用户建议提供一个最小的可重现示例。该错误似乎是由重复向服务器发送请求触发的。
为了实现这一点,我将首先提供我的服务器类的完整代码,这是一个将某个类作为模板参数的模板类。服务器在其构造中被传递一个此类的实例以及该类中的许多std::mem_fn
s,它绑定到某些命令类型。服务器随后通过在提供的模板参数类的实例上调用相关处理程序来处理请求:
[SERVER.H]
#pragma once
#ifndef CHORD_FINAL_SERVER
#define CHORD_FINAL_SERVER
/// When reading from / writing to sockets, it's best to just read/write as many chars as possible.
/// We'll just define this for as a macro.
#define MAX_SOCKET_WRITE 999999
#include <json/json.h>
#include <map>
#include <string>
#include <functional>
#include <iostream>
#include <cstring>
#include <netinet/in.h>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
#include <cerrno>
#include <functional>
/**
* A simple server which responds to requests using a pre-defined set of methods.
*
* @tparam RequestHandler Type of the method responding to requests.
* @tparam RequestClass The class for which Request Handler is a method.
*/
template<class RequestHandler, class RequestClass>
class Server {
public:
/**
* Initialize server object. (Does not run server or listen for reqs.)
*
* @param port Port on which to run server.
* @param commands A map of request strings (sent by clients) to function ptrs
* to handle them.
*/
Server(int port, int max_clients,
const std::map<std::string, RequestHandler> *commands,
RequestClass *request_class_inst) :
port_(port),
max_clients_(max_clients),
commands_(*commands),
request_class_inst_(request_class_inst),
keep_server_(true),
listening_(false),
reader_((new Json::CharReaderBuilder)->newCharReader()) {}
/**
* Run server, listen for client requests, send responses.
*/
void Run() {
int sock_fd, client_sock;
struct sockaddr_in server{}, client{};
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
throw std::runtime_error("Socket creation failed.");
}
int enable = 1;
setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(port_);
// Bind socket.
if (bind(sock_fd, (struct sockaddr *) &server, sizeof(server)) < 0) {
throw std::runtime_error("Socket bind failed.");
}
// Listen and accept incoming connections.
if(listen(sock_fd, max_clients_) < 0)
throw std::runtime_error("Listen failed");
listening_ = true;
int c = sizeof(struct sockaddr_in);
// Char buffer to which clients requests will be written.
char client_req[2048];
// JSON parser and serializer.
JSONCPP_STRING parse_err;
while ((client_sock = accept(sock_fd,
(struct sockaddr *) &client,
(socklen_t *) &c))
&& keep_server_)
{
// JSON objects to hold parsed request and serialized response.
Json::Value json_req, json_resp;
json_resp["SUCCESS"] = true;
if (client_sock < 0) {
throw std::runtime_error("Socket accept failed.");
}
bzero(client_req, 2048);
long req_size = recv(client_sock, client_req, 2048, 0);
if (req_size > 0) {
std::string client_req_str(client_req);
// Read clientReq as JSON first, write response to json_resp.
if (reader_->parse(client_req_str.c_str(),
client_req_str.c_str() +
client_req_str.length(),
&json_req, &parse_err))
{
try {
// Get JSON response.
json_resp = ProcessRequest(json_req);
} catch (const std::exception &ex) {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(ex.what());
}
} else {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(parse_err);
}
std::string resp = Json::writeString(writer_, json_resp);
errno = 0;
if(send(client_sock, strdup(resp.c_str()), MAX_SOCKET_WRITE, 0) == -1) {
std::cout << "ERRNO " << errno << std::endl;
throw std::runtime_error("Socket write failed.");
}
} else if (req_size == -1) {
throw std::runtime_error("Socket receive failed.");
}
if(close(client_sock) < 0)
throw std::runtime_error("Socket close error");
}
listening_ = false;
close(sock_fd);
}
/**
* Run server as daemon.
*/
void RunInBackground()
{
std::thread t([this] { Run(); });
t.detach();
}
/**
* Kill server, whether it runs in a detached thread or the main one.
*/
void Kill()
{
keep_server_ = false;
}
/// Set to true when server has begun listening.
bool listening_;
protected:
/// Since the commands in commands are methods, this is the "this" we pass to them.
RequestClass *request_class_inst_;
/// Port on which server runs.
const int port_;
/// Maximum clients for which server will listen.
const int max_clients_;
/// When "true", server thread will continue running in background. When set
/// to 0, server dies.
bool keep_server_;
/// A map of potential server commands (sent by clients) to RequestHandlers
/// (func ptrs).
std::map<std::string, RequestHandler> commands_;
/**
* Take a client's JSON request, generate an appropriate JSON response.
*
* @param request JSON object outlining request from a client to this server.
* @return a JSON object to be transmitted as a response to the client's req,
* with a key "SUCCESS" set to either 1 or 0 to indicate whether
* request was valid and succesffully executed, along with any
* other relevant return info from the request.
*/
Json::Value ProcessRequest(Json::Value request)
{
Json::Value response;
std::string command = request["COMMAND"].asString();
// If command is not valid, give a response with an error.
if(commands_.find(command) == commands_.end()) {
response["SUCCESS"] = false;
response["ERRORS"] = "Invalid command.";
}
// Otherwise, run the relevant handler.
else {
RequestHandler handler = commands_.at(command);
response = handler(*request_class_inst_, request);
}
return response;
}
private:
/// Reads JSON.
const std::unique_ptr<Json::CharReader> reader_;
/// Writes JSON.
Json::StreamWriterBuilder writer_;
};
#endif
我还定义了一个基本的客户端类,如下所示:
[CLIENT.H]
#pragma once
#ifndef CHORD_FINAL_CLIENT_H
#define CHORD_FINAL_CLIENT_H
#include <json/json.h>
class Client {
public:
/**
* Constructor, initializes JSON parser and serializer.
*/
Client();
/**
* Send a request to a server at a given port-IP pairing.
*
* @param recipient_ip IP to send request to.
* @param port Port to send request to.
* @param request JSON object detailing request to server.
* @return JSON response from server or throw an error.
*/
Json::Value make_request(const char *recipient_ip, int port, const Json::Value &request);
/**
* Determine whether or not a node is alive.
*
* @param ip_addr IP address of node to ping.
* @param port Port of node to ping
* @return Value of whether or not a server is running on ip_addr:port.
*/
static bool is_alive(const char *ip_addr, int port);
protected:
/// Reads JSON.
const std::unique_ptr<Json::CharReader> reader_;
/// Writes JSON.
Json::StreamWriterBuilder writer_;
};
#endif //CHORD_FINAL_CLIENT_H
[CLIENT.CPP]
#include "client.h"
#include <iostream>
#include <arpa/inet.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <thread>
Client::Client()
: reader_((new Json::CharReaderBuilder)->newCharReader())
{}
Json::Value Client::make_request(const char *recipient_ip, int port,
const Json::Value &request)
{
int sock_fd;
struct sockaddr_in destination{};
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
throw std::runtime_error("Socket creation failed");
}
destination.sin_addr.s_addr = inet_addr(recipient_ip);
destination.sin_family = AF_INET;
destination.sin_port = htons(port);
if (connect(sock_fd, (struct sockaddr *) &destination, sizeof(destination)) < 0) {
close(sock_fd);
throw std::runtime_error("Host " + std::string(recipient_ip) +
+ ":" + std::to_string(port) + " down.");
}
std::string serialized_req = Json::writeString(writer_, request);
if (send(sock_fd, serialized_req.c_str(), strlen(serialized_req.c_str()), 0) < 0) {
close(sock_fd);
throw std::runtime_error("Socket write failed.");
}
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
char resp_buff[2048];
if (recv(sock_fd, resp_buff, 2048, 0) < 0) {
throw std::runtime_error("Socket receive error.");
}
Json::Value json_resp;
JSONCPP_STRING parse_err;
std::string resp_str(resp_buff);
if (reader_->parse(resp_str.c_str(), resp_str.c_str() + resp_str.length(),
&json_resp, &parse_err))
{
close(sock_fd);
return json_resp;
}
throw std::runtime_error("Error parsing response.");
}
bool Client::is_alive(const char *ip_addr, int port)
{
int sock_fd;
struct sockaddr_in destination{};
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
throw std::runtime_error("Socket creation failed");
}
destination.sin_addr.s_addr = inet_addr(ip_addr);
destination.sin_family = AF_INET;
destination.sin_port = htons(port);
if (connect(sock_fd, (struct sockaddr *) &destination, sizeof(destination)) < 0) {
return false;
}
return true;
}
要重现我的问题,您只需要使用旨在处理 JSON 请求的方法定义基本服务器类型:
/// NOTE: This class exists exclusively for unit testing.
class RequestClass {
public:
/**
* Initialize class with value n to add sub from input values.
*
* @param n Value to add/sub from input values.
*/
explicit RequestClass(int n) : n_(n) {}
/// Value to add/sub from
int n_;
/**
* Add n to value in JSON request.
*
* @param request JSON request with field "value".
* @return JSON response containing modified field "value" = [original_value] + n.
*/
[[nodiscard]] Json::Value add_n(const Json::Value &request) const
{
Json::Value resp;
resp["SUCCESS"] = true;
// If value is present in request, return value + 1, else return error.
if (request.get("VALUE", NULL) != NULL) {
resp["VALUE"] = request["VALUE"].asInt() + this->n_;
} else {
resp["SUCCESS"] = false;
resp["ERRORS"] = "Invalid value.";
}
return resp;
}
/**
* Sun n from value in JSON request.
*
* @param request JSON request with field "value".
* @return JSON response containing modified field "value" = [original_value] - n.
*/
[[nodiscard]] Json::Value sub_n(const Json::Value &request) const
{
Json::Value resp, value;
resp["SUCCESS"] = true;
// If value is present in request, return value + 1, else return error.
if (request.get("VALUE", NULL) != NULL) {
resp["VALUE"] = request["VALUE"].asInt() - this->n_;
} else {
resp["SUCCESS"] = false;
resp["ERRORS"] = "Invalid value.";
}
return resp;
}
};
然后,实例化一个服务器映射方法到命令字符串。向它发送大量请求,并且在不到一秒的时间内,send
orwrite
将返回 -1,导致程序抛出运行时错误:
Server<RequestClassMethod, RequestClass> *server_;
auto *request_inst = new RequestClass(1);
std::map<std::string, RequestClassMethod> commands {{"ADD_1", std::mem_fn(&RequestClass::add_n)},
{"SUB_1", std::mem_fn(&RequestClass::sub_n)}};
server_ = new Server<RequestClassMethod, RequestClass>(5000, 50, &commands, request_inst);
server_->RunInBackground();
// Since the server is running in a new thread, spin until it completes setup.
while(! server_->listening_) {}
Client *request_maker_ = new Client;
// Formulate request asking for value of 1 - 1.
Json::Value sub_one_req;
sub_one_req["COMMAND"] = "SUB_1";
sub_one_req["VALUE"] = 1;
// Send request, expect value of 0 and successful return code.
Json::Value sub_one_resp = request_maker_->make_request("127.0.0.1", 5000, sub_one_req);
while(true) {
Json::Value sub_one_resp = request_maker_->make_request("127.0.0.1", 5000, sub_one_req);
}
希望这足以重现。对不起,如果这个问题以前格式不正确。
编辑 4:我想到这可能是与每个程序的最大文件描述符有关的问题。但是,我的程序在失败之前恰好完成了 2031 个请求-响应周期。这似乎不符合最大文件描述符上限。