summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/ReceiverThreads/ReceiverThreads.cpp97
-rw-r--r--src/ReceiverThreads/ReceiverThreads.h5
-rw-r--r--src/UDPServer/UDPServer.h5
3 files changed, 94 insertions, 13 deletions
diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp
index e5c339b..e86fae5 100644
--- a/src/ReceiverThreads/ReceiverThreads.cpp
+++ b/src/ReceiverThreads/ReceiverThreads.cpp
@@ -7,13 +7,32 @@
* Author: Tobias Frust
*/
+#include <iostream>
+
#include "ReceiverThreads.h"
#include "../UDPServer/UDPServer.h"
-#include <boost/log/trivial.hpp>
+#include <sys/epoll.h>
+#include <vma_extra.h>
+
+//#define USE_VMA
ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort)
- : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} {
+ : timeIntervall_{timeIntervall}, firstPort_{firstPort}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0}, nbufs_(100) {
+
+ int max_packet_size = 65535;
+
+#ifdef USE_VMA
+ vma_ = vma_get_api();
+ // First call fails?
+ if (!vma_) vma_ = vma_get_api();
+#else
+ vma_ = NULL;
+#endif
+
+ ringbuf_.resize(nbufs_);
+ for (int i = 0; i < nbufs_; i++)
+ ringbuf_[i].resize(numberOfDetectorModules * max_packet_size);
for(auto i = 0; i < numberOfDetectorModules; i++){
receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, firstPort+i);
@@ -22,12 +41,14 @@ ReceiverThreads::ReceiverThreads(const std::string& address, const int timeInter
for(auto i = 0; i < numberOfDetectorModules; i++){
receiverModules_[i].join();
}
-
}
auto ReceiverThreads::receiverThread(const int port) -> void {
+ int vma_ring_fd;
+
int max_packets = 100;
int max_packet_size = 65535;
+ int id = port - firstPort_;
UDPServer server = UDPServer(address_, port);
std::vector<std::vector<char>> buffers;
@@ -55,29 +76,75 @@ auto ReceiverThreads::receiverThread(const int port) -> void {
msg[i].msg_hdr.msg_iovlen = 1;
}
-
- printf("Listening %d\n", port);
- BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_;
+ if (vma_) {
+ vma_->get_socket_rings_fds(server.f_socket, &vma_ring_fd, 1);
+ if (vma_ring_fd < 0) throw "Can't get ring fds";
+ }
+
+ printf("ID %d listening %d\n", id, port);
double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.;
auto ts_last = std::chrono::high_resolution_clock::now();
+ int rbuf = 0;
while(true){
- int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_);
+ int packets;
+ struct vma_completion_t vma_comps[max_packets];
+ struct vma_packet_desc_t vma_packs[max_packets];
+
+ if (vma_) {
+ // Seems crashes on ConnectX-3, requires later cards according to documentation (section 8.2)
+ packets = vma_->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0);
+ } else {
+ packets = server.mrecv(max_packets, msg, 1); //timeIntervall_);
+ }
if (packets >= 0) {
for (int i = 0; i < packets; i++) {
- int bytes = msg[i].msg_len;
- unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base);
+ int bytes;
+ unsigned short *buf;
+ char *ringptr = ringbuf_[rbuf++].data() + id * max_packet_size;
+ if (rbuf == nbufs_) rbuf = 0;
+ if (vma_) {
+ vma_packs[i] = vma_comps[i].packet;
+
+ switch (vma_comps[i].events) {
+ case VMA_SOCKETXTREME_PACKET:
+ break;
+ case EPOLLERR:
+ case EPOLLRDHUP:
+ printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data);
+ throw "Polling error";
+ default:
+ printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data);
+ throw "Polling error";
+ }
+
+ bytes = vma_comps[i].packet.total_len;
+ buf = reinterpret_cast<unsigned short*>(vma_comps[i].packet.buff_lst->payload);
+
+ int n_bufs = vma_comps[i].packet.num_bufs;
+ struct vma_buff_t *vma_buf = vma_comps[i].packet.buff_lst;
+
+ for (int j = 0; j < n_bufs; j++) {
+ memcpy(ringptr, vma_buf->payload, vma_buf->len);
+ ringptr += vma_buf->len;
+ vma_buf = vma_buf->next;
+ }
+ } else {
+ bytes = msg[i].msg_len;
+ buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base);
+
+ memcpy(ringptr, buf, bytes);
+ }
+
rcv_packets++;
rcv_size += bytes;
-// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes.";
std::size_t index =*((std::size_t *)buf);
int diff = index - lastIndex - 1;
if(diff > 0){
loss += diff;
- BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex;
}
/* if (port == 4000) {
@@ -86,12 +153,16 @@ auto ReceiverThreads::receiverThread(const int port) -> void {
lastIndex = index;
}
+
+ if (vma_) {
+ vma_->socketxtreme_free_vma_packets(vma_packs, packets);
+ }
}
auto ts = std::chrono::high_resolution_clock::now();
std::chrono::nanoseconds d = ts - ts_last;
if (d.count() >= 1000000000) {
- printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000);
+ printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms [VMA: %i]\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000, (vma_?1:0));
rcv_packets = 0;
rcv_size = 0;
rcv_index = lastIndex;
@@ -100,7 +171,7 @@ auto ReceiverThreads::receiverThread(const int port) -> void {
}
}
- BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)";
+ std::cout << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)";
loss_ += loss;
}
diff --git a/src/ReceiverThreads/ReceiverThreads.h b/src/ReceiverThreads/ReceiverThreads.h
index 094652a..9b90b40 100644
--- a/src/ReceiverThreads/ReceiverThreads.h
+++ b/src/ReceiverThreads/ReceiverThreads.h
@@ -21,11 +21,16 @@ public:
private:
auto receiverThread(const int port) -> void;
+ struct vma_api_t *vma_;
+
+ int nbufs_;
+ std::vector<std::vector<char>> ringbuf_;
std::vector<std::thread> receiverModules_;
std::size_t loss_;
int timeIntervall_;
+ int firstPort_;
int numberOfDetectorModules_;
std::string address_;
diff --git a/src/UDPServer/UDPServer.h b/src/UDPServer/UDPServer.h
index ed0e033..60e8f19 100644
--- a/src/UDPServer/UDPServer.h
+++ b/src/UDPServer/UDPServer.h
@@ -16,6 +16,9 @@
#include <stdexcept>
#include <cstring>
+#include "../ReceiverThreads/ReceiverThreads.h"
+
+
class udp_client_server_runtime_error : public std::runtime_error
{
public:
@@ -27,6 +30,8 @@ class UDPServer
public:
UDPServer(const std::string& addr, int port);
~UDPServer();
+
+ friend ReceiverThreads;
int get_socket() const;
int get_port() const;