diff options
Diffstat (limited to 'src/roof-read-socket.c')
-rw-r--r-- | src/roof-read-socket.c | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/src/roof-read-socket.c b/src/roof-read-socket.c new file mode 100644 index 0000000..1b98b44 --- /dev/null +++ b/src/roof-read-socket.c @@ -0,0 +1,159 @@ +#define _GNU_SOURCE + +#include <stdio.h> +#include <unistd.h> +#include <assert.h> +#include <errno.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> + +#include "glib.h" + +#include "ufo-roof.h" +#include "ufo-roof-read-socket.h" + +typedef struct { + RoofReadInterface iface; + + RoofConfig *cfg; + + int socket; + struct mmsghdr *msg; + struct iovec *msgvec; + uint8_t *buf; + + int libvma; +} RoofReadSocket; + +static void roof_read_socket_free(RoofReadInterface *iface) { + RoofReadSocket *reader = (RoofReadSocket*)iface; + + if (reader) { + if (reader->socket >= 0) + close(reader->socket); + if (reader->msgvec) + free(reader->msgvec); + if (reader->msg) + free(reader->msg); + if (reader->buf) + free(reader->buf); + free(reader); + } +} + +static guint roof_read_socket(RoofReadInterface *iface, uint8_t **buf, GError **error) { + int packets; + struct timespec timeout_ts; + + assert(iface); + assert(buf); + + RoofReadSocket *reader = (RoofReadSocket*)iface; + RoofConfig *cfg = reader->cfg; + + timeout_ts.tv_sec = cfg->network_timeout / 1000000; + timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000); + + +//retry: + // Timeout seems broken, see BUGS in 'recvmmsg' bugs page + packets = recvmmsg(reader->socket, reader->msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); + if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); + +/* + // FIXME: Shall we verify packets consistency here? We can check at least the sizes... + if ((packets == 1)&&(msg[0].msg_len < 16)) { + goto retry; + } +*/ + + *buf = reader->buf; + return (guint)packets; +} + +RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error) { + int err; + int port = cfg->port + id; + char port_str[16]; + const char *addr_str = "192.168.100.8"; + struct addrinfo sockaddr_hints; + struct addrinfo *sockaddr_info; + + RoofReadSocket *reader = (RoofReadSocket*)calloc(1, sizeof(RoofReadSocket)); + if (!reader) roof_new_error(error, "Can't allocate RoofReadSocket"); + + reader->cfg = cfg; + reader->iface.close = roof_read_socket_free; + reader->iface.read =roof_read_socket; + + snprintf(port_str, sizeof(port_str), "%d", port); + port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0'; + + memset(&sockaddr_hints, 0, sizeof(sockaddr_hints)); + if (!strncmp(cfg->protocol, "udp", 3)) { + sockaddr_hints.ai_family = AF_UNSPEC; + sockaddr_hints.ai_socktype = SOCK_DGRAM; + sockaddr_hints.ai_protocol = IPPROTO_UDP; + } else if (!strncmp(cfg->protocol, "tcp", 3)) { + sockaddr_hints.ai_family = AF_UNSPEC; + sockaddr_hints.ai_socktype = SOCK_STREAM; + sockaddr_hints.ai_protocol = IPPROTO_TCP; + } else { + roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol); + } + + err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); + if (err || !sockaddr_info) { + free(reader); + roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str); + } + + reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol); + if(reader->socket == -1) { + freeaddrinfo(sockaddr_info); + free(reader); + roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str); + } + + err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); + if(err != 0) { + freeaddrinfo(sockaddr_info); + close(reader->socket); + free(reader); + roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str); + } + +/* + // Check that FPGA module is ready + char msg[4]; + addr_str = "192.168.34.83"; + getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); + sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); +*/ + freeaddrinfo(sockaddr_info); + + if (reader->libvma) { + } else { + reader->buf = (uint8_t*)malloc(cfg->max_packets * cfg->max_packet_size); + reader->msg = (struct mmsghdr*)malloc(cfg->max_packets * sizeof(struct mmsghdr)); + reader->msgvec = (struct iovec*)malloc(cfg->max_packets * sizeof(struct iovec)); + + if ((!reader->buf)||(!reader->msg)||(!reader->msgvec)) { + roof_read_socket_free((RoofReadInterface*)reader); + roof_new_error(error, "Can't allocate socket buffer"); + } + + memset(reader->msg, 0, cfg->max_packets * sizeof(struct mmsghdr)); + memset(reader->msgvec, 0, cfg->max_packets * sizeof(struct iovec)); + for (guint i = 0; i < cfg->max_packets; i++) { + reader->msgvec[i].iov_base = reader->buf + i * cfg->max_packet_size; + reader->msgvec[i].iov_len = cfg->max_packet_size; + reader->msg[i].msg_hdr.msg_iov = &reader->msgvec[i]; + reader->msg[i].msg_hdr.msg_iovlen = 1; + } + } + + + return (RoofReadInterface*)reader; +} |