diff options
Diffstat (limited to 'src/ReceiverThreads.vma')
-rw-r--r-- | src/ReceiverThreads.vma/ReceiverThreads.cpp | 166 | ||||
-rw-r--r-- | src/ReceiverThreads.vma/ReceiverThreads.h | 34 |
2 files changed, 0 insertions, 200 deletions
diff --git a/src/ReceiverThreads.vma/ReceiverThreads.cpp b/src/ReceiverThreads.vma/ReceiverThreads.cpp deleted file mode 100644 index 650a840..0000000 --- a/src/ReceiverThreads.vma/ReceiverThreads.cpp +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright 2016 - * - * ReceiverThreads.cpp - * - * Created on: 21.07.2016 - * Author: Tobias Frust - */ - -#include "ReceiverThreads.h" -#include "../UDPServer/UDPServer.h" - -#include <boost/log/trivial.hpp> - -ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules) - : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} { - - for(auto i = 0; i < numberOfDetectorModules; i++){ - receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i); - } - - for(auto i = 0; i < numberOfDetectorModules; i++){ - receiverModules_[i].join(); - } - -} - -auto ReceiverThreads::receiverThread(const int port) -> void { - int max_packets = 100; - int max_packet_size = 65535; - - - struct vma_ap_t *vma = vma_get_api(); - if (!vma) throw "Can't get LibVMA API" - - struct vma_packet_desc_t vma_packet; - struct vma_buff_t *vma_buf; - int vma_buf_offset; - - int vma_ring_fd; - vma->get_socket_rings_fds(fd?, &vma_ring_fd, 1); - if (vma_ring_fd < 0) throw "Can't get ring fds"; - - while (true) { - // free vma packets - - - struct vma_completion_t vma_comps[max_packets]; - int n_packets = vma->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0); - if (!n_packets) continue; - - for (int i = 0; i < n_packets; i++) { - switch (vma_comps[i].events) { - case VMA_SOCKETXTREME_PACKET: - break; - case EPOLLERR: - case EPOLLRDHUP: - printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); - throw "Polling error"; - default: - printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); - throw "Polling error"; - } - - int packet_len = vma_comps[i].packet.total_len; - int n_buffs = vma_comps[i].packet.num_bufs; - - vma_comps.packet.buff_lst - - vma_comps[i].packet.buff_lst->payload -/* -<------><------><------><------><------>conn = conns_in[vma_comps.user_data] vma_comps.packet.total_len; -<------><------><------><------><------>conn->vma_packet.num_bufs = vma_comps.packet.num_bufs; -<------><------><------><------><------>conn->vma_packet.total_len = vma_comps.packet.total_len; -<------><------><------><------><------>conn->vma_packet.buff_lst = vma_comps.packet.buff_lst; -<------><------><------><------><------>conn->vma_buf = conn->vma_packet.buff_lst; -<------><------><------><------><------>conn->vma_buf_offset = 0; -*/ -/* - ret = _min((_config.msg_size - conn->msg_len), (conn->vma_buf->len - conn->vma_buf_offset)); - memcpy(((uint8_t *)msg_hdr) + conn->msg_len, - ((uint8_t *)conn->vma_buf->payload) + conn->vma_buf_offset, - ret); -*/ - _vma_api->socketxtreme_free_vma_packets(&conn->vma_packet, 1); - } - - -auto ReceiverThreads::receiverThread(const int port) -> void { - int max_packets = 100; - int max_packet_size = 65535; - - UDPServer server = UDPServer(address_, port); - std::vector<std::vector<char>> buffers; - - std::size_t rcv_index = 0; - std::size_t rcv_packets = 0; - std::size_t rcv_size = 0; - - std::size_t lastIndex{0}; - std::size_t loss = 0; - - struct mmsghdr msg[max_packets]; - struct iovec msgvec[max_packets]; - - buffers.resize(max_packets); - - memset(msg, 0, sizeof(msg)); - memset(msgvec, 0, sizeof(msgvec)); - for (int i = 0; i < max_packets; i++) { - buffers[i].resize(max_packet_size); - - msgvec[i].iov_base = buffers[i].data(); - msgvec[i].iov_len = buffers[i].size(); - msg[i].msg_hdr.msg_iov = &msgvec[i]; - msg[i].msg_hdr.msg_iovlen = 1; - } - - - BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_; - - double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.; - auto ts_last = std::chrono::high_resolution_clock::now(); - while(true){ - int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_); - - if (packets >= 0) { - for (int i = 0; i < packets; i++) { - int bytes = msg[i].msg_len; - unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base); - - rcv_packets++; - rcv_size += bytes; - -// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes."; - std::size_t index =*((std::size_t *)buf); - int diff = index - lastIndex - 1; - if(diff > 0){ - loss += diff; - BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; - } - -/* if (port == 4000) { - printf("%i:%i:%i:%i,", index, diff, loss, i); - }*/ - - lastIndex = index; - } - } - - auto ts = std::chrono::high_resolution_clock::now(); - std::chrono::nanoseconds d = ts - ts_last; - if (d.count() >= 1000000000) { - printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000); - rcv_packets = 0; - rcv_size = 0; - rcv_index = lastIndex; - loss = 0; - ts_last = ts; - } - } - - BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)"; - loss_ += loss; -} - diff --git a/src/ReceiverThreads.vma/ReceiverThreads.h b/src/ReceiverThreads.vma/ReceiverThreads.h deleted file mode 100644 index 3cc986c..0000000 --- a/src/ReceiverThreads.vma/ReceiverThreads.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2016 - * - * ReceiverThreads.h - * - * Created on: 21.07.2016 - * Author: Tobias Frust - */ - -#ifndef RECEIVERTHREADS_H_ -#define RECEIVERTHREADS_H_ - -#include <vector> -#include <thread> - -class ReceiverThreads { -public: - ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules); - - auto run() -> void; -private: - auto receiverThread(const int port) -> void; - - std::vector<std::thread> receiverModules_; - - std::size_t loss_; - - int timeIntervall_; - int numberOfDetectorModules_; - - std::string address_; -}; - -#endif /* RECEIVERTHREADS_H_ */ |