fix filed, xseg tool to use the new xseg api
authorFilippos Giannakos <philipgian@grnet.gr>
Tue, 4 Sep 2012 13:02:52 +0000 (16:02 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Tue, 4 Sep 2012 13:02:52 +0000 (16:02 +0300)
xseg/peers/user/Makefile
xseg/peers/user/filed.c
xseg/peers/user/xseg-tool.c
xseg/sys/util.h
xseg/xseg/xseg.c
xseg/xseg/xseg.h

index 08cf843..dfd8519 100644 (file)
@@ -4,7 +4,9 @@ include $(XSEG_HOME)/base.mk
 
 default: all
 
-all: filed xseg sosd vlmcd mapperd
+#all: filed xseg sosd vlmcd mapperd
+all: filed xseg vlmcd mapperd
+
 
 filed: filed.c $(BASE)/xseg/xseg.h
        $(CC) $(CFLAGS) -o $@ $< $(INC) -L$(LIB) -lxseg -lpthread
@@ -15,9 +17,6 @@ vlmcd: vlmcd.c common.c $(BASE)/xseg/xseg.h $(BASE)/xseg/protocol.h common.h
 mapperd: mapperd.c common.c $(BASE)/xseg/xseg.h $(BASE)/xseg/protocol.h common.h 
        $(CC) $(CFLAGS) -o $@ $< common.c $(INC) -L$(LIB) -lxseg
 
-mapperd: mapperd.c common.c $(BASE)/xseg/xseg.h $(BASE)/xseg/protocol.h common.h 
-       $(CC) $(CFLAGS) -o $@ $< common.c $(INC) -L$(LIB) -lxseg
-
 sosd: sosd.c $(BASE)/xseg/xseg.h $(BASE)/util_libs/user/sos/sos.h
        $(CC) $(CFLAGS) -o $@ $< $(INC) -L$(LIB) -lxseg -lsos
 
index 901b508..751b0f4 100644 (file)
@@ -134,24 +134,26 @@ static inline void free_io(struct store *store, struct io *io)
 static void complete(struct store *store, struct io *io)
 {
        struct xseg_request *req = io->req;
+       xport p;
        req->state |= XS_SERVED;
        if (verbose)
                log_io("complete", io);
-       while (xseg_respond(store->xseg, req->portno, req) == Noneidx)
+       while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
                ;
-       xseg_signal(store->xseg, req->portno);
+       xseg_signal(store->xseg, p);
        __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
 }
 
 static void fail(struct store *store, struct io *io)
 {
        struct xseg_request *req = io->req;
+       xport p;
        req->state |= XS_FAILED;
        if (verbose)
                log_io("fail", io);
-       while (xseg_respond(store->xseg, req->portno, req) == Noneidx)
+       while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
                ;
-       xseg_signal(store->xseg, req->portno);
+       xseg_signal(store->xseg, p);
        if (io->fdcacheidx >= 0) {
                __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
        }
index cc7a1a6..41194cc 100644 (file)
@@ -193,11 +193,11 @@ int cmd_info(char *target)
        uint32_t targetlen = strlen(target);
        size_t size = sizeof(uint64_t);
        int r;
-       xserial srl;
+       xport p;
        struct xseg_request *req;
        char *req_target;
 
-       req = xseg_get_request(xseg, srcport);
+       req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
        if (!req) {
                fprintf(stderr, "No request!\n");
                return -1;
@@ -207,7 +207,7 @@ int cmd_info(char *target)
        if (r < 0) {
                fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
                        (unsigned long) targetlen, (unsigned long) size);
-               xseg_put_request(xseg, srcport, req);
+               xseg_put_request(xseg, req, srcport);
                return -1;
        }
 
@@ -217,11 +217,11 @@ int cmd_info(char *target)
        req->size = size;
        req->op = X_INFO;
 
-       srl = xseg_submit(xseg, dstport, req);
-       if (srl == Noneidx)
+       p = xseg_submit(xseg, req, srcport, X_ALLOC);
+       if (p == NoPort)
                return -1;
 
-       xseg_signal(xseg, dstport);
+       xseg_signal(xseg, p);
 
        return 0;
 }
@@ -230,9 +230,9 @@ int cmd_read(char *target, uint64_t offset, uint64_t size)
 {
        uint32_t targetlen = strlen(target);
        int r;
-       xserial srl;
+       xport p;
        char *req_target;
-       struct xseg_request *req = xseg_get_request(xseg, srcport);
+       struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
        printf("%x\n", req);
        if (!req) {
                fprintf(stderr, "No request\n");
@@ -243,7 +243,7 @@ int cmd_read(char *target, uint64_t offset, uint64_t size)
        if (r < 0) {
                fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
                        (unsigned long)targetlen, (unsigned long long)size);
-               xseg_put_request(xseg, srcport, req);
+               xseg_put_request(xseg, req, srcport);
                return -1;
        }
 
@@ -253,11 +253,11 @@ int cmd_read(char *target, uint64_t offset, uint64_t size)
        req->size = size;
        req->op = X_READ;
        report_request(req);
-       srl = xseg_submit(xseg, dstport, req);
-       if (srl == Noneidx)
+       p = xseg_submit(xseg, req, srcport, X_ALLOC);
+       if (p == NoPort)
                return -1;
 
-       xseg_signal(xseg, dstport);
+       xseg_signal(xseg, p);
        return 0;
 }
 
@@ -265,7 +265,7 @@ int cmd_write(char *target, uint64_t offset)
 {
        char *buf = NULL;
        int r;
-       xserial srl;
+       xport p;
        uint64_t size = 0;
        char *req_target, *req_data;
        uint32_t targetlen = strlen(target);
@@ -277,7 +277,7 @@ int cmd_write(char *target, uint64_t offset)
                return -1;
        }
 
-       req = xseg_get_request(xseg, srcport);
+       req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
        if (!req) {
                fprintf(stderr, "No request\n");
                return -1;
@@ -287,7 +287,7 @@ int cmd_write(char *target, uint64_t offset)
        if (r < 0) {
                fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
                        (unsigned long)targetlen, (unsigned long long)size);
-               xseg_put_request(xseg, srcport, req);
+               xseg_put_request(xseg, req, srcport);
                return -1;
        }
 
@@ -300,8 +300,8 @@ int cmd_write(char *target, uint64_t offset)
        req->size = size;
        req->op = X_WRITE;
 
-       srl = xseg_submit(xseg, dstport, req);
-       if (srl == Noneidx) {
+       p = xseg_submit(xseg, req, srcport, X_ALLOC);
+       if (p == NoPort) {
                fprintf(stderr, "Cannot submit\n");
                return -1;
        }
@@ -428,30 +428,31 @@ int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how)
                for (;;) {
                        active = 0;
 
+                       //FIXME
                        req = xseg_accept(xseg, portno1);
                        if (req) {
-                               xseg_submit(xseg, portno2, req);
+                               xseg_submit(xseg, req, portno2, X_ALLOC);
                                log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
                                active += 1;
                        }
 
                        req = xseg_accept(xseg, portno2);
                        if (req) {
-                               xseg_submit(xseg, portno1, req);
+                               xseg_submit(xseg, req, portno1, X_ALLOC);
                                log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
                                active += 1;
                        }
 
                        req = xseg_receive(xseg, portno1);
                        if (req) {
-                               xseg_respond(xseg, portno2, req);
+                               xseg_respond(xseg, req, portno2, X_ALLOC);
                                log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
                                active += 1;
                        }
 
                        req = xseg_receive(xseg, portno2);
                        if (req) {
-                               xseg_respond(xseg, portno1, req);
+                               xseg_respond(xseg, req, portno1, X_ALLOC);
                                log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
                                active += 1;
                        }
@@ -506,20 +507,20 @@ int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksiz
        long nr_submitted = 0, nr_received = 0, nr_failed = 0;
        int reported = 0, r;
        uint64_t offset;
-       xserial srl;
+       xport port;
        char *req_data, *req_target;
        seed = random();
 
        for (;;) {
                xseg_prepare_wait(xseg, srcport);
                if (nr_submitted < loops &&
-                   (submitted = xseg_get_request(xseg, srcport))) {
+                   (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
                        xseg_cancel_wait(xseg, srcport);
                        r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
                        if (r < 0) {
                                fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
                                        targetlen, chunksize);
-                               xseg_put_request(xseg, submitted->portno, submitted);
+                               xseg_put_request(xseg, submitted, srcport);
                                return -1;
                        }
                        
@@ -539,13 +540,13 @@ int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksiz
                        submitted->op = X_WRITE;
                        submitted->flags |= XF_NOSYNC;
 
-                       srl = xseg_submit(xseg, dstport, submitted);
-                       if (srl == Noneidx) {
-                               xseg_put_request(xseg, submitted->portno, submitted);
+                       port =  xseg_submit(xseg, submitted, srcport, X_ALLOC);
+                       if (port == NoPort) {
+                               xseg_put_request(xseg, submitted, srcport);
                        } else {
                                seed = random();
                                nr_submitted += 1;
-                               xseg_signal(xseg, dstport);
+                               xseg_signal(xseg, port);
                        }
                }
 
@@ -557,8 +558,8 @@ int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksiz
                                nr_failed += 1;
                                report_request(received);
                        }
-                       if (xseg_put_request(xseg, received->portno, received))
-                               fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+                       if (xseg_put_request(xseg, received, srcport))
+                               fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
                }
 
                if (!submitted && !received)
@@ -615,7 +616,7 @@ int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize
        long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
        int reported = 0, r;
        uint64_t offset;
-       xserial srl;
+       xport port;
        char *req_data, *req_target;
 
        seed = random();
@@ -623,13 +624,13 @@ int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize
                submitted = NULL;
                xseg_prepare_wait(xseg, srcport);
                if (nr_submitted < loops &&
-                   (submitted = xseg_get_request(xseg, srcport))) {
+                   (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
                        xseg_cancel_wait(xseg, srcport);
                        r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
                        if (r < 0) {
                                fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
                                        targetlen, chunksize);
-                               xseg_put_request(xseg, submitted->portno, submitted);
+                               xseg_put_request(xseg, submitted, srcport);
                                return -1;
                        }
 
@@ -644,14 +645,13 @@ int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize
                        submitted->offset = offset;
                        submitted->size = chunksize;
                        submitted->op = X_READ;
-                       srl = xseg_submit(xseg, dstport, submitted);
-                       if (srl == Noneidx) {
-                               printf("foo\n");
-                               xseg_put_request(xseg, submitted->portno, submitted);
+                       port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
+                       if (port == NoPort) {
+                               xseg_put_request(xseg, submitted, srcport);
                        } else {
                                seed = random();
                                nr_submitted += 1;
-                               xseg_signal(xseg, dstport);
+                               xseg_signal(xseg, port);
                        }
                }
 
@@ -670,8 +670,8 @@ int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize
                                nr_mismatch += 1;
                        }
 
-                       if (xseg_put_request(xseg, received->portno, received))
-                               fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+                       if (xseg_put_request(xseg, received, srcport))
+                               fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
                }
 
                if (!submitted && !received)
@@ -703,7 +703,7 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
        uint64_t offset;
        uint32_t targetlen = 10, chunksize = 4096;
        struct timeval tv1, tv2;
-       xserial srl;
+       xport p;
        char *req_data, *req_target;
 
        xseg_bind_port(xseg, srcport);
@@ -713,16 +713,17 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
                submitted = NULL;
                xseg_prepare_wait(xseg, srcport);
                if (nr_submitted < loops &&  nr_flying < concurrent_reqs &&
-                   (submitted = xseg_get_request(xseg, srcport))) {
+                   (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
                        xseg_cancel_wait(xseg, srcport);
                        r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
                        if (r < 0) {
                                fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
                                        targetlen, chunksize);
-                               xseg_put_request(xseg, submitted->portno, submitted);
+                               xseg_put_request(xseg, submitted, srcport);
                                return -1;
                        }
-
+                       
+                       //FIXME
                        ++nr_flying;
                        nr_submitted += 1;
                        reported = 0;
@@ -742,10 +743,11 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
                                mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
                        }
 
-                       srl = xseg_submit(xseg, dstport, submitted);
-                       (void)srl;
-                       if (xseg_signal(xseg, dstport) < 0)
-                               perror("Cannot signal peer");
+                       p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
+                       if ( p != NoPort){
+                               if (xseg_signal(xseg, p) < 0)
+                                       perror("Cannot signal peer");
+                       }
                }
                received = xseg_receive(xseg, srcport);
                if (received) {
@@ -760,8 +762,8 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
                                //report_request(received);
                        }
 
