diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/DetectorModule/DetectorModule.cpp | 92 | ||||
| -rw-r--r-- | src/DetectorModule/DetectorModule.h | 8 | ||||
| -rw-r--r-- | src/UDPClient/UDPClient.cpp | 13 | ||||
| -rw-r--r-- | src/UDPClient/UDPClient.h | 1 | 
4 files changed, 92 insertions, 22 deletions
| diff --git a/src/DetectorModule/DetectorModule.cpp b/src/DetectorModule/DetectorModule.cpp index 169c5a5..e7e0272 100644 --- a/src/DetectorModule/DetectorModule.cpp +++ b/src/DetectorModule/DetectorModule.cpp @@ -15,12 +15,27 @@  #include <exception>  #include <fstream> -void timer_start(std::function<void(void)> func, unsigned int interval){ -    std::thread([func, interval]() { +void timer_start(std::function<void(int)> func, unsigned int interval, unsigned int max_packets){ +    std::thread([func, interval, max_packets]() { +	int packets = 1; +	auto next = std::chrono::high_resolution_clock::now();          while (true)          { -            func(); -            std::this_thread::sleep_for(std::chrono::microseconds(interval)); +            func(packets); + +            next += std::chrono::microseconds(packets * interval); +            auto now = std::chrono::high_resolution_clock::now(); +            if (now > next) { +        	std::chrono::nanoseconds late = now - next; +        	packets = 1 + (late.count() / interval / 1000); +        	if (packets > max_packets) +        	    packets = max_packets; +            } else { +        	packets = 1; +            } + +	    std::this_thread::sleep_until(next); +//            std::this_thread::sleep_for(std::chrono::microseconds(interval));          }      }).detach();  } @@ -29,7 +44,8 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,     detectorID_{detectorID},     numberOfDetectorsPerModule_{16},     index_{0u}, -   client_{address, detectorID+4000}{ +   client_{address, detectorID+4000}, +   max_packets_{1000u}{        printf("Creating %d\n", detectorID); @@ -37,7 +53,10 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,        throw std::runtime_error("DetectorModule: Configuration file could not be loaded successfully. Please check!");     } -   sendBuffer_.resize(numberOfProjectionsPerPacket_*numberOfDetectorsPerModule_*sizeof(unsigned short)+sizeof(std::size_t)); +   sendBuffer_.resize(max_packets_); +   for(auto &it: sendBuffer_) { +      it.resize(numberOfProjectionsPerPacket_ * numberOfDetectorsPerModule_ * sizeof(unsigned short) + sizeof(size_t) + sizeof(short int)); +   }     //read the input data from the file corresponding to the detectorModuleID     readInput(); @@ -46,7 +65,7 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,     printf("Created %d\n", detectorID);  } -auto DetectorModule::send() -> void{ +auto DetectorModule::send(int packets = 1) -> void{     BOOST_LOG_TRIVIAL(debug) << "Detectormodule " << detectorID_ << " :sending udp packet with index " << index_ << ".";     int numberOfParts = numberOfProjections_/numberOfProjectionsPerPacket_;  //   sendBuffer_[0] = (sizeof(std::size_t))       & 0xff; @@ -57,23 +76,56 @@ auto DetectorModule::send() -> void{  //   sendBuffer_[5] = (sizeof(std::size_t) >> 40) & 0xff;  //   sendBuffer_[6] = (sizeof(std::size_t) >> 48) & 0xff;  //   sendBuffer_[7] = (sizeof(std::size_t) >> 56) & 0xff; -   unsigned int bufferSizeIndex = index_ % 1000; -   unsigned int sinoSize = numberOfDetectorsPerModule_*numberOfProjectionsPerPacket_; -   *reinterpret_cast<int*>(sendBuffer_.data()) = index_;	 -   *reinterpret_cast<unsigned short*>(sendBuffer_.data()+sizeof(std::size_t)) = partID_; -   std::copy(((char*)buffer_.data())+sinoSize*(bufferSizeIndex*numberOfParts+partID_)*sizeof(unsigned short), ((char*)buffer_.data())+(sinoSize*(1+bufferSizeIndex*numberOfParts+partID_))*sizeof(unsigned short), sendBuffer_.begin()+sizeof(std::size_t)+sizeof(unsigned short)); -   BOOST_LOG_TRIVIAL(debug) << "INDEX: " << (bufferSizeIndex*numberOfParts+partID_); -   client_.send(sendBuffer_.data(), sendBuffer_.size()); -   partID_ = (partID_+1) % numberOfParts; -   if(partID_ == 0) -    ++index_; + + +    struct mmsghdr msg[packets]; +    struct iovec msgvec[packets]; + +    unsigned int hdrSize = sizeof(size_t) + sizeof(short int); +    unsigned int sinoSize = numberOfDetectorsPerModule_ * numberOfProjectionsPerPacket_; + +    memset(msg, 0, sizeof(msg)); +    memset(msgvec, 0, sizeof(msgvec)); +    for (int i = 0; i < packets; i++) { +	unsigned int bufferSizeIndex = index_ % 1000; +	 +	char *ptr = sendBuffer_[i].data(); + +	msgvec[i].iov_base = sendBuffer_[i].data(); +	msgvec[i].iov_len = sendBuffer_[i].size(); +	msg[i].msg_hdr.msg_iov = &msgvec[i]; +	msg[i].msg_hdr.msg_iovlen = 1; +	 + +	*reinterpret_cast<size_t*>(ptr) = index_ * numberOfParts + partID_; +	*reinterpret_cast<unsigned short*>(ptr + sizeof(size_t)) = partID_; +	memcpy(ptr + hdrSize, buffer_.data() + sinoSize * (bufferSizeIndex * numberOfParts + partID_), sinoSize * sizeof(unsigned short)); + +	partID_ = (partID_ + 1) % numberOfParts; +	if (partID_ == 0) ++index_; +    } + +    client_.msend(packets, msg); +     +   auto ts = std::chrono::high_resolution_clock::now(); +   std::chrono::nanoseconds d = ts - ts_; +    counter_ += packets; +   if (d.count() >= 1000000000) { +	printf("Packets %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", counter_, sendBuffer_[0].size(), 8. * counter_ * sendBuffer_[0].size() / 1024 / 1024 / 1024, 1. * d.count() / 1000000); +	counter_ = 0; +	ts_ = ts; +   }  }  auto DetectorModule::sendPeriodically(unsigned int timeIntervall) -> void { -   std::function<void(void)> f = [=]() { -      this->send(); +   counter_ = 0; +   ips_ = 1000000. / ((double)timeIntervall); +   ts_ = std::chrono::high_resolution_clock::now(); + +   std::function<void(int)> f = [=](int packets = 1) { +      this->send(packets);     }; -   timer_start(f, timeIntervall); +   timer_start(f, timeIntervall, max_packets_);  }  auto DetectorModule::readInput() -> void { diff --git a/src/DetectorModule/DetectorModule.h b/src/DetectorModule/DetectorModule.h index afe4d04..f959857 100644 --- a/src/DetectorModule/DetectorModule.h +++ b/src/DetectorModule/DetectorModule.h @@ -27,7 +27,7 @@ public:  private:     std::vector<unsigned short> buffer_; -   std::vector<char> sendBuffer_; +   std::vector<std::vector<char>> sendBuffer_;     int detectorID_;     UDPClient client_; @@ -40,12 +40,16 @@ private:     unsigned int numberOfFrames_;     std::string path_, fileName_, fileEnding_; +   unsigned int max_packets_;     std::size_t index_; +   std::size_t ips_; +   std::size_t counter_; +   std::chrono::high_resolution_clock::time_point ts_;     unsigned short partID_{0};     auto readConfig(const std::string& configFile) -> bool;     auto readInput() -> void; -   auto send() -> void; +   auto send(int packets) -> void;  }; diff --git a/src/UDPClient/UDPClient.cpp b/src/UDPClient/UDPClient.cpp index 1d427ba..b9d55d0 100644 --- a/src/UDPClient/UDPClient.cpp +++ b/src/UDPClient/UDPClient.cpp @@ -64,12 +64,21 @@       {           throw udp_client_server_runtime_error(("invalid address or port: \"" + addr + ":" + decimal_port + "\"").c_str());       } +       f_socket = socket(f_addrinfo->ai_family, SOCK_DGRAM | SOCK_CLOEXEC, IPPROTO_UDP);       if(f_socket == -1)       {           freeaddrinfo(f_addrinfo);           throw udp_client_server_runtime_error(("could not create socket for: \"" + addr + ":" + decimal_port + "\"").c_str());       } +  +     if (connect(f_socket, f_addrinfo->ai_addr, f_addrinfo->ai_addrlen)) +     { +	close(f_socket); +        freeaddrinfo(f_addrinfo); +        throw udp_client_server_runtime_error(("could not connect socket for: \"" + addr + ":" + decimal_port + "\"").c_str()); +     } +            printf("Created client %d\n", f_port);   } @@ -140,3 +149,7 @@   int UDPClient::send(const char *msg, std::size_t size){       return sendto(f_socket, msg, size, 0, f_addrinfo->ai_addr, f_addrinfo->ai_addrlen);   } + + int UDPClient::msend(int n, struct mmsghdr *msg){ +     return sendmmsg(f_socket, msg, n, 0); + } diff --git a/src/UDPClient/UDPClient.h b/src/UDPClient/UDPClient.h index f6cf0d6..f1c8a8d 100644 --- a/src/UDPClient/UDPClient.h +++ b/src/UDPClient/UDPClient.h @@ -33,6 +33,7 @@ public:    std::string         get_addr() const;    int                 send(const char *msg, size_t size); +  int		      msend(int n, struct mmsghdr *msg);  private:    int                 f_socket; | 
