Reimplement xseg paths and add xseg_forward
[archipelago] / xseg / peers / user / xseg-tool.c
index 1dfd049..f2eca51 100644 (file)
@@ -1,3 +1,37 @@
+/*
+ * Copyright 2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
 #define _GNU_SOURCE
 #include <stdio.h>
 #include <stdlib.h>
@@ -217,15 +251,15 @@ void report_request(struct xseg_request *req)
        fprintf(stderr,
                "Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
                "offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
-               "src: %u, src_transit: %u, dst: %u, dst_transit: %u\n",
+               "src: %u, transit: %u, dst: %u, effective dst: %u\n",
                (unsigned long) req, req->targetlen, (unsigned long long)req->target,
                target,
                (unsigned long long) req->datalen, (unsigned long long) req->data,
                data,
                (unsigned long long) req->offset, (unsigned long long) req->size,
                (unsigned long long) req->serviced, req->op, req->state, req->flags,
-               (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
-               (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
+               (unsigned int) req->src_portno, (unsigned int) req->transit_portno,
+               (unsigned int) req->dst_portno, (unsigned int) req->effective_dst_portno);
 
 
 }
@@ -396,16 +430,111 @@ int cmd_delete(char *target)
 
 int cmd_acquire(char *target)
 {
+       uint32_t targetlen = strlen(target);
+       int r;
+       xport p;
+       char *req_target;
+       struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
+       if (!req) {
+               fprintf(stderr, "No request\n");
+               return -1;
+       }
+
+       r = xseg_prep_request(xseg, req, targetlen, 0);
+       if (r < 0) {
+               fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
+                       (unsigned long)targetlen);
+               xseg_put_request(xseg, req, srcport);
+               return -1;
+       }
+
+       req_target = xseg_get_target(xseg, req);
+       strncpy(req_target, target, targetlen);
+       req->offset = 0;
+       req->size = 0;
+       req->op = X_OPEN;
+       p = xseg_submit(xseg, req, srcport, X_ALLOC);
+       if (p == NoPort)
+               return -1;
+
+       xseg_signal(xseg, p);
        return 0;
 }
 
 int cmd_release(char *target)
 {
+       uint32_t targetlen = strlen(target);
+       int r;
+       xport p;
+       char *req_target;
+       struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
+       if (!req) {
+               fprintf(stderr, "No request\n");
+               return -1;
+       }
+
+       r = xseg_prep_request(xseg, req, targetlen, 0);
+       if (r < 0) {
+               fprintf(stderr, "Cannot prepare request! (%lu, 0)\n",
+                       (unsigned long)targetlen);
+               xseg_put_request(xseg, req, srcport);
+               return -1;
+       }
+
+       req_target = xseg_get_target(xseg, req);
+       strncpy(req_target, target, targetlen);
+       req->offset = 0;
+       req->size = 0;
+       req->op = X_CLOSE;
+       req->flags = XF_FORCE;
+       p = xseg_submit(xseg, req, srcport, X_ALLOC);
+       if (p == NoPort)
+               return -1;
+
+       xseg_signal(xseg, p);
+       return 0;
        return 0;
 }
 
 int cmd_copy(char *src, char *dst)
 {
+        uint32_t targetlen = strlen(dst);
+       uint32_t parentlen = strlen(src);
+        struct xseg_request *req;
+        struct xseg_request_copy *xcopy;
+       req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
+        if (!req) {
+                fprintf(stderr, "No request\n");
+                return -1;
+        }
+
+       int r = xseg_prep_request(xseg, req, targetlen,
+                       sizeof(struct xseg_request_copy));
+        if (r < 0) {
+                fprintf(stderr, "Cannot prepare request!\n");
+                xseg_put_request(xseg, req, srcport);
+                return -1;
+        }
+
+       char *target = xseg_get_target(xseg, req);
+       char *data = xseg_get_data(xseg, req);
+
+       strncpy(target, dst, targetlen);
+        xcopy = (struct xseg_request_copy *) data;
+        strncpy(xcopy->target, src, parentlen);
+       xcopy->targetlen = parentlen;
+        req->offset = 0;
+        req->size = sizeof(struct xseg_request_copy);
+        req->op = X_COPY;
+
+       xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+       if (p == NoPort){
+               fprintf(stderr, "Cannot submit request\n");
+               return -1;
+       }
+       xseg_signal(xseg, p);
+
+       return 0;
        return 0;
 }
 
@@ -452,6 +581,48 @@ int cmd_clone(char *src, char *dst)
        return 0;
 }
 
+int cmd_snapshot(char *src, char *dst, long block_size)
+{
+
+        uint32_t targetlen = strlen(src);
+       uint32_t parentlen = strlen(dst);
+        struct xseg_request *req;
+        struct xseg_request_snapshot *xsnapshot;
+       xseg_bind_port(xseg, srcport, NULL);
+       req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
+        if (!req) {
+                fprintf(stderr, "No request\n");
+                return -1;
+        }
+
+       int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_snapshot));
+        if (r < 0) {
+                fprintf(stderr, "Cannot prepare request!\n");
+                xseg_put_request(xseg, req, srcport);
+                return -1;
+        }
+
+       char *target = xseg_get_target(xseg, req);
+       char *data = xseg_get_data(xseg, req);
+
+       strncpy(target, src, targetlen);
+        xsnapshot = (struct xseg_request_snapshot *) data;
+        strncpy(xsnapshot->target, dst, parentlen);
+       xsnapshot->targetlen = parentlen;
+        req->offset = 0;
+        req->size = (uint64_t) block_size;
+        req->op = X_SNAPSHOT;
+
+       xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+       if (p == NoPort){
+               fprintf(stderr, "Cannot submit request\n");
+               return -1;
+       }
+       xseg_signal(xseg, p);
+
+       return 0;
+}
+
 void log_req(int logfd, uint32_t portno2, uint32_t portno1, int op, int method,
                struct xseg_request *req)
 {
@@ -899,7 +1070,7 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
 
        struct xseg_request *submitted = NULL, *received;
        long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0, nr_flying = 0;
-       int reported = 0, r;
+       int r;
        uint64_t offset;
        uint32_t targetlen = 10, chunksize = 4096;
        struct timeval tv1, tv2;
@@ -926,7 +1097,6 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
                        //FIXME
                        ++nr_flying;
                        nr_submitted += 1;
-                       reported = 0;
                        offset = 0;//pick(size);
 
                        submitted->offset = offset;
@@ -1009,13 +1179,13 @@ int cmd_report(uint32_t portno)
        lock_status(&port->rq_lock, rls, 64);
        lock_status(&port->pq_lock, pls, 64);
        fprintf(stderr, "port %u:\n"
-               "   requests: %llu/%llu  src gw: %u  dst gw: %u\n"
+               "   requests: %llu/%llu  next: %u  dst gw: %u\n"
                "       free_queue [%p] count : %4llu | %s\n"
                "    request_queue [%p] count : %4llu | %s\n"
                "      reply_queue [%p] count : %4llu | %s\n",
                portno, (unsigned long long)port->alloc_reqs, 
                (unsigned long long)port->max_alloc_reqs,
-               xseg->src_gw[portno],
+               xseg->path_next[portno],
                xseg->dst_gw[portno],
                (void *)fq, (unsigned long long)xq_count(fq), fls,
                (void *)rq, (unsigned long long)xq_count(rq), rls,
@@ -1190,28 +1360,63 @@ static int isDangling(struct xseg_request *req)
        return 1;
 }
 
+int prompt_user(char *msg)
+{
+       int c = 0, r = -1;
+       printf("%s [y/n]: ", msg);
+       while (1) {
+               c = fgetc(stdin);
+               if (c == 'y' || c == 'Y')
+                       r = 1;
+               else if (c == 'n' || c == 'N')
+                       r = 0;
+               else if (c == '\n'){
+                       if (r == -1)
+                               printf("%s [y/n]: ", msg);
+                       else
+                               break;
+               }
+       }
+       return r;
+}
+
 //FIXME this should be in xseg lib?
-int cmd_verify(enum req_action action)
+int cmd_verify(int fix)
 {
        if (cmd_join())
                return -1;
        //segment lock
-       if (xseg->shared->flags & XSEG_F_LOCK)
+       if (xseg->shared->flags & XSEG_F_LOCK){
                fprintf(stderr, "Segment lock: Locked\n");
+               if (fix && prompt_user("Unlock it ?"))
+                       xseg->shared->flags &= ~XSEG_F_LOCK;
+       }
        //heap lock
-       if (xseg->heap->lock.owner != Noone)
+       if (xseg->heap->lock.owner != Noone){
                fprintf(stderr, "Heap lock: Locked (Owner: %llu)\n",
                        (unsigned long long)xseg->heap->lock.owner);
+               if (fix && prompt_user("Unlock it ?"))
+                       xlock_release(&xseg->heap->lock);
+       }
        //obj_h locks
-       if (xseg->request_h->lock.owner != Noone)
+       if (xseg->request_h->lock.owner != Noone){
                fprintf(stderr, "Requests handler lock: Locked (Owner: %llu)\n",
                        (unsigned long long)xseg->request_h->lock.owner);
-       if (xseg->port_h->lock.owner != Noone)
+               if (fix && prompt_user("Unlock it ?"))
+                       xlock_release(&xseg->request_h->lock);
+       }
+       if (xseg->port_h->lock.owner != Noone){
                fprintf(stderr, "Ports handler lock: Locked (Owner: %llu)\n",
                        (unsigned long long)xseg->port_h->lock.owner);
-       if (xseg->object_handlers->lock.owner != Noone)
+               if (fix && prompt_user("Unlock it ?"))
+                       xlock_release(&xseg->port_h->lock);
+       }
+       if (xseg->object_handlers->lock.owner != Noone){
                fprintf(stderr, "Objects handler lock: Locked (Owner: %llu)\n",
                        (unsigned long long)xseg->object_handlers->lock.owner);
+               if (fix && prompt_user("Unlock it ?"))
+                       xlock_release(&xseg->object_handlers->lock);
+       }
        //take segment lock?
        xport i;
        struct xseg_port *port;
@@ -1225,35 +1430,40 @@ int cmd_verify(enum req_action action)
                        if (port->fq_lock.owner != Noone) {
                                fprintf(stderr, "Free queue lock of port %u locked (Owner %llu)\n",
                                                i, (unsigned long long)port->fq_lock.owner);
+                               if (fix && prompt_user("Unlock it ?"))
+                                       xlock_release(&port->fq_lock);
                        }
                        if (port->rq_lock.owner != Noone) {
                                fprintf(stderr, "Request queue lock of port %u locked (Owner %llu)\n",
                                                i, (unsigned long long)port->rq_lock.owner);
+                               if (fix && prompt_user("Unlock it ?"))
+                                       xlock_release(&port->rq_lock);
                        }
                        if (port->pq_lock.owner != Noone) {
                                fprintf(stderr, "Reply queue lock of port %u locked (Owner %llu)\n",
                                                i, (unsigned long long)port->pq_lock.owner);
+                               if (fix && prompt_user("Unlock it ?"))
+                                       xlock_release(&port->pq_lock);
                        }
                }
        }
 
        struct xobject_h *obj_h = xseg->request_h;
        struct xobject_iter it;
-       xobj_iter_init(obj_h, &it);
 
        struct xseg_request *req;
        xlock_acquire(&obj_h->lock, srcport);
+       xobj_iter_init(obj_h, &it);
        while (xobj_iterate(obj_h, &it, (void **)&req)){
                //FIXME this will not work cause obj->magic - req->serial is not
                //touched when a request is get
                /* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
                if (isDangling(req) && !__xobj_isFree(obj_h, req)){
-                       if (action == REPORT)
-                               report_request(req);
-                       else if (action == FAIL)
-                               finish_req(req, action);
-                       else if (action == COMPLETE)
-                               finish_req(req, COMPLETE);
+                       report_request(req);
+                       if (fix && prompt_user("Fail it ?")){
+                               printf("Finishing ...\n");
+                               finish_req(req, FAIL);
+                       }
                }
        }
        xlock_release(&obj_h->lock);
@@ -1304,40 +1514,31 @@ int cmd_inspectq(xport portno, enum queue qt)
 }
 
 
-int cmd_requests(xport portno, enum req_action action )
+int cmd_request(struct xseg_request *req, enum req_action action)
 {
        if (cmd_join())
                return -1;
 
        struct xobject_h *obj_h = xseg->request_h;
-       void *container = XPTR(&obj_h->container);
-       xhash_t *allocated = XPTR_TAKE(obj_h->allocated, container);
-       xhash_iter_t it;
-       xhash_iter_init(allocated, &it);
-       xhashidx key, val;
-       int i;
-       xlock_acquire(&obj_h->lock, srcport);
-       while (xhash_iterate(allocated, &it, &key, &val)){
-               void *mem = XPTR_TAKE(val, container);
-               struct xseg_request *req = mem, *t;
-               for (i = 0; i < xheap_get_chunk_size(mem)/obj_h->obj_size; i++) {
-                       t = req + i;
-                       struct xobject *obj = (struct xobject *)t;
-                       //FIXME. obj->magic is not touched by req->serial...
-                       /* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
-                       if (t->src_portno == portno){
-                               if (action == REPORT)
-                                       report_request(t);
-                               else if (action == FAIL)
-                                       finish_req(t, action);
-                               else if (action == COMPLETE)
-                                       finish_req(t, COMPLETE);
-                       }
+       if (!xobj_check(obj_h, req))
+               return -1;
+
+       if (action == REPORT)
+               report_request(req);
+       else if (action == FAIL){
+               report_request(req);
+               if (prompt_user("fail it ?")){
+                       printf("Finishing ...\n");
+                       finish_req(req, FAIL);
+               }
+       }
+       else if (action == COMPLETE){
+               report_request(req);
+               if (prompt_user("Complete it ?")){
+                       printf("Finishing ...\n");
+                       finish_req(req, COMPLETE);
                }
        }
-       xlock_release(&obj_h->lock);
-
-       fprintf(stderr, "\n");
        return 0;
 }
 
@@ -1461,7 +1662,18 @@ void handle_reply(struct xseg_request *req)
        case X_INFO:
                fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
                break;
-
+       case X_COPY:
+               fprintf(stderr, "copied %s\n", ((struct xseg_request_copy *)req_data)->target);
+               break;
+       case X_CLOSE:
+               fprintf(stderr, "Closed %s\n", req_target);
+               break;
+       case X_OPEN:
+               fprintf(stderr, "Opened %s\n", req_target);
+               break;
+       case X_SNAPSHOT:
+               fprintf(stderr, "Snapshotted %s\n", req_target);
+               break;
        default:
                break;
        }
@@ -1542,6 +1754,12 @@ int cmd_signal(uint32_t portno)
        return xseg_signal(xseg, portno);
 }
 
+int cmd_set_next(xport portno, xport next)
+{
+       xseg->path_next[portno] = next;
+       return 0;
+}
+
 int parse_ports(char *str)
 {
        int ret = 0;
@@ -1634,6 +1852,12 @@ int main(int argc, char **argv)
                        continue;
                }
 
+               if (!strcmp(argv[i], "set-next") && (i + 2 < argc)) {
+                       ret = cmd_set_next(atol(argv[i+1]), atol(argv[i+2]));
+                       i += 2;
+                       continue;
+               }
+
                if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
                        ret = cmd_signal(atol(argv[i+1]));
                        i += 1;
@@ -1661,24 +1885,20 @@ int main(int argc, char **argv)
                        continue;
                }
 
-               if (!strcmp(argv[i], "showreqs") && (i + 1 < argc)) {
-                       ret = cmd_requests(atol(argv[i+1]), REPORT);
-                       i += 1;
-                       continue;
-               }
-
                if (!strcmp(argv[i], "verify")) {
-                       ret = cmd_verify(REPORT);
+                       ret = cmd_verify(0);
                        continue;
                }
 
                if (!strcmp(argv[i], "verify-fix")) {
-                       ret = cmd_verify(FAIL);
+                       ret = cmd_verify(1);
                        continue;
                }
 
-               if (!strcmp(argv[i], "failreqs") && (i + 1 < argc)) {
-                       ret = cmd_requests(atol(argv[i+1]), FAIL);
+               if (!strcmp(argv[i], "failreq") && (i + 1 < argc)) {
+                       struct xseg_request *req;
+                       sscanf(argv[i+1], "%lx", (unsigned long *)&req);
+                       ret = cmd_request(req, FAIL);
                        i += 1;
                        continue;
                }
@@ -1847,6 +2067,13 @@ int main(int argc, char **argv)
                        i += 2;
                        continue;
                }
+               if (!strcmp(argv[i], "snapshot") && (i + 2 < argc)) {
+                       char *src = argv[i+1];
+                       char *dst = argv[i+2];
+                       ret = cmd_snapshot(src, dst, 4096*1024);
+                       i += 2;
+                       continue;
+               }
 
                if (!strcmp(argv[i], "info") && (i + 1 < argc)) {
                        char *target = argv[i+1];