summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--src/DEVELOPMENT22
-rw-r--r--src/ufo-roof-buffer.c36
-rw-r--r--src/ufo-roof-buffer.h1
-rw-r--r--src/ufo-roof-build-task.c28
-rw-r--r--src/ufo-roof-config.c8
-rw-r--r--src/ufo-roof-config.h1
-rw-r--r--src/ufo-roof-read-socket.c1
-rw-r--r--src/ufo-roof-read-task.c1
-rw-r--r--tests/config.sh15
-rw-r--r--tests/roof.json7
-rw-r--r--tests/roof.py33
-rwxr-xr-xtests/roof.sh16
-rw-r--r--tests/roof.yaml11
-rw-r--r--tests/test_file.sh10
-rwxr-xr-xtests/vma-analyze.sh35
16 files changed, 182 insertions, 45 deletions
diff --git a/.gitignore b/.gitignore
index 8e4b968..c8831d9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,10 +2,12 @@
*.pyc
*.so
*.o
+*.raw
CMakeFiles/
CTestTestfile.cmake
cmake_install.cmake
CMakeCache.txt
+install_manifest.txt
Makefile
/tags
/tests/venv
diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT
index f3381fb..a40e2bb 100644
--- a/src/DEVELOPMENT
+++ b/src/DEVELOPMENT
@@ -10,6 +10,28 @@ Architecture
for missing bits.
+Problems
+========
+ - When streaming at high speed (~ 16 data streams; 600 Mbit & 600 kpck each), the data streams quickly get
+ desynchronized (but all packets are delivered).
+ * It is unclear if problem is on the receiver side (no overloaded CPU cores) or de-synchronization is first
+ appear on the simmulation sender. The test with real hardware is required.
+ * For border case scenarios, increasing number of buffers from 2 to 10-20 helps. But at full speed, even 1000s
+ buffers are not enough. Packets counts are quickly going appart.
+ * Further increase of packet buffer provided to 'recvmmsg' does not help (even if blocking is enforced until
+ all packets are received)
+ * At the speed specified above, the system works also without libvma.
+ * Actually, with libvma a larger buffer is required. In the beginning the performance of libvma is gradually
+ speeding up (that was always like that). And during this period a significant desynchronization happens. To
+ compensate it, we need about 400 buffers with libvma as compared to only 10 required if standard Linux
+ networking is utilized.
+ - Can we pre-heat to avoid this speeding-up problem? Or it will be also not a problem with hardware?
+ - Communication breaks with small MTU sizes (bellow 1500), but this is probably not important (Packets are
+ delivered but with extreme latencies. Probably some tunning of network stack is required).
+ - Technically, everything should work if we start UFO server when data is already streamed. However, the first
+ dataset could be any. Therefore, the check fails as the data is shifted by a random number of datasets.
+
+
Questions
=========
- Can we pre-allocate several UFO buffers for forth-comming events. Currently, we need to buffer out-of-order
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c
index eaf9b35..f83885e 100644
--- a/src/ufo-roof-buffer.c
+++ b/src/ufo-roof-buffer.c
@@ -13,6 +13,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
buffer->ring_size = cfg->buffer_size;
+ buffer->drop_buffers = cfg->drop_buffers;
buffer->fragment_size = cfg->payload_size;
buffer->dataset_size = cfg->dataset_size;
buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
@@ -46,6 +47,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
// fragment_id is numbered from 1 (0 - means auto)
gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
+ gboolean ready = FALSE;
guint buffer_id;
guint dataset_id;
@@ -58,25 +60,32 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
buffer_id = dataset_id % buffer->ring_size;
- // Late arrived packed
-// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id);
+ // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
if (dataset_id < buffer->current_id)
- roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
+ return FALSE;
+// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
// We are not fast enough, new packets are arrvining to fast
if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
// FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
- if (error)
- root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed",
- buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed",
+ dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
// FIXME: Send semi-complete buffers further?
// FIXME: Or shall we drop more if larger buffers are allocated?
- for (guint i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++)
- buffer->n_fragments[i%buffer->ring_size] = 0;
- buffer->current_id = dataset_id - buffer->ring_size + 1;
-
+ if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
+ memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
+ buffer->current_id = dataset_id;
+ } else {
+ for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
+ buffer->n_fragments[i%buffer->ring_size] = 0;
+ buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
+ }
+
+ if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
+ ready = TRUE;
+
// FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
// To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value
// The updates may happen after writting/reading is finished.
@@ -97,12 +106,11 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
// FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
- if (dataset_id == buffer->current_id) {
- return TRUE;
- }
+ if (dataset_id == buffer->current_id)
+ ready = TRUE;
}
- return FALSE;
+ return ready;
}
diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h
index 367f2d5..f7b2124 100644
--- a/src/ufo-roof-buffer.h
+++ b/src/ufo-roof-buffer.h
@@ -7,6 +7,7 @@ struct _UfoRoofBuffer {
guint current_id; // The ID of the first (active) dataset in the buffer
guint ring_size; // Number of datasets to buffer
+ guint drop_buffers; // If we need to catch up
uint8_t *ring_buffer; // The ring buffer
_Atomic guint *n_fragments; // Number of completed fragments in each buffer
guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams)
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
index fa2fdcd..821c761 100644
--- a/src/ufo-roof-build-task.c
+++ b/src/ufo-roof-build-task.c
@@ -37,6 +37,8 @@ struct _UfoRoofBuildTaskPrivate {
guint number; // Number of datasets to read
gboolean stop; // Stop flag
+
+ guint announced; // For debugging
};
static void ufo_task_interface_init (UfoTaskIface *iface);
@@ -162,31 +164,35 @@ ufo_roof_build_task_process (UfoTask *task,
// UfoRequisition in_req;
// ufo_buffer_get_requisition (inputs[0], &in_req);
- uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL);
+ const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL);
UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg);
if (priv->stop)
return FALSE;
-
+
+ const uint8_t *fragment = data;
for (guint i = 0; i < header->n_packets; i++) {
guint packet_id = 0;
// Otherwise considered consecutive and handled by the buffer
if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
- UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(data);
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
packet_id = pheader->packet_id + 1;
}
- ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, data, &gerr);
+ // FIXME: Can we kill here the dataset finished during the previous step of iteration
+ ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr);
if (gerr) roof_print_error(gerr);
- data += cfg->max_packet_size;
+ fragment += cfg->max_packet_size;
}
// FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one?
+/*
+ printf("proc (%s) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", header->channel_id, header->n_packets,
+ (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0);
+*/
-// printf("proc (%s) - channel: %i, packets: %i\n", ready?"yes":" no", header->channel_id, header->n_packets);
-
return !ready;
}
@@ -214,8 +220,12 @@ ufo_roof_build_task_generate (UfoTask *task,
priv->stop = TRUE;
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
}
-
-// printf("gen(%s) %i\n", ready?"yes":" no", buf->current_id);
+
+
+ if ((buf->current_id - priv->announced) > 1000) {
+ printf("Generating dataset %i (%s)\n", buf->current_id, ready?"yes":" no");
+ priv->announced = buf->current_id;
+ }
return ready;
}
diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c
index 11f8bd4..812d4a2 100644
--- a/src/ufo-roof-config.c
+++ b/src/ufo-roof-config.c
@@ -83,6 +83,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
cfg->max_packets = 100;
cfg->dataset_size = 0;
cfg->buffer_size = 2;
+ cfg->drop_buffers = 0;
cfg->path = NULL;
// Read configuration
@@ -101,6 +102,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
roof_config_node_get(hardware, root, object, "hardware");
roof_config_node_get(network, root, object, "network");
roof_config_node_get(simulation, root, object, "simulation");
+ roof_config_node_get(performance, root, object, "performance");
}
if (hardware) {
@@ -123,6 +125,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
if (performance) {
roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once");
roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size");
+ roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers");
}
if (simulation) {
@@ -167,6 +170,11 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
if (!cfg->dataset_size)
cfg->dataset_size = cfg->payload_size;
+ if (cfg->buffer_size < 4) {
+ cfg->drop_buffers = 0;
+ } else if (cfg->drop_buffers >= cfg->buffer_size) {
+ cfg->drop_buffers = cfg->buffer_size / 2;
+ }
return cfg;
}
diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h
index a22c84f..f90c5f3 100644
--- a/src/ufo-roof-config.h
+++ b/src/ufo-roof-config.h
@@ -25,6 +25,7 @@ typedef struct {
guint max_packets; // limits maximum number of packets which are read at once
guint max_packet_size; // payload_size + header_size + ...?
guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected
+ guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up.
guint network_timeout; // Maximum time (us) to wait for data on the socket
diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c
index f213d99..7bbe8ef 100644
--- a/src/ufo-roof-read-socket.c
+++ b/src/ufo-roof-read-socket.c
@@ -51,6 +51,7 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr
msg[i].msg_hdr.msg_iovlen = 1;
}
+ // Timeout seems broken, see BUGS in 'recvmmsg' bugs page
int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c
index ebff9de..1582437 100644
--- a/src/ufo-roof-read-task.c
+++ b/src/ufo-roof-read-task.c
@@ -172,6 +172,7 @@ ufo_roof_read_task_generate (UfoTask *task,
guint packets = priv->reader->read(priv->reader, output_buffer, &gerr);
if (gerr) {
g_warning("Error reciving data: %s", gerr->message);
+ g_error_free(gerr);
return FALSE;
}
diff --git a/tests/config.sh b/tests/config.sh
new file mode 100644
index 0000000..c1c656b
--- /dev/null
+++ b/tests/config.sh
@@ -0,0 +1,15 @@
+uname -r | grep el7 &> /dev/null
+el7=$(($? == 0))
+
+ods_path=/mnt/ands/ods/bin-fedora/
+vma_path=/mnt/ands/
+vma_lib_path=/mnt/ands/lib64-fedora/
+
+[ $el7 -eq 1 ] && ods_path="${ods_path/fedora/centos}"
+[ $el7 -eq 1 ] && vma_lib_path="${vma_lib_path/fedora/centos}"
+
+# Standard library
+vma_lib=${vma_lib_path}/libvma.so.8.6.10
+
+# With Mellanox OFED extensions (./configure --enable-socketxtreme)
+#vma_lib=${vma_lib_path}/mlx/libvma.so.8.6.10
diff --git a/tests/roof.json b/tests/roof.json
index 0dcd1e7..784fb6d 100644
--- a/tests/roof.json
+++ b/tests/roof.json
@@ -3,17 +3,14 @@
"protocol": "udp",
"port": 4000,
"streams": 16,
+ "header_size": 10,
"payload_size": 1280,
"dataset_size": 1024000
},
"performance": {
- "buffer_size": 2,
+ "buffer_size": 10,
"packets_at_once": 100
},
- "simulation": {
- "first_file_number": 1,
- "path": "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat"
- },
"setup": {
"planes": 2,
"modules": 16,
diff --git a/tests/roof.py b/tests/roof.py
index 2138931..a859069 100644
--- a/tests/roof.py
+++ b/tests/roof.py
@@ -1,8 +1,9 @@
import gi
import sys
import json
-import gobject
+import argparse
+gi.require_version('Ufo', '0.0')
from gi.repository import Ufo
from gi.repository import GObject
@@ -16,24 +17,40 @@ class RoofConfig:
self.streams = cfg["network"]["streams"]
elif cfg.get("setup", {}).get("modules") != None:
self.streams = cfg["setup"]["modules"]
-
+
+
config = "roof.json"
-cfg = RoofConfig(config)
+output = None
+
+parser = argparse.ArgumentParser()
+parser.add_argument('-c', '--config', dest="config", default="roof.json", help="ROOF configuration (JSON)")
+parser.add_argument('-o', '--output', dest="output", default=None, help="Output file")
+parser.add_argument('-n', '--number', dest="number", default=None, help="Specify number of frames to capture")
+args = parser.parse_args()
+
+
+cfg = RoofConfig(args.config)
pm = Ufo.PluginManager()
graph = Ufo.TaskGraph()
scheduler = Ufo.Scheduler()
+if args.output is None:
+ print ("Starting ROOF using NULL writter")
+ write = pm.get_task('null')
+ if args.number is None: args.number = 0
+else:
+ print ("Starting ROOF streaming to {}".format(args.output))
+ write = pm.get_task('write')
+ write.set_properties(filename=args.output)
+ if args.number is None: args.number = 5
build = pm.get_task('roof-build')
-build.set_properties(config=config, number=0)
-
-write = pm.get_task('write')
-write.set_properties(filename="test.raw")
+build.set_properties(config=args.config, number=args.number)
for id in range(cfg.streams):
read = pm.get_task('roof-read')
- read.set_properties(config=config, id=id)
+ read.set_properties(config=args.config, id=id)
graph.connect_nodes(read, build)
build.bind_property('stop', read, 'stop', GObject.BindingFlags.DEFAULT)
diff --git a/tests/roof.sh b/tests/roof.sh
index d0ec30a..df51679 100755
--- a/tests/roof.sh
+++ b/tests/roof.sh
@@ -1,3 +1,17 @@
+#! /bin/bash
+
+. config.sh
+
+bufs=800000
+bufs=$((bufs * 4))
+
cat roof.yaml | yq . > roof.json
-GI_TYPELIB_PATH="/usr/local/lib64/girepository-1.0/" python roof.py
+ulimit -l unlimited
+echo 1000000000 > /proc/sys/kernel/shmmax # 18446744073692774399
+echo 8000 > /proc/sys/vm/nr_hugepages # 0
+
+
+#VMA_THREAD_MODE=3 VMA_MTU=0 VMA_RX_POLL=0 VMA_SELECT_POLL=0 VMA_RING_ALLOCATION_LOGIC_RX=20 VMA_RX_BUFS=$bufs LD_PRELOAD=$vma_lib \
+ LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib64" GI_TYPELIB_PATH="/usr/local/lib64/girepository-1.0/" \
+ python roof.py "$@"
diff --git a/tests/roof.yaml b/tests/roof.yaml
index a49d53b..2a24d08 100644
--- a/tests/roof.yaml
+++ b/tests/roof.yaml
@@ -2,16 +2,17 @@ network:
protocol: udp
port: 4000
streams: 16
-# header_size: 0
+ header_size: 10
payload_size: 1280
# max_packet_size: 1284
dataset_size: 1024000
performance:
- buffer_size: 2
+ buffer_size: 10
+# drop_buffers: 0
packets_at_once: 100
-simulation:
- first_file_number: 1
- path: "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat"
+#simulation:
+# first_file_number: 1
+# path: "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat"
setup:
planes: 2
modules: 16
diff --git a/tests/test_file.sh b/tests/test_file.sh
index 5f47e45..1eb29ca 100644
--- a/tests/test_file.sh
+++ b/tests/test_file.sh
@@ -1,11 +1,15 @@
#! /bin/bash
+shopt -s extglob
+
packet_size=1280
packets_per_dataset=50
+rm -f roof_test.raw
for packet in $(seq 0 24); do
- for id in $(seq 0 15); do
- name=$(ls *$id.dat | grep -P "_0?$id.dat")
- dd if=$name of="roof_test.raw" bs=$packet_size count=$packets_per_dataset skip=$((packet * $packets_per_dataset)) oflag=append conv=notrunc
+ for id in $(seq 1 16); do
+ name=$(ls -- *$id.@(fx|dat) | grep -P "_0?$id\.\w+")
+ echo "Appending packet $packet from $name "
+ dd if=$name of="roof_test.raw" bs=$packet_size count=$packets_per_dataset skip=$((packet * $packets_per_dataset)) oflag=append conv=notrunc status=none
done
done
diff --git a/tests/vma-analyze.sh b/tests/vma-analyze.sh
new file mode 100755
index 0000000..7000922
--- /dev/null
+++ b/tests/vma-analyze.sh
@@ -0,0 +1,35 @@
+#! /bin/bash
+
+sleep=1
+
+# This doesn't work properly... Something is wrong with counters...
+
+path=/mnt/ands/bin/vma_stats_mlx
+#-z seems ignored
+#$path -p $(pidof onlineDetectorSimulatorServer) -c 1 -z &> /dev/null
+stats1=($($path -p $(pidof python) -c 1 | grep Rx | awk '{ print $3, $4 }'))
+sleep $sleep
+stats2=($($path -p $(pidof python) -c 1 | grep Rx | awk '{ print $3, $4 }'))
+
+pksum=0
+bwsum=0
+for i in "${!stats2[@]}"; do
+ if [ -n "$stats1" ]; then
+ diff=$(bc <<< "(${stats2[$i]} - ${stats1[$i]}) / $sleep")
+ else
+ diff=$(bc <<< "${stats2[$i]} / $sleep")
+ fi
+
+ if [ $((i & 1)) -eq 0 ]; then
+ echo -n "Queue: "
+ printf "packets: %9.3f kpps" $(bc -l <<< "1. * $diff / 1000")
+ pksum=$(($pksum + diff))
+ else
+ printf ", bandwidth: %9.3f Gb/s\n" $(bc -l <<< "8. * $diff / 1024 / 1024")
+ bwsum=$(($bwsum + diff))
+ fi
+done
+
+echo -n "Total: "
+printf "packets: %9.3f kpps" $(bc -l <<< "1. * $pksum / 1000")
+printf ", bandwidth: %9.3f Gb/s\n" $(bc -l <<< "8. * $bwsum / 1024 / 1024")