diff options
Diffstat (limited to 'src/save/ufo-roof-read-thread.c')
-rw-r--r-- | src/save/ufo-roof-read-thread.c | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c new file mode 100644 index 0000000..60868d2 --- /dev/null +++ b/src/save/ufo-roof-read-thread.c @@ -0,0 +1,155 @@ +#include <stdio.h> +#include <threads.h> + +#include <ufo/ufo.h> + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) { + int i; + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context"); + + thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size); + if (!thr->rdbuf) { + ufo_roof_read_thread_free(thr); + roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer"); + } + + thr->rd = rd; + thr->from = from; + thr->to = to; + + + return thr; + +} + +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) { + if (!thr) return; + if (thr->rdbuf) free(thr->rdbuf); + + ufo_roof_thread_stop(thr, error); + free(thr); +} + +static int ufo_roof_read_thread_run(void *arg) { + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)arg; + + UfoRoofConfig *cfg = thr->rd->cfg; + UfoRoofBuffer *buf = thr->rd->buf; + UfoRoofReadInterface *rdi = thr->rd->rdi; + + guint from = thr->from; + guint to = thr->to; + + void *rdbuf = thr->rdbuf; + + uint64_t current_id[to - from] = {0}; + uint64_t grabbed[to - from] = {0}; + + static uint64_t errors = 0; + + while (thr->op != UFO_ROOF_OP_STOP) { + for (guint sid = from; sid < to; sid++) { + // FIXME break waiting on stop? If no packets are send + guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr); + if (gerr) roof_print_error(gerr); + + guint ready = false; + const uint8_t *fragment = (uint8_t*)rdbuf; + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } + +#ifdef UFO_ROOF_DEBUG + if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) { + printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); + //if (++errors > 1000) exit(1); + } + + current_id[sid - from] = packet_id; + grabbed[sid - from]++; + if ((grabbed[sid - from]%1000000)==0) + printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000); +#endif + + ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + } // fragment-loop + + // send notification? Broadcast blocks, we don't want it. + if (ready) { + } + + } // socket-loop + } // operation-loop + + +#ifdef UFO_ROOF_DEBUG + // Store first received packet on each channel... + static int debug = 1; + if (debug) { + char fname[256]; + sprintf(fname, "channel%i_packet0.raw", priv->id); + FILE *f = fopen(fname, "w"); + if (f) { + fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f); + fclose(f); + } + debug = 0; + } +#endif /* UFO_ROOF_DEBUG */ + + // FIXME: End of data (shall we restart in the network case?) +// if (!packets) +// return FALSE; + + // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? + header->channel_id = priv->id; +// header->n_packets = packets; + + return TRUE; +} + + +} + +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) { + int err; + if (!thr) return FALSE; + + err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err); + + ctx->launched = TRUE; + return TRUE; +} + +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) { + int err, ret; + if (!thr) return FALSE; + if (!thr->launched) return TRUE; + + // Signal thread termination + + err = thrd_join(&thr->thread, &ret); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err); + + return TRUE; +} + |