-                       if (xseg_put_request(xseg, received->portno, received))
-                               fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+                       if (xseg_put_request(xseg, received, srcport))
+                               fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
                }
 
                if (!submitted && !received)
@@ -792,11 +794,12 @@ int cmd_report(uint32_t portno)
        rq = xseg_get_queue(xseg, port, request_queue);
        pq = xseg_get_queue(xseg, port, reply_queue);
        fprintf(stderr, "port %u:\n"
-               "   requests: %llu/%llu\n"
+               "   requests: %llu/%llu  src gw: %lu  dst gw: %lu\n"
                "       free_queue [%p] count : %u\n"
                "    request_queue [%p] count : %u\n"
                "      reply_queue [%p] count : %u\n",
                portno, port->alloc_reqs, port->max_alloc_reqs,
+               xseg->src_gw[portno], xseg->dst_gw[portno],
                (void *)fq, xq_count(fq),
                (void *)rq, xq_count(rq),
                (void *)pq, xq_count(pq));
@@ -872,8 +875,8 @@ int cmd_put_requests(void)
                req = xseg_accept(xseg, dstport);
                if (!req)
                        break;
-               if (xseg_put_request(xseg, req->portno, req))
-                       fprintf(stderr, "Cannot put request at port %u\n", req->portno);
+               if (xseg_put_request(xseg, req, srcport))
+                       fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
        }
 
        return 0;
