diff options
-rw-r--r-- | src/ReceiverThreads/ReceiverThreads.cpp | 97 | ||||
-rw-r--r-- | src/ReceiverThreads/ReceiverThreads.h | 5 | ||||
-rw-r--r-- | src/UDPServer/UDPServer.h | 5 |
3 files changed, 94 insertions, 13 deletions
diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp index e5c339b..e86fae5 100644 --- a/src/ReceiverThreads/ReceiverThreads.cpp +++ b/src/ReceiverThreads/ReceiverThreads.cpp @@ -7,13 +7,32 @@ * Author: Tobias Frust */ +#include <iostream> + #include "ReceiverThreads.h" #include "../UDPServer/UDPServer.h" -#include <boost/log/trivial.hpp> +#include <sys/epoll.h> +#include <vma_extra.h> + +//#define USE_VMA ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort) - : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} { + : timeIntervall_{timeIntervall}, firstPort_{firstPort}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0}, nbufs_(100) { + + int max_packet_size = 65535; + +#ifdef USE_VMA + vma_ = vma_get_api(); + // First call fails? + if (!vma_) vma_ = vma_get_api(); +#else + vma_ = NULL; +#endif + + ringbuf_.resize(nbufs_); + for (int i = 0; i < nbufs_; i++) + ringbuf_[i].resize(numberOfDetectorModules * max_packet_size); for(auto i = 0; i < numberOfDetectorModules; i++){ receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, firstPort+i); @@ -22,12 +41,14 @@ ReceiverThreads::ReceiverThreads(const std::string& address, const int timeInter for(auto i = 0; i < numberOfDetectorModules; i++){ receiverModules_[i].join(); } - } auto ReceiverThreads::receiverThread(const int port) -> void { + int vma_ring_fd; + int max_packets = 100; int max_packet_size = 65535; + int id = port - firstPort_; UDPServer server = UDPServer(address_, port); std::vector<std::vector<char>> buffers; @@ -55,29 +76,75 @@ auto ReceiverThreads::receiverThread(const int port) -> void { msg[i].msg_hdr.msg_iovlen = 1; } - - printf("Listening %d\n", port); - BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_; + if (vma_) { + vma_->get_socket_rings_fds(server.f_socket, &vma_ring_fd, 1); + if (vma_ring_fd < 0) throw "Can't get ring fds"; + } + + printf("ID %d listening %d\n", id, port); double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.; auto ts_last = std::chrono::high_resolution_clock::now(); + int rbuf = 0; while(true){ - int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_); + int packets; + struct vma_completion_t vma_comps[max_packets]; + struct vma_packet_desc_t vma_packs[max_packets]; + + if (vma_) { + // Seems crashes on ConnectX-3, requires later cards according to documentation (section 8.2) + packets = vma_->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0); + } else { + packets = server.mrecv(max_packets, msg, 1); //timeIntervall_); + } if (packets >= 0) { for (int i = 0; i < packets; i++) { - int bytes = msg[i].msg_len; - unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base); + int bytes; + unsigned short *buf; + char *ringptr = ringbuf_[rbuf++].data() + id * max_packet_size; + if (rbuf == nbufs_) rbuf = 0; + if (vma_) { + vma_packs[i] = vma_comps[i].packet; + + switch (vma_comps[i].events) { + case VMA_SOCKETXTREME_PACKET: + break; + case EPOLLERR: + case EPOLLRDHUP: + printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); + throw "Polling error"; + default: + printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); + throw "Polling error"; + } + + bytes = vma_comps[i].packet.total_len; + buf = reinterpret_cast<unsigned short*>(vma_comps[i].packet.buff_lst->payload); + + int n_bufs = vma_comps[i].packet.num_bufs; + struct vma_buff_t *vma_buf = vma_comps[i].packet.buff_lst; + + for (int j = 0; j < n_bufs; j++) { + memcpy(ringptr, vma_buf->payload, vma_buf->len); + ringptr += vma_buf->len; + vma_buf = vma_buf->next; + } + } else { + bytes = msg[i].msg_len; + buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base); + + memcpy(ringptr, buf, bytes); + } + rcv_packets++; rcv_size += bytes; -// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes."; std::size_t index =*((std::size_t *)buf); int diff = index - lastIndex - 1; if(diff > 0){ loss += diff; - BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; } /* if (port == 4000) { @@ -86,12 +153,16 @@ auto ReceiverThreads::receiverThread(const int port) -> void { lastIndex = index; } + + if (vma_) { + vma_->socketxtreme_free_vma_packets(vma_packs, packets); + } } auto ts = std::chrono::high_resolution_clock::now(); std::chrono::nanoseconds d = ts - ts_last; if (d.count() >= 1000000000) { - printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000); + printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms [VMA: %i]\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000, (vma_?1:0)); rcv_packets = 0; rcv_size = 0; rcv_index = lastIndex; @@ -100,7 +171,7 @@ auto ReceiverThreads::receiverThread(const int port) -> void { } } - BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)"; + std::cout << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)"; loss_ += loss; } diff --git a/src/ReceiverThreads/ReceiverThreads.h b/src/ReceiverThreads/ReceiverThreads.h index 094652a..9b90b40 100644 --- a/src/ReceiverThreads/ReceiverThreads.h +++ b/src/ReceiverThreads/ReceiverThreads.h @@ -21,11 +21,16 @@ public: private: auto receiverThread(const int port) -> void; + struct vma_api_t *vma_; + + int nbufs_; + std::vector<std::vector<char>> ringbuf_; std::vector<std::thread> receiverModules_; std::size_t loss_; int timeIntervall_; + int firstPort_; int numberOfDetectorModules_; std::string address_; diff --git a/src/UDPServer/UDPServer.h b/src/UDPServer/UDPServer.h index ed0e033..60e8f19 100644 --- a/src/UDPServer/UDPServer.h +++ b/src/UDPServer/UDPServer.h @@ -16,6 +16,9 @@ #include <stdexcept> #include <cstring> +#include "../ReceiverThreads/ReceiverThreads.h" + + class udp_client_server_runtime_error : public std::runtime_error { public: @@ -27,6 +30,8 @@ class UDPServer public: UDPServer(const std::string& addr, int port); ~UDPServer(); + + friend ReceiverThreads; int get_socket() const; int get_port() const; |