async_network.cpp #3

  • //
  • guest/
  • ShadauxCat/
  • Sprawl/
  • Mainline/
  • network/
  • async_network.cpp
  • View
  • Commits
  • Open Download .zip Download (30 KB)
#ifdef _WIN32
#	define _CRT_SECURE_NO_WARNINGS
#else
#	include <string.h>
#endif

#include "async_network.hpp"
#include <sstream>
#include "../common/logging.hpp"

#ifdef _WIN32
#	define close closesocket
#	include <time.h>

#	if defined(_MSC_VER) || defined(_MSC_EXTENSIONS)
#		define DELTA_EPOCH_IN_MICROSECS  11644473600000000Ui64
#	else
#		define DELTA_EPOCH_IN_MICROSECS  11644473600000000ULL
#	endif

	namespace
	{
		int gettimeofday(struct timeval* tv, void* unusedArg)
		{
			if(tv == nullptr)
				return 0;

			FILETIME ft;
			unsigned __int64 asInt = 0;

			GetSystemTimeAsFileTime(&ft);

			asInt = ft.dwHighDateTime;
			asInt <<= 32;
			asInt |= ft.dwLowDateTime;
			asInt -= DELTA_EPOCH_IN_MICROSECS;
			asInt /= 10;

			tv->tv_sec = (long)(asInt / 1000000UL);
			tv->tv_usec = (long)(asInt % 1000000UL);
			return 0;
		}
	}

#include <WinBase.h>
#include <Winsock2.h>
#endif

namespace sprawl
{
	namespace async_network
	{
		namespace
		{
			static void PrintLastError(char const* prefixText)
			{
#	ifdef _WIN32
				char buf[512];
				FormatMessageA( FORMAT_MESSAGE_FROM_SYSTEM, NULL, WSAGetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), buf, 512, NULL);
				SPRAWL_LOG_TRACE( "%s: %s\n", prefixText, buf );
#	else
				SPRAWL_LOG_TRACE("%s: %s\n", prefixText, strerror(errno));
#	endif
			}
		}
		SOCKET Connection::GetDescriptor()
		{
			return m_desc;
		}

		std::string Connection::GetHostname()
		{
			char buf[256];
			inet_ntop( AF_INET, &((sockaddr_in*)&m_dest)->sin_addr, buf, sizeof(buf) );
			return buf;
		}

		int Connection::GetPort()
		{
			return ((sockaddr_in*)&m_dest)->sin_port;
		}

		void Connection::Send(const std::string& data, SendCallback onSendFunction /*= nullptr*/)
		{
			//Store this data to be sent later on the network thread
			std::lock_guard<std::mutex> lock(m_outDataMutex);

			SPRAWL_LOG_TRACE("Request to send %d bytes of data, adding to queue", int(data.length()));

			m_outData.push_back(std::make_pair(data, onSendFunction));

			if( m_parentClientSocket )
			{
				m_parentClientSocket->NotifySend();
			}
			else if( m_parentServerSocket )
			{
				m_parentServerSocket->NotifySend();
			}
		}