@@ -885,6 +888,7 @@ int cmd_finish(unsigned long nr, int fail)
        char *buf = malloc(sizeof(char) * 8128);
        char *req_target, *req_data;
        xseg_bind_port(xseg, srcport);
+       xport p;
 
        for (; nr--;) {
                xseg_prepare_wait(xseg, srcport);
@@ -907,8 +911,8 @@ int cmd_finish(unsigned long nr, int fail)
                                req->serviced = req->size;
                        }
 
-                       xseg_respond(xseg, dstport, req);
-                       xseg_signal(xseg, dstport);
+                       p = xseg_respond(xseg, req, srcport, X_ALLOC);
+                       xseg_signal(xseg, p);
                        continue;
                }
                ++nr;
@@ -948,8 +952,8 @@ void handle_reply(struct xseg_request *req)
        }
 
 put:
-       if (xseg_put_request(xseg, req->portno, req))
-               fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
+       if (xseg_put_request(xseg, req, srcport))
+               fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
 }
 
 int cmd_wait(uint32_t nr)
@@ -998,7 +1002,7 @@ int cmd_put_replies(void)
 
                //fwrite(req->buffer, 1, req->bufferlen, stdout);
 
-               if (xseg_put_request(xseg, req->portno, req))
+               if (xseg_put_request(xseg, req, srcport))
                        fprintf(stderr, "Cannot put reply\n");
        }
 
