path: root/src/roof.c
diff options
authorSuren A. Chilingaryan <>2020-09-03 03:00:30 +0200
committerSuren A. Chilingaryan <>2020-09-03 03:00:30 +0200
commit5172421d248250b4ab3b69eb57fd83656e23a4da (patch)
treea499d9f1dd0b74b754816884a59927b3171656fc /src/roof.c
parent7b2e6168b049be9e7852b2d364d897592eff69fc (diff)
This is unfinished work implemeting out-of-UFO network serversHEADmaster
Diffstat (limited to 'src/roof.c')
1 files changed, 133 insertions, 0 deletions
diff --git a/src/roof.c b/src/roof.c
new file mode 100644
index 0000000..505e3aa
--- /dev/null
+++ b/src/roof.c
@@ -0,0 +1,133 @@
+void roof_init() {
+ hw_sched_init();
+Roof *roof_new(UfoRoofConfig *cfg, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+ Roof *ctx = (Roof*)calloc(1, sizeof(Roof));
+ if (!ctx) roof_new_error(error, "Can't allocate Roof context");
+ ctx->cfg = cfg;
+ ctx->n_threads = cfg->n_streams / cfg->sockets_per_thread;
+ if (cfg->n_streams % cfg->sockets_per_thread) ctx->n_threads++;
+ ctx->rdi = (RoofReadInterface**)calloc(cfg->n_streams, sizeof(RoofReadInterface*));
+ ctx->rdc = (RoofReadContext**)calloc(cfg->n_streams, sizeof(RoofReadContext*));
+ ctx->rdt = (RoofThreadContext**)calloc(ctx->n_threads, sizeof(RoofThreadContext*));
+ ctx->sched = hw_sched_create(cfg->n_threads);
+ if ((!ctx->rdi)||(!ctx->rdc)||(!ctx->rdt)||(!ctx->sched)) {
+ roof_free(ctx);
+ roof_setup_error(error, "Failed to allocate memory for various Roof contexts");
+ }
+ return ctx;
+void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error) {
+ assert(ctx);
+ ctx->simulate = 1;
+ ctx->path = path;
+ ctx->first_file_number = first_file_number;
+void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error) {
+ assert(ctx);
+ ctx->max_datasets = max;
+void roof_setup(Roof *ctx, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+ assert(ctx);
+ RoofConfig *cfg = ctx->cfg;
+ ctx->buf = roof_buffer_new(cfg, 2, ctx->max_datasets, &gerr);
+ if ((gerr)||(!ctx->buf))
+ roof_propagate_error(error, gerr, "roof_buffer_new: ");
+ for (i = 0; i < cfg->n_streams; i++) {
+ if (ctx->simulate) {
+ if (!ctx->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+ ctx->rdi[i] = roof_read_file_new(cfg, ctx->path, ctx->first_file_number + i, &gerr);
+ } else {
+ ctx->rdi[i] = roof_read_socket_new(cfg, i, &gerr);
+ }
+ if (!ctx->rdi[i])
+ roof_propagate_error(error, gerr, "roof_read_interface_new: ");
+ ctx->rdc[i] = roof_read_context_new(cfg, ctx->rdi[i], &gerr);
+ if (!ctx->rdc[i])
+ roof_propagate_error(error, gerr, "roof_read_context_new: ");
+ }
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ guint extra = 0, sockets_per_thread = cfg->n_streams / ctx->n_threads;
+ if (cfg->n_streams % ctx->n_threads) extra = cfg->n_streams - ctx->n_threads * sockets_per_thread;
+ guint from, to;
+ for (i = 0; i < ctx->n_threads; i++) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+ ctx->thr[i]= roof_thread_new(cfg, ctx, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "roof_thread_new (%i): ", i);
+ }
+void roof_free(Roof *ctx) {
+ guint i;
+ if (ctx) {
+ RoofConfig *cfg = ctx->cfg;
+ if (ctx->sched) hw_sched_destroy(priv->sched);
+ if (ctx->rdt) {
+ for (i = 0; i < ctx->n_threads; i++)
+ if (ctx->rdt[i]) roof_thread_context_free(ctx->rdt[i]);
+ free(ctx->rdt);
+ }
+ if (ctx->rdc) {
+ for (i = 0; i < cfg->n_streams; i++)
+ if (ctx->rdc[i]) roof_read_context_free(ctx->rdc[i]);
+ free(ctx->rdc);
+ }
+ if (ctx->rdi) {
+ for (i = 0; i < cfg->n_streams; i++)
+ if (ctx->rdi[i]) roof_read_interface_free(ctx->rdi[i]);
+ free(ctx->rdi);
+ }
+ if (ctx->buf) roof_buffer_free(ctx->buf);
+ free(ctx);
+ }
+void roof_read_dataset(Roof *ctx, void *buffer, GError **error) {
+ priv->current_dataset;
+ priv->current_buffer = buffer;
+ err = hw_sched_schedule_thread_task(sched, (void*)ctx, roof_thread_read);
+ if (!err) err = hw_sched_wait_task(sched);
+ if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); }