summaryrefslogtreecommitdiffstats
path: root/src/save/ufo-roof-read-thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/save/ufo-roof-read-thread.c')
-rw-r--r--src/save/ufo-roof-read-thread.c155
1 files changed, 155 insertions, 0 deletions
diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c
new file mode 100644
index 0000000..60868d2
--- /dev/null
+++ b/src/save/ufo-roof-read-thread.c
@@ -0,0 +1,155 @@
+#include <stdio.h>
+#include <threads.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-thread.h"
+#include "ufo-roof-read.h"
+
+
+UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) {
+ int i;
+ GError *gerr = NULL;
+
+ UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread));
+ if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context");
+
+ thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size);
+ if (!thr->rdbuf) {
+ ufo_roof_read_thread_free(thr);
+ roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer");
+ }
+
+ thr->rd = rd;
+ thr->from = from;
+ thr->to = to;
+
+
+ return thr;
+
+}
+
+void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) {
+ if (!thr) return;
+ if (thr->rdbuf) free(thr->rdbuf);
+
+ ufo_roof_thread_stop(thr, error);
+ free(thr);
+}
+
+static int ufo_roof_read_thread_run(void *arg) {
+ GError *gerr = NULL;
+
+ UfoRoofReadThread *thr = (UfoRoofReadThread*)arg;
+
+ UfoRoofConfig *cfg = thr->rd->cfg;
+ UfoRoofBuffer *buf = thr->rd->buf;
+ UfoRoofReadInterface *rdi = thr->rd->rdi;
+
+ guint from = thr->from;
+ guint to = thr->to;
+
+ void *rdbuf = thr->rdbuf;
+
+ uint64_t current_id[to - from] = {0};
+ uint64_t grabbed[to - from] = {0};
+
+ static uint64_t errors = 0;
+
+ while (thr->op != UFO_ROOF_OP_STOP) {
+ for (guint sid = from; sid < to; sid++) {
+ // FIXME break waiting on stop? If no packets are send
+ guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ guint ready = false;
+ const uint8_t *fragment = (uint8_t*)rdbuf;
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ }
+
+#ifdef UFO_ROOF_DEBUG
+ if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) {
+ printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets);
+ //if (++errors > 1000) exit(1);
+ }
+
+ current_id[sid - from] = packet_id;
+ grabbed[sid - from]++;
+ if ((grabbed[sid - from]%1000000)==0)
+ printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000);
+#endif
+
+ ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+ } // fragment-loop
+
+ // send notification? Broadcast blocks, we don't want it.
+ if (ready) {
+ }
+
+ } // socket-loop
+ } // operation-loop
+
+
+#ifdef UFO_ROOF_DEBUG
+ // Store first received packet on each channel...
+ static int debug = 1;
+ if (debug) {
+ char fname[256];
+ sprintf(fname, "channel%i_packet0.raw", priv->id);
+ FILE *f = fopen(fname, "w");
+ if (f) {
+ fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f);
+ fclose(f);
+ }
+ debug = 0;
+ }
+#endif /* UFO_ROOF_DEBUG */
+
+ // FIXME: End of data (shall we restart in the network case?)
+// if (!packets)
+// return FALSE;
+
+ // Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
+ header->channel_id = priv->id;
+// header->n_packets = packets;
+
+ return TRUE;
+}
+
+
+}
+
+gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) {
+ int err;
+ if (!thr) return FALSE;
+
+ err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr);
+ if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err);
+
+ ctx->launched = TRUE;
+ return TRUE;
+}
+
+gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) {
+ int err, ret;
+ if (!thr) return FALSE;
+ if (!thr->launched) return TRUE;
+
+ // Signal thread termination
+
+ err = thrd_join(&thr->thread, &ret);
+ if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err);
+
+ return TRUE;
+}
+