diff options
author | Suren A. Chilingaryan <csa@suren.me> | 2018-07-25 15:57:55 +0200 |
---|---|---|
committer | Suren A. Chilingaryan <csa@suren.me> | 2018-07-25 15:57:55 +0200 |
commit | 76affa8334acbd21f3a1186fdaace1efe93e2e31 (patch) | |
tree | 32065ef3866c4759f00f86e91cc01a6e24134269 /src/ReceiverThreads | |
parent | 41d41b7deb416167da5a0de2638ecea078c13ea6 (diff) | |
download | ods-76affa8334acbd21f3a1186fdaace1efe93e2e31.tar.gz ods-76affa8334acbd21f3a1186fdaace1efe93e2e31.tar.bz2 ods-76affa8334acbd21f3a1186fdaace1efe93e2e31.tar.xz ods-76affa8334acbd21f3a1186fdaace1efe93e2e31.zip |
Recieve multiple packets in one system call
Diffstat (limited to 'src/ReceiverThreads')
-rw-r--r-- | src/ReceiverThreads/ReceiverThreads.cpp | 88 |
1 files changed, 72 insertions, 16 deletions
diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp index 3d22c66..e60971e 100644 --- a/src/ReceiverThreads/ReceiverThreads.cpp +++ b/src/ReceiverThreads/ReceiverThreads.cpp @@ -26,24 +26,80 @@ ReceiverThreads::ReceiverThreads(const std::string& address, const int timeInter } auto ReceiverThreads::receiverThread(const int port) -> void { - UDPServer server = UDPServer(address_, port); - std::vector<unsigned short> buf(33000); - std::size_t lastIndex{0}; + int max_packets = 100; + int max_packet_size = 65535; + + UDPServer server = UDPServer(address_, port); + std::vector<std::vector<char>> buffers; + + std::size_t rcv_index = 0; + std::size_t rcv_packets = 0; + std::size_t rcv_size = 0; + + std::size_t lastIndex{0}; + std::size_t loss = 0; + + struct mmsghdr msg[max_packets]; + struct iovec msgvec[max_packets]; + + buffers.resize(max_packets); + + memset(msg, 0, sizeof(msg)); + memset(msgvec, 0, sizeof(msgvec)); + for (int i = 0; i < max_packets; i++) { + buffers[i].resize(max_packet_size); + + msgvec[i].iov_base = buffers[i].data(); + msgvec[i].iov_len = buffers[i].size(); + msg[i].msg_hdr.msg_iov = &msgvec[i]; + msg[i].msg_hdr.msg_iovlen = 1; + } + + BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_; + + double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.; + auto ts_last = std::chrono::high_resolution_clock::now(); while(true){ - int bytes = server.timed_recv((char*)buf.data(), 65536, timeIntervall_); - if(bytes < 0){ - break; - } - BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes."; - std::size_t index = *((std::size_t *)buf.data()); - int diff = index - lastIndex - 1; - if(diff > 0){ - loss_ += diff; - BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; - } - lastIndex = index; + int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_); + + if (packets >= 0) { + for (int i = 0; i < packets; i++) { + int bytes = msg[i].msg_len; + unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base); + + rcv_packets++; + rcv_size += bytes; + +// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes."; + std::size_t index =*((std::size_t *)buf); + int diff = index - lastIndex - 1; + if(diff > 0){ + loss += diff; + BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; + } + +/* if (port == 4000) { + printf("%i:%i:%i:%i,", index, diff, loss, i); + }*/ + + lastIndex = index; + } + } + + auto ts = std::chrono::high_resolution_clock::now(); + std::chrono::nanoseconds d = ts - ts_last; + if (d.count() >= 1000000000) { + printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000); + rcv_packets = 0; + rcv_size = 0; + rcv_index = lastIndex; + loss = 0; + ts_last = ts; + } } - BOOST_LOG_TRIVIAL(info) << "Lost " << loss_ << " from " << lastIndex << " packets; (" << loss_/(double)lastIndex*100.0 << "%)"; + + BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)"; + loss_ += loss; } |