uint32_t targetlen = strlen(target);
size_t size = sizeof(uint64_t);
int r;
- xserial srl;
+ xport p;
struct xseg_request *req;
char *req_target;
- req = xseg_get_request(xseg, srcport);
+ req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
if (!req) {
fprintf(stderr, "No request!\n");
return -1;
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
(unsigned long) targetlen, (unsigned long) size);
- xseg_put_request(xseg, srcport, req);
+ xseg_put_request(xseg, req, srcport);
return -1;
}
req->size = size;
req->op = X_INFO;
- srl = xseg_submit(xseg, dstport, req);
- if (srl == Noneidx)
+ p = xseg_submit(xseg, req, srcport, X_ALLOC);
+ if (p == NoPort)
return -1;
- xseg_signal(xseg, dstport);
+ xseg_signal(xseg, p);
return 0;
}
{
uint32_t targetlen = strlen(target);
int r;
- xserial srl;
+ xport p;
char *req_target;
- struct xseg_request *req = xseg_get_request(xseg, srcport);
+ struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
printf("%x\n", req);
if (!req) {
fprintf(stderr, "No request\n");
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
(unsigned long)targetlen, (unsigned long long)size);
- xseg_put_request(xseg, srcport, req);
+ xseg_put_request(xseg, req, srcport);
return -1;
}
req->size = size;
req->op = X_READ;
report_request(req);
- srl = xseg_submit(xseg, dstport, req);
- if (srl == Noneidx)
+ p = xseg_submit(xseg, req, srcport, X_ALLOC);
+ if (p == NoPort)
return -1;
- xseg_signal(xseg, dstport);
+ xseg_signal(xseg, p);
return 0;
}
{
char *buf = NULL;
int r;
- xserial srl;
+ xport p;
uint64_t size = 0;
char *req_target, *req_data;
uint32_t targetlen = strlen(target);
return -1;
}
- req = xseg_get_request(xseg, srcport);
+ req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
if (!req) {
fprintf(stderr, "No request\n");
return -1;
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
(unsigned long)targetlen, (unsigned long long)size);
- xseg_put_request(xseg, srcport, req);
+ xseg_put_request(xseg, req, srcport);
return -1;
}
req->size = size;
req->op = X_WRITE;
- srl = xseg_submit(xseg, dstport, req);
- if (srl == Noneidx) {
+ p = xseg_submit(xseg, req, srcport, X_ALLOC);
+ if (p == NoPort) {
fprintf(stderr, "Cannot submit\n");
return -1;
}
for (;;) {
active = 0;
+ //FIXME
req = xseg_accept(xseg, portno1);
if (req) {
- xseg_submit(xseg, portno2, req);
+ xseg_submit(xseg, req, portno2, X_ALLOC);
log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
active += 1;
}
req = xseg_accept(xseg, portno2);
if (req) {
- xseg_submit(xseg, portno1, req);
+ xseg_submit(xseg, req, portno1, X_ALLOC);
log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
active += 1;
}
req = xseg_receive(xseg, portno1);
if (req) {
- xseg_respond(xseg, portno2, req);
+ xseg_respond(xseg, req, portno2, X_ALLOC);
log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
active += 1;
}
req = xseg_receive(xseg, portno2);
if (req) {
- xseg_respond(xseg, portno1, req);
+ xseg_respond(xseg, req, portno1, X_ALLOC);
log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
active += 1;
}
long nr_submitted = 0, nr_received = 0, nr_failed = 0;
int reported = 0, r;
uint64_t offset;
- xserial srl;
+ xport port;
char *req_data, *req_target;
seed = random();
for (;;) {
xseg_prepare_wait(xseg, srcport);
if (nr_submitted < loops &&
- (submitted = xseg_get_request(xseg, srcport))) {
+ (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
xseg_cancel_wait(xseg, srcport);
r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
targetlen, chunksize);
- xseg_put_request(xseg, submitted->portno, submitted);
+ xseg_put_request(xseg, submitted, srcport);
return -1;
}
submitted->op = X_WRITE;
submitted->flags |= XF_NOSYNC;
- srl = xseg_submit(xseg, dstport, submitted);
- if (srl == Noneidx) {
- xseg_put_request(xseg, submitted->portno, submitted);
+ port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
+ if (port == NoPort) {
+ xseg_put_request(xseg, submitted, srcport);
} else {
seed = random();
nr_submitted += 1;
- xseg_signal(xseg, dstport);
+ xseg_signal(xseg, port);
}
}
nr_failed += 1;
report_request(received);
}
- if (xseg_put_request(xseg, received->portno, received))
- fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+ if (xseg_put_request(xseg, received, srcport))
+ fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
}
if (!submitted && !received)
long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
int reported = 0, r;
uint64_t offset;
- xserial srl;
+ xport port;
char *req_data, *req_target;
seed = random();
submitted = NULL;
xseg_prepare_wait(xseg, srcport);
if (nr_submitted < loops &&
- (submitted = xseg_get_request(xseg, srcport))) {
+ (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
xseg_cancel_wait(xseg, srcport);
r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
targetlen, chunksize);
- xseg_put_request(xseg, submitted->portno, submitted);
+ xseg_put_request(xseg, submitted, srcport);
return -1;
}
submitted->offset = offset;
submitted->size = chunksize;
submitted->op = X_READ;
- srl = xseg_submit(xseg, dstport, submitted);
- if (srl == Noneidx) {
- printf("foo\n");
- xseg_put_request(xseg, submitted->portno, submitted);
+ port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
+ if (port == NoPort) {
+ xseg_put_request(xseg, submitted, srcport);
} else {
seed = random();
nr_submitted += 1;
- xseg_signal(xseg, dstport);
+ xseg_signal(xseg, port);
}
}
nr_mismatch += 1;
}
- if (xseg_put_request(xseg, received->portno, received))
- fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+ if (xseg_put_request(xseg, received, srcport))
+ fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
}
if (!submitted && !received)
uint64_t offset;
uint32_t targetlen = 10, chunksize = 4096;
struct timeval tv1, tv2;
- xserial srl;
+ xport p;
char *req_data, *req_target;
xseg_bind_port(xseg, srcport);
submitted = NULL;
xseg_prepare_wait(xseg, srcport);
if (nr_submitted < loops && nr_flying < concurrent_reqs &&
- (submitted = xseg_get_request(xseg, srcport))) {
+ (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
xseg_cancel_wait(xseg, srcport);
r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
targetlen, chunksize);
- xseg_put_request(xseg, submitted->portno, submitted);
+ xseg_put_request(xseg, submitted, srcport);
return -1;
}
-
+
+ //FIXME
++nr_flying;
nr_submitted += 1;
reported = 0;
mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
}
- srl = xseg_submit(xseg, dstport, submitted);
- (void)srl;
- if (xseg_signal(xseg, dstport) < 0)
- perror("Cannot signal peer");
+ p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
+ if ( p != NoPort){
+ if (xseg_signal(xseg, p) < 0)
+ perror("Cannot signal peer");
+ }
}
received = xseg_receive(xseg, srcport);
if (received) {
//report_request(received);
}
- if (xseg_put_request(xseg, received->portno, received))
- fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+ if (xseg_put_request(xseg, received, srcport))
+ fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
}
if (!submitted && !received)
rq = xseg_get_queue(xseg, port, request_queue);
pq = xseg_get_queue(xseg, port, reply_queue);
fprintf(stderr, "port %u:\n"
- " requests: %llu/%llu\n"
+ " requests: %llu/%llu src gw: %lu dst gw: %lu\n"
" free_queue [%p] count : %u\n"
" request_queue [%p] count : %u\n"
" reply_queue [%p] count : %u\n",
portno, port->alloc_reqs, port->max_alloc_reqs,
+ xseg->src_gw[portno], xseg->dst_gw[portno],
(void *)fq, xq_count(fq),
(void *)rq, xq_count(rq),
(void *)pq, xq_count(pq));
req = xseg_accept(xseg, dstport);
if (!req)
break;
- if (xseg_put_request(xseg, req->portno, req))
- fprintf(stderr, "Cannot put request at port %u\n", req->portno);
+ if (xseg_put_request(xseg, req, srcport))
+ fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
}
return 0;
char *buf = malloc(sizeof(char) * 8128);
char *req_target, *req_data;
xseg_bind_port(xseg, srcport);
+ xport p;
for (; nr--;) {
xseg_prepare_wait(xseg, srcport);
req->serviced = req->size;
}
- xseg_respond(xseg, dstport, req);
- xseg_signal(xseg, dstport);
+ p = xseg_respond(xseg, req, srcport, X_ALLOC);
+ xseg_signal(xseg, p);
continue;
}
++nr;
}
put:
- if (xseg_put_request(xseg, req->portno, req))
- fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
+ if (xseg_put_request(xseg, req, srcport))
+ fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
}
int cmd_wait(uint32_t nr)
//fwrite(req->buffer, 1, req->bufferlen, stdout);
- if (xseg_put_request(xseg, req->portno, req))
+ if (xseg_put_request(xseg, req, srcport))
fprintf(stderr, "Cannot put reply\n");
}
}
struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
- xport dst_portno)
+ xport dst_portno, uint32_t flags)
{
- //TODO add flags option
- //X_ALLOC
- //X_LOCAL_ONLY (Maybe we want this as default, to give a hint to a peer
- // how many requests it can have flying)
+ /*
+ * Flags:
+ * X_ALLOC Allocate more requests if object handler
+ * does not have any avaiable
+ * X_LOCAL Use only local - preallocated reqs
+ * (Maybe we want this as default, to give a hint to a peer
+ * how many requests it can have flying)
+ */
struct xseg_request *req = NULL;
struct xseg_port *port;
struct xq *q;
goto done;
}
+ if (flags & X_LOCAL)
+ return NULL;
+
//else try to allocate from global heap
+ //FIXME
xlock_acquire(&port->port_lock, src_portno);
if (port->alloc_reqs < port->max_alloc_reqs) {
- req = xobj_get_obj(xseg->request_h, X_ALLOC);
+ req = xobj_get_obj(xseg->request_h, flags & X_ALLOC);
if (req)
port->alloc_reqs++;
}
req->datalen = 0;
req->targetlen = 0;
if (xseg_prep_ports(xseg, req, src_portno, dst_portno) < 0) {
- xseg_put_request(xseg, src_portno, req);
+ xseg_put_request(xseg, req, src_portno);
return NULL;
}
req->state = 0;
return req;
}
-int xseg_put_request ( struct xseg *xseg,
- uint32_t portno,
- struct xseg_request *xreq )
+int xseg_put_request (struct xseg *xseg, struct xseg_request *xreq,
+ xport portno)
{
xqindex xqi = XPTR_MAKE(xreq, xseg->segment);
struct xq *q;
- struct xseg_port *port = xseg_get_port(xseg, portno);
+ struct xseg_port *port = xseg_get_port(xseg, xreq->src_portno);
if (!port)
return -1;
xreq->timestamp.tv_usec = tv.tv_usec;
}
-xserial xseg_submit ( struct xseg *xseg, uint32_t portno,
- struct xseg_request *xreq )
+xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
+ xport portno, uint32_t flags)
{
xserial serial = NoSerial;
xqindex xqi, r;
/* discover next and current ports */
if (!__validate_port(xseg, xreq->src_transit_portno))
- return serial;
+ return NoPort;
next = xseg->src_gw[xreq->src_transit_portno];
if (next != xreq->src_portno) {
cur = xreq->src_transit_portno;
}
if (!__validate_port(xseg, xreq->dst_transit_portno))
- return serial;
+ return NoPort;
next = xseg->dst_gw[xreq->dst_transit_portno];
if (xreq->dst_transit_portno == xreq->dst_portno)
cur = xreq->src_transit_portno;
/* add current port to path */
serial = __xq_append_head(&xreq->path, cur);
if (serial == Noneidx){
- return serial;
+ return NoPort;
}
xlock_acquire(&port->rq_lock, portno);
q = XPTR_TAKE(port->request_queue, xseg->segment);
serial = __xq_append_tail(q, xqi);
- if (serial == Noneidx) {
- //TODO make it flag controlled
+ if (flags & X_ALLOC && serial == Noneidx) {
/* double up queue size */
newq = __alloc_queue(xseg, xq_size(q)*2);
if (!newq)
goto out_rel;
r = __xq_resize(q, newq);
- if (r == Noneidx)
+ if (r == Noneidx){
+ xheap_free(newq);
goto out_rel;
+ }
port->request_queue = XPTR_MAKE(newq, xseg->segment);
xheap_free(q);
serial = __xq_append_tail(newq, xqi);
if (serial == Noneidx)
__xq_pop_head(&xreq->path);
out:
- return serial;
+ return next;
}
-struct xseg_request *xseg_receive(struct xseg *xseg, uint32_t portno)
+struct xseg_request *xseg_receive(struct xseg *xseg, xport portno)
{
xqindex xqi;
xserial serial = NoSerial;
return req;
}
-struct xseg_request *xseg_accept(struct xseg *xseg, uint32_t portno)
+struct xseg_request *xseg_accept(struct xseg *xseg, xport portno)
{
xqindex xqi;
struct xq *q;
return req;
}
-xserial xseg_respond ( struct xseg *xseg, uint32_t portno,
- struct xseg_request *xreq )
+xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
+ xport portno, uint32_t flags)
{
xserial serial = NoSerial;
xqindex xqi, r;
serial = __xq_peek_head(&xreq->path);
if (serial == Noneidx)
- return serial;
+ return NoPort;
dst = (xport) serial;
port = xseg_get_port(xseg, dst);
if (!port)
- goto out;
+ return NoPort;
xqi = XPTR_MAKE(xreq, xseg->segment);
xlock_acquire(&port->pq_lock, portno);
q = XPTR_TAKE(port->reply_queue, xseg->segment);
serial = __xq_append_tail(q, xqi);
- if (serial == Noneidx) {
+ if (flags & X_ALLOC && serial == Noneidx) {
newq = __alloc_queue(xseg, xq_size(q)*2);
- if (!newq)
+ if (!newq)
goto out_rel;
r = __xq_resize(q, newq);
- if (r == Noneidx)
+ if (r == Noneidx) {
+ xheap_free(newq);
goto out_rel;
+ }
port->reply_queue = XPTR_MAKE(newq, xseg->segment);
xheap_free(q);
serial = __xq_append_tail(newq, xqi);
out_rel:
xlock_release(&port->pq_lock);
-out:
- return serial;
+
+ if (serial == Noneidx)
+ dst = NoPort;
+ return dst;
}
r = xhash_lookup(req_data, (ul_t) xreq, &val);
*data = (void *) val;
if (r >= 0) {
+ // delete or update to NULL ?
r = xhash_delete(req_data, (ul_t) xreq);
if (r == -XHASH_ERESIZE) {
req_data = xhash_resize(req_data, shrink_size_shift(req_data), NULL);
return r;
}
+/*
+int xseg_complete_req(struct xseg_request *req)
+{
+ req->state |= XS_SERVED;
+ req->state &= ~XS_FAILED;
+}
+
+int xseg_fail_req(struct xseg_request *req)
+{
+ req->state &= ~XS_SERVED;
+ req->state |= XS_FAILED;
+}
+*/
+
struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req)
{
uint32_t portno, maxno, id = __get_id(), force;
return port;
}
+int xseg_leave_port(struct xseg *xseg, struct xseg_port *port)
+{
+ /* To be implemented */
+ return -1;
+}
+
int xseg_initialize(void)
{
return __xseg_preinit(); /* with or without lock ? */