		Connection::Connection(ServerSocket* parent, SOCKET desc_, struct sockaddr* addr, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
			: m_desc(desc_)
			, m_closeSocket(false)
			, m_partialPacket()
			, m_onReceive(onReceive_)
			, m_validatePacket(validatePacket_)
			, m_parentServerSocket(parent)
			, m_parentClientSocket(nullptr)
		{
			if(addr != nullptr)
			{
				m_dest = *addr;
			}
		}

		Connection::Connection(ClientSocket* parent, SOCKET desc_, struct sockaddr* addr, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
			: m_desc(desc_)
			, m_closeSocket(false)
			, m_partialPacket()
			, m_onReceive(onReceive_)
			, m_validatePacket(validatePacket_)
			, m_parentServerSocket(nullptr)
			, m_parentClientSocket(parent)
		{
			if(addr != nullptr)
			{
				m_dest = *addr;
			}
		}

		void Connection::Send()
		{
			//Send all the things
			std::vector<std::pair<std::string, SendCallback>> outData;

			{
				std::lock_guard<std::mutex> lock(m_outDataMutex);
				outData = std::move( m_outData );
			}

			for(auto& data : outData)
			{
				SPRAWL_LOG_TRACE("TCP: Performing send with %d bytes of data", int(data.first.length()));
				int ret = send(m_desc, data.first.c_str(), (int)data.first.length(), 0);
				if(ret == -1)
				{
					PrintLastError("TCP: Send failed");
				}

				if(data.second)
				{
					data.second();
				}
			}
		}

		int Connection::Recv()
		{
			if(m_closeSocket)
			{
				close(m_desc);
				return 0;
			}
			int ret;
			char buf[32768];
			char* pbuf = buf;

			SPRAWL_LOG_TRACE("TCP: Told there was data on socket %d, receiving.", m_desc);
			ret = recv(m_desc, pbuf, 32768, 0);
			SPRAWL_LOG_TRACE("TCP: Received %d bytes of data on socket %d.", ret, m_desc);
			if(ret <= 0)
			{
				if(ret == -1)
				{
					PrintLastError("TCP: Recv failed");
				}
				return ret;
			}

			if(m_onReceive)
			{
				//If there's no onReceive callback, what can we do? Nothing.
				m_partialPacket.append(buf, ret);
				const char* cPacket = m_partialPacket.c_str();
				int packetEnd = (int)m_partialPacket.length();

				while(packetEnd > 0)
				{
					int newEnd = packetEnd;
					//if we don't have a validator we'll just give them the full packet.
					if(m_validatePacket)
					{
						int totalLength = -1;
						newEnd = m_validatePacket(cPacket, packetEnd, totalLength);
						if(totalLength > 0 && totalLength < (int)m_partialPacket.capacity())
						{
							m_partialPacket.reserve(totalLength);
						}
					}
					if(newEnd > 0)
					{
						SPRAWL_LOG_TRACE("TCP: Client code informs of complete packet, calling receive callback.");
						m_onReceive(shared_from_this(), cPacket, newEnd);
						cPacket += newEnd;
						packetEnd -= newEnd;
						m_partialPacket = std::string(cPacket, packetEnd);
						cPacket = m_partialPacket.c_str();
					}
					else
					{
						SPRAWL_LOG_TRACE("TCP: Client code informs packet is incomplete, returning and waiting for more.");
						break;
					}
				}
			}
			return ret;
		}

		void Connection::Close()
		{
			SPRAWL_LOG_TRACE("Socket close: %d", m_desc);
			if( m_parentClientSocket )
			{
				m_parentClientSocket->Close();
			}
			else if( m_parentServerSocket )
			{
				m_parentServerSocket->CloseConnection( shared_from_this() );
			}
		}

		/*virtual*/ void UDPConnection::Send(const std::string& str, FailType behavior, SendCallback callback /*= nullptr*/) /*override final*/
		{
			SPRAWL_LOG_TRACE("UDP: Request to send %d bytes of data, adding to queue", int(str.length()));
			SendPacketWithID(str, behavior, ++m_currentId, callback);

			if( m_parentClientSocket )
			{
				m_parentClientSocket->NotifySend();
			}
			else if( m_parentServerSocket )
			{
				m_parentServerSocket->NotifySend();
			}
		}

		UDPConnection::UDPConnection(ServerSocket* parent, SOCKET desc_, struct sockaddr* addr, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
			: Connection(parent, desc_, addr, onReceive_, validatePacket_)
			, m_packets()
			, m_highId(-1)
			, m_currentId(0)
			, m_slen(sizeof(sockaddr_in))
		 {
			gettimeofday(&m_lastRcvd, nullptr);
			m_lastSent.tv_sec = 0;
			m_lastSent.tv_usec = 0;
		 }

		UDPConnection::UDPConnection(ServerSocket* parent, SOCKET desc_, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
			: Connection(parent, desc_, nullptr, onReceive_, validatePacket_)
			, m_packets()
			, m_highId(-1)
			, m_currentId(0)
			, m_slen(sizeof(sockaddr_in))
		{
			m_lastRcvd.tv_sec = 0;
			m_lastRcvd.tv_usec = 0;
			m_lastSent.tv_sec = 0;
			m_lastSent.tv_usec = 0;
		}

		UDPConnection::UDPConnection(ClientSocket* parent, SOCKET desc_, struct sockaddr* addr, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
			: Connection(parent, desc_, addr, onReceive_, validatePacket_)
			, m_packets()
			, m_highId(-1)
			, m_currentId(0)
			, m_slen(sizeof(sockaddr_in))
		 {
			gettimeofday(&m_lastRcvd, nullptr);
			m_lastSent.tv_sec = 0;
			m_lastSent.tv_usec = 0;
		 }

		UDPConnection::UDPConnection(ClientSocket* parent, SOCKET desc_, ReceiveCallback onReceive_, PacketValidationCallback validatePacket_)
			: Connection(parent, desc_, nullptr, onReceive_, validatePacket_)
			, m_packets()
			, m_highId(-1)
			, m_currentId(0)
			, m_slen(sizeof(sockaddr_in))
		{
			m_lastRcvd.tv_sec = 0;
			m_lastRcvd.tv_usec = 0;
			m_lastSent.tv_sec = 0;
			m_lastSent.tv_usec = 0;
		}

		/*virtual*/ void UDPConnection::Send() /*override final*/
		{
			std::unordered_map<int32_t, std::pair<packet, SendCallback> > outPackets;

			{
				std::lock_guard<std::mutex> lock(m_outDataMutex);
				outPackets = std::move( m_outPackets );
			}

			std::lock_guard<std::mutex> lock(m_packetMutex);
			for(auto& kvp: outPackets)
			{
				SPRAWL_LOG_TRACE("UDP: Performing send with %d bytes of data", int(kvp.second.first.m_content.length()));
				//Send the packet...
				std::string content = kvp.second.first.m_header + kvp.second.first.m_content;
				int ret = sendto(m_desc, content.c_str(), (int)content.length(), 0, &m_dest, m_slen);
				if(ret == -1)
				{
					PrintLastError("UDP: Send failed");
				}
				if(kvp.second.second)
				{
					kvp.second.second();
				}
				if(kvp.second.first.m_behavior != FailType::ignore)
				{
					if(m_packets.count(kvp.first))
					{
						m_packets.at(kvp.first) = kvp.second.first;
					}
					else
					{
						m_packets.insert(std::make_pair(kvp.first, kvp.second.first));
					}
				}
			}
			if(!outPackets.empty())
			{
				gettimeofday(&m_lastSent, nullptr);
			}
		}

		/*virtual*/ int UDPConnection::Recv() /*override final*/
		{
			int ret;
			char abuf[32768];
			char* buf = abuf;
			//Check the data to see where it's from. Don't pull it yet.
			ret = recvfrom(m_desc, buf, 32768, MSG_PEEK, (sockaddr*)&m_src, &m_slen);
			if(ret == -1)
			{
				PrintLastError("UDP: recv failed");
				return ret;
			}
			if(m_lastRcvd.tv_sec == 0 && m_lastRcvd.tv_usec == 0)
			{
				//New connection. Remember who's on the other end.
				m_dest = m_src;
			}
			SPRAWL_LOG_TRACE("UDP: Receiving %d bytes of data on socket %d.", ret, m_dest);
			//If we're not pulling from the person who's actually on the other end of this connection, ignore the data.
			if(((sockaddr_in*)&m_dest)->sin_addr.s_addr == ((sockaddr_in*)&m_src)->sin_addr.s_addr
					&& ((sockaddr_in*)&m_dest)->sin_port == ((sockaddr_in*)&m_src)->sin_port)
			{
				ret = recvfrom(m_desc, buf, 32768, 0, (sockaddr*)&m_src, &m_slen);
				int32_t id, ack;
				uint32_t bits;

				const int32_t* header = reinterpret_cast<const int32_t*>(buf);
				id = header[0];
				ack = header[1];
				bits = header[2];

				gettimeofday(&m_lastRcvd, nullptr);

				buf += (3 * sizeof(uint32_t));
				ret -= (3 * sizeof(uint32_t));

				if(id > m_highId)
				{
					m_highId = id;
				}
				if(ack >= 0)
				{
					std::lock_guard<std::mutex> dataLock(m_outDataMutex);
					std::lock_guard<std::mutex> packetLock(m_packetMutex);
					if(m_packets.count(ack))
					{
						m_packets.erase(ack);
					}
					if(m_outPackets.count(ack))
					{
						m_outPackets.erase(ack);
					}
					for(int i=0; i<32; i++)
					{
						int ackid = ack - i - 1;
						if(ackid < 0)
						{
							break;
						}
						if((bits & (1 << i)) != 0)
						{
							if(m_packets.count(ackid))
							{
								m_packets.erase(ackid);
							}
							if(m_outPackets.count(ackid))
							{
								m_outPackets.erase(ackid);
							}
						}
					}
					fflush(stdout);
				}
				if(ret <= 0)
				{
					return ret;
				}

				if(m_received.count(id))
				{
					//We already received this packet, so we can ignore it now.
					return ret;
				}

				m_received.insert(id);

				if(m_onReceive)
				{
					//If there's no onReceive callback, what can we do? Nothing.
					m_partialPacket.append(buf, ret);
					const char* cPacket = m_partialPacket.c_str();
					int packetEnd = (int)m_partialPacket.length();

					while(packetEnd > 0)
					{
						int newEnd = packetEnd;
						//if we don't have a validator we'll just give them the full packet.
						if(m_validatePacket)
						{
							int totalLength = -1;
							newEnd = m_validatePacket(cPacket, packetEnd, totalLength);
							if(totalLength > 0 && totalLength < (int)m_partialPacket.capacity())
							{
								m_partialPacket.reserve(totalLength);
							}
						}
						if(newEnd > 0)
						{
							m_onReceive(shared_from_this(), cPacket, newEnd);
							cPacket += newEnd;
							packetEnd -= newEnd;
							m_partialPacket = std::string(cPacket, packetEnd);
							cPacket = m_partialPacket.c_str();
						}
						else
						{
							break;
						}
					}
				}
				return ret;
			}
			//-2 indicates a valid request, but invalid for this connection. Move on and try the next connection.
			return -2;
		}
		bool UDPConnection::CheckClosed()
		{
			struct timeval now;
			gettimeofday(&now, nullptr);
			int secs = now.tv_sec - m_lastRcvd.tv_sec;
			int usecs = now.tv_usec - m_lastRcvd.tv_usec;
			while(usecs < 0)
			{
				usecs += 1000000;
				secs -= 1;
			}
			if((m_lastRcvd.tv_sec != 0 || m_lastRcvd.tv_usec != 0) && secs >= 5)
			{
				return true;
			}
			return false;
		}
		void UDPConnection::SendKeepAlive()
		{
			struct timeval now;
			gettimeofday(&now, nullptr);
			int secs, usecs;

			std::lock_guard<std::mutex> lock(m_packetMutex);
			auto it = m_packets.begin();
			while( it != m_packets.end() )
			{
				secs = now.tv_sec - it->second.m_sentTime.tv_sec;
				int usecs = now.tv_usec - it->second.m_sentTime.tv_usec;
				while(usecs < 0)
				{
					usecs += 1000000;
					secs -= 1;
				}
				//Resend any packet that hasn't been ACKed for a full second or more.
				if(secs >= 1)
				{
					SendPacketWithID(it->second.m_content, FailType::resend, it->first, nullptr);
				}
				++it;
			}
			secs = now.tv_sec - m_lastSent.tv_sec;
			usecs = now.tv_usec - m_lastSent.tv_usec;
			while(usecs < 0)
			{
				usecs += 1000000;
				secs -= 1;
			}
			//Pulse four times a second.
			if(secs >= 1 || usecs >= 250000)
			{
				Send("", FailType::ignore);
			}
		}

		UDPConnection::packet::packet(uint32_t _id, FailType _behavior, const std::string& _content, const std::string& header_)
			: m_ID(_id)
			, m_behavior(_behavior)
			, m_content(_content)
			, m_header(header_)
		{
			gettimeofday(&m_sentTime, nullptr);
		}

		UDPConnection::packet::packet(UDPConnection::packet&& other)
			: m_ID(std::move(other.m_ID))
			, m_behavior(std::move(other.m_behavior))
			, m_content(std::move(other.m_content))
			, m_header(std::move(other.m_header))
		{
		}

		UDPConnection::packet::packet(const UDPConnection::packet& other)
			: m_ID(other.m_ID)
			, m_behavior(other.m_behavior)
			, m_content(other.m_content)
			, m_header(other.m_header)
		{
		}

		UDPConnection::packet& UDPConnection::packet::operator=(const UDPConnection::packet& other)
		{
			m_ID = other.m_ID;
			m_behavior = other.m_behavior;
			m_content = other.m_content;
			m_header = other.m_header;
			return *this;
		}

		void UDPConnection::SendPacketWithID(const std::string& str, FailType behavior, int32_t sendid, SendCallback callback)
		{
			char header[3*sizeof(uint32_t)];
			char* ptr = header;

			//Construct header: ID, ACK, Ack bits
			memcpy(ptr, &sendid, sizeof(uint32_t));
			int32_t id = m_highId;
			memcpy(ptr+sizeof(uint32_t), &id, sizeof(uint32_t));
			uint32_t bits = 0;
			if(id >= 0)
			{
				std::vector<int> ids_to_erase;
				for(auto& rcvd : m_received)
				{
					if(rcvd == id)
					{
						continue;
					}
					int bit = id - rcvd - 1;
					//Old bits we don't care about.
					if(bit > 31)
					{
						ids_to_erase.push_back(rcvd);
						continue;
					}
					bits |= (1 << bit);
				}
				for(auto& rcvd : ids_to_erase)
				{
					m_received.erase(rcvd);
				}
			}
			memcpy(ptr+(sizeof(uint32_t)*2), &bits, sizeof(uint32_t));

			//Create a packet from header + str...
			std::string headerStr(header, 3*sizeof(uint32_t));

			{
				std::lock_guard<std::mutex> lock(m_outDataMutex);
				m_outPackets.insert(std::make_pair(sendid, std::make_pair(packet(sendid, behavior, str, headerStr), callback)));
			}
		}

#define SOCK_ERROR(errorStr) do{ m_lastError = errorStr; return false; }while(false)

		ServerSocket::ServerSocket(const ConnectionType connectionType)
			: m_inSock(-1)
			, m_onConnect(nullptr)
			, m_onClose(nullptr)
			, m_onReceive(nullptr)
			, m_packetValidator(nullptr)
			, m_running(false)
			, m_connections()
			, m_inSet()
			, m_excSet()
			, m_hints()
			, m_servInfo(nullptr)
			, m_sendThread()
			, m_recvThread()
			, m_mtx()
			, m_sendLock()
			, m_connectionType(connectionType)
			, m_lastError(nullptr)
			, m_sendReady(false)
		{
			memset(&m_hints, 0, sizeof m_hints);
			m_hints.ai_family = AF_UNSPEC;
			if(m_connectionType == ConnectionType::TCP)
			{
				m_hints.ai_socktype = SOCK_STREAM;
			}
			else
			{
				m_hints.ai_socktype = SOCK_DGRAM;
			}
			m_hints.ai_flags = AI_PASSIVE;
		}

		void ServerSocket::SetOnReceive(ReceiveCallback c)
		{
			m_onReceive = c;
		}

		void ServerSocket::SetOnConnect(ConnectionCallback c)
		{
			m_onConnect = c;
		}

		void ServerSocket::SetOnClose(ConnectionCallback c)
		{
			m_onClose = c;
		}

		void ServerSocket::SetPacketValidator(PacketValidationCallback c)
		{
			m_packetValidator = c;
		}

		bool ServerSocket::listen(int port)
		{
			if( m_inSock != -1 )
			{
				SOCK_ERROR("Socket already open!");
			}

			//Ports lower than 1024 require root access, just not going to support them
			if( port < 1024 || port > 65535 )
			{
				SOCK_ERROR("Port out of range.");
			}

			//Get the localhost address info for the requested port
			std::stringstream s;
			s << port;
			int status = getaddrinfo(nullptr, s.str().c_str(), &m_hints, &m_servInfo);

			if( status != 0 )
			{
				SOCK_ERROR(gai_strerror(status));
			}

			//Open the socket
			m_inSock = socket(m_servInfo->ai_family, m_servInfo->ai_socktype, m_servInfo->ai_protocol);
			if( m_inSock == -1 )
			{
				PrintLastError("Could not open socket");
				return false;
			}

			int yes = 1;
#ifndef _WIN32
			setsockopt(m_inSock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
#else
			int no = 0;
			setsockopt(m_inSock, SOL_SOCKET, SO_REUSEADDR, (char*) &yes, sizeof(int));
			setsockopt(m_inSock, IPPROTO_IPV6, IPV6_V6ONLY, (char*) &no, sizeof(int));
#endif

			//Bind the port
			if( ::bind(m_inSock, m_servInfo->ai_addr, (int)m_servInfo->ai_addrlen) == -1 )
			{
				SOCK_ERROR("Port already in use.");
			}

			//Open the port for incoming connections
			::listen(m_inSock, 5);

			m_running = true;

			//And start up the network thread to actually handle them
			m_sendThread = std::thread(&ServerSocket::SendThread, this );
			m_recvThread = std::thread(&ServerSocket::RecvThread, this );
			return true;
		}

		void ServerSocket::Close()
		{
			if(m_running)
			{
				m_running = false;
				m_sendNotifier.notify_one();
				m_sendThread.join();
				close(m_inSock);
				m_recvThread.join();
				m_inSock = -1;
				freeaddrinfo(m_servInfo);

			}
		}

		ServerSocket::~ServerSocket()
		{
			Close();
		}

		std::vector<ConnectionWPtr > ServerSocket::GetConnections()
		{
			std::lock_guard<std::mutex> lock(m_mtx);
			std::vector< ConnectionWPtr > ret;
			for( auto& connection : m_connections )
			{
				ret.push_back(connection);
			}
			return std::move(ret);
		}

		ConnectionWPtr ServerSocket::GetConnection(int i)
		{
			std::lock_guard<std::mutex> lock(m_mtx);
			return m_connections[i];
		}

		ConnectionWPtr ServerSocket::GetConnectionByDesc(int d)
		{
			std::lock_guard<std::mutex> lock(m_mtx);
			for( auto& connection : m_connections )
			{
				if (connection->GetDescriptor() == d)
				{
					return connection;
				}
			}
			return ConnectionWPtr();
		}

		ConnectionWPtr ServerSocket::GetConnectionByPort(int p)
		{
			std::lock_guard<std::mutex> lock(m_mtx);
			for( auto& connection : m_connections )
			{
				if (connection->GetPort() == p)
				{
					return connection;
				}
			}
			return ConnectionWPtr();
		}

		size_t ServerSocket::GetNumConnections()
		{
			std::lock_guard<std::mutex> lock(m_mtx);
			return m_connections.size();
		}

		void ServerSocket::CloseConnection(ConnectionPtr c)
		{
			if(!c)
			{
				return;
			}

			std::lock_guard<std::mutex> lock(m_mtx);
			for( auto it = m_connections.begin(); it != m_connections.end(); it++ )
			{
				if( *it == c )
				{
					if(m_connectionType == ConnectionType::TCP)
					{
						close(c->GetDescriptor());
					}
					if(m_onClose)
					{
						m_onClose(c);
					}
					m_connections.erase(it);

					return;
				}
			}
		}

		void ServerSocket::SendThread()
		{
			while(m_running)
			{
				{
					std::unique_lock<std::mutex> lock(m_sendLock);
					while(!m_sendReady)
					{
						if(m_connectionType == ConnectionType::UDP)
						{
							m_sendNotifier.wait_for( lock, std::chrono::milliseconds(250) );
						}
						else
						{
							m_sendNotifier.wait( lock );
						}
					}
					m_sendReady = false;
				}

				std::lock_guard<std::mutex> lock(m_mtx);

				if(m_connectionType == ConnectionType::TCP)
				{
					for( size_t i = 0; i < m_connections.size(); i++ )
					{
						m_connections[i]->Send();
					}
				}
				else
				{
					std::vector<size_t> indexes_to_erase;
					for( size_t i = 0; i < m_connections.size(); i++ )
					{
						auto& connection = m_connections[i];
						connection->Send();
						if(std::static_pointer_cast<UDPConnection>(connection)->CheckClosed())
						{
							if(m_onClose)
							{
								m_onClose(connection);
							}
							indexes_to_erase.push_back(i);
							continue;
						}
						std::static_pointer_cast<UDPConnection>(connection)->SendKeepAlive();
					}
					for( int i = (int)indexes_to_erase.size() - 1; i >= 0; --i )
					{
						m_connections.erase(m_connections.begin() + indexes_to_erase[i]);
					}
				}
			}
		}

		void ServerSocket::RecvThread()
		{
			while(m_running)
			{
				struct sockaddr_storage addr;
				socklen_t addr_size = sizeof(addr);
				ConnectionPtr c;

				FD_ZERO(&m_inSet);
				FD_ZERO(&m_excSet);
				FD_SET(m_inSock, &m_inSet);
				SOCKET newcon = -1;
				SOCKET max = m_inSock;

				{
					std::lock_guard<std::mutex> lock(m_mtx);
					for( auto& connection : m_connections )
					{
						SOCKET desc = connection->GetDescriptor();
						if (desc > max)
							max = desc;
						FD_SET( desc, &m_inSet );
						FD_SET( desc, &m_excSet );
					}
				}

				int ret = select((int)(max + 1), &m_inSet, NULL, &m_excSet, nullptr);

				SPRAWL_LOG_TRACE("Select informs of %d sockets with data ready to receive.", ret);

				std::lock_guard<std::mutex> lock(m_mtx);

				if( ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK )
				{
					m_lastError = strerror(errno);
					break;
				}

				if( FD_ISSET( m_inSock, &m_excSet ) )
				{
					continue;
					FD_CLR( m_inSock, &m_inSet );
				}
				else if( FD_ISSET( m_inSock, &m_inSet ) )
				{
					if (m_connectionType == ConnectionType::TCP)
					{
						newcon = accept(m_inSock, (struct sockaddr *)& addr, &addr_size);
						if(newcon != -1)
						{
							int yes = 1;
#ifndef _WIN32
							setsockopt(newcon, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(int));
#else
							setsockopt(newcon, SOL_SOCKET, SO_KEEPALIVE, (char*) &yes, sizeof(int));
#endif
							c.reset(new Connection(this, newcon, ((sockaddr*)&addr), m_onReceive, m_packetValidator));
							if(m_onConnect)
							{
								m_onConnect(c);
							}
							m_connections.push_back(c);
						}
						else if(errno != EWOULDBLOCK && errno != EAGAIN)
						{
							m_lastError = strerror(errno);
							continue;
						}
						else
						{
							PrintLastError("Could not accept connection");
						}
					}
					else
					{
						bool bFound = false;
						for (auto it = m_connections.begin(); it != m_connections.end(); it++)
						{
							if((*it)->Recv() != -2)
							{
								bFound = true;
							}
						}
						if(!bFound)
						{
							c.reset(new UDPConnection(this, m_inSock, m_onReceive, m_packetValidator));
							if(m_onConnect)
							{
								m_onConnect(c);
							}
							m_connections.push_back(c);
							c->Recv();
						}
					}
				}

				if(m_connectionType == ConnectionType::TCP)
				{
					std::vector<size_t> indexes_to_erase;
					for( size_t i = 0; i < m_connections.size(); i++ )
					{
						auto& connection = m_connections[i];
						if( FD_ISSET( connection->GetDescriptor(), &m_inSet ) )
						{
							if(connection->Recv() <= 0)
							{
								if(m_onClose)
								{
									m_onClose(connection);
								}
								indexes_to_erase.push_back(i);
							}
						}
					}
					for( int i = (int)indexes_to_erase.size() - 1; i >= 0; --i )
					{
						m_connections.erase(m_connections.begin() + indexes_to_erase[i]);
					}
				}
			}
		}

		void ServerSocket::CloseConnection(int i)
		{
			std::lock_guard<std::mutex> lock(m_mtx);
			ConnectionPtr c = m_connections[i];
			if(m_connectionType == ConnectionType::TCP)
			{
				close(c->GetDescriptor());
			}
			if(m_onClose)
			{
				m_onClose(c);
			}
			m_connections.erase(m_connections.begin() + i);
		}

		void ServerSocket::NotifySend()
		{
			std::lock_guard<std::mutex> lock(m_sendLock);
			m_sendReady = true;
			m_sendNotifier.notify_one();
		}

		ClientSocket::ClientSocket(ConnectionType connectionType)
			: m_onConnect(nullptr)
			, m_onClose(nullptr)
			, m_onReceive(nullptr)
			, m_packetValidator(nullptr)
			, m_con()
			, m_sock(-1)
			, m_inSet()
			, m_excSet()
			, m_hints()
			, m_servInfo(nullptr)
			, m_running(false)
			, m_sendThread()
			, m_recvThread()
			, m_sendLock()
			, m_connectionType(connectionType)
			, m_lastError(nullptr)
			, m_sendReady(false)
		{
			memset(&m_hints, 0, sizeof m_hints);
			m_hints.ai_family = AF_UNSPEC;
			if(m_connectionType == ConnectionType::TCP)
			{
				m_hints.ai_socktype = SOCK_STREAM;
			}
			else
			{
				m_hints.ai_socktype = SOCK_DGRAM;
			}
		}

		ClientSocket::~ClientSocket()
		{
			Close();
			freeaddrinfo(m_servInfo);
		}

		bool ClientSocket::Connect(const std::string& addr, int port)
		{
			struct addrinfo* p;
			if(port < 1 || port > 65535)
			{
				SOCK_ERROR("Port out of range.");
			}
			std::stringstream s;
			s << port;
			getaddrinfo(addr.c_str(), s.str().c_str(), &m_hints, &m_servInfo);

			for(p = m_servInfo; p != nullptr; p = p->ai_next)
			{
				if ((m_sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
				{
					PrintLastError("Could not open socket.");
					return false;
				}

				if (connect(m_sock, p->ai_addr, (int)p->ai_addrlen) == -1)
				{
					PrintLastError("Socket connect attempt failed (non-fatal)");
					close(m_sock);
					continue;
				}

				break;
			}

			if(p == nullptr)
			{
				SOCK_ERROR("All socket connect attempts failed. Could not establish a connection.");
			}

			if(m_connectionType == ConnectionType::TCP)
			{
				m_con.reset(new Connection(this, m_sock, (sockaddr*)p->ai_addr, m_onReceive, m_packetValidator));
			}
			else
			{
				m_con.reset(new UDPConnection(this, m_sock, (sockaddr*)p->ai_addr, m_onReceive, m_packetValidator));
			}

			m_running = true;

			m_sendThread = std::thread(&ClientSocket::SendThread, this );
			m_recvThread = std::thread(&ClientSocket::RecvThread, this );

			if(m_onConnect)
			{
				m_onConnect(m_con);
			}
			return true;
		}

		bool ClientSocket::Reconnect()
		{
			struct addrinfo* p;

			if(m_con != nullptr)
			{
				SOCK_ERROR("Already connected.");
			}

			for (p = m_servInfo; p != nullptr; p = p->ai_next)
			{
				if ((m_sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
				{
					PrintLastError("Could not open socket.");
					return false;
				}

				if (connect(m_sock, p->ai_addr, (int)p->ai_addrlen) == -1)
				{
					PrintLastError("Socket connect attempt failed (non-fatal)");
					close(m_sock);
					continue;
				}

				break;
			}

			if (p == nullptr)
			{
				SOCK_ERROR("All socket connect attempts failed. Could not establish a connection.");
			}

			if(m_connectionType == ConnectionType::TCP)
			{
				m_con.reset(new Connection(this, m_sock, (sockaddr*)p->ai_addr, m_onReceive, m_packetValidator));
			}
			else
			{
				m_con.reset(new UDPConnection(this, m_sock, (sockaddr*)p->ai_addr, m_onReceive, m_packetValidator));
			}

			m_running = true;

			m_sendThread = std::thread(&ClientSocket::SendThread, this );
			m_recvThread = std::thread(&ClientSocket::RecvThread, this );

			if(m_onConnect)
			{
				m_onConnect(m_con);
			}
			return true;
		}

		void ClientSocket::SetOnReceive(ReceiveCallback c)
		{
			m_onReceive = c;
		}

		void ClientSocket::SetOnConnect(ConnectionCallback c)
		{
			m_onConnect = c;
		}

		void ClientSocket::SetOnClose(ConnectionCallback c)
		{
			m_onClose = c;
		}

		void ClientSocket::SetPacketValidator(PacketValidationCallback c)
		{
			m_packetValidator = c;
		}

		void ClientSocket::Close()
		{
			m_running = false;
			if (m_sendThread.joinable())
			{
				if (m_connectionType == ConnectionType::UDP)
				{
					close(m_con->GetDescriptor());
				}
				m_sendNotifier.notify_one();
				m_sendThread.join();
			}
			if(m_recvThread.joinable())
			{
				if (m_connectionType == ConnectionType::TCP)
				{
					close(m_con->GetDescriptor());
				}
				m_recvThread.join();
			}
			m_con.reset();
		}

		void ClientSocket::SendThread()
		{
			while(m_running)
			{
				{
					std::unique_lock<std::mutex> lock(m_sendLock);
					while(!m_sendReady)
					{
						if(m_connectionType == ConnectionType::UDP)
						{
							m_sendNotifier.wait_for( lock, std::chrono::milliseconds(250) );
						}
						else
						{
							m_sendNotifier.wait( lock );
						}
					}
					m_sendReady = false;
				}
				if (!m_running)
				{
					return;
				}
				m_con->Send();
				if(m_connectionType == ConnectionType::UDP)
				{
					if(std::static_pointer_cast<UDPConnection>(m_con)->CheckClosed())
					{
						SPRAWL_LOG_TRACE("Received disconnect signal for socket %d.", m_con->GetDescriptor());
						close(m_con->GetDescriptor());
						if (m_onClose)
						{
							m_onClose(m_con);
						}
						m_running = false;
						return;
					}
					std::static_pointer_cast<UDPConnection>(m_con)->SendKeepAlive();
				}
			}
		}

		void ClientSocket::RecvThread()
		{
			while(m_running)
			{
				if(m_con == nullptr)
				{
					m_running = false;
					break;
				}
				FD_ZERO(&m_inSet);
				FD_ZERO(&m_excSet);
				FD_SET( m_sock, &m_inSet );
				FD_SET( m_sock, &m_excSet );

				int ret = select((int)(m_sock + 1), &m_inSet, NULL, &m_excSet, nullptr);

				if (!m_running)
				{
					return;
				}

				SPRAWL_LOG_TRACE("Select informs client socket has data ready to receive.");

				if( ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK )
				{
					m_lastError = strerror(errno);
				}

				if( FD_ISSET( m_sock, &m_inSet ) )
				{
					if( m_con->Recv() <= 0 && m_connectionType == ConnectionType::TCP )
					{
						SPRAWL_LOG_TRACE("Received disconnect signal for socket %d.", m_con->GetDescriptor());
						close(m_con->GetDescriptor());
						if (m_onClose)
						{
							m_onClose(m_con);
						}
						m_running = false;
						m_sendNotifier.notify_one();
						return;
					}
				}
			}
		}

		void ClientSocket::NotifySend()
		{
			std::lock_guard<std::mutex> lock(m_sendLock);
			m_sendReady = true;
			m_sendNotifier.notify_one();
		}
	}
}
# Change User Description Committed
#5 19906 ShadauxCat - Added tag, compile time string type
- Since tag requires visual studio 2015, removed compatibility code for earlier versions of visual studio
- Improved compiler detection
- Added endianness detection
- Added template if/else helper
- Fixed bug with murmur3 64 bit
- Added seed argument for murmur3

#review-19907
#4 14783 ShadauxCat Style corrections (placement of const)

#review-14784
#3 12508 ShadauxCat -Added threading library.
Currently only functional for Linux; Windows will fail to link. (I will fix this soon.)
-Fixed missing move and copy constructors in List and ForwardList
-Fixed broken move constructor in HashMap
-Fixed missing const get() in HashMap
-Fixed broken operator-> in ListIterator
-Added sprawl::noncopyable
-Added sketch headers for filesystem library
-Made StringLiteral hashable, added special hashes for pointers and integers in murmur3
-Fixed compiler warning in async_network
-Updated memory allocators to use new threading library for mutexes
-Added accessibility to sprawl::StringLiteral to be able toa ccess its pointer and length and perform pointer comparisons

#review-12504
#2 11516 ShadauxCat Network: Fix for race condition that could cause messages to fail to send.
#1 11496 ShadauxCat Initial checkin: Current states for csbuild and libSprawl