diff options
author | Suren A. Chilingaryan <csa@suren.me> | 2019-11-17 16:58:02 +0100 |
---|---|---|
committer | Suren A. Chilingaryan <csa@suren.me> | 2019-11-17 16:58:02 +0100 |
commit | ea9626b60092f2d2c79431718c3ca8bc383429a6 (patch) | |
tree | f76a6dcf118fc3977eda1cbcf368018715ebe01c | |
parent | 23f22348c5281fff685c1fa89255e7e1e76266a4 (diff) | |
download | ufo-roof-temp-ea9626b60092f2d2c79431718c3ca8bc383429a6.tar.gz ufo-roof-temp-ea9626b60092f2d2c79431718c3ca8bc383429a6.tar.bz2 ufo-roof-temp-ea9626b60092f2d2c79431718c3ca8bc383429a6.tar.xz ufo-roof-temp-ea9626b60092f2d2c79431718c3ca8bc383429a6.zip |
Networking setup
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | src/DEVELOPMENT | 22 | ||||
-rw-r--r-- | src/ufo-roof-buffer.c | 36 | ||||
-rw-r--r-- | src/ufo-roof-buffer.h | 1 | ||||
-rw-r--r-- | src/ufo-roof-build-task.c | 28 | ||||
-rw-r--r-- | src/ufo-roof-config.c | 8 | ||||
-rw-r--r-- | src/ufo-roof-config.h | 1 | ||||
-rw-r--r-- | src/ufo-roof-read-socket.c | 1 | ||||
-rw-r--r-- | src/ufo-roof-read-task.c | 1 | ||||
-rw-r--r-- | tests/config.sh | 15 | ||||
-rw-r--r-- | tests/roof.json | 7 | ||||
-rw-r--r-- | tests/roof.py | 33 | ||||
-rwxr-xr-x | tests/roof.sh | 16 | ||||
-rw-r--r-- | tests/roof.yaml | 11 | ||||
-rw-r--r-- | tests/test_file.sh | 10 | ||||
-rwxr-xr-x | tests/vma-analyze.sh | 35 |
16 files changed, 182 insertions, 45 deletions
@@ -2,10 +2,12 @@ *.pyc *.so *.o +*.raw CMakeFiles/ CTestTestfile.cmake cmake_install.cmake CMakeCache.txt +install_manifest.txt Makefile /tags /tests/venv diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT index f3381fb..a40e2bb 100644 --- a/src/DEVELOPMENT +++ b/src/DEVELOPMENT @@ -10,6 +10,28 @@ Architecture for missing bits. +Problems +======== + - When streaming at high speed (~ 16 data streams; 600 Mbit & 600 kpck each), the data streams quickly get + desynchronized (but all packets are delivered). + * It is unclear if problem is on the receiver side (no overloaded CPU cores) or de-synchronization is first + appear on the simmulation sender. The test with real hardware is required. + * For border case scenarios, increasing number of buffers from 2 to 10-20 helps. But at full speed, even 1000s + buffers are not enough. Packets counts are quickly going appart. + * Further increase of packet buffer provided to 'recvmmsg' does not help (even if blocking is enforced until + all packets are received) + * At the speed specified above, the system works also without libvma. + * Actually, with libvma a larger buffer is required. In the beginning the performance of libvma is gradually + speeding up (that was always like that). And during this period a significant desynchronization happens. To + compensate it, we need about 400 buffers with libvma as compared to only 10 required if standard Linux + networking is utilized. + - Can we pre-heat to avoid this speeding-up problem? Or it will be also not a problem with hardware? + - Communication breaks with small MTU sizes (bellow 1500), but this is probably not important (Packets are + delivered but with extreme latencies. Probably some tunning of network stack is required). + - Technically, everything should work if we start UFO server when data is already streamed. However, the first + dataset could be any. Therefore, the check fails as the data is shifted by a random number of datasets. + + Questions ========= - Can we pre-allocate several UFO buffers for forth-comming events. Currently, we need to buffer out-of-order diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c index eaf9b35..f83885e 100644 --- a/src/ufo-roof-buffer.c +++ b/src/ufo-roof-buffer.c @@ -13,6 +13,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) { if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); buffer->ring_size = cfg->buffer_size; + buffer->drop_buffers = cfg->drop_buffers; buffer->fragment_size = cfg->payload_size; buffer->dataset_size = cfg->dataset_size; buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size; @@ -46,6 +47,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { // fragment_id is numbered from 1 (0 - means auto) gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) { + gboolean ready = FALSE; guint buffer_id; guint dataset_id; @@ -58,25 +60,32 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; buffer_id = dataset_id % buffer->ring_size; - // Late arrived packed -// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id); + // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? if (dataset_id < buffer->current_id) - roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); + return FALSE; +// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); // We are not fast enough, new packets are arrvining to fast if (dataset_id >= (buffer->current_id + buffer->ring_size)) { // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? - if (error) - root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed", - buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed", + dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); // FIXME: Send semi-complete buffers further? // FIXME: Or shall we drop more if larger buffers are allocated? - for (guint i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++) - buffer->n_fragments[i%buffer->ring_size] = 0; - buffer->current_id = dataset_id - buffer->ring_size + 1; - + if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) { + memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint)); + buffer->current_id = dataset_id; + } else { + for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++) + buffer->n_fragments[i%buffer->ring_size] = 0; + buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1; + } + + if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset) + ready = TRUE; + // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset. // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value // The updates may happen after writting/reading is finished. @@ -97,12 +106,11 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? - if (dataset_id == buffer->current_id) { - return TRUE; - } + if (dataset_id == buffer->current_id) + ready = TRUE; } - return FALSE; + return ready; } diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h index 367f2d5..f7b2124 100644 --- a/src/ufo-roof-buffer.h +++ b/src/ufo-roof-buffer.h @@ -7,6 +7,7 @@ struct _UfoRoofBuffer { guint current_id; // The ID of the first (active) dataset in the buffer guint ring_size; // Number of datasets to buffer + guint drop_buffers; // If we need to catch up uint8_t *ring_buffer; // The ring buffer _Atomic guint *n_fragments; // Number of completed fragments in each buffer guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams) diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c index fa2fdcd..821c761 100644 --- a/src/ufo-roof-build-task.c +++ b/src/ufo-roof-build-task.c @@ -37,6 +37,8 @@ struct _UfoRoofBuildTaskPrivate { guint number; // Number of datasets to read gboolean stop; // Stop flag + + guint announced; // For debugging }; static void ufo_task_interface_init (UfoTaskIface *iface); @@ -162,31 +164,35 @@ ufo_roof_build_task_process (UfoTask *task, // UfoRequisition in_req; // ufo_buffer_get_requisition (inputs[0], &in_req); - uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL); + const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL); UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg); if (priv->stop) return FALSE; - + + const uint8_t *fragment = data; for (guint i = 0; i < header->n_packets; i++) { guint packet_id = 0; // Otherwise considered consecutive and handled by the buffer if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { - UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(data); + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); packet_id = pheader->packet_id + 1; } - ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, data, &gerr); + // FIXME: Can we kill here the dataset finished during the previous step of iteration + ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr); if (gerr) roof_print_error(gerr); - data += cfg->max_packet_size; + fragment += cfg->max_packet_size; } // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one? +/* + printf("proc (%s) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", header->channel_id, header->n_packets, + (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0); +*/ -// printf("proc (%s) - channel: %i, packets: %i\n", ready?"yes":" no", header->channel_id, header->n_packets); - return !ready; } @@ -214,8 +220,12 @@ ufo_roof_build_task_generate (UfoTask *task, priv->stop = TRUE; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); } - -// printf("gen(%s) %i\n", ready?"yes":" no", buf->current_id); + + + if ((buf->current_id - priv->announced) > 1000) { + printf("Generating dataset %i (%s)\n", buf->current_id, ready?"yes":" no"); + priv->announced = buf->current_id; + } return ready; } diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c index 11f8bd4..812d4a2 100644 --- a/src/ufo-roof-config.c +++ b/src/ufo-roof-config.c @@ -83,6 +83,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { cfg->max_packets = 100; cfg->dataset_size = 0; cfg->buffer_size = 2; + cfg->drop_buffers = 0; cfg->path = NULL; // Read configuration @@ -101,6 +102,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { roof_config_node_get(hardware, root, object, "hardware"); roof_config_node_get(network, root, object, "network"); roof_config_node_get(simulation, root, object, "simulation"); + roof_config_node_get(performance, root, object, "performance"); } if (hardware) { @@ -123,6 +125,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { if (performance) { roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once"); roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size"); + roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers"); } if (simulation) { @@ -167,6 +170,11 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { if (!cfg->dataset_size) cfg->dataset_size = cfg->payload_size; + if (cfg->buffer_size < 4) { + cfg->drop_buffers = 0; + } else if (cfg->drop_buffers >= cfg->buffer_size) { + cfg->drop_buffers = cfg->buffer_size / 2; + } return cfg; } diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h index a22c84f..f90c5f3 100644 --- a/src/ufo-roof-config.h +++ b/src/ufo-roof-config.h @@ -25,6 +25,7 @@ typedef struct { guint max_packets; // limits maximum number of packets which are read at once guint max_packet_size; // payload_size + header_size + ...? guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected + guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up. guint network_timeout; // Maximum time (us) to wait for data on the socket diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c index f213d99..7bbe8ef 100644 --- a/src/ufo-roof-read-socket.c +++ b/src/ufo-roof-read-socket.c @@ -51,6 +51,7 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr msg[i].msg_hdr.msg_iovlen = 1; } + // Timeout seems broken, see BUGS in 'recvmmsg' bugs page int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c index ebff9de..1582437 100644 --- a/src/ufo-roof-read-task.c +++ b/src/ufo-roof-read-task.c @@ -172,6 +172,7 @@ ufo_roof_read_task_generate (UfoTask *task, guint packets = priv->reader->read(priv->reader, output_buffer, &gerr); if (gerr) { g_warning("Error reciving data: %s", gerr->message); + g_error_free(gerr); return FALSE; } diff --git a/tests/config.sh b/tests/config.sh new file mode 100644 index 0000000..c1c656b --- /dev/null +++ b/tests/config.sh @@ -0,0 +1,15 @@ +uname -r | grep el7 &> /dev/null +el7=$(($? == 0)) + +ods_path=/mnt/ands/ods/bin-fedora/ +vma_path=/mnt/ands/ +vma_lib_path=/mnt/ands/lib64-fedora/ + +[ $el7 -eq 1 ] && ods_path="${ods_path/fedora/centos}" +[ $el7 -eq 1 ] && vma_lib_path="${vma_lib_path/fedora/centos}" + +# Standard library +vma_lib=${vma_lib_path}/libvma.so.8.6.10 + +# With Mellanox OFED extensions (./configure --enable-socketxtreme) +#vma_lib=${vma_lib_path}/mlx/libvma.so.8.6.10 diff --git a/tests/roof.json b/tests/roof.json index 0dcd1e7..784fb6d 100644 --- a/tests/roof.json +++ b/tests/roof.json @@ -3,17 +3,14 @@ "protocol": "udp", "port": 4000, "streams": 16, + "header_size": 10, "payload_size": 1280, "dataset_size": 1024000 }, "performance": { - "buffer_size": 2, + "buffer_size": 10, "packets_at_once": 100 }, - "simulation": { - "first_file_number": 1, - "path": "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat" - }, "setup": { "planes": 2, "modules": 16, diff --git a/tests/roof.py b/tests/roof.py index 2138931..a859069 100644 --- a/tests/roof.py +++ b/tests/roof.py @@ -1,8 +1,9 @@ import gi import sys import json -import gobject +import argparse +gi.require_version('Ufo', '0.0') from gi.repository import Ufo from gi.repository import GObject @@ -16,24 +17,40 @@ class RoofConfig: self.streams = cfg["network"]["streams"] elif cfg.get("setup", {}).get("modules") != None: self.streams = cfg["setup"]["modules"] - + + config = "roof.json" -cfg = RoofConfig(config) +output = None + +parser = argparse.ArgumentParser() +parser.add_argument('-c', '--config', dest="config", default="roof.json", help="ROOF configuration (JSON)") +parser.add_argument('-o', '--output', dest="output", default=None, help="Output file") +parser.add_argument('-n', '--number', dest="number", default=None, help="Specify number of frames to capture") +args = parser.parse_args() + + +cfg = RoofConfig(args.config) pm = Ufo.PluginManager() graph = Ufo.TaskGraph() scheduler = Ufo.Scheduler() +if args.output is None: + print ("Starting ROOF using NULL writter") + write = pm.get_task('null') + if args.number is None: args.number = 0 +else: + print ("Starting ROOF streaming to {}".format(args.output)) + write = pm.get_task('write') + write.set_properties(filename=args.output) + if args.number is None: args.number = 5 build = pm.get_task('roof-build') -build.set_properties(config=config, number=0) - -write = pm.get_task('write') -write.set_properties(filename="test.raw") +build.set_properties(config=args.config, number=args.number) for id in range(cfg.streams): read = pm.get_task('roof-read') - read.set_properties(config=config, id=id) + read.set_properties(config=args.config, id=id) graph.connect_nodes(read, build) build.bind_property('stop', read, 'stop', GObject.BindingFlags.DEFAULT) diff --git a/tests/roof.sh b/tests/roof.sh index d0ec30a..df51679 100755 --- a/tests/roof.sh +++ b/tests/roof.sh @@ -1,3 +1,17 @@ +#! /bin/bash + +. config.sh + +bufs=800000 +bufs=$((bufs * 4)) + cat roof.yaml | yq . > roof.json -GI_TYPELIB_PATH="/usr/local/lib64/girepository-1.0/" python roof.py +ulimit -l unlimited +echo 1000000000 > /proc/sys/kernel/shmmax # 18446744073692774399 +echo 8000 > /proc/sys/vm/nr_hugepages # 0 + + +#VMA_THREAD_MODE=3 VMA_MTU=0 VMA_RX_POLL=0 VMA_SELECT_POLL=0 VMA_RING_ALLOCATION_LOGIC_RX=20 VMA_RX_BUFS=$bufs LD_PRELOAD=$vma_lib \ + LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib64" GI_TYPELIB_PATH="/usr/local/lib64/girepository-1.0/" \ + python roof.py "$@" diff --git a/tests/roof.yaml b/tests/roof.yaml index a49d53b..2a24d08 100644 --- a/tests/roof.yaml +++ b/tests/roof.yaml @@ -2,16 +2,17 @@ network: protocol: udp port: 4000 streams: 16 -# header_size: 0 + header_size: 10 payload_size: 1280 # max_packet_size: 1284 dataset_size: 1024000 performance: - buffer_size: 2 + buffer_size: 10 +# drop_buffers: 0 packets_at_once: 100 -simulation: - first_file_number: 1 - path: "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat" +#simulation: +# first_file_number: 1 +# path: "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat" setup: planes: 2 modules: 16 diff --git a/tests/test_file.sh b/tests/test_file.sh index 5f47e45..1eb29ca 100644 --- a/tests/test_file.sh +++ b/tests/test_file.sh @@ -1,11 +1,15 @@ #! /bin/bash +shopt -s extglob + packet_size=1280 packets_per_dataset=50 +rm -f roof_test.raw for packet in $(seq 0 24); do - for id in $(seq 0 15); do - name=$(ls *$id.dat | grep -P "_0?$id.dat") - dd if=$name of="roof_test.raw" bs=$packet_size count=$packets_per_dataset skip=$((packet * $packets_per_dataset)) oflag=append conv=notrunc + for id in $(seq 1 16); do + name=$(ls -- *$id.@(fx|dat) | grep -P "_0?$id\.\w+") + echo "Appending packet $packet from $name " + dd if=$name of="roof_test.raw" bs=$packet_size count=$packets_per_dataset skip=$((packet * $packets_per_dataset)) oflag=append conv=notrunc status=none done done diff --git a/tests/vma-analyze.sh b/tests/vma-analyze.sh new file mode 100755 index 0000000..7000922 --- /dev/null +++ b/tests/vma-analyze.sh @@ -0,0 +1,35 @@ +#! /bin/bash + +sleep=1 + +# This doesn't work properly... Something is wrong with counters... + +path=/mnt/ands/bin/vma_stats_mlx +#-z seems ignored +#$path -p $(pidof onlineDetectorSimulatorServer) -c 1 -z &> /dev/null +stats1=($($path -p $(pidof python) -c 1 | grep Rx | awk '{ print $3, $4 }')) +sleep $sleep +stats2=($($path -p $(pidof python) -c 1 | grep Rx | awk '{ print $3, $4 }')) + +pksum=0 +bwsum=0 +for i in "${!stats2[@]}"; do + if [ -n "$stats1" ]; then + diff=$(bc <<< "(${stats2[$i]} - ${stats1[$i]}) / $sleep") + else + diff=$(bc <<< "${stats2[$i]} / $sleep") + fi + + if [ $((i & 1)) -eq 0 ]; then + echo -n "Queue: " + printf "packets: %9.3f kpps" $(bc -l <<< "1. * $diff / 1000") + pksum=$(($pksum + diff)) + else + printf ", bandwidth: %9.3f Gb/s\n" $(bc -l <<< "8. * $diff / 1024 / 1024") + bwsum=$(($bwsum + diff)) + fi +done + +echo -n "Total: " +printf "packets: %9.3f kpps" $(bc -l <<< "1. * $pksum / 1000") +printf ", bandwidth: %9.3f Gb/s\n" $(bc -l <<< "8. * $bwsum / 1024 / 1024") |