diff options
Diffstat (limited to 'src/ufo-roof-buffer.c')
-rw-r--r-- | src/ufo-roof-buffer.c | 79 |
1 files changed, 66 insertions, 13 deletions
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c index 32598c9..0e0a890 100644 --- a/src/ufo-roof-buffer.c +++ b/src/ufo-roof-buffer.c @@ -19,6 +19,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d buffer->max_datasets = max_datasets; buffer->ring_size = cfg->buffer_size; buffer->drop_buffers = cfg->drop_buffers; + buffer->latency_buffers = cfg->latency_buffers; buffer->n_dims = n_dims; buffer->dataset_size = cfg->dataset_size; buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8; @@ -35,6 +36,15 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int)); buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint)); +#ifdef UFO_ROOF_INDEPENDENT_STREAMS + buffer->first_id = malloc(buffer->ring_size * sizeof(guint64)); + if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets"); + for (guint i = 0; i < buffer->ring_size; i++) + buffer->first_id[i] = (guint64)-1; +#else + buffer->first_id = (guint64)-1; +#endif + if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) { ufo_roof_buffer_free(buffer); roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size); @@ -45,6 +55,10 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { if (buffer) { +#ifdef UFO_ROOF_INDEPENDENT_STREAMS + if (buffer->first_id) + free(buffer->first_id); +#endif if (buffer->ring_buffer) free(buffer->ring_buffer); if (buffer->n_fragments) @@ -57,10 +71,11 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { } // fragment_id is numbered from 1 (0 - means auto) -gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) { +gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) { gboolean ready = FALSE; guint buffer_id; - guint dataset_id; + guint64 first_id; + guint64 dataset_id; if (!fragment_id) { fragment_id = ++buffer->stream_fragment[stream_id]; @@ -69,21 +84,37 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required) dataset_id = (fragment_id - 1) / buffer->fragments_per_stream; fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; + +#ifdef UFO_ROOF_INDEPENDENT_STREAMS + if (buffer->first_id[stream_id] == (guint64)-1) + buffer->first_id[stream_id] = dataset_id; + first_id = buffer->first_id[stream_id]; +#else + if (buffer->first_id == (guint64)-1) + buffer->first_id = dataset_id; + first_id = buffer->first_id; +#endif + if (dataset_id < first_id) + return FALSE; + + dataset_id -= first_id; buffer_id = dataset_id % buffer->ring_size; // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? - if (dataset_id < buffer->current_id) + if (dataset_id < buffer->current_id) { + roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id); return FALSE; -// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); + } - if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) + if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) { +// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id); return FALSE; - + } // We are not fast enough, new packets are arrvining to fast if (dataset_id >= (buffer->current_id + buffer->ring_size)) { // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? - root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed", + root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed", dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); // FIXME: Send semi-complete buffers further? @@ -105,10 +136,14 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu // The updates may happen after writting/reading is finished. } -/* printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n", - buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size, - ((uint32_t*)fragment)[0] - );*/ +/* + printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n", + dataset_id, buffer_id, + buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset, + stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, + buffer->fragment_size, ((uint32_t*)fragment)[0] + ); +*/ uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; if (buffer->n_dims == 2) { @@ -116,8 +151,8 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu stream_id * buffer->fragment_dims[0] + // x-coordinate (fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0]; // y-coordinate - for (int i = 0; i < buffer->fragment_dims[1]; ++i) { - memcpy(fragment_buffer + i * buffer->dataset_dims[0], fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); + for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { + memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); } } else { // 1D stracture, simply putting fragment at the appropriate position in the stream @@ -132,11 +167,29 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? if (dataset_id == buffer->current_id) ready = TRUE; + else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) + ready = ufo_roof_buffer_skip_to_ready(buffer); } return ready; } +gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) { + for (guint i = 0; i < buffer->ring_size; i++) { + guint64 id = buffer->current_id + i; + guint buffer_id = id % buffer->ring_size; + + if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { + buffer->current_id = id; + return TRUE; + } + +// printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + buffer->n_fragments[buffer_id] = 0; + } + + return FALSE; +} gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) { |