#include #include #include #include "ufo-roof-buffer.h" #include "ufo-roof-read-thread.h" #include "ufo-roof-read.h" #include #include #include #include static void ufo_roof_read_optimize(UfoRoofConfig *cfg) { // FIXME: request real-time permissions? // FIXME: do we need this? /* uint32_t lat = 0; int fd = open("/dev/cpu_dma_latency", O_RDWR); if (fd == -1) { fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno)); } else { write(fd, &lat, sizeof(lat)); close(fd); } */ } const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) { assert(ctx); return ctx->settings; } UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) { guint i; GError *gerr = NULL; UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead)); if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context"); ufo_roof_read_optimize(ctx); ctx->n_threads = cfg->n_read_threads; ctx->cfg = cfg; ctx->buf = buf; ctx->rdi = rdi; ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread)); if (!ctx->thr) { ufo_roof_read_free(ctx); roof_new_error(error, "Error allocating UfoRoofReadThread contexts"); } // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++; ctx->n_threads = n_threads; guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads; if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread; guint from, to; for (i = 0, from = 0; i < n_threads; i++, from = to) { guint to = from + sockets_per_thread; if (i < extra) to++; ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr); if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); } return ctx; } void ufo_roof_read_free(UfoRoofRead *ctx) { if (!ctx) return; if (ctx->thr) { int i; ufo_roof_read_stop(ctx); for (i = 0; i < ctx->n_threads; i++) { if (ctx->thr[i]) ufo_roof_read_thread_free(ctx->thr[i]); } free(ctx->thr); } free(ctx); } gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) { gboolean ret; GError *gerr; if ((!ctx)||(!ctx->thr)) return FALSE; for (int i = 0; i < ctx->n_threads; i++) { if (!ctx->thr[i]) return FALSE; ret = ufo_roof_read_thread_start(ctx, &gerr); if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i); } return TRUE; } gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) { gboolean ret, res = FALSE; GError *gerr; if ((!ctx)||(!ctx->thr)) return FALSE; for (int i = 0; i < ctx->n_threads; i++) { if (!ctx->thr[i]) return FALSE; ret = ufo_roof_read_thread_stop(ctx, &gerr); if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i); res |= !ret; } return !res; }