#include "UDP.h" #include #include void UDPSocketBase::create_socket() { while (true) { sockfd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); #if defined(_WIN32) if (sockfd_ != INVALID_SOCKET_VALUE) { break; } #else if (sockfd_ >= 0) { break; } #endif } } void UDPSocketBase::create_socket(socket_t socket) { sockfd_ = socket; } void UDPSocketBase::set_nonblock(bool enable) { u_long ctl = enable ? 1 : 0; #if defined(_WIN32) ioctlsocket(sockfd_, FIONBIO, &ctl) == 0; #else ioctl(sockfd_, FIONBIO, &ctl) == 0; #endif is_nonblock_ = enable; } void UDPSocketBase::bind_socket(const sockaddr_in &addr) const { if (bind(sockfd_, (struct sockaddr *) &addr, sizeof(addr)) != 0) { throw std::runtime_error("Bind failed"); } } UDPSocketBase::~UDPSocketBase() { if (sockfd_ != INVALID_SOCKET_VALUE) { #ifdef _WIN32 closesocket(sockfd_); #else close(sockfd_); #endif } } UDPSender::UDPSender(socket_t socket, bool nonblock) { if (socket != INVALID_SOCKET_VALUE) { create_socket(socket); } else { create_socket(); } set_nonblock(nonblock); } void UDPSender::set_destination(const std::string &ip, uint16_t port) { dstIp = ip; dstPort = port; dest_addr_.sin_family = AF_INET; dest_addr_.sin_port = htons(port); #if defined(_WIN32_WINNT) && _WIN32_WINNT < 0x600 dest_addr_.sin_addr.S_un.S_addr = inet_addr(ip.c_str()); #else inet_pton(AF_INET, ip.c_str(), &dest_addr_.sin_addr); #endif } void UDPSender::set_source(const std::string &ip, uint16_t port) { sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(port); #if defined(_WIN32_WINNT) && _WIN32_WINNT < 0x600 addr.sin_addr.S_un.S_addr = inet_addr(ip.c_str()); #else inet_pton(AF_INET, ip.c_str(), &addr.sin_addr); #endif bind_socket(addr); } size_t UDPSender::send(const std::vector &data) { auto sent = sendto(sockfd_, reinterpret_cast(data.data()), static_cast(data.size()), 0, (struct sockaddr *) &dest_addr_, sizeof(dest_addr_)); if (sent == SOCKET_ERROR) { throw std::runtime_error("Send failed"); } return static_cast(sent); } size_t UDPSender::send(const std::string &data) { auto sent = sendto(sockfd_, reinterpret_cast(data.data()), static_cast(data.size()), 0, (struct sockaddr *) &dest_addr_, sizeof(dest_addr_)); if (sent == SOCKET_ERROR) { throw std::runtime_error("Send failed"); } return static_cast(sent); } size_t UDPSender::send(uint8_t *data, size_t size) { int nleft, nwritten; const char *ptr; ptr = (char *) data; /* can't do pointer arithmetic on void* */ nleft = size; while (nleft > 0) { #if defined(_WIN32) if ((nwritten = sendto(sockfd_, ptr, nleft, 0, (const struct sockaddr *) &dest_addr_, sizeof(dest_addr_))) <= 0) #else if ((nwritten = sendto(sockfd_, ptr, nleft, MSG_NOSIGNAL, (const struct sockaddr *) &dest_addr_, sizeof(struct sockaddr))) <= 0) #endif { if (!(errno == EWOULDBLOCK || errno == EAGAIN || errno == EINTR)) return -1; /* error */ else continue; } nleft -= nwritten; ptr += nwritten; } return (size); } UDPReceiver::UDPReceiver(uint16_t port, bool nonblock, size_t buffer_size) : buffer_(buffer_size), running_(false), srcPort(port) { create_socket(); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; bind_socket(addr); set_nonblock(nonblock); } void UDPReceiver::start_receiving(const UDPReceiver::ReceiveCallback &callback) { running_ = true; receiver_thread_ = std::thread([this, callback]() { while (running_.load(std::memory_order_relaxed)) { try { auto result = receive(); ssize_t size = result.first; auto &from = result.second; if (size > 0) { std::vector data(buffer_.begin(), buffer_.begin() + size); callback(data, from.first, from.second); } } catch (const std::runtime_error &e) { if (is_nonblock_) continue; throw; } } }); } void UDPReceiver::stop_receiving() { running_.store(false); if (receiver_thread_.joinable()) { receiver_thread_.join(); } } std::pair> UDPReceiver::receive() { sockaddr_in from_addr{}; socklen_t from_len = sizeof(from_addr); #ifdef _WIN32 DWORD timeout = UDPTimeout * 1000; // 3秒(单位:毫秒) setsockopt(sockfd_, SOL_SOCKET, SO_RCVTIMEO, (const char *) &timeout, sizeof(timeout)); #else struct timeval tv; tv.tv_sec = UDPTimeout; // 秒 tv.tv_usec = 0; // 微秒 setsockopt(sockfd_, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); #endif auto size = recvfrom(sockfd_, reinterpret_cast(buffer_.data()), static_cast(buffer_.size()), 0, (struct sockaddr *) &from_addr, &from_len); if (size == SOCKET_ERROR) { throw std::runtime_error("Receive Timeout"); } return {size, {from_addr, from_len}}; }