Files
Lixinzhe_lib/Socket/UDP.cpp
2025-06-16 08:52:40 +08:00

208 lines
5.1 KiB
C++

#include "UDP.h"
#include <memory>
#include <mutex>
void UDPSocketBase::create_socket() {
while (true) {
sockfd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
#if defined(_WIN32)
if (sockfd_ != INVALID_SOCKET_VALUE) {
break;
}
#else
if (sockfd_ >= 0) {
break;
}
#endif
}
}
void UDPSocketBase::create_socket(socket_t socket) {
sockfd_ = socket;
}
void UDPSocketBase::set_nonblock(bool enable) {
u_long ctl = enable ? 1 : 0;
#if defined(_WIN32)
ioctlsocket(sockfd_, FIONBIO, &ctl) == 0;
#else
ioctl(sockfd_, FIONBIO, &ctl) == 0;
#endif
is_nonblock_ = enable;
}
void UDPSocketBase::bind_socket(const sockaddr_in &addr) const {
if (bind(sockfd_, (struct sockaddr *) &addr, sizeof(addr)) != 0) {
throw std::runtime_error("Bind failed");
}
}
UDPSocketBase::~UDPSocketBase() {
if (sockfd_ != INVALID_SOCKET_VALUE) {
#ifdef _WIN32
closesocket(sockfd_);
#else
close(sockfd_);
#endif
}
}
UDPSender::UDPSender(socket_t socket, bool nonblock) {
if (socket != INVALID_SOCKET_VALUE) {
create_socket(socket);
}
else {
create_socket();
}
set_nonblock(nonblock);
}
void UDPSender::set_destination(const std::string &ip, uint16_t port) {
dstIp = ip;
dstPort = port;
dest_addr_.sin_family = AF_INET;
dest_addr_.sin_port = htons(port);
#if defined(_WIN32_WINNT) && _WIN32_WINNT < 0x600
dest_addr_.sin_addr.S_un.S_addr = inet_addr(ip.c_str());
#else
inet_pton(AF_INET, ip.c_str(), &dest_addr_.sin_addr);
#endif
}
void UDPSender::set_source(const std::string &ip, uint16_t port) {
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
#if defined(_WIN32_WINNT) && _WIN32_WINNT < 0x600
addr.sin_addr.S_un.S_addr = inet_addr(ip.c_str());
#else
inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
#endif
bind_socket(addr);
}
size_t UDPSender::send(const std::vector<uint8_t> &data) {
auto sent = sendto(sockfd_,
reinterpret_cast<const char *>(data.data()),
static_cast<int>(data.size()),
0,
(struct sockaddr *) &dest_addr_,
sizeof(dest_addr_));
if (sent == SOCKET_ERROR) {
throw std::runtime_error("Send failed");
}
return static_cast<size_t>(sent);
}
size_t UDPSender::send(const std::string &data) {
auto sent = sendto(sockfd_,
reinterpret_cast<const char *>(data.data()),
static_cast<int>(data.size()),
0,
(struct sockaddr *) &dest_addr_,
sizeof(dest_addr_));
if (sent == SOCKET_ERROR) {
throw std::runtime_error("Send failed");
}
return static_cast<size_t>(sent);
}
size_t UDPSender::send(uint8_t *data, size_t size) {
int nleft, nwritten;
const char *ptr;
ptr = (char *) data; /* can't do pointer arithmetic on void* */
nleft = size;
while (nleft > 0) {
#if defined(_WIN32)
if ((nwritten = sendto(sockfd_, ptr, nleft, 0, (const struct sockaddr *) &dest_addr_, sizeof(dest_addr_))) <= 0)
#else
if ((nwritten = sendto(sockfd_, ptr, nleft, MSG_NOSIGNAL, (const struct sockaddr *) &dest_addr_,
sizeof(struct sockaddr))) <= 0)
#endif
{
if (!(errno == EWOULDBLOCK || errno == EAGAIN || errno == EINTR))
return -1; /* error */
else
continue;
}
nleft -= nwritten;
ptr += nwritten;
}
return (size);
}
UDPReceiver::UDPReceiver(uint16_t port, bool nonblock, size_t buffer_size)
: buffer_(buffer_size), running_(false), srcPort(port) {
create_socket();
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
bind_socket(addr);
set_nonblock(nonblock);
}
void UDPReceiver::start_receiving(const UDPReceiver::ReceiveCallback &callback) {
running_ = true;
receiver_thread_ = std::thread([this, callback]() {
while (running_.load(std::memory_order_relaxed)) {
try {
auto result = receive();
ssize_t size = result.first;
auto &from = result.second;
if (size > 0) {
std::vector<uint8_t> data(buffer_.begin(),
buffer_.begin() + size);
callback(data, from.first, from.second);
}
}
catch (const std::runtime_error &e) {
if (is_nonblock_) continue;
throw;
}
}
});
}
void UDPReceiver::stop_receiving() {
running_.store(false);
if (receiver_thread_.joinable()) {
receiver_thread_.join();
}
}
std::pair<ssize_t, std::pair<sockaddr_in, socklen_t>> UDPReceiver::receive() {
sockaddr_in from_addr{};
socklen_t from_len = sizeof(from_addr);
#ifdef _WIN32
DWORD timeout = UDPTimeout * 1000; // 3秒(单位:毫秒)
setsockopt(sockfd_, SOL_SOCKET, SO_RCVTIMEO, (const char *) &timeout, sizeof(timeout));
#else
struct timeval tv;
tv.tv_sec = UDPTimeout; // 秒
tv.tv_usec = 0; // 微秒
setsockopt(sockfd_, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
#endif
auto size = recvfrom(sockfd_,
reinterpret_cast<char *>(buffer_.data()),
static_cast<int>(buffer_.size()),
0,
(struct sockaddr *) &from_addr,
&from_len);
if (size == SOCKET_ERROR) {
throw std::runtime_error("Receive Timeout");
}
return {size, {from_addr, from_len}};
}