summaryrefslogtreecommitdiffstats
path: root/src/save/ufo-roof-read.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/save/ufo-roof-read.c')
-rw-r--r--src/save/ufo-roof-read.c123
1 files changed, 123 insertions, 0 deletions
diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c
new file mode 100644
index 0000000..f3d790d
--- /dev/null
+++ b/src/save/ufo-roof-read.c
@@ -0,0 +1,123 @@
+#include <stdio.h>
+#include <assert.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-thread.h"
+#include "ufo-roof-read.h"
+
+
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static void ufo_roof_read_optimize(UfoRoofConfig *cfg) {
+ // FIXME: request real-time permissions?
+ // FIXME: do we need this?
+/*
+ uint32_t lat = 0;
+ int fd = open("/dev/cpu_dma_latency", O_RDWR);
+ if (fd == -1) {
+ fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno));
+ } else {
+ write(fd, &lat, sizeof(lat));
+ close(fd);
+ }
+*/
+}
+
+
+const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) {
+ assert(ctx);
+ return ctx->settings;
+}
+
+
+UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead));
+ if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context");
+
+ ufo_roof_read_optimize(ctx);
+
+ ctx->n_threads = cfg->n_read_threads;
+ ctx->cfg = cfg;
+ ctx->buf = buf;
+ ctx->rdi = rdi;
+
+ ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread));
+ if (!ctx->thr) {
+ ufo_roof_read_free(ctx);
+ roof_new_error(error, "Error allocating UfoRoofReadThread contexts");
+ }
+
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread;
+ if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++;
+ ctx->n_threads = n_threads;
+
+ guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads;
+ if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread;
+
+ guint from, to;
+ for (i = 0, from = 0; i < n_threads; i++, from = to) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+
+ ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i);
+ }
+
+ return ctx;
+}
+
+void ufo_roof_read_free(UfoRoofRead *ctx) {
+ if (!ctx) return;
+
+ if (ctx->thr) {
+ int i;
+ ufo_roof_read_stop(ctx);
+ for (i = 0; i < ctx->n_threads; i++) {
+ if (ctx->thr[i])
+ ufo_roof_read_thread_free(ctx->thr[i]);
+ }
+ free(ctx->thr);
+ }
+ free(ctx);
+}
+
+gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) {
+ gboolean ret;
+ GError *gerr;
+
+ if ((!ctx)||(!ctx->thr)) return FALSE;
+
+ for (int i = 0; i < ctx->n_threads; i++) {
+ if (!ctx->thr[i]) return FALSE;
+ ret = ufo_roof_read_thread_start(ctx, &gerr);
+ if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i);
+ }
+ return TRUE;
+}
+
+gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) {
+ gboolean ret, res = FALSE;
+ GError *gerr;
+
+ if ((!ctx)||(!ctx->thr)) return FALSE;
+
+ for (int i = 0; i < ctx->n_threads; i++) {
+ if (!ctx->thr[i]) return FALSE;
+ ret = ufo_roof_read_thread_stop(ctx, &gerr);
+ if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i);
+ res |= !ret;
+ }
+ return !res;
+}
+
+