#ifdef _WIN32
# define _CRT_SECURE_NO_WARNINGS
#else
# include <string.h>
#endif
#include "async_network.hpp"
#include <sstream>
#include "../common/logging.hpp"
#include "../common/compat.hpp"
#ifdef _WIN32
# define close closesocket
# include <time.h>
# if SPRAWL_COMPILER_MSVC || defined(_MSC_EXTENSIONS)
# define DELTA_EPOCH_IN_MICROSECS 11644473600000000Ui64
# else
# define DELTA_EPOCH_IN_MICROSECS 11644473600000000ULL
# endif
namespace
{
int gettimeofday(struct timeval* tv, void* unusedArg)
{
if(tv == nullptr)
return 0;
FILETIME ft;
unsigned __int64 asInt = 0;
GetSystemTimeAsFileTime(&ft);
asInt = ft.dwHighDateTime;
asInt <<= 32;
asInt |= ft.dwLowDateTime;
asInt -= DELTA_EPOCH_IN_MICROSECS;
asInt /= 10;
tv->tv_sec = (long)(asInt / 1000000UL);
tv->tv_usec = (long)(asInt % 1000000UL);
return 0;
}
}
#include <WinBase.h>
#include <Winsock2.h>
#endif
namespace sprawl
{
namespace async_network
{
namespace
{
static void PrintLastError(char const* prefixText)
{
# ifdef _WIN32
char buf[512];
FormatMessageA( FORMAT_MESSAGE_FROM_SYSTEM, NULL, WSAGetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), buf, 512, NULL);
SPRAWL_LOG_TRACE( "%s: %s\n", prefixText, buf );
# else
SPRAWL_LOG_TRACE("%s: %s\n", prefixText, strerror(errno));
# endif
}
}
SOCKET Connection::GetDescriptor()
{
return m_desc;
}
std::string Connection::GetHostname()
{
char buf[256];
inet_ntop( AF_INET, &((sockaddr_in*)&m_dest)->sin_addr, buf, sizeof(buf) );
return buf;
}
int Connection::GetPort()
{
return ((sockaddr_in*)&m_dest)->sin_port;
}
void Connection::Send(std::string const& data, SendCallback onSendFunction /*= nullptr*/)
{
//Store this data to be sent later on the network thread
std::lock_guard<std::mutex> lock(m_outDataMutex);
SPRAWL_LOG_TRACE("Request to send %d bytes of data, adding to queue", int(data.length()));
m_outData.push_back(std::make_pair(data, onSendFunction));
if( m_parentClientSocket )
{
m_parentClientSocket->NotifySend();
}
else if( m_parentServerSocket )
{
m_parentServerSocket->NotifySend();
}
}
Connection::Connection(ServerSocket* parent, SOCKET desc_, struct sockaddr* addr, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
: m_desc(desc_)
, m_closeSocket(false)
, m_partialPacket()
, m_onReceive(onReceive_)
, m_validatePacket(validatePacket_)
, m_parentServerSocket(parent)
, m_parentClientSocket(nullptr)
{
if(addr != nullptr)
{
m_dest = *addr;
}
}
Connection::Connection(ClientSocket* parent, SOCKET desc_, struct sockaddr* addr, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
: m_desc(desc_)
, m_closeSocket(false)
, m_partialPacket()
, m_onReceive(onReceive_)
, m_validatePacket(validatePacket_)
, m_parentServerSocket(nullptr)
, m_parentClientSocket(parent)
{
if(addr != nullptr)
{
m_dest = *addr;
}
}
void Connection::Send()
{
//Send all the things
std::vector<std::pair<std::string, SendCallback>> outData;
{
std::lock_guard<std::mutex> lock(m_outDataMutex);
outData = std::move( m_outData );
}
for(auto& data : outData)
{
SPRAWL_LOG_TRACE("TCP: Performing send with %d bytes of data", int(data.first.length()));
int ret = send(m_desc, data.first.c_str(), (int)data.first.length(), 0);
if(ret == -1)
{
PrintLastError("TCP: Send failed");
}
if(data.second)
{
data.second();
}
}
}
int Connection::Recv()
{
if(m_closeSocket)
{
close(m_desc);
return 0;
}
int ret;
char buf[32768];
char* pbuf = buf;
SPRAWL_LOG_TRACE("TCP: Told there was data on socket %d, receiving.", m_desc);
ret = recv(m_desc, pbuf, 32768, 0);
SPRAWL_LOG_TRACE("TCP: Received %d bytes of data on socket %d.", ret, m_desc);
if(ret <= 0)
{
if(ret == -1)
{
PrintLastError("TCP: Recv failed");
}
return ret;
}
if(m_onReceive)
{
//If there's no onReceive callback, what can we do? Nothing.
m_partialPacket.append(buf, ret);
const char* cPacket = m_partialPacket.c_str();
int packetEnd = (int)m_partialPacket.length();
while(packetEnd > 0)
{
int newEnd = packetEnd;
//if we don't have a validator we'll just give them the full packet.
if(m_validatePacket)
{
int totalLength = -1;
newEnd = m_validatePacket(cPacket, packetEnd, totalLength);
if(totalLength > 0 && totalLength < (int)m_partialPacket.capacity())
{
m_partialPacket.reserve(totalLength);
}
}
if(newEnd > 0)
{
SPRAWL_LOG_TRACE("TCP: Client code informs of complete packet, calling receive callback.");
m_onReceive(shared_from_this(), cPacket, newEnd);
cPacket += newEnd;
packetEnd -= newEnd;
m_partialPacket = std::string(cPacket, packetEnd);
cPacket = m_partialPacket.c_str();
}
else
{
SPRAWL_LOG_TRACE("TCP: Client code informs packet is incomplete, returning and waiting for more.");
break;
}
}
}
return ret;
}
void Connection::Close()
{
SPRAWL_LOG_TRACE("Socket close: %d", m_desc);
if( m_parentClientSocket )
{
m_parentClientSocket->Close();
}
else if( m_parentServerSocket )
{
m_parentServerSocket->CloseConnection( shared_from_this() );
}
}
/*virtual*/ void UDPConnection::Send(std::string const& str, FailType behavior, SendCallback callback /*= nullptr*/) /*override final*/
{
SPRAWL_LOG_TRACE("UDP: Request to send %d bytes of data, adding to queue", int(str.length()));
SendPacketWithID(str, behavior, ++m_currentId, callback);
if( m_parentClientSocket )
{
m_parentClientSocket->NotifySend();
}
else if( m_parentServerSocket )
{
m_parentServerSocket->NotifySend();
}
}
UDPConnection::UDPConnection(ServerSocket* parent, SOCKET desc_, struct sockaddr* addr, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
: Connection(parent, desc_, addr, onReceive_, validatePacket_)
, m_packets()
, m_highId(-1)
, m_currentId(0)
, m_slen(sizeof(sockaddr_in))
{
gettimeofday(&m_lastRcvd, nullptr);
m_lastSent.tv_sec = 0;
m_lastSent.tv_usec = 0;
}
UDPConnection::UDPConnection(ServerSocket* parent, SOCKET desc_, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
: Connection(parent, desc_, nullptr, onReceive_, validatePacket_)
, m_packets()
, m_highId(-1)
, m_currentId(0)
, m_slen(sizeof(sockaddr_in))
{
m_lastRcvd.tv_sec = 0;
m_lastRcvd.tv_usec = 0;
m_lastSent.tv_sec = 0;
m_lastSent.tv_usec = 0;
}
UDPConnection::UDPConnection(ClientSocket* parent, SOCKET desc_, struct sockaddr* addr, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
: Connection(parent, desc_, addr, onReceive_, validatePacket_)
, m_packets()
, m_highId(-1)
, m_currentId(0)
, m_slen(sizeof(sockaddr_in))
{
gettimeofday(&m_lastRcvd, nullptr);
m_lastSent.tv_sec = 0;
m_lastSent.tv_usec = 0;
}
UDPConnection::UDPConnection(ClientSocket* parent, SOCKET desc_, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
: Connection(parent, desc_, nullptr, onReceive_, validatePacket_)
, m_packets()
, m_highId(-1)
, m_currentId(0)
, m_slen(sizeof(sockaddr_in))
{
m_lastRcvd.tv_sec = 0;
m_lastRcvd.tv_usec = 0;
m_lastSent.tv_sec = 0;
m_lastSent.tv_usec = 0;
}
/*virtual*/ void UDPConnection::Send() /*override final*/
{
std::unordered_map<int32_t, std::pair<packet, SendCallback> > outPackets;
{
std::lock_guard<std::mutex> lock(m_outDataMutex);
outPackets = std::move( m_outPackets );
}
std::lock_guard<std::mutex> lock(m_packetMutex);
for(auto& kvp: outPackets)
{
SPRAWL_LOG_TRACE("UDP: Performing send with %d bytes of data", int(kvp.second.first.m_content.length()));
//Send the packet...
std::string content = kvp.second.first.m_header + kvp.second.first.m_content;
int ret = sendto(m_desc, content.c_str(), (int)content.length(), 0, &m_dest, m_slen);
if(ret == -1)
{
PrintLastError("UDP: Send failed");
}
if(kvp.second.second)
{
kvp.second.second();
}
if(kvp.second.first.m_behavior != FailType::ignore)
{
if(m_packets.count(kvp.first))
{
m_packets.at(kvp.first) = kvp.second.first;
}
else
{
m_packets.insert(std::make_pair(kvp.first, kvp.second.first));
}
}
}
if(!outPackets.empty())
{
gettimeofday(&m_lastSent, nullptr);
}
}
/*virtual*/ int UDPConnection::Recv() /*override final*/
{
int ret;
char abuf[32768];
char* buf = abuf;
//Check the data to see where it's from. Don't pull it yet.
ret = recvfrom(m_desc, buf, 32768, MSG_PEEK, (sockaddr*)&m_src, &m_slen);
if(ret == -1)
{
PrintLastError("UDP: recv failed");
return ret;
}
if(m_lastRcvd.tv_sec == 0 && m_lastRcvd.tv_usec == 0)
{
//New connection. Remember who's on the other end.
m_dest = m_src;
}
SPRAWL_LOG_TRACE("UDP: Receiving %d bytes of data on socket %d.", ret, m_dest);
//If we're not pulling from the person who's actually on the other end of this connection, ignore the data.
if(((sockaddr_in*)&m_dest)->sin_addr.s_addr == ((sockaddr_in*)&m_src)->sin_addr.s_addr
&& ((sockaddr_in*)&m_dest)->sin_port == ((sockaddr_in*)&m_src)->sin_port)
{
ret = recvfrom(m_desc, buf, 32768, 0, (sockaddr*)&m_src, &m_slen);
int32_t id, ack;
uint32_t bits;
const int32_t* header = reinterpret_cast<const int32_t*>(buf);
id = header[0];
ack = header[1];
bits = header[2];
gettimeofday(&m_lastRcvd, nullptr);
buf += (3 * sizeof(uint32_t));
ret -= (3 * sizeof(uint32_t));
if(id > m_highId)
{
m_highId = id;
}
if(ack >= 0)
{
std::lock_guard<std::mutex> dataLock(m_outDataMutex);
std::lock_guard<std::mutex> packetLock(m_packetMutex);
if(m_packets.count(ack))
{
m_packets.erase(ack);
}
if(m_outPackets.count(ack))
{
m_outPackets.erase(ack);
}
for(int i=0; i<32; i++)
{
int ackid = ack - i - 1;
if(ackid < 0)
{
break;
}
if((bits & (1 << i)) != 0)
{
if(m_packets.count(ackid))
{
m_packets.erase(ackid);
}
if(m_outPackets.count(ackid))
{
m_outPackets.erase(ackid);
}
}
}
fflush(stdout);
}
if(ret <= 0)
{
return ret;
}
if(m_received.count(id))
{
//We already received this packet, so we can ignore it now.
return ret;
}
m_received.insert(id);
if(m_onReceive)
{
//If there's no onReceive callback, what can we do? Nothing.
m_partialPacket.append(buf, ret);
const char* cPacket = m_partialPacket.c_str();
int packetEnd = (int)m_partialPacket.length();
while(packetEnd > 0)
{
int newEnd = packetEnd;
//if we don't have a validator we'll just give them the full packet.
if(m_validatePacket)
{
int totalLength = -1;
newEnd = m_validatePacket(cPacket, packetEnd, totalLength);
if(totalLength > 0 && totalLength < (int)m_partialPacket.capacity())
{
m_partialPacket.reserve(totalLength);
}
}
if(newEnd > 0)
{
m_onReceive(shared_from_this(), cPacket, newEnd);
cPacket += newEnd;
packetEnd -= newEnd;
m_partialPacket = std::string(cPacket, packetEnd);
cPacket = m_partialPacket.c_str();
}
else
{
break;
}
}
}
return ret;
}
//-2 indicates a valid request, but invalid for this connection. Move on and try the next connection.
return -2;
}
bool UDPConnection::CheckClosed()
{
struct timeval now;
gettimeofday(&now, nullptr);
int secs = now.tv_sec - m_lastRcvd.tv_sec;
int usecs = now.tv_usec - m_lastRcvd.tv_usec;
while(usecs < 0)
{
usecs += 1000000;
secs -= 1;
}
if((m_lastRcvd.tv_sec != 0 || m_lastRcvd.tv_usec != 0) && secs >= 5)
{
return true;
}
return false;
}
void UDPConnection::SendKeepAlive()
{
struct timeval now;
gettimeofday(&now, nullptr);
int secs, usecs;
std::lock_guard<std::mutex> lock(m_packetMutex);
auto it = m_packets.begin();
while( it != m_packets.end() )
{
secs = now.tv_sec - it->second.m_sentTime.tv_sec;
int usecs = now.tv_usec - it->second.m_sentTime.tv_usec;
while(usecs < 0)
{
usecs += 1000000;
secs -= 1;
}
//Resend any packet that hasn't been ACKed for a full second or more.
if(secs >= 1)
{
SendPacketWithID(it->second.m_content, FailType::resend, it->first, nullptr);
}
++it;
}
secs = now.tv_sec - m_lastSent.tv_sec;
usecs = now.tv_usec - m_lastSent.tv_usec;
while(usecs < 0)
{
usecs += 1000000;
secs -= 1;
}
//Pulse four times a second.
if(secs >= 1 || usecs >= 250000)
{
Send("", FailType::ignore);
}
}
UDPConnection::packet::packet(uint32_t _id, FailType _behavior, std::string const& _content, std::string const& header_)
: m_ID(_id)
, m_behavior(_behavior)
, m_content(_content)
, m_header(header_)
{
gettimeofday(&m_sentTime, nullptr);
}
UDPConnection::packet::packet(UDPConnection::packet&& other)
: m_ID(std::move(other.m_ID))
, m_behavior(std::move(other.m_behavior))
, m_content(std::move(other.m_content))
, m_header(std::move(other.m_header))
{
}
UDPConnection::packet::packet(UDPConnection::packet const& other)
: m_ID(other.m_ID)
, m_behavior(other.m_behavior)
, m_content(other.m_content)
, m_header(other.m_header)
{
}
UDPConnection::packet& UDPConnection::packet::operator=(UDPConnection::packet const& other)
{
m_ID = other.m_ID;
m_behavior = other.m_behavior;
m_content = other.m_content;
m_header = other.m_header;
return *this;
}
void UDPConnection::SendPacketWithID(std::string const& str, FailType behavior, int32_t sendid, SendCallback callback)
{
char header[3*sizeof(uint32_t)];
char* ptr = header;
//Construct header: ID, ACK, Ack bits
memcpy(ptr, &sendid, sizeof(uint32_t));
int32_t id = m_highId;
memcpy(ptr+sizeof(uint32_t), &id, sizeof(uint32_t));
uint32_t bits = 0;
if(id >= 0)
{
std::vector<int> ids_to_erase;
for(auto& rcvd : m_received)
{
if(rcvd == id)
{
continue;
}
int bit = id - rcvd - 1;
//Old bits we don't care about.
if(bit > 31)
{
ids_to_erase.push_back(rcvd);
continue;
}
bits |= (1 << bit);
}
for(auto& rcvd : ids_to_erase)
{
m_received.erase(rcvd);
}
}
memcpy(ptr+(sizeof(uint32_t)*2), &bits, sizeof(uint32_t));
//Create a packet from header + str...
std::string headerStr(header, 3*sizeof(uint32_t));
{
std::lock_guard<std::mutex> lock(m_outDataMutex);
m_outPackets.insert(std::make_pair(sendid, std::make_pair(packet(sendid, behavior, str, headerStr), callback)));
}
}
#define SOCK_ERROR(errorStr) do{ m_lastError = errorStr; return false; }while(false)
ServerSocket::ServerSocket(const ConnectionType connectionType)
: m_inSock(-1)
, m_onConnect(nullptr)
, m_onClose(nullptr)
, m_onReceive(nullptr)
, m_packetValidator(nullptr)
, m_running(false)
, m_connections()
, m_inSet()
, m_excSet()
, m_hints()
, m_servInfo(nullptr)
, m_sendThread()
, m_recvThread()
, m_mtx()
, m_sendLock()
, m_connectionType(connectionType)
, m_lastError(nullptr)
, m_sendReady(false)
{
memset(&m_hints, 0, sizeof m_hints);
m_hints.ai_family = AF_UNSPEC;
if(m_connectionType == ConnectionType::TCP)
{
m_hints.ai_socktype = SOCK_STREAM;
}
else
{
m_hints.ai_socktype = SOCK_DGRAM;
}
m_hints.ai_flags = AI_PASSIVE;
}
void ServerSocket::SetOnReceive(ReceiveCallback c)
{
m_onReceive = c;
}
void ServerSocket::SetOnConnect(ConnectionCallback c)
{
m_onConnect = c;
}
void ServerSocket::SetOnClose(ConnectionCallback c)
{
m_onClose = c;
}
void ServerSocket::SetPacketValidator(PacketValidationCallback c)
{
m_packetValidator = c;
}
bool ServerSocket::listen(int port)
{
if( m_inSock != -1 )
{
SOCK_ERROR("Socket already open!");
}
//Ports lower than 1024 require root access, just not going to support them
if( port < 1024 || port > 65535 )
{
SOCK_ERROR("Port out of range.");
}
//Get the localhost address info for the requested port
std::stringstream s;
s << port;
int status = getaddrinfo(nullptr, s.str().c_str(), &m_hints, &m_servInfo);
if( status != 0 )
{
SOCK_ERROR(gai_strerror(status));
}
//Open the socket
m_inSock = socket(m_servInfo->ai_family, m_servInfo->ai_socktype, m_servInfo->ai_protocol);
if( m_inSock == -1 )
{
PrintLastError("Could not open socket");
return false;
}
int yes = 1;
#ifndef _WIN32
setsockopt(m_inSock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
#else
int no = 0;
setsockopt(m_inSock, SOL_SOCKET, SO_REUSEADDR, (char*) &yes, sizeof(int));
setsockopt(m_inSock, IPPROTO_IPV6, IPV6_V6ONLY, (char*) &no, sizeof(int));
#endif
//Bind the port
if( ::bind(m_inSock, m_servInfo->ai_addr, (int)m_servInfo->ai_addrlen) == -1 )
{
SOCK_ERROR("Port already in use.");
}
//Open the port for incoming connections
::listen(m_inSock, 5);
m_running = true;
//And start up the network thread to actually handle them
m_sendThread = std::thread(&ServerSocket::SendThread, this );
m_recvThread = std::thread(&ServerSocket::RecvThread, this );
return true;
}
void ServerSocket::Close()
{
if(m_running)
{
m_running = false;
m_sendNotifier.notify_one();
m_sendThread.join();
close(m_inSock);
m_recvThread.join();
m_inSock = -1;
freeaddrinfo(m_servInfo);
}
}
ServerSocket::~ServerSocket()
{
Close();
}
std::vector<ConnectionWPtr > ServerSocket::GetConnections()
{
std::lock_guard<std::mutex> lock(m_mtx);
std::vector< ConnectionWPtr > ret;
for( auto& connection : m_connections )
{
ret.push_back(connection);
}
return std::move(ret);
}
ConnectionWPtr ServerSocket::GetConnection(int i)
{
std::lock_guard<std::mutex> lock(m_mtx);
return m_connections[i];
}
ConnectionWPtr ServerSocket::GetConnectionByDesc(int d)
{
std::lock_guard<std::mutex> lock(m_mtx);
for( auto& connection : m_connections )
{
if (connection->GetDescriptor() == d)
{
return connection;
}
}
return ConnectionWPtr();
}
ConnectionWPtr ServerSocket::GetConnectionByPort(int p)
{
std::lock_guard<std::mutex> lock(m_mtx);
for( auto& connection : m_connections )
{
if (connection->GetPort() == p)
{
return connection;
}
}
return ConnectionWPtr();
}
size_t ServerSocket::GetNumConnections()
{
std::lock_guard<std::mutex> lock(m_mtx);
return m_connections.size();
}
void ServerSocket::CloseConnection(ConnectionPtr c)
{
if(!c)
{
return;
}
std::lock_guard<std::mutex> lock(m_mtx);
for( auto it = m_connections.begin(); it != m_connections.end(); it++ )
{
if( *it == c )
{
if(m_connectionType == ConnectionType::TCP)
{
close(c->GetDescriptor());
}
if(m_onClose)
{
m_onClose(c);
}
m_connections.erase(it);
return;
}
}
}
void ServerSocket::SendThread()
{
while(m_running)
{
{
std::unique_lock<std::mutex> lock(m_sendLock);
while(!m_sendReady)
{
if(m_connectionType == ConnectionType::UDP)
{
m_sendNotifier.wait_for( lock, std::chrono::milliseconds(250) );
}
else
{
m_sendNotifier.wait( lock );
}
}
m_sendReady = false;
}
std::lock_guard<std::mutex> lock(m_mtx);
if(m_connectionType == ConnectionType::TCP)
{
for( size_t i = 0; i < m_connections.size(); i++ )
{
m_connections[i]->Send();
}
}
else
{
std::vector<size_t> indexes_to_erase;
for( size_t i = 0; i < m_connections.size(); i++ )
{
auto& connection = m_connections[i];
connection->Send();
if(std::static_pointer_cast<UDPConnection>(connection)->CheckClosed())
{
if(m_onClose)
{
m_onClose(connection);
}
indexes_to_erase.push_back(i);
continue;
}
std::static_pointer_cast<UDPConnection>(connection)->SendKeepAlive();
}
for( int i = (int)indexes_to_erase.size() - 1; i >= 0; --i )
{
m_connections.erase(m_connections.begin() + indexes_to_erase[i]);
}
}
}
}
void ServerSocket::RecvThread()
{
while(m_running)
{
struct sockaddr_storage addr;
socklen_t addr_size = sizeof(addr);
ConnectionPtr c;
FD_ZERO(&m_inSet);
FD_ZERO(&m_excSet);
FD_SET(m_inSock, &m_inSet);
SOCKET newcon = -1;
SOCKET max = m_inSock;
{
std::lock_guard<std::mutex> lock(m_mtx);
for( auto& connection : m_connections )
{
SOCKET desc = connection->GetDescriptor();
if (desc > max)
max = desc;
FD_SET( desc, &m_inSet );
FD_SET( desc, &m_excSet );
}
}
int ret = select((int)(max + 1), &m_inSet, NULL, &m_excSet, nullptr);
SPRAWL_LOG_TRACE("Select informs of %d sockets with data ready to receive.", ret);
std::lock_guard<std::mutex> lock(m_mtx);
if( ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK )
{
m_lastError = strerror(errno);
break;
}
if( FD_ISSET( m_inSock, &m_excSet ) )
{
continue;
FD_CLR( m_inSock, &m_inSet );
}
else if( FD_ISSET( m_inSock, &m_inSet ) )
{
if (m_connectionType == ConnectionType::TCP)
{
newcon = accept(m_inSock, (struct sockaddr *)& addr, &addr_size);
if(newcon != -1)
{
int yes = 1;
#ifndef _WIN32
setsockopt(newcon, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(int));
#else
setsockopt(newcon, SOL_SOCKET, SO_KEEPALIVE, (char*) &yes, sizeof(int));
#endif
c.reset(new Connection(this, newcon, ((sockaddr*)&addr), m_onReceive, m_packetValidator));
if(m_onConnect)
{
m_onConnect(c);
}
m_connections.push_back(c);
}
else if(errno != EWOULDBLOCK && errno != EAGAIN)
{
m_lastError = strerror(errno);
continue;
}
else
{
PrintLastError("Could not accept connection");
}
}
else
{
bool bFound = false;
for (auto it = m_connections.begin(); it != m_connections.end(); it++)
{
if((*it)->Recv() != -2)
{
bFound = true;
}
}
if(!bFound)
{
c.reset(new UDPConnection(this, m_inSock, m_onReceive, m_packetValidator));
if(m_onConnect)
{
m_onConnect(c);
}
m_connections.push_back(c);
c->Recv();
}
}
}
if(m_connectionType == ConnectionType::TCP)
{
std::vector<size_t> indexes_to_erase;
for( size_t i = 0; i < m_connections.size(); i++ )
{
auto& connection = m_connections[i];
if( FD_ISSET( connection->GetDescriptor(), &m_inSet ) )
{
if(connection->Recv() <= 0)
{
if(m_onClose)
{
m_onClose(connection);
}
indexes_to_erase.push_back(i);
}
}
}
for( int i = (int)indexes_to_erase.size() - 1; i >= 0; --i )
{
m_connections.erase(m_connections.begin() + indexes_to_erase[i]);
}
}
}
}
void ServerSocket::CloseConnection(int i)
{
std::lock_guard<std::mutex> lock(m_mtx);
ConnectionPtr c = m_connections[i];
if(m_connectionType == ConnectionType::TCP)
{
close(c->GetDescriptor());
}
if(m_onClose)
{
m_onClose(c);
}
m_connections.erase(m_connections.begin() + i);
}
void ServerSocket::NotifySend()
{
std::lock_guard<std::mutex> lock(m_sendLock);
m_sendReady = true;
m_sendNotifier.notify_one();
}
ClientSocket::ClientSocket(ConnectionType connectionType)
: m_onConnect(nullptr)
, m_onClose(nullptr)
, m_onReceive(nullptr)
, m_packetValidator(nullptr)
, m_con()
, m_sock(-1)
, m_inSet()
, m_excSet()
, m_hints()
, m_servInfo(nullptr)
, m_running(false)
, m_sendThread()
, m_recvThread()
, m_sendLock()
, m_connectionType(connectionType)
, m_lastError(nullptr)
, m_sendReady(false)
{
memset(&m_hints, 0, sizeof m_hints);
m_hints.ai_family = AF_UNSPEC;
if(m_connectionType == ConnectionType::TCP)
{
m_hints.ai_socktype = SOCK_STREAM;
}
else
{
m_hints.ai_socktype = SOCK_DGRAM;
}
}
ClientSocket::~ClientSocket()
{
Close();
freeaddrinfo(m_servInfo);
}
bool ClientSocket::Connect(std::string const& addr, int port)
{
struct addrinfo* p;
if(port < 1 || port > 65535)
{
SOCK_ERROR("Port out of range.");
}
std::stringstream s;
s << port;
getaddrinfo(addr.c_str(), s.str().c_str(), &m_hints, &m_servInfo);
for(p = m_servInfo; p != nullptr; p = p->ai_next)
{
if ((m_sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
{
PrintLastError("Could not open socket.");
return false;
}
if (connect(m_sock, p->ai_addr, (int)p->ai_addrlen) == -1)
{
PrintLastError("Socket connect attempt failed (non-fatal)");
close(m_sock);
continue;
}
break;
}
if(p == nullptr)
{
SOCK_ERROR("All socket connect attempts failed. Could not establish a connection.");
}
if(m_connectionType == ConnectionType::TCP)
{
m_con.reset(new Connection(this, m_sock, (sockaddr*)p->ai_addr, m_onReceive, m_packetValidator));
}
else
{
m_con.reset(new UDPConnection(this, m_sock, (sockaddr*)p->ai_addr, m_onReceive, m_packetValidator));
}
m_running = true;
m_sendThread = std::thread(&ClientSocket::SendThread, this );
m_recvThread = std::thread(&ClientSocket::RecvThread, this );
if(m_onConnect)
{
m_onConnect(m_con);
}
return true;
}
bool ClientSocket::Reconnect()
{
struct addrinfo* p;
if(m_con != nullptr)
{
SOCK_ERROR("Already connected.");
}
for (p = m_servInfo; p != nullptr; p = p->ai_next)
{
if ((m_sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
{
PrintLastError("Could not open socket.");
return false;
}
if (connect(m_sock, p->ai_addr, (int)p->ai_addrlen) == -1)
{
PrintLastError("Socket connect attempt failed (non-fatal)");
close(m_sock);
continue;
}
break;
}
if (p == nullptr)
{
SOCK_ERROR("All socket connect attempts failed. Could not establish a connection.");
}
if(m_connectionType == ConnectionType::TCP)
{
m_con.reset(new Connection(this, m_sock, (sockaddr*)p->ai_addr, m_onReceive, m_packetValidator));
}
else
{
m_con.reset(new UDPConnection(this, m_sock, (sockaddr*)p->ai_addr, m_onReceive, m_packetValidator));
}
m_running = true;
m_sendThread = std::thread(&ClientSocket::SendThread, this );
m_recvThread = std::thread(&ClientSocket::RecvThread, this );
if(m_onConnect)
{
m_onConnect(m_con);
}
return true;
}
void ClientSocket::SetOnReceive(ReceiveCallback c)
{
m_onReceive = c;
}
void ClientSocket::SetOnConnect(ConnectionCallback c)
{
m_onConnect = c;
}
void ClientSocket::SetOnClose(ConnectionCallback c)
{
m_onClose = c;
}
void ClientSocket::SetPacketValidator(PacketValidationCallback c)
{
m_packetValidator = c;
}
void ClientSocket::Close()
{
m_running = false;
if (m_sendThread.joinable())
{
if (m_connectionType == ConnectionType::UDP)
{
close(m_con->GetDescriptor());
}
m_sendNotifier.notify_one();
m_sendThread.join();
}
if(m_recvThread.joinable())
{
if (m_connectionType == ConnectionType::TCP)
{
close(m_con->GetDescriptor());
}
m_recvThread.join();
}
m_con.reset();
}
void ClientSocket::SendThread()
{
while(m_running)
{
{
std::unique_lock<std::mutex> lock(m_sendLock);
while(!m_sendReady)
{
if(m_connectionType == ConnectionType::UDP)
{
m_sendNotifier.wait_for( lock, std::chrono::milliseconds(250) );
}
else
{
m_sendNotifier.wait( lock );
}
}
m_sendReady = false;
}
if (!m_running)
{
return;
}
m_con->Send();
if(m_connectionType == ConnectionType::UDP)
{
if(std::static_pointer_cast<UDPConnection>(m_con)->CheckClosed())
{
SPRAWL_LOG_TRACE("Received disconnect signal for socket %d.", m_con->GetDescriptor());
close(m_con->GetDescriptor());
if (m_onClose)
{
m_onClose(m_con);
}
m_running = false;
return;
}
std::static_pointer_cast<UDPConnection>(m_con)->SendKeepAlive();
}
}
}
void ClientSocket::RecvThread()
{
while(m_running)
{
if(m_con == nullptr)
{
m_running = false;
break;
}
FD_ZERO(&m_inSet);
FD_ZERO(&m_excSet);
FD_SET( m_sock, &m_inSet );
FD_SET( m_sock, &m_excSet );
int ret = select((int)(m_sock + 1), &m_inSet, NULL, &m_excSet, nullptr);
if (!m_running)
{
return;
}
SPRAWL_LOG_TRACE("Select informs client socket has data ready to receive.");
if( ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK )
{
m_lastError = strerror(errno);
}
if( FD_ISSET( m_sock, &m_inSet ) )
{
if( m_con->Recv() <= 0 && m_connectionType == ConnectionType::TCP )
{
SPRAWL_LOG_TRACE("Received disconnect signal for socket %d.", m_con->GetDescriptor());
close(m_con->GetDescriptor());
if (m_onClose)
{
m_onClose(m_con);
}
m_running = false;
m_sendNotifier.notify_one();
return;
}
}
}
}
void ClientSocket::NotifySend()
{
std::lock_guard<std::mutex> lock(m_sendLock);
m_sendReady = true;
m_sendNotifier.notify_one();
}
}
}
| # | Change | User | Description | Committed | |
|---|---|---|---|---|---|
| #1 | 23398 | ququlala | "Forking branch Mainline of shadauxcat-libsprawl to ququlala-libsprawl." | ||
| //guest/ShadauxCat/Sprawl/Mainline/network/async_network.cpp | |||||
| #5 | 19906 | ShadauxCat |
- Added tag, compile time string type - Since tag requires visual studio 2015, removed compatibility code for earlier versions of visual studio - Improved compiler detection - Added endianness detection - Added template if/else helper - Fixed bug with murmur3 64 bit - Added seed argument for murmur3 #review-19907 |
||
| #4 | 14783 | ShadauxCat |
Style corrections (placement of const) #review-14784 |
||
| #3 | 12508 | ShadauxCat |
-Added threading library. Currently only functional for Linux; Windows will fail to link. (I will fix this soon.) -Fixed missing move and copy constructors in List and ForwardList -Fixed broken move constructor in HashMap -Fixed missing const get() in HashMap -Fixed broken operator-> in ListIterator -Added sprawl::noncopyable -Added sketch headers for filesystem library -Made StringLiteral hashable, added special hashes for pointers and integers in murmur3 -Fixed compiler warning in async_network -Updated memory allocators to use new threading library for mutexes -Added accessibility to sprawl::StringLiteral to be able toa ccess its pointer and length and perform pointer comparisons #review-12504 |
||
| #2 | 11516 | ShadauxCat | Network: Fix for race condition that could cause messages to fail to send. | ||
| #1 | 11496 | ShadauxCat | Initial checkin: Current states for csbuild and libSprawl | ||