fprintf(stderr,
"Request %lx: target[%u](xptr: %llu): %s, data[%llu](xptr: %llu): %s \n\t"
"offset: %llu, size: %llu, serviced; %llu, op: %u, state: %u, flags: %u \n\t"
- "src: %u, src_transit: %u, dst: %u, dst_transit: %u\n",
+ "src: %u, transit: %u, dst: %u, effective dst: %u\n",
(unsigned long) req, req->targetlen, (unsigned long long)req->target,
target,
(unsigned long long) req->datalen, (unsigned long long) req->data,
data,
(unsigned long long) req->offset, (unsigned long long) req->size,
(unsigned long long) req->serviced, req->op, req->state, req->flags,
- (unsigned int) req->src_portno, (unsigned int) req->src_transit_portno,
- (unsigned int) req->dst_portno, (unsigned int) req->dst_transit_portno);
+ (unsigned int) req->src_portno, (unsigned int) req->transit_portno,
+ (unsigned int) req->dst_portno, (unsigned int) req->effective_dst_portno);
}
lock_status(&port->rq_lock, rls, 64);
lock_status(&port->pq_lock, pls, 64);
fprintf(stderr, "port %u:\n"
- " requests: %llu/%llu src gw: %u dst gw: %u\n"
+ " requests: %llu/%llu next: %u dst gw: %u\n"
" free_queue [%p] count : %4llu | %s\n"
" request_queue [%p] count : %4llu | %s\n"
" reply_queue [%p] count : %4llu | %s\n",
portno, (unsigned long long)port->alloc_reqs,
(unsigned long long)port->max_alloc_reqs,
- xseg->src_gw[portno],
+ xseg->path_next[portno],
xseg->dst_gw[portno],
(void *)fq, (unsigned long long)xq_count(fq), fls,
(void *)rq, (unsigned long long)xq_count(rq), rls,
return xseg_signal(xseg, portno);
}
+int cmd_set_next(xport portno, xport next)
+{
+ xseg->path_next[portno] = next;
+ return 0;
+}
+
int parse_ports(char *str)
{
int ret = 0;
continue;
}
+ if (!strcmp(argv[i], "set-next") && (i + 2 < argc)) {
+ ret = cmd_set_next(atol(argv[i+1]), atol(argv[i+2]));
+ i += 2;
+ continue;
+ }
+
if (!strcmp(argv[i], "signal") && (i + 1 < argc)) {
ret = cmd_signal(atol(argv[i+1]));
i += 1;
return -1;
gw = mem;
for (i = 0; i < cfg->nr_ports; i++) {
- gw[i] = i;
+ gw[i] = NoPort;
}
- xseg->src_gw = (xport *) XPTR_MAKE(mem, segment);
+ xseg->path_next = (xport *) XPTR_MAKE(mem, segment);
mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
if (!mem)
return -1;
gw = mem;
for (i = 0; i < cfg->nr_ports; i++) {
- gw[i] = i;
+ gw[i] = NoPort;
}
xseg->dst_gw = (xport *) XPTR_MAKE(mem, segment);
xseg->request_h = XPTR_TAKE(__xseg->request_h, __xseg);
xseg->port_h = XPTR_TAKE(__xseg->port_h, __xseg);
xseg->ports = XPTR_TAKE(__xseg->ports, __xseg);
- xseg->src_gw = XPTR_TAKE(__xseg->src_gw, __xseg);
+ xseg->path_next = XPTR_TAKE(__xseg->path_next, __xseg);
xseg->dst_gw = XPTR_TAKE(__xseg->dst_gw, __xseg);
xseg->heap = XPTR_TAKE(__xseg->heap, __xseg);
xseg->object_handlers = XPTR_TAKE(__xseg->object_handlers, __xseg);
port->peer_type = 0; //FIXME what here ??? NoType??
port->alloc_reqs = 0;
port->max_alloc_reqs = XSEG_DEF_MAX_ALLOCATED_REQS;
+ port->flags = 0;
return port;
return i;
}
-int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
+int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
uint32_t src_portno, uint32_t dst_portno)
{
if (!__validate_port(xseg, src_portno))
return -1;
xreq->src_portno = src_portno;
- xreq->src_transit_portno = src_portno;
+ xreq->transit_portno = src_portno;
xreq->dst_portno = dst_portno;
- xreq->dst_transit_portno = dst_portno;
+ xreq->effective_dst_portno = dst_portno;
return 0;
}
-struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
+struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
xport dst_portno, uint32_t flags)
{
/*
* Flags:
- * X_ALLOC Allocate more requests if object handler
+ * X_ALLOC Allocate more requests if object handler
* does not have any avaiable
* X_LOCAL Use only local - preallocated reqs
* (Maybe we want this as default, to give a hint to a peer
xreq->state = 0;
xreq->src_portno = NoPort;
xreq->dst_portno = NoPort;
- xreq->src_transit_portno = NoPort;
- xreq->dst_transit_portno = NoPort;
+ xreq->transit_portno = NoPort;
+ xreq->effective_dst_portno = NoPort;
if (xreq->elapsed != 0) {
__lock_segment(xseg);
xreq->timestamp.tv_usec = tv.tv_usec;
}
-//FIXME cur should match portno ??
//FIXME should we add NON_BLOCK flag?
-xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
+xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
xport portno, uint32_t flags)
{
xserial serial = NoSerial;
xport next, cur;
struct xseg_port *port;
- /* discover next and current ports */
- if (!__validate_port(xseg, xreq->src_transit_portno)){
- XSEGLOG("couldn't validate src_transit_portno");
+ /* discover where to submit */
+
+ if (!__validate_port(xseg, xreq->transit_portno)){
+ XSEGLOG("Couldn't validate transit_portno (portno: %lu)",
+ xreq->transit_portno);
return NoPort;
}
- //FIXME if path changes, this does not work
- next = xseg->src_gw[xreq->src_transit_portno];
- if (next != xreq->src_portno) {
- cur = xreq->src_transit_portno;
- goto submit;
- }
-
- if (!__validate_port(xseg, xreq->dst_transit_portno)){
- XSEGLOG("couldn't validate dst_transit_portno");
+ if (!__validate_port(xseg, xreq->effective_dst_portno)){
+ XSEGLOG("Couldn't validate effective_dst_portno (portno: %lu)",
+ xreq->effective_dst_portno);
return NoPort;
}
- next = xseg->dst_gw[xreq->dst_transit_portno];
- if (xreq->dst_transit_portno == xreq->dst_portno)
- cur = xreq->src_transit_portno;
- else
- cur = xreq->dst_transit_portno;
+ cur = xreq->transit_portno;
+ next = cur;
+ //FIXME assert(cur == portno);
+ do {
+ if (next == xreq->effective_dst_portno){
+ XSEGLOG("Path ended with no one willing to accept");
+ return NoPort;
+ }
-submit:
- port = xseg_get_port(xseg, next);
- if (!port){
- XSEGLOG("couldnt get port (next :%u)", next);
- return NoPort;
- }
+ if (xseg->path_next[next] != NoPort){
+ next = xseg->path_next[next];
+ } else {
+ next = xreq->effective_dst_portno;
+ }
+
+ port = xseg_get_port(xseg, next);
+ if (!port){
+ XSEGLOG("Couldnt get port (next :%u)", next);
+ return NoPort;
+ }
+ } while ((!port->flags & CAN_ACCEPT));
+
+ /* submit */
__update_timestamp(xreq);
-
+
xqi = XPTR_MAKE(xreq, xseg->segment);
/* add current port to path */
serial = __xq_append_head(&xreq->path, cur);
if (serial == Noneidx){
- XSEGLOG("couldn't append path head");
+ XSEGLOG("Couldn't append path head");
return NoPort;
}
serial = __xq_append_tail(q, xqi);
if (flags & X_ALLOC && serial == Noneidx) {
/* double up queue size */
- XSEGLOG("trying to double up queue");
+ XSEGLOG("Trying to double up queue");
newq = __alloc_queue(xseg, xq_size(q)*2);
if (!newq)
goto out_rel;
out_rel:
xlock_release(&port->rq_lock);
if (serial == Noneidx){
- XSEGLOG("couldn't append request to queue");
+ XSEGLOG("Couldn't append request to queue");
__xq_pop_head(&xreq->path);
next = NoPort;
}
return next;
-
+
}
struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags)
return NULL;
req = XPTR_TAKE(xqi, xseg->segment);
-
- if (xseg->src_gw[req->src_transit_portno] == portno)
- req->src_transit_portno = portno;
- else
- req->dst_transit_portno = portno;
-
+ req->transit_portno = portno;
return req;
}
struct xseg_port *port;
xport dst;
+retry:
serial = __xq_peek_head(&xreq->path);
if (serial == Noneidx)
return NoPort;
port = xseg_get_port(xseg, dst);
if (!port)
return NoPort;
+ if (!(port->flags & CAN_RECEIVE)){
+ //XSEGLOG("Port %u cannot receive", dst);
+ /* Port cannot receive. Try next one in path */
+ __xq_pop_head(&xreq->path);
+ goto retry;
+ }
xqi = XPTR_MAKE(xreq, xseg->segment);
}
-xport xseg_set_srcgw(struct xseg *xseg, xport portno, xport srcgw)
+xport xseg_forward(struct xseg *xseg, struct xseg_request *req, xport new_dst,
+ xport portno, uint32_t flags)
{
- if (!__validate_port(xseg, portno))
+ if (!__validate_port(xseg, new_dst)){
+ XSEGLOG("Couldn't validate new destination (new_dst %lu)",
+ new_dst);
return NoPort;
- xseg->src_gw[portno] = srcgw;
- return srcgw;
-}
+ }
+ req->effective_dst_portno = new_dst;
+ return xseg_submit(xseg, req, portno, flags);
-xport xseg_getandset_srcgw(struct xseg *xseg, xport portno, xport srcgw)
-{
- xport prev_portno;
- do {
- prev_portno = xseg->src_gw[portno];
- xseg->src_gw[srcgw] = prev_portno;
- }while(!(__sync_bool_compare_and_swap(&xseg->src_gw[portno], prev_portno, srcgw)));
- return prev_portno;
}
-xport xseg_set_dstgw(struct xseg *xseg, xport portno, xport dstgw)
+int xseg_set_path_next(struct xseg *xseg, xport portno, xport next)
{
if (!__validate_port(xseg, portno))
- return NoPort;
- xseg->dst_gw[portno] = dstgw;
- return dstgw;
-}
-
-xport xseg_getandset_dstgw(struct xseg *xseg, xport portno, xport dstgw)
-{
- xport prev_portno;
- do {
- prev_portno = xseg->dst_gw[portno];
- xseg->dst_gw[dstgw] = prev_portno;
- }while(!(__sync_bool_compare_and_swap(&xseg->dst_gw[portno], prev_portno, dstgw)));
- return prev_portno;
+ return -1;
+ if (!__validate_port(xseg, next))
+ return -1;
+ xseg->path_next[portno] = next;
+ return 0;
}
int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data)
port->peer_type = (uint64_t)driver;
port->owner = id;
port->portno = portno;
+ port->flags = CAN_ACCEPT | CAN_RECEIVE;
xseg->ports[portno] = XPTR_MAKE(port, xseg->segment);
goto out;
}
uint64_t alloc_reqs;
struct xlock port_lock;
xptr signal_desc;
+ uint32_t flags;
};
struct xseg_request;
#define X_CLOSE 15
#define X_SNAPSHOT 16
-/* FLAGS */
-#define XF_NOSYNC (1 << 0)
-#define XF_FLUSH (1 << 1)
-#define XF_FUA (1 << 2)
-#define XF_FORCE (1 << 3)
+/* REQ FLAGS */
+#define XF_NOSYNC (1 << 0)
+#define XF_FLUSH (1 << 1)
+#define XF_FUA (1 << 2)
+#define XF_FORCE (1 << 3)
+
+/* PORT FLAGS */
+
+#define CAN_ACCEPT (1 << 0)
+#define CAN_RECEIVE (1 << 1)
/* STATES */
#define XS_SERVED (1 << 0)
uint32_t state;
uint32_t flags;
xport src_portno;
- xport src_transit_portno;
+ xport transit_portno;
xport dst_portno;
- xport dst_transit_portno;
+ xport effective_dst_portno;
struct xq path;
xqindex path_bufs[MAX_PATH_LEN];
/* pad */
struct xobject_h *request_h;
struct xobject_h *port_h;
xptr *ports;
- xport *src_gw, *dst_gw;
+ xport *path_next, *dst_gw;
struct xseg_shared *shared;
struct xseg_private *priv;
#endif
-xport xseg_set_srcgw (struct xseg *xseg, xport portno, xport srcgw);
-xport xseg_getandset_srcgw (struct xseg *xseg, xport portno, xport srcgw);
-xport xseg_set_dstgw (struct xseg *xseg, xport portno, xport dstgw);
-xport xseg_getandset_dstgw (struct xseg *xseg, xport portno, xport dstgw);
+int xseg_set_path_next(struct xseg *xseg, xport portno, xport next);
-int xseg_set_req_data (struct xseg *xseg, struct xseg_request *xreq, void *data);
-int xseg_get_req_data (struct xseg *xseg, struct xseg_request *xreq, void **data);
+int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data);
+int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data);
int xseg_init_local_signal(struct xseg *xseg, xport portno);
void xseg_quit_local_signal(struct xseg *xseg, xport portno);
int xseg_set_freequeue_size(struct xseg *xseg, xport portno, xqindex size,
uint32_t flags);
-
+xport xseg_forward(struct xseg *xseg, struct xseg_request *req, xport new_dst,
+ xport portno, uint32_t flags);