index 46d6467..2f48e7a 100644 (file)
@@ -10,6 +10,7 @@
 
 /* general purpose xflags */
 #define X_ALLOC ((uint32_t) (1 << 0))
+#define X_LOCAL ((uint32_t) (1 << 1))
 
 
 typedef uint64_t xpointer;
index 2bc8a06..57bf23c 100644 (file)
@@ -991,12 +991,16 @@ int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
 }
 
 struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno, 
-                                       xport dst_portno)
+                                       xport dst_portno, uint32_t flags)
 {
-       //TODO add flags option 
-       //X_ALLOC
-       //X_LOCAL_ONLY (Maybe we want this as default, to give a hint to a peer
-       //              how many requests it can have flying)
+       /*
+        * Flags:
+        * X_ALLOC Allocate more requests if object handler 
+        *         does not have any avaiable
+        * X_LOCAL Use only local - preallocated reqs
+        *         (Maybe we want this as default, to give a hint to a peer
+        *          how many requests it can have flying)
+        */
        struct xseg_request *req = NULL;
        struct xseg_port *port;
        struct xq *q;
@@ -1015,10 +1019,14 @@ struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
                goto done;
        }
 
+       if (flags & X_LOCAL)
+               return NULL;
+
        //else try to allocate from global heap
