1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
/*
* Copyright 2016
*
* ReceiverThreads.cpp
*
* Created on: 21.07.2016
* Author: Tobias Frust
*/
#include "ReceiverThreads.h"
#include "../UDPServer/UDPServer.h"
#include <boost/log/trivial.hpp>
ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules)
: timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} {
for(auto i = 0; i < numberOfDetectorModules; i++){
receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i);
}
for(auto i = 0; i < numberOfDetectorModules; i++){
receiverModules_[i].join();
}
}
auto ReceiverThreads::receiverThread(const int port) -> void {
int max_packets = 100;
int max_packet_size = 65535;
UDPServer server = UDPServer(address_, port);
std::vector<std::vector<char>> buffers;
std::size_t rcv_index = 0;
std::size_t rcv_packets = 0;
std::size_t rcv_size = 0;
std::size_t lastIndex{0};
std::size_t loss = 0;
struct mmsghdr msg[max_packets];
struct iovec msgvec[max_packets];
buffers.resize(max_packets);
memset(msg, 0, sizeof(msg));
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < max_packets; i++) {
buffers[i].resize(max_packet_size);
msgvec[i].iov_base = buffers[i].data();
msgvec[i].iov_len = buffers[i].size();
msg[i].msg_hdr.msg_iov = &msgvec[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_;
double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.;
auto ts_last = std::chrono::high_resolution_clock::now();
while(true){
int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_);
if (packets >= 0) {
for (int i = 0; i < packets; i++) {
int bytes = msg[i].msg_len;
unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base);
rcv_packets++;
rcv_size += bytes;
// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes.";
std::size_t index =*((std::size_t *)buf);
int diff = index - lastIndex - 1;
if(diff > 0){
loss += diff;
BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex;
}
/* if (port == 4000) {
printf("%i:%i:%i:%i,", index, diff, loss, i);
}*/
lastIndex = index;
}
}
auto ts = std::chrono::high_resolution_clock::now();
std::chrono::nanoseconds d = ts - ts_last;
if (d.count() >= 1000000000) {
printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000);
rcv_packets = 0;
rcv_size = 0;
rcv_index = lastIndex;
loss = 0;
ts_last = ts;
}
}
BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)";
loss_ += loss;
}
|