diff options
Diffstat (limited to 'src/ReceiverThreads.vma/ReceiverThreads.cpp')
-rw-r--r-- | src/ReceiverThreads.vma/ReceiverThreads.cpp | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/src/ReceiverThreads.vma/ReceiverThreads.cpp b/src/ReceiverThreads.vma/ReceiverThreads.cpp new file mode 100644 index 0000000..650a840 --- /dev/null +++ b/src/ReceiverThreads.vma/ReceiverThreads.cpp @@ -0,0 +1,166 @@ +/* + * Copyright 2016 + * + * ReceiverThreads.cpp + * + * Created on: 21.07.2016 + * Author: Tobias Frust + */ + +#include "ReceiverThreads.h" +#include "../UDPServer/UDPServer.h" + +#include <boost/log/trivial.hpp> + +ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules) + : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} { + + for(auto i = 0; i < numberOfDetectorModules; i++){ + receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i); + } + + for(auto i = 0; i < numberOfDetectorModules; i++){ + receiverModules_[i].join(); + } + +} + +auto ReceiverThreads::receiverThread(const int port) -> void { + int max_packets = 100; + int max_packet_size = 65535; + + + struct vma_ap_t *vma = vma_get_api(); + if (!vma) throw "Can't get LibVMA API" + + struct vma_packet_desc_t vma_packet; + struct vma_buff_t *vma_buf; + int vma_buf_offset; + + int vma_ring_fd; + vma->get_socket_rings_fds(fd?, &vma_ring_fd, 1); + if (vma_ring_fd < 0) throw "Can't get ring fds"; + + while (true) { + // free vma packets + + + struct vma_completion_t vma_comps[max_packets]; + int n_packets = vma->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0); + if (!n_packets) continue; + + for (int i = 0; i < n_packets; i++) { + switch (vma_comps[i].events) { + case VMA_SOCKETXTREME_PACKET: + break; + case EPOLLERR: + case EPOLLRDHUP: + printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); + throw "Polling error"; + default: + printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); + throw "Polling error"; + } + + int packet_len = vma_comps[i].packet.total_len; + int n_buffs = vma_comps[i].packet.num_bufs; + + vma_comps.packet.buff_lst + + vma_comps[i].packet.buff_lst->payload +/* +<------><------><------><------><------>conn = conns_in[vma_comps.user_data] vma_comps.packet.total_len; +<------><------><------><------><------>conn->vma_packet.num_bufs = vma_comps.packet.num_bufs; +<------><------><------><------><------>conn->vma_packet.total_len = vma_comps.packet.total_len; +<------><------><------><------><------>conn->vma_packet.buff_lst = vma_comps.packet.buff_lst; +<------><------><------><------><------>conn->vma_buf = conn->vma_packet.buff_lst; +<------><------><------><------><------>conn->vma_buf_offset = 0; +*/ +/* + ret = _min((_config.msg_size - conn->msg_len), (conn->vma_buf->len - conn->vma_buf_offset)); + memcpy(((uint8_t *)msg_hdr) + conn->msg_len, + ((uint8_t *)conn->vma_buf->payload) + conn->vma_buf_offset, + ret); +*/ + _vma_api->socketxtreme_free_vma_packets(&conn->vma_packet, 1); + } + + +auto ReceiverThreads::receiverThread(const int port) -> void { + int max_packets = 100; + int max_packet_size = 65535; + + UDPServer server = UDPServer(address_, port); + std::vector<std::vector<char>> buffers; + + std::size_t rcv_index = 0; + std::size_t rcv_packets = 0; + std::size_t rcv_size = 0; + + std::size_t lastIndex{0}; + std::size_t loss = 0; + + struct mmsghdr msg[max_packets]; + struct iovec msgvec[max_packets]; + + buffers.resize(max_packets); + + memset(msg, 0, sizeof(msg)); + memset(msgvec, 0, sizeof(msgvec)); + for (int i = 0; i < max_packets; i++) { + buffers[i].resize(max_packet_size); + + msgvec[i].iov_base = buffers[i].data(); + msgvec[i].iov_len = buffers[i].size(); + msg[i].msg_hdr.msg_iov = &msgvec[i]; + msg[i].msg_hdr.msg_iovlen = 1; + } + + + BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_; + + double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.; + auto ts_last = std::chrono::high_resolution_clock::now(); + while(true){ + int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_); + + if (packets >= 0) { + for (int i = 0; i < packets; i++) { + int bytes = msg[i].msg_len; + unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base); + + rcv_packets++; + rcv_size += bytes; + +// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes."; + std::size_t index =*((std::size_t *)buf); + int diff = index - lastIndex - 1; + if(diff > 0){ + loss += diff; + BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; + } + +/* if (port == 4000) { + printf("%i:%i:%i:%i,", index, diff, loss, i); + }*/ + + lastIndex = index; + } + } + + auto ts = std::chrono::high_resolution_clock::now(); + std::chrono::nanoseconds d = ts - ts_last; + if (d.count() >= 1000000000) { + printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000); + rcv_packets = 0; + rcv_size = 0; + rcv_index = lastIndex; + loss = 0; + ts_last = ts; + } + } + + BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)"; + loss_ += loss; +} + |