1
0
mirror of https://github.com/Kitware/CMake.git synced 2025-10-19 11:18:40 +08:00

server: Add support for connections that aren't event based

This commit is contained in:
Justin Berger
2017-02-25 15:06:34 -07:00
parent 5ddfb6a472
commit cf0ae55dcb
7 changed files with 88 additions and 67 deletions

View File

@@ -14,7 +14,8 @@ struct write_req_t
uv_buf_t buf; uv_buf_t buf;
}; };
void cmConnection::on_alloc_buffer(uv_handle_t* handle, size_t suggested_size, void cmEventBasedConnection::on_alloc_buffer(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) uv_buf_t* buf)
{ {
(void)(handle); (void)(handle);
@@ -22,10 +23,10 @@ void cmConnection::on_alloc_buffer(uv_handle_t* handle, size_t suggested_size,
*buf = uv_buf_init(rawBuffer, static_cast<unsigned int>(suggested_size)); *buf = uv_buf_init(rawBuffer, static_cast<unsigned int>(suggested_size));
} }
void cmConnection::on_read(uv_stream_t* stream, ssize_t nread, void cmEventBasedConnection::on_read(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) const uv_buf_t* buf)
{ {
auto conn = reinterpret_cast<cmConnection*>(stream->data); auto conn = reinterpret_cast<cmEventBasedConnection*>(stream->data);
if (conn) { if (conn) {
if (nread >= 0) { if (nread >= 0) {
conn->ReadData(std::string(buf->base, buf->base + nread)); conn->ReadData(std::string(buf->base, buf->base + nread));
@@ -37,16 +38,16 @@ void cmConnection::on_read(uv_stream_t* stream, ssize_t nread,
delete[](buf->base); delete[](buf->base);
} }
void cmConnection::on_close_delete(uv_handle_t* handle) void cmEventBasedConnection::on_close_delete(uv_handle_t* handle)
{ {
delete handle; delete handle;
} }
void cmConnection::on_close(uv_handle_t*) void cmEventBasedConnection::on_close(uv_handle_t* /*handle*/)
{ {
} }
void cmConnection::on_write(uv_write_t* req, int status) void cmEventBasedConnection::on_write(uv_write_t* req, int status)
{ {
(void)(status); (void)(status);
@@ -56,22 +57,22 @@ void cmConnection::on_write(uv_write_t* req, int status)
delete wr; delete wr;
} }
void cmConnection::on_new_connection(uv_stream_t* stream, int status) void cmEventBasedConnection::on_new_connection(uv_stream_t* stream, int status)
{ {
(void)(status); (void)(status);
auto conn = reinterpret_cast<cmConnection*>(stream->data); auto conn = reinterpret_cast<cmEventBasedConnection*>(stream->data);
if (conn) { if (conn) {
conn->Connect(stream); conn->Connect(stream);
} }
} }
bool cmConnection::IsOpen() const bool cmEventBasedConnection::IsOpen() const
{ {
return this->WriteStream != CM_NULLPTR; return this->WriteStream != CM_NULLPTR;
} }
void cmConnection::WriteData(const std::string& data) void cmEventBasedConnection::WriteData(const std::string& data)
{ {
assert(this->WriteStream); assert(this->WriteStream);
@@ -86,12 +87,7 @@ void cmConnection::WriteData(const std::string& data)
on_write); on_write);
} }
cmConnection::~cmConnection() void cmEventBasedConnection::ReadData(const std::string& data)
{
OnServerShuttingDown();
}
void cmConnection::ReadData(const std::string& data)
{ {
this->RawReadBuffer += data; this->RawReadBuffer += data;
if (BufferStrategy) { if (BufferStrategy) {
@@ -107,21 +103,39 @@ void cmConnection::ReadData(const std::string& data)
} }
} }
void cmConnection::SetServer(cmServerBase* s) cmEventBasedConnection::cmEventBasedConnection(
{ cmConnectionBufferStrategy* bufferStrategy)
Server = s;
}
cmConnection::cmConnection(cmConnectionBufferStrategy* bufferStrategy)
: BufferStrategy(bufferStrategy) : BufferStrategy(bufferStrategy)
{ {
} }
void cmConnection::Connect(uv_stream_t*) void cmEventBasedConnection::Connect(uv_stream_t* server)
{ {
(void)server;
Server->OnConnected(nullptr); Server->OnConnected(nullptr);
} }
void cmEventBasedConnection::OnDisconnect(int onerror)
{
(void)onerror;
this->OnConnectionShuttingDown();
this->Server->OnDisconnect(this);
}
cmConnection::~cmConnection()
{
}
bool cmConnection::OnConnectionShuttingDown()
{
return true;
}
void cmConnection::SetServer(cmServerBase* s)
{
Server = s;
}
void cmConnection::ProcessRequest(const std::string& request) void cmConnection::ProcessRequest(const std::string& request)
{ {
Server->ProcessRequest(this, request); Server->ProcessRequest(this, request);
@@ -133,13 +147,7 @@ bool cmConnection::OnServeStart(std::string* errString)
return true; return true;
} }
void cmConnection::OnDisconnect(int errorCode) bool cmEventBasedConnection::OnConnectionShuttingDown()
{
(void)errorCode;
this->Server->OnDisconnect(this);
}
bool cmConnection::OnServerShuttingDown()
{ {
this->WriteStream->data = nullptr; this->WriteStream->data = nullptr;
this->ReadStream->data = nullptr; this->ReadStream->data = nullptr;

View File

@@ -46,43 +46,57 @@ public:
// TODO: There should be a callback / flag set for errors // TODO: There should be a callback / flag set for errors
}; };
/***
* Abstraction of a connection; ties in event callbacks from libuv and notifies
* the server when appropriate
*/
class cmConnection class cmConnection
{ {
CM_DISABLE_COPY(cmConnection) CM_DISABLE_COPY(cmConnection)
public: public:
cmConnection() {}
virtual void WriteData(const std::string& data) = 0;
virtual ~cmConnection(); virtual ~cmConnection();
virtual bool OnConnectionShuttingDown();
virtual bool IsOpen() const = 0;
virtual void SetServer(cmServerBase* s);
virtual void ProcessRequest(const std::string& request);
virtual bool OnServeStart(std::string* pString);
protected:
cmServerBase* Server = nullptr;
};
/***
* Abstraction of a connection; ties in event callbacks from libuv and notifies
* the server when appropriate
*/
class cmEventBasedConnection : public cmConnection
{
public:
/*** /***
* @param bufferStrategy If no strategy is given, it will process the raw * @param bufferStrategy If no strategy is given, it will process the raw
* chunks as they come in. The connection * chunks as they come in. The connection
* owns the pointer given. * owns the pointer given.
*/ */
cmConnection(cmConnectionBufferStrategy* bufferStrategy = nullptr); cmEventBasedConnection(cmConnectionBufferStrategy* bufferStrategy = nullptr);
virtual void Connect(uv_stream_t* server); virtual void Connect(uv_stream_t* server);
virtual void ReadData(const std::string& data); virtual void ReadData(const std::string& data);
virtual bool OnServeStart(std::string* errString); bool IsOpen() const override;
virtual bool OnServerShuttingDown(); void WriteData(const std::string& data) override;
bool OnConnectionShuttingDown() override;
virtual bool IsOpen() const;
virtual void WriteData(const std::string& data);
virtual void ProcessRequest(const std::string& request);
virtual void SetServer(cmServerBase* s);
virtual void OnDisconnect(int errorCode); virtual void OnDisconnect(int errorCode);
uv_stream_t* ReadStream = nullptr; uv_stream_t* ReadStream = nullptr;
cmServerBase* Server = nullptr;
uv_stream_t* WriteStream = nullptr; uv_stream_t* WriteStream = nullptr;
static void on_close(uv_handle_t* handle); static void on_close(uv_handle_t* handle);

View File

@@ -7,7 +7,7 @@
cmPipeConnection::cmPipeConnection(const std::string& name, cmPipeConnection::cmPipeConnection(const std::string& name,
cmConnectionBufferStrategy* bufferStrategy) cmConnectionBufferStrategy* bufferStrategy)
: cmConnection(bufferStrategy) : cmEventBasedConnection(bufferStrategy)
, PipeName(name) , PipeName(name)
{ {
} }
@@ -26,8 +26,7 @@ void cmPipeConnection::Connect(uv_stream_t* server)
this->ClientPipe = new uv_pipe_t(); this->ClientPipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), this->ClientPipe, 0); uv_pipe_init(this->Server->GetLoop(), this->ClientPipe, 0);
this->ClientPipe->data = static_cast<cmConnection*>(this); this->ClientPipe->data = static_cast<cmEventBasedConnection*>(this);
auto client = reinterpret_cast<uv_stream_t*>(this->ClientPipe); auto client = reinterpret_cast<uv_stream_t*>(this->ClientPipe);
if (uv_accept(server, client) != 0) { if (uv_accept(server, client) != 0) {
uv_close(reinterpret_cast<uv_handle_t*>(client), &on_close_delete); uv_close(reinterpret_cast<uv_handle_t*>(client), &on_close_delete);
@@ -45,7 +44,7 @@ bool cmPipeConnection::OnServeStart(std::string* errorMessage)
{ {
this->ServerPipe = new uv_pipe_t(); this->ServerPipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), this->ServerPipe, 0); uv_pipe_init(this->Server->GetLoop(), this->ServerPipe, 0);
this->ServerPipe->data = static_cast<cmConnection*>(this); this->ServerPipe->data = static_cast<cmEventBasedConnection*>(this);
int r; int r;
if ((r = uv_pipe_bind(this->ServerPipe, this->PipeName.c_str())) != 0) { if ((r = uv_pipe_bind(this->ServerPipe, this->PipeName.c_str())) != 0) {
@@ -63,7 +62,7 @@ bool cmPipeConnection::OnServeStart(std::string* errorMessage)
return cmConnection::OnServeStart(errorMessage); return cmConnection::OnServeStart(errorMessage);
} }
bool cmPipeConnection::OnServerShuttingDown() bool cmPipeConnection::OnConnectionShuttingDown()
{ {
if (this->ClientPipe) { if (this->ClientPipe) {
uv_close(reinterpret_cast<uv_handle_t*>(this->ClientPipe), uv_close(reinterpret_cast<uv_handle_t*>(this->ClientPipe),
@@ -77,5 +76,5 @@ bool cmPipeConnection::OnServerShuttingDown()
this->WriteStream = nullptr; this->WriteStream = nullptr;
this->ReadStream = nullptr; this->ReadStream = nullptr;
return cmConnection::OnServerShuttingDown(); return cmConnection::OnConnectionShuttingDown();
} }

View File

@@ -9,7 +9,7 @@
#include <string> #include <string>
class cmPipeConnection : public cmConnection class cmPipeConnection : public cmEventBasedConnection
{ {
public: public:
cmPipeConnection(const std::string& name, cmPipeConnection(const std::string& name,
@@ -17,7 +17,7 @@ public:
bool OnServeStart(std::string* pString) override; bool OnServeStart(std::string* pString) override;
bool OnServerShuttingDown() override; bool OnConnectionShuttingDown() override;
void Connect(uv_stream_t* server) override; void Connect(uv_stream_t* server) override;

View File

@@ -28,7 +28,7 @@ static void on_walk_to_shutdown(uv_handle_t* handle, void* arg)
{ {
(void)arg; (void)arg;
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
uv_close(handle, &cmConnection::on_close); uv_close(handle, &cmEventBasedConnection::on_close);
} }
} }
@@ -478,7 +478,7 @@ void cmServerBase::StartShutDown()
} }
for (auto& connection : Connections) { for (auto& connection : Connections) {
connection->OnServerShuttingDown(); connection->OnConnectionShuttingDown();
} }
Connections.clear(); Connections.clear();

View File

@@ -7,7 +7,7 @@
cmStdIoConnection::cmStdIoConnection( cmStdIoConnection::cmStdIoConnection(
cmConnectionBufferStrategy* bufferStrategy) cmConnectionBufferStrategy* bufferStrategy)
: cmConnection(bufferStrategy) : cmEventBasedConnection(bufferStrategy)
, Input() , Input()
, Output() , Output()
{ {
@@ -23,13 +23,13 @@ void cmStdIoConnection::SetServer(cmServerBase* s)
this->Input.tty = new uv_tty_t(); this->Input.tty = new uv_tty_t();
uv_tty_init(this->Server->GetLoop(), this->Input.tty, 0, 1); uv_tty_init(this->Server->GetLoop(), this->Input.tty, 0, 1);
uv_tty_set_mode(this->Input.tty, UV_TTY_MODE_NORMAL); uv_tty_set_mode(this->Input.tty, UV_TTY_MODE_NORMAL);
this->Input.tty->data = static_cast<cmConnection*>(this); this->Input.tty->data = static_cast<cmEventBasedConnection*>(this);
this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.tty); this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.tty);
this->Output.tty = new uv_tty_t(); this->Output.tty = new uv_tty_t();
uv_tty_init(this->Server->GetLoop(), this->Output.tty, 1, 0); uv_tty_init(this->Server->GetLoop(), this->Output.tty, 1, 0);
uv_tty_set_mode(this->Output.tty, UV_TTY_MODE_NORMAL); uv_tty_set_mode(this->Output.tty, UV_TTY_MODE_NORMAL);
this->Output.tty->data = static_cast<cmConnection*>(this); this->Output.tty->data = static_cast<cmEventBasedConnection*>(this);
this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.tty); this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.tty);
} else { } else {
usesTty = false; usesTty = false;
@@ -37,13 +37,13 @@ void cmStdIoConnection::SetServer(cmServerBase* s)
this->Input.pipe = new uv_pipe_t(); this->Input.pipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), this->Input.pipe, 0); uv_pipe_init(this->Server->GetLoop(), this->Input.pipe, 0);
uv_pipe_open(this->Input.pipe, 0); uv_pipe_open(this->Input.pipe, 0);
this->Input.pipe->data = static_cast<cmConnection*>(this); this->Input.pipe->data = static_cast<cmEventBasedConnection*>(this);
this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.pipe); this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.pipe);
this->Output.pipe = new uv_pipe_t(); this->Output.pipe = new uv_pipe_t();
uv_pipe_init(this->Server->GetLoop(), this->Output.pipe, 0); uv_pipe_init(this->Server->GetLoop(), this->Output.pipe, 0);
uv_pipe_open(this->Output.pipe, 1); uv_pipe_open(this->Output.pipe, 1);
this->Output.pipe->data = static_cast<cmConnection*>(this); this->Output.pipe->data = static_cast<cmEventBasedConnection*>(this);
this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.pipe); this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.pipe);
} }
} }
@@ -55,9 +55,9 @@ bool cmStdIoConnection::OnServeStart(std::string* pString)
return cmConnection::OnServeStart(pString); return cmConnection::OnServeStart(pString);
} }
bool cmStdIoConnection::OnServerShuttingDown() bool cmStdIoConnection::OnConnectionShuttingDown()
{ {
cmConnection::OnServerShuttingDown(); cmEventBasedConnection::OnConnectionShuttingDown();
if (usesTty) { if (usesTty) {
uv_read_stop(reinterpret_cast<uv_stream_t*>(this->Input.tty)); uv_read_stop(reinterpret_cast<uv_stream_t*>(this->Input.tty));

View File

@@ -32,14 +32,14 @@ private:
/*** /***
* Generic connection over std io interfaces -- tty * Generic connection over std io interfaces -- tty
*/ */
class cmStdIoConnection : public cmConnection class cmStdIoConnection : public cmEventBasedConnection
{ {
public: public:
cmStdIoConnection(cmConnectionBufferStrategy* bufferStrategy); cmStdIoConnection(cmConnectionBufferStrategy* bufferStrategy);
void SetServer(cmServerBase* s) override; void SetServer(cmServerBase* s) override;
bool OnServerShuttingDown() override; bool OnConnectionShuttingDown() override;
bool OnServeStart(std::string* pString) override; bool OnServeStart(std::string* pString) override;