Reimplement xseg paths and add xseg_forward
authorFilippos Giannakos <philipgian@grnet.gr>
Fri, 22 Feb 2013 16:50:13 +0000 (18:50 +0200)
committerFilippos Giannakos <philipgian@grnet.gr>
Wed, 27 Feb 2013 15:37:01 +0000 (17:37 +0200)
Reimplement xseg paths. Paths are a tool for the administrator to dynamically
alter a running xseg setup, by forcing a request to be submitted to a different
port than the original destination.

xseg_forward, on the other hand, allows a peer to alter the normal path of a
request by changing the effective destination of the request, and thus allows
the dynamic creation of temporal paths based on various criteria.

Also, add CAN_RECEIVE and CAN_ACCEPT port flags, allowing each peer to notify
the segment whether it is able to accept or receive on the specified port.

xseg/peers/user/peer.c
xseg/peers/user/xseg-tool.c
xseg/sys/util.h
xseg/xseg/xseg.c
xseg/xseg/xseg.h
xseg/xseg/xseg_exports.h

index defa168..a9ec9b3 100644 (file)
@@ -194,7 +194,7 @@ void print_req(struct xseg *xseg, struct xseg_request *req)
                strncpy(data, req_data, 63);
                data[63] = 0;
                printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
-                               "src: %u, st: %u, dst: %u dt: %u\n"
+                               "src: %u, transit: %u, dst: %u effective dst: %u\n"
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
                                (unsigned long)(req),
                                (unsigned int)req->op,
@@ -203,9 +203,9 @@ void print_req(struct xseg *xseg, struct xseg_request *req)
                                (unsigned long)req->serviced,
                                (unsigned int)req->state,
                                (unsigned int)req->src_portno,
-                               (unsigned int)req->src_transit_portno,
+                               (unsigned int)req->transit_portno,
                                (unsigned int)req->dst_portno,
-                               (unsigned int)req->dst_transit_portno,
+                               (unsigned int)req->effective_dst_portno,
                                (unsigned int)req->targetlen, target,
                                (unsigned long long)req->datalen, data);
        }
