diff options
| author | Timo Dritschler <timo.dritschler@kit.edu> | 2014-12-10 15:27:32 +0100 | 
|---|---|---|
| committer | Timo Dritschler <timo.dritschler@kit.edu> | 2014-12-10 15:27:32 +0100 | 
| commit | 50a297a290c78c7feb0a4918efba82edd019590b (patch) | |
| tree | 2407fb62f377a2212df46fdae5f13b9219a2adac | |
| parent | b948c151b9f45db2b90a9ecf95ffd4f54edf3924 (diff) | |
| download | kiro-50a297a290c78c7feb0a4918efba82edd019590b.tar.gz kiro-50a297a290c78c7feb0a4918efba82edd019590b.tar.bz2 kiro-50a297a290c78c7feb0a4918efba82edd019590b.tar.xz kiro-50a297a290c78c7feb0a4918efba82edd019590b.zip | |
Fixed KIRO client getting stuck in the RDMA event handler
Fix #8: KIRO Server and Client now have routines to handle async communication
Fix #6: Added kiro_client_ping_server and respective event handling to server
Changed kiro-test-latency to use the new kiro_client_ping_server instead
| -rw-r--r-- | src/kiro-client.c | 152 | ||||
| -rw-r--r-- | src/kiro-client.h | 13 | ||||
| -rw-r--r-- | src/kiro-rdma.h | 14 | ||||
| -rw-r--r-- | src/kiro-server.c | 19 | ||||
| -rw-r--r-- | test/test-client-latency.c | 32 | 
5 files changed, 195 insertions, 35 deletions
| diff --git a/src/kiro-client.c b/src/kiro-client.c index 084e4b8..6e140b5 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -65,6 +65,11 @@ struct _KiroClientPrivate {  G_DEFINE_TYPE (KiroClient, kiro_client, G_TYPE_OBJECT); +// Temporary storage and lock for PING timing +G_LOCK_DEFINE (ping_time); +volatile struct timeval ping_time; + +  KiroClient *  kiro_client_new (void)  { @@ -92,6 +97,8 @@ kiro_client_init (KiroClient *self)      memset (priv, 0, sizeof (&priv));      //Hack to make the 'unused function' from the kiro-rdma include go away...      kiro_attach_qp (NULL); +    ping_time.tv_sec = -1; +    ping_time.tv_usec = -1;  } @@ -160,10 +167,15 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)      KiroClientPrivate *priv = (KiroClientPrivate *)data;      struct ibv_wc wc; -    if (rdma_get_recv_comp (priv->conn, &wc) < 0) { -        g_critical ("Failure waiting for POST from server: %s", strerror (errno)); -        return FALSE;  +    if (ibv_poll_cq (priv->conn->recv_cq, 1, &wc) < 0) { +        g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); +        return FALSE;      } +    void *cq_ctx; +    struct ibv_cq *cq; +    int err = ibv_get_cq_event (priv->conn->recv_cq_channel, &cq, &cq_ctx); +    if (!err) +        ibv_ack_cq_events (cq, 1);      struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context;      guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; @@ -185,6 +197,22 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)              return FALSE;          }      } +    if (type == KIRO_PONG) { +        G_LOCK (ping_time); +        struct timeval local_time; +        gettimeofday (&local_time, NULL); + +        if (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) { +            g_debug ("Received PONG message from server"); +            ping_time.tv_sec = local_time.tv_sec; +            ping_time.tv_usec = local_time.tv_usec; +        } +        else { +            g_debug ("Received unexpected PONG message from server"); +        } + +        G_UNLOCK (ping_time); +    }      //Post a generic receive in order to stay responsive to any messages from      //the server @@ -202,6 +230,7 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)      // for this in the kiro client main loop.      ibv_req_notify_cq (priv->conn->recv_cq, 0); +    g_debug ("Finished RDMA event handling");      return TRUE;  } @@ -287,6 +316,7 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port)      }      g_message ("Connection to server established. Waiting for response."); +    ibv_req_notify_cq (priv->conn->recv_cq, 0); // Make the respective Queue push events onto the channel      if (!process_rdma_event (NULL, 0, (gpointer)priv)) {          g_critical ("No RDMA access information received from the server. Failed to connect.");          goto fail; @@ -355,11 +385,119 @@ kiro_client_sync (KiroClient *self)      }  fail: -    kiro_destroy_connection (&(priv->conn));  +    kiro_destroy_connection (&(priv->conn));      return -1;  } +gboolean +ping_timeout (gpointer data) { + +    //Not needed. Void it to prevent 'unused variable' warning +    (void) data; + +    G_LOCK (ping_time); + +    // Maybe the server did answer while dispatching the timeout? +    if (ping_time.tv_sec != 0 || ping_time.tv_usec != 0) { +        goto done; +    } + +    ping_time.tv_usec = -1; +    ping_time.tv_sec = -1; + + +done: +    G_UNLOCK (ping_time); + +    // Return FALSE to automtically stop the timeout from reoccuring +    return FALSE; +} + + +gint +kiro_client_ping_server (KiroClient *self) +{ +    // Will be returned. -1 for error. +    gint t_usec = 0; + +    KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); +    if (!priv->conn) { +        g_warning ("Client not connected"); +        return -1; +    } + +    struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; + +    struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *)(ctx->cf_mr_send->mem); +    msg->msg_type = KIRO_PING; + +    G_LOCK (ping_time); +    ping_time.tv_sec = 0; +    ping_time.tv_usec = 0; +    struct timeval local_time; +    gettimeofday (&local_time, NULL); + +    if (rdma_post_send (priv->conn, priv->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { +        g_warning ("Failure while trying to post SEND for PING: %s", strerror (errno)); +        t_usec = -1; +        goto end; +    } +    G_UNLOCK (ping_time); + +    struct ibv_wc wc; +    if (rdma_get_send_comp (priv->conn, &wc) < 0) { +        g_warning ("Failure during PING send: %s", strerror (errno)); +        t_usec = -1; +        goto end; +    } + +    // Set a two-second timeout for the ping +    guint timeout = g_timeout_add_seconds (2, ping_timeout, NULL); + +    //Wait for ping response +    while (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) {}; + + +    G_LOCK (ping_time); +    // No response from the server. Timeout kicked in +    // (Note: The timeout callback has already deregistered itself. We don't +    // need to do that here again) +    if (ping_time.tv_sec == -1 && ping_time.tv_usec == -1) { +        g_message ("PING timed out."); +        G_UNLOCK (ping_time); +        t_usec = -1; +        goto end; +    } + +    // Remove the timeout +    GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout); +    if (timeout_source) { +        g_source_destroy (timeout_source); +    } + +    gint secs = ping_time.tv_sec - local_time.tv_sec; + +    // tv_usecs wraps back to 0 at 1000000us (1s). +    // This might cause our calculation to produce negative numbers when time > 1s. +    for (int i = 0; i < secs; i++) { +        ping_time.tv_usec += 1000 * 1000; +    } +    t_usec = ping_time.tv_usec - local_time.tv_usec; +    gint millis = (gint)(t_usec/1000.); +    G_UNLOCK (ping_time); + +    g_debug ("Server responded to PING in: %is, %ims, %ius", secs, millis, t_usec); + +end: +    G_LOCK (ping_time); +    ping_time.tv_sec = -1; +    ping_time.tv_usec = -1; +    G_UNLOCK (ping_time); +    return t_usec; +} + +  void *  kiro_client_get_memory (KiroClient *self)  { @@ -377,7 +515,7 @@ kiro_client_get_memory (KiroClient *self)  } -size_t  +size_t  kiro_client_get_memory_size (KiroClient *self)  {      KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); @@ -431,13 +569,13 @@ kiro_client_disconnect (KiroClient *self)      priv->close_signal = FALSE;      //kiro_destroy_connection does not free RDMA memory. Therefore, we need to -    //cache the memory pointer and free the memory afterwards manually  +    //cache the memory pointer and free the memory afterwards manually      struct kiro_connection_context *ctx = (struct kiro_connection_context *) (priv->conn->context);      void *rdma_mem = ctx->rdma_mr->mem;      kiro_destroy_connection (&(priv->conn));      free (rdma_mem); -    // priv->ec is just an easy-access pointer. Don't free it. Just NULL it  +    // priv->ec is just an easy-access pointer. Don't free it. Just NULL it      priv->ec = NULL;      g_message ("Client disconnected from server");  } diff --git a/src/kiro-client.h b/src/kiro-client.h index 9c6036d..3be2621 100644 --- a/src/kiro-client.h +++ b/src/kiro-client.h @@ -160,6 +160,19 @@ void        kiro_client_disconnect             (KiroClient *client);  int         kiro_client_sync                (KiroClient *client);  /** + * kiro_client_ping_server - Sends a PING to the server + * @client: (transfer none): The #KiroServer to send the PING from + * Returns: + *   A #guint telling the time (in microseconds) how long it took for the + *   connected #KiroServer to reply + * Description: + *   Sends a PING package to the connected #KiroServer and waits for a PONG + *   package from that server. The time between sending the PING and receiving + *   the PONG (in microseconds) is measured and returned by this function. + */ +gint        kiro_client_ping_server         (KiroClient *client); + +/**   * kiro_client_get_memory - Return a pointer to the current client memory   * @client: (transfer none): The #KiroClient to get the memory from   * Returns: (transfer none): diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h index 361dabc..5b4895f 100644 --- a/src/kiro-rdma.h +++ b/src/kiro-rdma.h @@ -19,6 +19,7 @@  #include <stdio.h>  #include <stdlib.h>  #include <unistd.h> +#include <sys/time.h>  #ifndef __KIRO_RDMA_H__  #define __KIRO_RDMA_H__ @@ -42,9 +43,7 @@ struct kiro_connection_context {          KIRO_IDLE,          KIRO_MRI_REQUESTED,                         // Memory Region Information Requested          KIRO_RDMA_ESTABLISHED,                      // MRI Exchange complete. RDMA is ready -        KIRO_RDMA_ACTIVE,                           // RDMA Operation is being performed -        KIRO_PING,                                  // PING Message -        KIRO_PONG                                   // PONG Message (PING reply) +        KIRO_RDMA_ACTIVE                            // RDMA Operation is being performed      } rdma_state;  }; @@ -55,11 +54,12 @@ struct kiro_ctrl_msg {      enum {          KIRO_REQ_RDMA,                              // Requesting RDMA Access to/from the peer          KIRO_ACK_RDMA,                              // acknowledge RDMA Request and provide Memory Region Information -        KIRO_REJ_RDMA                               // RDMA Request rejected :(  (peer_mri will be invalid) +        KIRO_REJ_RDMA,                              // RDMA Request rejected :(  (peer_mri will be invalid) +        KIRO_PING,                                  // PING Message +        KIRO_PONG                                   // PONG Message (PING reply)      } msg_type;      struct ibv_mr peer_mri; -  }; @@ -89,8 +89,8 @@ kiro_attach_qp (struct rdma_cm_id *id)      qp_attr.send_cq = id->send_cq;      qp_attr.recv_cq = id->recv_cq;      qp_attr.qp_type = IBV_QPT_RC; -    qp_attr.cap.max_send_wr = 1; -    qp_attr.cap.max_recv_wr = 1; +    qp_attr.cap.max_send_wr = 10; +    qp_attr.cap.max_recv_wr = 10;      qp_attr.cap.max_send_sge = 1;      qp_attr.cap.max_recv_sge = 1;      return rdma_create_qp (id, id->pd, &qp_attr); diff --git a/src/kiro-server.c b/src/kiro-server.c index ad4593b..e7a3908 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -247,18 +247,35 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)      guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type;      g_debug ("Received a message from Client %u of type %u", cc->id, type); +    if (type == KIRO_PING) { +        struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem); +        msg->msg_type = KIRO_PONG; + +        if (rdma_post_send (cc->conn, cc->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { +            g_warning ("Failure while trying to post PONG send: %s", strerror (errno)); +            goto done; +        } + +        if (rdma_get_send_comp (cc->conn, &wc) < 0) { +            g_warning ("An error occured while sending PONG: %s", strerror (errno)); +        } +    } + +done:      //Post a generic receive in order to stay responsive to any messages from      //the client      if (rdma_post_recv (cc->conn, cc->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {          //TODO: Connection teardown in an event handler routine? Not a good          //idea... -        g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); +        g_critical ("Posting generic receive for event handling failed: %s", strerror (errno));          kiro_destroy_connection_context (&ctx);          rdma_destroy_ep (cc->conn);          return FALSE;      }      ibv_req_notify_cq (cc->conn->recv_cq, 0); // Make the respective Queue push events onto the channel + +    g_debug ("Finished RDMA event handling");      return TRUE;  } diff --git a/test/test-client-latency.c b/test/test-client-latency.c index d05747d..208c37c 100644 --- a/test/test-client-latency.c +++ b/test/test-client-latency.c @@ -6,7 +6,7 @@  #include <assert.h> -int  +int  main ( int argc, char *argv[] )  {      if (argc < 3) { @@ -15,38 +15,30 @@ main ( int argc, char *argv[] )      }      KiroClient *client = kiro_client_new (); -    KiroTrb *trb = kiro_trb_new ();      if (-1 == kiro_client_connect (client, argv[1], argv[2])) {          kiro_client_free (client);          return -1;      } -    kiro_client_sync (client); -    kiro_trb_adopt (trb, kiro_client_get_memory (client)); +    int iterations = 10000; -    GTimer *timer = g_timer_new (); -while (1) {  -    g_timer_reset (timer); +while (1) {      int i = 0; -    while(i < 50000) { -        kiro_client_sync (client); +    float ping_us = 0; +    int fail_count = 0; +    while(i < iterations) { +        float tmp = kiro_client_ping_server (client); +        if (tmp < 0) +            fail_count++; +        else +            ping_us += tmp;          i++;      } -    double elapsed = g_timer_elapsed (timer, NULL); -    printf ("Average Latency: %fus\n", (elapsed/50000.)*1000*1000); +    printf ("Average Latency: %fus\n", ping_us/(float)(iterations - fail_count));  } -    g_timer_stop (timer);      kiro_client_free (client); -    kiro_trb_free (trb);      return 0;  } - - - - - - - | 
