diff options
Diffstat (limited to 'src/save/ufo-roof-read.c')
-rw-r--r-- | src/save/ufo-roof-read.c | 123 |
1 files changed, 123 insertions, 0 deletions
diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c new file mode 100644 index 0000000..f3d790d --- /dev/null +++ b/src/save/ufo-roof-read.c @@ -0,0 +1,123 @@ +#include <stdio.h> +#include <assert.h> + +#include <ufo/ufo.h> + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + + +#include <errno.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +static void ufo_roof_read_optimize(UfoRoofConfig *cfg) { + // FIXME: request real-time permissions? + // FIXME: do we need this? +/* + uint32_t lat = 0; + int fd = open("/dev/cpu_dma_latency", O_RDWR); + if (fd == -1) { + fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno)); + } else { + write(fd, &lat, sizeof(lat)); + close(fd); + } +*/ +} + + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) { + assert(ctx); + return ctx->settings; +} + + +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) { + guint i; + GError *gerr = NULL; + + UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context"); + + ufo_roof_read_optimize(ctx); + + ctx->n_threads = cfg->n_read_threads; + ctx->cfg = cfg; + ctx->buf = buf; + ctx->rdi = rdi; + + ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread)); + if (!ctx->thr) { + ufo_roof_read_free(ctx); + roof_new_error(error, "Error allocating UfoRoofReadThread contexts"); + } + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; + if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++; + ctx->n_threads = n_threads; + + guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads; + if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread; + + guint from, to; + for (i = 0, from = 0; i < n_threads; i++, from = to) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); + } + + return ctx; +} + +void ufo_roof_read_free(UfoRoofRead *ctx) { + if (!ctx) return; + + if (ctx->thr) { + int i; + ufo_roof_read_stop(ctx); + for (i = 0; i < ctx->n_threads; i++) { + if (ctx->thr[i]) + ufo_roof_read_thread_free(ctx->thr[i]); + } + free(ctx->thr); + } + free(ctx); +} + +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) { + gboolean ret; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_start(ctx, &gerr); + if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i); + } + return TRUE; +} + +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) { + gboolean ret, res = FALSE; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_stop(ctx, &gerr); + if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i); + res |= !ret; + } + return !res; +} + + |