#define _GNU_SOURCE #include #include #include #include #include #include #include #include "glib.h" #include "ufo-roof.h" #include "ufo-roof-read-socket.h" typedef struct { RoofReadInterface iface; RoofConfig *cfg; int socket; struct mmsghdr *msg; struct iovec *msgvec; uint8_t *buf; int libvma; } RoofReadSocket; static void roof_read_socket_free(RoofReadInterface *iface) { RoofReadSocket *reader = (RoofReadSocket*)iface; if (reader) { if (reader->socket >= 0) close(reader->socket); if (reader->msgvec) free(reader->msgvec); if (reader->msg) free(reader->msg); if (reader->buf) free(reader->buf); free(reader); } } static guint roof_read_socket(RoofReadInterface *iface, uint8_t **buf, GError **error) { int packets; struct timespec timeout_ts; assert(iface); assert(buf); RoofReadSocket *reader = (RoofReadSocket*)iface; RoofConfig *cfg = reader->cfg; timeout_ts.tv_sec = cfg->network_timeout / 1000000; timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000); //retry: // Timeout seems broken, see BUGS in 'recvmmsg' bugs page packets = recvmmsg(reader->socket, reader->msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); /* // FIXME: Shall we verify packets consistency here? We can check at least the sizes... if ((packets == 1)&&(msg[0].msg_len < 16)) { goto retry; } */ *buf = reader->buf; return (guint)packets; } RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error) { int err; int port = cfg->port + id; char port_str[16]; const char *addr_str = "192.168.100.8"; struct addrinfo sockaddr_hints; struct addrinfo *sockaddr_info; RoofReadSocket *reader = (RoofReadSocket*)calloc(1, sizeof(RoofReadSocket)); if (!reader) roof_new_error(error, "Can't allocate RoofReadSocket"); reader->cfg = cfg; reader->iface.close = roof_read_socket_free; reader->iface.read =roof_read_socket; snprintf(port_str, sizeof(port_str), "%d", port); port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0'; memset(&sockaddr_hints, 0, sizeof(sockaddr_hints)); if (!strncmp(cfg->protocol, "udp", 3)) { sockaddr_hints.ai_family = AF_UNSPEC; sockaddr_hints.ai_socktype = SOCK_DGRAM; sockaddr_hints.ai_protocol = IPPROTO_UDP; } else if (!strncmp(cfg->protocol, "tcp", 3)) { sockaddr_hints.ai_family = AF_UNSPEC; sockaddr_hints.ai_socktype = SOCK_STREAM; sockaddr_hints.ai_protocol = IPPROTO_TCP; } else { roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol); } err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); if (err || !sockaddr_info) { free(reader); roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str); } reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol); if(reader->socket == -1) { freeaddrinfo(sockaddr_info); free(reader); roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str); } err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); if(err != 0) { freeaddrinfo(sockaddr_info); close(reader->socket); free(reader); roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str); } /* // Check that FPGA module is ready char msg[4]; addr_str = "192.168.34.83"; getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); */ freeaddrinfo(sockaddr_info); if (reader->libvma) { } else { reader->buf = (uint8_t*)malloc(cfg->max_packets * cfg->max_packet_size); reader->msg = (struct mmsghdr*)malloc(cfg->max_packets * sizeof(struct mmsghdr)); reader->msgvec = (struct iovec*)malloc(cfg->max_packets * sizeof(struct iovec)); if ((!reader->buf)||(!reader->msg)||(!reader->msgvec)) { roof_read_socket_free((RoofReadInterface*)reader); roof_new_error(error, "Can't allocate socket buffer"); } memset(reader->msg, 0, cfg->max_packets * sizeof(struct mmsghdr)); memset(reader->msgvec, 0, cfg->max_packets * sizeof(struct iovec)); for (guint i = 0; i < cfg->max_packets; i++) { reader->msgvec[i].iov_base = reader->buf + i * cfg->max_packet_size; reader->msgvec[i].iov_len = cfg->max_packet_size; reader->msg[i].msg_hdr.msg_iov = &reader->msgvec[i]; reader->msg[i].msg_hdr.msg_iovlen = 1; } } return (RoofReadInterface*)reader; }