index 1b0e502..f2eca51 100644 (file)
@@ -251,15 +251,15 @@ void report_request(struct xseg_request *req)
        fprintf(stderr,
                "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
                "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
-               "src: %u, src_transit: %u, dst: %u, dst_transit: %u\n",
+               "src: %u, transit: %u, dst: %u, effective dst: %u\n",
                (unsigned long) req, req->targetlen, (unsigned long long)req->target,
                target,
                (unsigned long long) req->datalen, (unsigned long long) req->data,
                data,
                (unsigned long long) req->offset, (unsigned long long) req->size,
                (unsigned long long) req->serviced, req->op, req->state, req->flags,
-               (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
-               (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
+               (unsigned int) req->src_portno, (unsigned int) req->transit_portno,
+               (unsigned int) req->dst_portno, (unsigned int) req->effective_dst_portno);
 
 
 }
@@ -1179,13 +1179,13 @@ int cmd_report(uint32_t portno)
        lock_status(&port->rq_lock, rls, 64);
        lock_status(&port->pq_lock, pls, 64);
        fprintf(stderr, "port %u:\n"
-               "   requests: %llu/%llu  src gw: %u  dst gw: %u\n"
+               "   requests: %llu/%llu  next: %u  dst gw: %u\n"
                "       free_queue [%p] count : %4llu | %s\n"
                "    request_queue [%p] count : %4llu | %s\n"
                "      reply_queue [%p] count : %4llu | %s\n",
                portno, (unsigned long long)port->alloc_reqs, 
                (unsigned long long)port->max_alloc_reqs,
-               xseg->src_gw[portno],
+               xseg->path_next[portno],
                xseg->dst_gw[portno],
                (void *)fq, (unsigned long long)xq_count(fq), fls,
                (void *)rq, (unsigned long long)xq_count(rq), rls,
@@ -1754,6 +1754,12 @@ int cmd_signal(uint32_t portno)
        return xseg_signal(xseg, portno);
 }
 
+int cmd_set_next(xport portno, xport next)
+{
+       xseg->path_next[portno] = next;
+       return 0;
+}
+
 int parse_ports(char *str)
 {
        int ret = 0;
@@ -1846,6 +1852,12 @@ int main(int argc, char **argv)
                        continue;
                }
 
+               if (!strcmp(argv[i], "set-next") && (i + 2 < argc)) {
+                       ret = cmd_set_next(atol(argv[i+1]), atol(argv[i+2]));
+                       i += 2;
+                       continue;
+               }
+
                if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
                        ret = cmd_signal(atol(argv[i+1]));
                        i += 1;
index 4b438d7..5b2803a 100644 (file)
@@ -59,15 +59,15 @@ void log_request(struct log_context *lc, struct xseg *xseg,  struct xseg_request
        __xseg_log2(lc, I, "\n\t"
        "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
        "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
-       "src: %u, src_transit: %u, dst: %u, dst_transit: %u",
+       "src: %u, transit: %u, dst: %u, effective dst: %u",
        (unsigned long) req, req->targetlen, (unsigned long long)req->target,
        xseg_get_target(xseg, req),
        (unsigned long long) req->datalen, (unsigned long long) req->data,
        xseg_get_data(xseg, req),
        (unsigned long long) req->offset, (unsigned long long) req->size,
        (unsigned long long) req->serviced, req->op, req->state, req->flags,
-       (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
-       (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
+       (unsigned int) req->src_portno, (unsigned int) req->transit_portno,
+       (unsigned int) req->dst_portno, (unsigned int) req->effective_dst_portno);
 }
 */
 
index 2aa7954..3d48b42 100644 (file)
@@ -568,16 +568,16 @@ static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
                return -1;
        gw = mem;
        for (i = 0; i < cfg->nr_ports; i++) {
-               gw[i] = i;
+               gw[i] = NoPort;
        }
-       xseg->src_gw = (xport *) XPTR_MAKE(mem, segment);
+       xseg->path_next = (xport *) XPTR_MAKE(mem, segment);
 
        mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
        if (!mem)
                return -1;
        gw = mem;
        for (i = 0; i < cfg->nr_ports; i++) {
-               gw[i] = i;
+               gw[i] = NoPort;
        }
        xseg->dst_gw = (xport *) XPTR_MAKE(mem, segment);
        
@@ -805,7 +805,7 @@ struct xseg *xseg_join(     char *segtypename,
        xseg->request_h = XPTR_TAKE(__xseg->request_h, __xseg);
        xseg->port_h = XPTR_TAKE(__xseg->port_h, __xseg);
        xseg->ports = XPTR_TAKE(__xseg->ports, __xseg);
-       xseg->src_gw = XPTR_TAKE(__xseg->src_gw, __xseg);
+       xseg->path_next = XPTR_TAKE(__xseg->path_next, __xseg);
        xseg->dst_gw = XPTR_TAKE(__xseg->dst_gw, __xseg);
        xseg->heap = XPTR_TAKE(__xseg->heap, __xseg);
        xseg->object_handlers = XPTR_TAKE(__xseg->object_handlers, __xseg);
@@ -933,6 +933,7 @@ struct xseg_port *xseg_alloc_port(struct xseg *xseg, uint32_t flags, uint64_t nr
        port->peer_type = 0; //FIXME what  here ??? NoType??
        port->alloc_reqs = 0;
        port->max_alloc_reqs = XSEG_DEF_MAX_ALLOCATED_REQS;
+       port->flags = 0;
 
 
        return port;
@@ -1106,7 +1107,7 @@ int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
        return i;
 }
 
-int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq, 
+int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
                        uint32_t src_portno, uint32_t dst_portno)
 {
        if (!__validate_port(xseg, src_portno))
@@ -1116,19 +1117,19 @@ int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
                return -1;
 
        xreq->src_portno = src_portno;
-       xreq->src_transit_portno = src_portno;
+       xreq->transit_portno = src_portno;
        xreq->dst_portno = dst_portno;
-       xreq->dst_transit_portno = dst_portno;
+       xreq->effective_dst_portno = dst_portno;
 
        return 0;
 }
 
-struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno, 
+struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
                                        xport dst_portno, uint32_t flags)
 {
        /*
         * Flags:
-        * X_ALLOC Allocate more requests if object handler 
+        * X_ALLOC Allocate more requests if object handler
         *         does not have any avaiable
         * X_LOCAL Use only local - preallocated reqs
         *         (Maybe we want this as default, to give a hint to a peer
@@ -1220,8 +1221,8 @@ int xseg_put_request (struct xseg *xseg, struct xseg_request *xreq,
        xreq->state = 0;
        xreq->src_portno = NoPort;
        xreq->dst_portno = NoPort;
-       xreq->src_transit_portno = NoPort;
-       xreq->dst_transit_portno = NoPort;      
+       xreq->transit_portno = NoPort;
+       xreq->effective_dst_portno = NoPort;    
        
        if (xreq->elapsed != 0) {
                __lock_segment(xseg);
@@ -1303,9 +1304,8 @@ static void __update_timestamp(struct xseg_request *xreq)
        xreq->timestamp.tv_usec = tv.tv_usec;
 }
 
-//FIXME cur should match portno ??
 //FIXME should we add NON_BLOCK flag?
-xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq, 
+xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
                        xport portno, uint32_t flags)
 {
        xserial serial = NoSerial;
@@ -1314,44 +1314,51 @@ xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
        xport next, cur;
        struct xseg_port *port;
 
-       /* discover next and current ports */
-       if (!__validate_port(xseg, xreq->src_transit_portno)){
-               XSEGLOG("couldn't validate src_transit_portno");
+       /* discover where to submit */
+
+       if (!__validate_port(xseg, xreq->transit_portno)){
+               XSEGLOG("Couldn't validate transit_portno (portno: %lu)",
+                               xreq->transit_portno);
                return NoPort;
        }
-       //FIXME if path changes, this does not work
-       next = xseg->src_gw[xreq->src_transit_portno];
-       if (next != xreq->src_portno) {
-               cur = xreq->src_transit_portno;
-               goto submit;
-       }
-       
-       if (!__validate_port(xseg, xreq->dst_transit_portno)){
-               XSEGLOG("couldn't validate dst_transit_portno");
+       if (!__validate_port(xseg, xreq->effective_dst_portno)){
+               XSEGLOG("Couldn't validate effective_dst_portno (portno: %lu)",
+                               xreq->effective_dst_portno);
                return NoPort;
        }
-       next = xseg->dst_gw[xreq->dst_transit_portno];
-       if (xreq->dst_transit_portno == xreq->dst_portno)
-               cur = xreq->src_transit_portno; 
-       else 
-               cur = xreq->dst_transit_portno;
 
+       cur = xreq->transit_portno;
+       next = cur;
+       //FIXME assert(cur == portno);
+       do {
+               if (next == xreq->effective_dst_portno){
+                       XSEGLOG("Path ended with no one willing to accept");
+                       return NoPort;
+               }
 
-submit:
-       port = xseg_get_port(xseg, next);
-       if (!port){
-               XSEGLOG("couldnt get port (next :%u)", next);
-               return NoPort;
-       }
+               if (xseg->path_next[next] != NoPort){
+                       next = xseg->path_next[next];
+               } else {
+                       next = xreq->effective_dst_portno;
+               }
+
+               port = xseg_get_port(xseg, next);
+               if (!port){
+                       XSEGLOG("Couldnt get port (next :%u)", next);
+                       return NoPort;
+               }
+       } while ((!port->flags & CAN_ACCEPT));
+
+       /* submit */
 
        __update_timestamp(xreq);
-       
+
        xqi = XPTR_MAKE(xreq, xseg->segment);
 
        /* add current port to path */
        serial = __xq_append_head(&xreq->path, cur);
        if (serial == Noneidx){
-               XSEGLOG("couldn't append path head");
+               XSEGLOG("Couldn't append path head");
                return NoPort;
        }
 
@@ -1360,7 +1367,7 @@ submit:
        serial = __xq_append_tail(q, xqi);
        if (flags & X_ALLOC && serial == Noneidx) {
                /* double up queue size */
-               XSEGLOG("trying to double up queue");
+               XSEGLOG("Trying to double up queue");
                newq = __alloc_queue(xseg, xq_size(q)*2);
                if (!newq)
                        goto out_rel;
@@ -1377,12 +1384,12 @@ submit:
 out_rel:
        xlock_release(&port->rq_lock);
        if (serial == Noneidx){
-               XSEGLOG("couldn't append request to queue");
+               XSEGLOG("Couldn't append request to queue");
                __xq_pop_head(&xreq->path);
                next = NoPort;
        }
        return next;
-       
+
 }
 
 struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags)
@@ -1443,12 +1450,7 @@ struct xseg_request *xseg_accept(struct xseg *xseg, xport portno, uint32_t flags
                return NULL;
 
        req = XPTR_TAKE(xqi, xseg->segment);
-
-       if (xseg->src_gw[req->src_transit_portno] == portno)
-               req->src_transit_portno = portno;
-       else
-               req->dst_transit_portno = portno;
-
+       req->transit_portno = portno;
 
        return req;
 }
@@ -1463,6 +1465,7 @@ xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
        struct xseg_port *port;
        xport dst;
 
+retry:
        serial = __xq_peek_head(&xreq->path);
        if (serial == Noneidx)
                return NoPort;
@@ -1471,6 +1474,12 @@ xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
        port = xseg_get_port(xseg, dst);
        if (!port)
                return NoPort;
+       if (!(port->flags & CAN_RECEIVE)){
+               //XSEGLOG("Port %u cannot receive", dst);
+               /* Port cannot receive. Try next one in path */
+               __xq_pop_head(&xreq->path);
+               goto retry;
+       }
 
        xqi = XPTR_MAKE(xreq, xseg->segment);
 
@@ -1500,40 +1509,27 @@ out_rel:
        
 }
 
-xport xseg_set_srcgw(struct xseg *xseg, xport portno, xport srcgw)
+xport xseg_forward(struct xseg *xseg, struct xseg_request *req, xport new_dst,
+               xport portno, uint32_t flags)
 {
-       if (!__validate_port(xseg, portno))
+       if (!__validate_port(xseg, new_dst)){
+               XSEGLOG("Couldn't validate new destination (new_dst %lu)",
+                               new_dst);
                return NoPort;
-       xseg->src_gw[portno] = srcgw;
-       return srcgw;
-}
+       }
+       req->effective_dst_portno = new_dst;
+       return xseg_submit(xseg, req, portno, flags);
 
-xport xseg_getandset_srcgw(struct xseg *xseg, xport portno, xport srcgw)
-{
-       xport prev_portno;
-       do {
-               prev_portno = xseg->src_gw[portno];
-               xseg->src_gw[srcgw] = prev_portno;
-       }while(!(__sync_bool_compare_and_swap(&xseg->src_gw[portno], prev_portno, srcgw)));
-       return prev_portno; 
 }
 
-xport xseg_set_dstgw(struct xseg *xseg, xport portno, xport dstgw)
+int xseg_set_path_next(struct xseg *xseg, xport portno, xport next)
 {
        if (!__validate_port(xseg, portno))
-               return NoPort;
-       xseg->dst_gw[portno] = dstgw;
-       return dstgw;
-}
-
-xport xseg_getandset_dstgw(struct xseg *xseg, xport portno, xport dstgw)
-{
-       xport prev_portno;
-       do {
-               prev_portno = xseg->dst_gw[portno];
-               xseg->dst_gw[dstgw] = prev_portno;
-       }while(!(__sync_bool_compare_and_swap(&xseg->dst_gw[portno], prev_portno, dstgw)));
-       return prev_portno;
+               return -1;
+       if (!__validate_port(xseg, next))
+               return -1;
+       xseg->path_next[portno] = next;
+       return 0;
 }
 
 int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data)
@@ -1672,6 +1668,7 @@ struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req, void * sd)
                port->peer_type = (uint64_t)driver;
                port->owner = id;
                port->portno = portno;
+               port->flags = CAN_ACCEPT | CAN_RECEIVE;
                xseg->ports[portno] = XPTR_MAKE(port, xseg->segment);
                goto out;
        }