+       //FIXME
        xlock_acquire(&port->port_lock, src_portno);
        if (port->alloc_reqs < port->max_alloc_reqs) {
-               req = xobj_get_obj(xseg->request_h, X_ALLOC);
+               req = xobj_get_obj(xseg->request_h, flags & X_ALLOC);
                if (req)
                        port->alloc_reqs++;
        }
@@ -1033,7 +1041,7 @@ done:
        req->datalen = 0;
        req->targetlen = 0;
        if (xseg_prep_ports(xseg, req, src_portno, dst_portno) < 0) {
-               xseg_put_request(xseg, src_portno, req);
+               xseg_put_request(xseg, req, src_portno);
                return NULL;
        }
        req->state = 0;
@@ -1046,13 +1054,12 @@ done:
        return req;
 }
 
-int xseg_put_request (  struct xseg *xseg,
-                       uint32_t portno,
-                       struct xseg_request *xreq )
+int xseg_put_request (struct xseg *xseg, struct xseg_request *xreq,
+                       xport portno)
 {
        xqindex xqi = XPTR_MAKE(xreq, xseg->segment);
        struct xq *q;
-       struct xseg_port *port = xseg_get_port(xseg, portno);
+       struct xseg_port *port = xseg_get_port(xseg, xreq->src_portno);
        if (!port) 
                return -1;
 
@@ -1128,8 +1135,8 @@ static void __update_timestamp(struct xseg_request *xreq)
        xreq->timestamp.tv_usec = tv.tv_usec;
 }
 
