summaryrefslogtreecommitdiffstats
path: root/src/ufo-roof-buffer.c
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2019-11-17 16:58:02 +0100
committerSuren A. Chilingaryan <csa@suren.me>2019-11-17 16:58:02 +0100
commitea9626b60092f2d2c79431718c3ca8bc383429a6 (patch)
treef76a6dcf118fc3977eda1cbcf368018715ebe01c /src/ufo-roof-buffer.c
parent23f22348c5281fff685c1fa89255e7e1e76266a4 (diff)
downloadufo-roof-temp-ea9626b60092f2d2c79431718c3ca8bc383429a6.tar.gz
ufo-roof-temp-ea9626b60092f2d2c79431718c3ca8bc383429a6.tar.bz2
ufo-roof-temp-ea9626b60092f2d2c79431718c3ca8bc383429a6.tar.xz
ufo-roof-temp-ea9626b60092f2d2c79431718c3ca8bc383429a6.zip
Networking setup
Diffstat (limited to 'src/ufo-roof-buffer.c')
-rw-r--r--src/ufo-roof-buffer.c36
1 files changed, 22 insertions, 14 deletions
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c
index eaf9b35..f83885e 100644
--- a/src/ufo-roof-buffer.c
+++ b/src/ufo-roof-buffer.c
@@ -13,6 +13,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
buffer->ring_size = cfg->buffer_size;
+ buffer->drop_buffers = cfg->drop_buffers;
buffer->fragment_size = cfg->payload_size;
buffer->dataset_size = cfg->dataset_size;
buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
@@ -46,6 +47,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
// fragment_id is numbered from 1 (0 - means auto)
gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
+ gboolean ready = FALSE;
guint buffer_id;
guint dataset_id;
@@ -58,25 +60,32 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
buffer_id = dataset_id % buffer->ring_size;
- // Late arrived packed
-// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id);
+ // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
if (dataset_id < buffer->current_id)
- roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
+ return FALSE;
+// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
// We are not fast enough, new packets are arrvining to fast
if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
// FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
- if (error)
- root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed",
- buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed",
+ dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
// FIXME: Send semi-complete buffers further?
// FIXME: Or shall we drop more if larger buffers are allocated?
- for (guint i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++)
- buffer->n_fragments[i%buffer->ring_size] = 0;
- buffer->current_id = dataset_id - buffer->ring_size + 1;
-
+ if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
+ memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
+ buffer->current_id = dataset_id;
+ } else {
+ for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
+ buffer->n_fragments[i%buffer->ring_size] = 0;
+ buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
+ }
+
+ if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
+ ready = TRUE;
+
// FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
// To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value
// The updates may happen after writting/reading is finished.
@@ -97,12 +106,11 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
// FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
- if (dataset_id == buffer->current_id) {
- return TRUE;
- }
+ if (dataset_id == buffer->current_id)
+ ready = TRUE;
}
- return FALSE;
+ return ready;
}