summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2018-08-06 20:20:46 +0200
committerSuren A. Chilingaryan <csa@suren.me>2018-08-06 20:20:46 +0200
commit254f1dc9e629e9de818672174a6614c6595fb11a (patch)
tree39786da7f579f3098c5a79fe9c129dae2fd3ef5c /src
parent7ae5d89c48bd3982626afe03891eab2e2d31e746 (diff)
downloadods-254f1dc9e629e9de818672174a6614c6595fb11a.tar.gz
ods-254f1dc9e629e9de818672174a6614c6595fb11a.tar.bz2
ods-254f1dc9e629e9de818672174a6614c6595fb11a.tar.xz
ods-254f1dc9e629e9de818672174a6614c6595fb11a.zip
Support arbitrary ports and port-range splitting
Diffstat (limited to 'src')
-rw-r--r--src/Detector/Detector.cpp6
-rw-r--r--src/Detector/Detector.h2
-rw-r--r--src/DetectorModule/DetectorModule.cpp6
-rw-r--r--src/DetectorModule/DetectorModule.h2
-rw-r--r--src/ReceiverThreads.vma/ReceiverThreads.cpp166
-rw-r--r--src/ReceiverThreads.vma/ReceiverThreads.h34
-rw-r--r--src/ReceiverThreads/ReceiverThreads.cpp5
-rw-r--r--src/ReceiverThreads/ReceiverThreads.h2
-rw-r--r--src/main_client.cpp15
-rw-r--r--src/main_server.cpp14
10 files changed, 230 insertions, 22 deletions
diff --git a/src/Detector/Detector.cpp b/src/Detector/Detector.cpp
index 5dde6d1..43402ee 100644
--- a/src/Detector/Detector.cpp
+++ b/src/Detector/Detector.cpp
@@ -10,12 +10,12 @@
#include "Detector.h"
-Detector::Detector(const std::string& address, const std::string& configPath, const unsigned int timeIntervall) :
+Detector::Detector(const std::string& address, const std::string& configPath, const int firstPort, const int numPorts, const unsigned int timeIntervall) :
timeIntervall_{timeIntervall},
- numberOfDetectorModules_{27} {
+ numberOfDetectorModules_{numPorts} {
modules_.reserve(numberOfDetectorModules_);
for(auto i = 0; i < numberOfDetectorModules_; i++){
- modules_.emplace_back(i, address, configPath);
+ modules_.emplace_back(i, address, firstPort + i, configPath);
}
}
diff --git a/src/Detector/Detector.h b/src/Detector/Detector.h
index 1969dbd..4295e63 100644
--- a/src/Detector/Detector.h
+++ b/src/Detector/Detector.h
@@ -17,7 +17,7 @@
class Detector {
public:
- Detector(const std::string& address, const std::string& configPath, const unsigned int timeIntervall);
+ Detector(const std::string& address, const std::string& configPath, const int firstPort, const int numPorts, const unsigned int timeIntervall);
auto run() -> void;
private:
diff --git a/src/DetectorModule/DetectorModule.cpp b/src/DetectorModule/DetectorModule.cpp
index e7e0272..6767eb9 100644
--- a/src/DetectorModule/DetectorModule.cpp
+++ b/src/DetectorModule/DetectorModule.cpp
@@ -40,15 +40,13 @@ void timer_start(std::function<void(int)> func, unsigned int interval, unsigned
}).detach();
}
-DetectorModule::DetectorModule(const int detectorID, const std::string& address, const std::string& configPath) :
+DetectorModule::DetectorModule(const int detectorID, const std::string& address, const int port, const std::string& configPath) :
detectorID_{detectorID},
numberOfDetectorsPerModule_{16},
index_{0u},
- client_{address, detectorID+4000},
+ client_{address, port},
max_packets_{1000u}{
- printf("Creating %d\n", detectorID);
-
if (readConfig(configPath)) {
throw std::runtime_error("DetectorModule: Configuration file could not be loaded successfully. Please check!");
}
diff --git a/src/DetectorModule/DetectorModule.h b/src/DetectorModule/DetectorModule.h
index f959857..fe25727 100644
--- a/src/DetectorModule/DetectorModule.h
+++ b/src/DetectorModule/DetectorModule.h
@@ -21,7 +21,7 @@
class DetectorModule {
public:
- DetectorModule(const int detectorID, const std::string& address, const std::string& configPath);
+ DetectorModule(const int detectorID, const std::string& address, const int port, const std::string& configPath);
auto sendPeriodically(unsigned int timeIntervall) -> void;
diff --git a/src/ReceiverThreads.vma/ReceiverThreads.cpp b/src/ReceiverThreads.vma/ReceiverThreads.cpp
new file mode 100644
index 0000000..650a840
--- /dev/null
+++ b/src/ReceiverThreads.vma/ReceiverThreads.cpp
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2016
+ *
+ * ReceiverThreads.cpp
+ *
+ * Created on: 21.07.2016
+ * Author: Tobias Frust
+ */
+
+#include "ReceiverThreads.h"
+#include "../UDPServer/UDPServer.h"
+
+#include <boost/log/trivial.hpp>
+
+ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules)
+ : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} {
+
+ for(auto i = 0; i < numberOfDetectorModules; i++){
+ receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i);
+ }
+
+ for(auto i = 0; i < numberOfDetectorModules; i++){
+ receiverModules_[i].join();
+ }
+
+}
+
+auto ReceiverThreads::receiverThread(const int port) -> void {
+ int max_packets = 100;
+ int max_packet_size = 65535;
+
+
+ struct vma_ap_t *vma = vma_get_api();
+ if (!vma) throw "Can't get LibVMA API"
+
+ struct vma_packet_desc_t vma_packet;
+ struct vma_buff_t *vma_buf;
+ int vma_buf_offset;
+
+ int vma_ring_fd;
+ vma->get_socket_rings_fds(fd?, &vma_ring_fd, 1);
+ if (vma_ring_fd < 0) throw "Can't get ring fds";
+
+ while (true) {
+ // free vma packets
+
+
+ struct vma_completion_t vma_comps[max_packets];
+ int n_packets = vma->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0);
+ if (!n_packets) continue;
+
+ for (int i = 0; i < n_packets; i++) {
+ switch (vma_comps[i].events) {
+ case VMA_SOCKETXTREME_PACKET:
+ break;
+ case EPOLLERR:
+ case EPOLLRDHUP:
+ printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data);
+ throw "Polling error";
+ default:
+ printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data);
+ throw "Polling error";
+ }
+
+ int packet_len = vma_comps[i].packet.total_len;
+ int n_buffs = vma_comps[i].packet.num_bufs;
+
+ vma_comps.packet.buff_lst
+
+ vma_comps[i].packet.buff_lst->payload
+/*
+<------><------><------><------><------>conn = conns_in[vma_comps.user_data] vma_comps.packet.total_len;
+<------><------><------><------><------>conn->vma_packet.num_bufs = vma_comps.packet.num_bufs;
+<------><------><------><------><------>conn->vma_packet.total_len = vma_comps.packet.total_len;
+<------><------><------><------><------>conn->vma_packet.buff_lst = vma_comps.packet.buff_lst;
+<------><------><------><------><------>conn->vma_buf = conn->vma_packet.buff_lst;
+<------><------><------><------><------>conn->vma_buf_offset = 0;
+*/
+/*
+ ret = _min((_config.msg_size - conn->msg_len), (conn->vma_buf->len - conn->vma_buf_offset));
+ memcpy(((uint8_t *)msg_hdr) + conn->msg_len,
+ ((uint8_t *)conn->vma_buf->payload) + conn->vma_buf_offset,
+ ret);
+*/
+ _vma_api->socketxtreme_free_vma_packets(&conn->vma_packet, 1);
+ }
+
+
+auto ReceiverThreads::receiverThread(const int port) -> void {
+ int max_packets = 100;
+ int max_packet_size = 65535;
+
+ UDPServer server = UDPServer(address_, port);
+ std::vector<std::vector<char>> buffers;
+
+ std::size_t rcv_index = 0;
+ std::size_t rcv_packets = 0;
+ std::size_t rcv_size = 0;
+
+ std::size_t lastIndex{0};
+ std::size_t loss = 0;
+
+ struct mmsghdr msg[max_packets];
+ struct iovec msgvec[max_packets];
+
+ buffers.resize(max_packets);
+
+ memset(msg, 0, sizeof(msg));
+ memset(msgvec, 0, sizeof(msgvec));
+ for (int i = 0; i < max_packets; i++) {
+ buffers[i].resize(max_packet_size);
+
+ msgvec[i].iov_base = buffers[i].data();
+ msgvec[i].iov_len = buffers[i].size();
+ msg[i].msg_hdr.msg_iov = &msgvec[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+ }
+
+
+ BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_;
+
+ double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.;
+ auto ts_last = std::chrono::high_resolution_clock::now();
+ while(true){
+ int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_);
+
+ if (packets >= 0) {
+ for (int i = 0; i < packets; i++) {
+ int bytes = msg[i].msg_len;
+ unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base);
+
+ rcv_packets++;
+ rcv_size += bytes;
+
+// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes.";
+ std::size_t index =*((std::size_t *)buf);
+ int diff = index - lastIndex - 1;
+ if(diff > 0){
+ loss += diff;
+ BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex;
+ }
+
+/* if (port == 4000) {
+ printf("%i:%i:%i:%i,", index, diff, loss, i);
+ }*/
+
+ lastIndex = index;
+ }
+ }
+
+ auto ts = std::chrono::high_resolution_clock::now();
+ std::chrono::nanoseconds d = ts - ts_last;
+ if (d.count() >= 1000000000) {
+ printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000);
+ rcv_packets = 0;
+ rcv_size = 0;
+ rcv_index = lastIndex;
+ loss = 0;
+ ts_last = ts;
+ }
+ }
+
+ BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)";
+ loss_ += loss;
+}
+
diff --git a/src/ReceiverThreads.vma/ReceiverThreads.h b/src/ReceiverThreads.vma/ReceiverThreads.h
new file mode 100644
index 0000000..3cc986c
--- /dev/null
+++ b/src/ReceiverThreads.vma/ReceiverThreads.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2016
+ *
+ * ReceiverThreads.h
+ *
+ * Created on: 21.07.2016
+ * Author: Tobias Frust
+ */
+
+#ifndef RECEIVERTHREADS_H_
+#define RECEIVERTHREADS_H_
+
+#include <vector>
+#include <thread>
+
+class ReceiverThreads {
+public:
+ ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules);
+
+ auto run() -> void;
+private:
+ auto receiverThread(const int port) -> void;
+
+ std::vector<std::thread> receiverModules_;
+
+ std::size_t loss_;
+
+ int timeIntervall_;
+ int numberOfDetectorModules_;
+
+ std::string address_;
+};
+
+#endif /* RECEIVERTHREADS_H_ */
diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp
index e60971e..e5c339b 100644
--- a/src/ReceiverThreads/ReceiverThreads.cpp
+++ b/src/ReceiverThreads/ReceiverThreads.cpp
@@ -12,11 +12,11 @@
#include <boost/log/trivial.hpp>
-ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules)
+ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort)
: timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} {
for(auto i = 0; i < numberOfDetectorModules; i++){
- receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i);
+ receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, firstPort+i);
}
for(auto i = 0; i < numberOfDetectorModules; i++){
@@ -56,6 +56,7 @@ auto ReceiverThreads::receiverThread(const int port) -> void {
}
+ printf("Listening %d\n", port);
BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_;
double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.;
diff --git a/src/ReceiverThreads/ReceiverThreads.h b/src/ReceiverThreads/ReceiverThreads.h
index 7cb04c0..094652a 100644
--- a/src/ReceiverThreads/ReceiverThreads.h
+++ b/src/ReceiverThreads/ReceiverThreads.h
@@ -15,7 +15,7 @@
class ReceiverThreads {
public:
- ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules);
+ ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort = 4000);
auto run() -> void;
private:
diff --git a/src/main_client.cpp b/src/main_client.cpp
index 01b3aad..fb0c1e3 100644
--- a/src/main_client.cpp
+++ b/src/main_client.cpp
@@ -10,23 +10,28 @@
#include <string>
void initLog() {
+/*
#ifndef NDEBUG
boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::debug);
#else
boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::info);
#endif
+*/
+ boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::info);
}
int main (int argc, char *argv[]){
- if(argc < 3){
- BOOST_LOG_TRIVIAL(error) << "Program usage: ./onlineDetectorSimulatorClient <packets_per_second> <address>!";
+ if(argc < 5){
+ BOOST_LOG_TRIVIAL(error) << "Program usage: ./onlineDetectorSimulatorClient <address> <first_port> <num_ports> <packets_per_second>";
return 0;
}
- int imagesPerSec = std::stoi(argv[1]);
+ std::string address = argv[1];
+ int firstPort = std::stoi(argv[2]);
+ int numPorts = std::stoi(argv[3]);
+ int imagesPerSec = std::stoi(argv[4]);
- std::string address = argv[2];
double timegap = 1./(double)imagesPerSec;
unsigned int intervall = timegap*1000*1000;
@@ -37,7 +42,7 @@ int main (int argc, char *argv[]){
auto configPath = std::string { "config.cfg" };
- Detector detector{address, configPath, intervall};
+ Detector detector{address, configPath, firstPort, numPorts, intervall};
//DetectorModule detModule0 = DetectorModule(1, address, configPath);
diff --git a/src/main_server.cpp b/src/main_server.cpp
index cd84cb9..b2f9425 100644
--- a/src/main_server.cpp
+++ b/src/main_server.cpp
@@ -10,11 +10,14 @@
#include <thread>
void initLog() {
+/*
#ifndef NDEBUG
boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::debug);
#else
boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::info);
#endif
+*/
+ boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::info);
}
void start(std::function<void(void)> func){
@@ -30,12 +33,14 @@ int main (int argc, char *argv[]){
initLog();
- if(argc < 2){
- BOOST_LOG_TRIVIAL(error) << "Program usage: ./onlineDetectorSimulatorServer <address>!";
+ if(argc < 4){
+ BOOST_LOG_TRIVIAL(error) << "Program usage: ./onlineDetectorSimulatorServer <address> <first_port> <num_ports>";
return 0;
}
std::string address = argv[1];
+ int firstPort = std::stoi(argv[2]);
+ int numPorts = std::stoi(argv[3]);
// int port = 4002;
//
@@ -46,9 +51,8 @@ int main (int argc, char *argv[]){
std::vector<unsigned short> buf(16000);
- std::cout << "Receiving UDP packages: " << std::endl;
-
- ReceiverThreads(address, 10, 27);
+ printf("Receving udp packets on ports %u - %u\n", firstPort, firstPort + numPorts);
+ ReceiverThreads(address, 10, numPorts, firstPort);
// for(auto i = 0; i < 27; i++){
// std::function<void(void)> f = [=]() {