diff options
Diffstat (limited to 'src/ufo-roof-read-task.c')
-rw-r--r-- | src/ufo-roof-read-task.c | 68 |
1 files changed, 52 insertions, 16 deletions
diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c index 7d55b79..83b9627 100644 --- a/src/ufo-roof-read-task.c +++ b/src/ufo-roof-read-task.c @@ -35,7 +35,7 @@ struct _UfoRoofReadTaskPrivate { gchar *config; // ROOF configuration file name UfoRoofConfig *cfg; // Parsed ROOF parameters - UfoRoofReadInterface *reader; + UfoRoofReadInterface *reader[16]; guint id; // Reader ID (defince sequential port number) gboolean stop; // Flag requiring termination @@ -76,6 +76,7 @@ ufo_roof_read_task_setup (UfoTask *task, UfoResources *resources, GError **error) { + guint i; GError *gerr = NULL; UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task); @@ -91,17 +92,19 @@ ufo_roof_read_task_setup (UfoTask *task, roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams); // Start actual reader - if (priv->simulate) { - if (!priv->path) - roof_setup_error(error, "Path to simulated data should be specified"); - - priv->reader = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id + priv->first_file_number, &gerr); - } else - priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr); - if (!priv->reader) - roof_propagate_error(error, gerr, "roof_read_new: "); + for (i = 0; (i < priv->cfg->sockets_per_thread)&&((priv->id * priv->cfg->sockets_per_thread + i) < priv->cfg->n_streams); i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->reader[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr); + } else + priv->reader[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr); + if (!priv->reader[i]) + roof_propagate_error(error, gerr, "roof_read_new: "); + } } @@ -110,8 +113,10 @@ ufo_roof_read_task_finalize (GObject *object) { UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object); - if (priv->reader) { - priv->reader->close(priv->reader); + for (guint i = 0; i < priv->cfg->sockets_per_thread; i++) { + if (priv->reader[i]) { + priv->reader[i]->close(priv->reader[i]); + } } if (priv->cfg) { @@ -180,16 +185,47 @@ ufo_roof_read_task_generate (UfoTask *task, void *output_buffer = ufo_buffer_get_host_array(output, NULL); UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg); + uint64_t current_id[16] = {0}; + uint64_t grabbed[16] = {0}; + + static uint64_t errors = 0; +retry: if (priv->stop) return FALSE; - guint packets = priv->reader->read(priv->reader, output_buffer, &gerr); + for (guint sid = 0; (sid < cfg->sockets_per_thread)&&((priv->id * cfg->sockets_per_thread + sid) < priv->cfg->n_streams); sid++) { + + guint packets = priv->reader[sid]->read(priv->reader[sid], output_buffer, &gerr); if (gerr) { g_warning("Error reciving data: %s", gerr->message); g_error_free(gerr); return FALSE; } + const uint8_t *fragment = output_buffer; + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } + + if ((current_id[sid])&&(current_id[sid] + 1 != packet_id)) { + printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); + //if (++errors > 1000) exit(1); + } + current_id[sid] = packet_id; + grabbed[sid]++; + if ((grabbed[sid]%1000000)==0) printf("Channel %i(%i): Grabbed %lu Mpackets\n", priv->id * cfg->sockets_per_thread + sid, sid, grabbed[sid]/1000000); + + fragment += cfg->max_packet_size; + } + } + + goto retry; + #ifdef UFO_ROOF_DEBUG // Store first received packet on each channel... static int debug = 1; @@ -206,12 +242,12 @@ ufo_roof_read_task_generate (UfoTask *task, #endif /* UFO_ROOF_DEBUG */ // FIXME: End of data (shall we restart in the network case?) - if (!packets) - return FALSE; +// if (!packets) +// return FALSE; // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? header->channel_id = priv->id; - header->n_packets = packets; +// header->n_packets = packets; return TRUE; } |