index 7699fa4..c890d42 100644 (file)
@@ -195,6 +195,7 @@ struct xseg_port {
        uint64_t alloc_reqs;
        struct xlock port_lock;
        xptr signal_desc;
+       uint32_t flags;
 };
 
 struct xseg_request;
@@ -226,11 +227,16 @@ struct xseg_task {
 #define X_CLOSE    15
 #define        X_SNAPSHOT 16
 
-/* FLAGS */
-#define XF_NOSYNC (1 << 0)
-#define XF_FLUSH  (1 << 1)
-#define XF_FUA    (1 << 2)
-#define XF_FORCE  (1 << 3)
+/* REQ FLAGS */
+#define XF_NOSYNC    (1 << 0)
+#define XF_FLUSH     (1 << 1)
+#define XF_FUA       (1 << 2)
+#define XF_FORCE     (1 << 3)
+
+/* PORT FLAGS */
+
+#define CAN_ACCEPT   (1 << 0)
+#define CAN_RECEIVE  (1 << 1)
 
 /* STATES */
 #define XS_SERVED      (1 << 0)
@@ -254,9 +260,9 @@ struct xseg_request {
        uint32_t state;
        uint32_t flags;
        xport src_portno;
-       xport src_transit_portno;
+       xport transit_portno;
        xport dst_portno;
-       xport dst_transit_portno;
+       xport effective_dst_portno;
        struct xq path;
        xqindex path_bufs[MAX_PATH_LEN];
        /* pad */
@@ -301,7 +307,7 @@ struct xseg {
        struct xobject_h *request_h;
        struct xobject_h *port_h;
        xptr *ports;
-       xport *src_gw, *dst_gw;
+       xport *path_next, *dst_gw;
 
        struct xseg_shared *shared;
        struct xseg_private *priv;
@@ -460,13 +466,10 @@ void xseg_put_objh(struct xseg *xseg, struct xobject_h *objh);
 
 #endif
 
-xport xseg_set_srcgw           (struct xseg *xseg, xport portno, xport srcgw);
-xport xseg_getandset_srcgw     (struct xseg *xseg, xport portno, xport srcgw);
-xport xseg_set_dstgw           (struct xseg *xseg, xport portno, xport dstgw);
-xport xseg_getandset_dstgw     (struct xseg *xseg, xport portno, xport dstgw);
+int xseg_set_path_next(struct xseg *xseg, xport portno, xport next);
 
-int xseg_set_req_data (struct xseg *xseg, struct xseg_request *xreq, void *data);
-int xseg_get_req_data (struct xseg *xseg, struct xseg_request *xreq, void **data);
+int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data);
+int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data);
 
 int xseg_init_local_signal(struct xseg *xseg, xport portno);
 void xseg_quit_local_signal(struct xseg *xseg, xport portno);
@@ -480,4 +483,5 @@ uint64_t xseg_get_allocated_requests(struct xseg *xseg, xport portno);
 int xseg_set_freequeue_size(struct xseg *xseg, xport portno, xqindex size,
                                uint32_t flags);
 
-
+xport xseg_forward(struct xseg *xseg, struct xseg_request *req, xport new_dst,
+               xport portno, uint32_t flags);
index f439ad5..e5997c5 100644 (file)
@@ -63,10 +63,8 @@ EXPORT_SYMBOL(xseg_signal);
 EXPORT_SYMBOL(xseg_get_port);
 EXPORT_SYMBOL(xseg_set_req_data);
 EXPORT_SYMBOL(xseg_get_req_data);
-EXPORT_SYMBOL(xseg_set_srcgw);
-EXPORT_SYMBOL(xseg_set_dstgw);
-EXPORT_SYMBOL(xseg_getandset_srcgw);
-EXPORT_SYMBOL(xseg_getandset_dstgw);
+EXPORT_SYMBOL(xseg_set_path_next);
+EXPORT_SYMBOL(xseg_forward);
 EXPORT_SYMBOL(xseg_init_local_signal);
 EXPORT_SYMBOL(xseg_quit_local_signal);
 EXPORT_SYMBOL(xseg_resize_request);