diff options
Diffstat (limited to 'src/ufo-roof-buffer.c')
-rw-r--r-- | src/ufo-roof-buffer.c | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c new file mode 100644 index 0000000..f071481 --- /dev/null +++ b/src/ufo-roof-buffer.c @@ -0,0 +1,122 @@ +#include <stdio.h> +#include <stdint.h> + +#include "glib.h" + +#include "ufo-roof.h" +#include "ufo-roof-buffer.h" + +// This is currently not thread safe. With dual-filter architecture this will be called sequentially. + +UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) { + UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer)); + if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); + + buffer->ring_size = cfg->buffer_size; + buffer->fragment_size = cfg->payload_size; + buffer->dataset_size = cfg->dataset_size; + buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size; + buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams; +// printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size); + + buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size); + buffer->n_fragments = (_Atomic int*)calloc(buffer->ring_size, sizeof(_Atomic int)); + buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint)); + + if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) { + ufo_roof_buffer_free(buffer); + roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size); + } + + return buffer; +} + +void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { + if (buffer) { + if (buffer->ring_buffer) + free(buffer->ring_buffer); + if (buffer->n_fragments) + free(buffer->n_fragments); + if (buffer->stream_fragment) + free(buffer->stream_fragment); + + free(buffer); + } +} + + // fragment_id is numbered from 1 (0 - means auto) +gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) { + guint buffer_id; + guint dataset_id; + + if (!fragment_id) { + fragment_id = ++buffer->stream_fragment[stream_id]; + } + + // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required) + dataset_id = (fragment_id - 1) / buffer->fragments_per_stream; + fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; + buffer_id = dataset_id % buffer->ring_size; + + // Late arrived packed +// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id); + if (dataset_id < buffer->current_id) + roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); + + // We are not fast enough, new packets are arrvining to fast + if (dataset_id >= (buffer->current_id + buffer->ring_size)) { + // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? + + if (error) + root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed", + buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + + // FIXME: Send semi-complete buffers further? + // FIXME: Or shall we drop more if larger buffers are allocated? + for (int i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++) + buffer->n_fragments[i%buffer->ring_size] = 0; + buffer->current_id = dataset_id - buffer->ring_size + 1; + + // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset. + // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value + // The updates may happen after writting/reading is finished. + } + + // FIXME: This is builds events as it read from file in roof v.1 code. We can assemble fan projections directly here. + void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; + void *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size; + +/* printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n", + buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size, + ((uint32_t*)fragment)[0] + );*/ + memcpy(fragment_buffer, fragment, buffer->fragment_size); + + // FIXME: Sanity checks: verify is not a dublicate fragment? + atomic_fetch_add(&buffer->n_fragments[buffer_id], 1); + + if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { + // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? + if (dataset_id == buffer->current_id) { + return TRUE; + } + } + + return FALSE; +} + + + +gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error) { + guint buffer_id = buffer->current_id % buffer->ring_size; + void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; + + // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? + if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE; + + memcpy(output_buffer, dataset_buffer, buffer->dataset_size); + buffer->n_fragments[buffer_id] = 0; + buffer->current_id += 1; + + return TRUE; +} |