-xserial xseg_submit (  struct xseg *xseg, uint32_t portno,
-                       struct xseg_request *xreq       )
+xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq, 
+                       xport portno, uint32_t flags)
 {
        xserial serial = NoSerial;
        xqindex xqi, r;
@@ -1139,7 +1146,7 @@ xserial xseg_submit (     struct xseg *xseg, uint32_t portno,
 
        /* discover next and current ports */
        if (!__validate_port(xseg, xreq->src_transit_portno))
-               return serial;
+               return NoPort;
        next = xseg->src_gw[xreq->src_transit_portno];
        if (next != xreq->src_portno) {
                cur = xreq->src_transit_portno;
@@ -1147,7 +1154,7 @@ xserial xseg_submit (     struct xseg *xseg, uint32_t portno,
        }
        
        if (!__validate_port(xseg, xreq->dst_transit_portno))
-               return serial;
+               return NoPort;
        next = xseg->dst_gw[xreq->dst_transit_portno];
        if (xreq->dst_transit_portno == xreq->dst_portno)
                cur = xreq->src_transit_portno; 
@@ -1167,21 +1174,22 @@ submit:
        /* add current port to path */
        serial = __xq_append_head(&xreq->path, cur);
        if (serial == Noneidx){
-               return serial;
+               return NoPort;
        }
 
        xlock_acquire(&port->rq_lock, portno);
        q = XPTR_TAKE(port->request_queue, xseg->segment);
        serial = __xq_append_tail(q, xqi);
-       if (serial == Noneidx) {
-               //TODO make it flag controlled
+       if (flags & X_ALLOC && serial == Noneidx) {
                /* double up queue size */
                newq = __alloc_queue(xseg, xq_size(q)*2);
                if (!newq)
                        goto out_rel;
                r = __xq_resize(q, newq);
-               if (r == Noneidx)
+               if (r == Noneidx){
+                       xheap_free(newq);
                        goto out_rel;
+               }
                port->request_queue = XPTR_MAKE(newq, xseg->segment);
                xheap_free(q);
                serial = __xq_append_tail(newq, xqi);
@@ -1192,11 +1200,11 @@ out_rel:
        if (serial == Noneidx)
                __xq_pop_head(&xreq->path);
 out:
-       return serial;
+       return next;
        
 }
 
-struct xseg_request *xseg_receive(struct xseg *xseg, uint32_t portno)
+struct xseg_request *xseg_receive(struct xseg *xseg, xport portno)
 {
        xqindex xqi;
        xserial serial = NoSerial;
@@ -1227,7 +1235,7 @@ retry:
        return req;
 }
 
-struct xseg_request *xseg_accept(struct xseg *xseg, uint32_t portno)
+struct xseg_request *xseg_accept(struct xseg *xseg, xport portno)
 {
        xqindex xqi;
        struct xq *q;
@@ -1253,8 +1261,8 @@ struct xseg_request *xseg_accept(struct xseg *xseg, uint32_t portno)
        return req;
 }
 
-xserial xseg_respond (  struct xseg *xseg, uint32_t portno,
-                       struct xseg_request *xreq  )
+xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
+                       xport portno, uint32_t flags)
 {
        xserial serial = NoSerial;
        xqindex xqi, r;
@@ -1264,25 +1272,27 @@ xserial xseg_respond (  struct xseg *xseg, uint32_t portno,
 
        serial = __xq_peek_head(&xreq->path);
        if (serial == Noneidx)
-               return serial;
+               return NoPort;
        dst = (xport) serial;
        
        port = xseg_get_port(xseg, dst);
        if (!port)
-               goto out;
+               return NoPort;
        
        xqi = XPTR_MAKE(xreq, xseg->segment);
        
        xlock_acquire(&port->pq_lock, portno);
        q = XPTR_TAKE(port->reply_queue, xseg->segment);
        serial = __xq_append_tail(q, xqi);
-       if (serial == Noneidx) {
+       if (flags & X_ALLOC && serial == Noneidx) {
                newq = __alloc_queue(xseg, xq_size(q)*2);
-               if (!newq)
+               if (!newq) 
                        goto out_rel;
                r = __xq_resize(q, newq);
-               if (r == Noneidx)
+               if (r == Noneidx) {
+                       xheap_free(newq);
                        goto out_rel;
+               }
                port->reply_queue = XPTR_MAKE(newq, xseg->segment);
                xheap_free(q);
                serial = __xq_append_tail(newq, xqi);
@@ -1290,8 +1300,10 @@ xserial xseg_respond (  struct xseg *xseg, uint32_t portno,
 
 out_rel:
        xlock_release(&port->pq_lock);
-out:
-       return serial;
+       
+       if (serial == Noneidx)
+               dst = NoPort;
+       return dst;
        
 }
 
@@ -1355,6 +1367,7 @@ int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data)
        r = xhash_lookup(req_data, (ul_t) xreq, &val);
        *data = (void *) val;
        if (r >= 0) {
+               // delete or update to NULL ?
                r = xhash_delete(req_data, (ul_t) xreq);
                if (r == -XHASH_ERESIZE) {
                        req_data = xhash_resize(req_data, shrink_size_shift(req_data), NULL);
@@ -1367,6 +1380,20 @@ int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data)
        return r;
 }
 
+/*
+int xseg_complete_req(struct xseg_request *req)
+{
+       req->state |= XS_SERVED;
+       req->state &= ~XS_FAILED;
+}
+
+int xseg_fail_req(struct xseg_request *req)
+{
+       req->state &= ~XS_SERVED;
+       req->state |= XS_FAILED;
+}
+*/
+
 struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req)
 {
        uint32_t portno, maxno, id = __get_id(), force;
@@ -1414,6 +1441,12 @@ out:
        return port;
 }
 
+int xseg_leave_port(struct xseg *xseg, struct xseg_port *port)
+{
+       /* To be implemented */
+       return -1;
+}
+
 int xseg_initialize(void)
 {
        return __xseg_preinit();        /* with or without lock ? */
index e769aa7..aae945f 100644 (file)
@@ -302,11 +302,12 @@ struct xseg {
 
 struct xseg_request *  xseg_get_request     ( struct xseg         * xseg,
                                               xport                 src_portno,
-                                             xport                 dst_portno);
+                                             xport                 dst_portno,
+                                             uint32_t              flags     );
 
                 int    xseg_put_request     ( struct xseg         * xseg,
-                                              uint32_t              portno,
-                                              struct xseg_request * xreq      );
+                                              struct xseg_request * xreq,
+                                              xport                 portno    );
 
                 int    xseg_prep_request    ( struct xseg        * xseg,
                                              struct xseg_request * xreq,
@@ -315,22 +316,24 @@ struct xseg_request *  xseg_get_request     ( struct xseg         * xseg,
 /*                    \___________________/                       \_________/ */
 /*                     ___________________                         _________  */
 /*                    /                   \                       /         \ */
-            xserial    xseg_submit          ( struct xseg         * xseg,
-                                              uint32_t              portno,
-                                              struct xseg_request * xreq      );
+              xport    xseg_submit          ( struct xseg         * xseg,
+                                              struct xseg_request * xreq,      
+                                              xport                 portno,
+                                             uint32_t              flags     );
 
 struct xseg_request *  xseg_receive         ( struct xseg         * xseg,
-                                              uint32_t              portno    );
+                                              xport                 portno    );
 /*                    \___________________/                       \_________/ */
 /*                     ___________________                         _________  */
 /*                    /                   \                       /         \ */
 
 struct xseg_request *  xseg_accept          ( struct xseg         * xseg,
-                                              uint32_t              portno    );
+                                              xport                 portno    );
 
-            xserial    xseg_respond         ( struct xseg         * xseg,
-                                              uint32_t              portno,
-                                              struct xseg_request * xreq      );
+              xport    xseg_respond         ( struct xseg         * xseg,
+                                              struct xseg_request * xreq,
+                                              xport                 portno,
+                                              uint32_t              flags     );
 /*                    \___________________/                       \_________/ */
 /*                     ___________________                         _________  */
 /*                    /                   \                       /         \ */
@@ -374,5 +377,10 @@ static inline char* xseg_get_data(struct xseg* xseg, struct xseg_request *req)
 
 #endif
 
-int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data);
-int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data);
+xport xseg_set_srcgw           (struct xseg *xseg, xport portno, xport srcgw);
+xport xseg_getandset_srcgw     (struct xseg *xseg, xport portno, xport srcgw);
+xport xseg_set_dstgw           (struct xseg *xseg, xport portno, xport dstgw);
+xport xseg_getandset_dstgw     (struct xseg *xseg, xport portno, xport dstgw);
+
+int xseg_set_req_data (struct xseg *xseg, struct xseg_request *xreq, void *data);
+int xseg_get_req_data (struct xseg *xseg, struct xseg_request *xreq, void **data);