#include <unistd.h>
#include <xseg/xseg.h>
-
+#include <xseg/protocol.h>
int help(void)
{
printf("xseg <spec> [[[<src_port>]:[<dst_port>]] [<command> <arg>*] ]*\n"
* ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l
*/
+xport sport = NoPort;
+static void init_local_signal()
+{
+ if (xseg && sport != srcport){
+ xseg_init_local_signal(xseg, srcport);
+ sport = srcport;
+ }
+}
+
void mkname_heavy(char *name, uint32_t namelen, uint32_t seed)
{
int i;
void report_request(struct xseg_request *req)
{
- uint32_t max = req->datalen;
- if (max > 128)
- max = 128;
- req->data[max-1] = 0;
+ //uint32_t max = req->datalen;
+ //char *data = xseg_get_data(xseg, req);
+ //if (max > 128)
+ // max = 128;
+ //data[max-1] = 0;
fprintf(stderr, "request %llu state %u\n", (unsigned long long)req->serial, req->state);
- fprintf(stderr, "data: %s\n", req->data);
}
int cmd_info(char *target)
uint32_t targetlen = strlen(target);
size_t size = sizeof(uint64_t);
int r;
- xserial srl;
+ xport p;
struct xseg_request *req;
+ char *req_target;
- req = xseg_get_request(xseg, srcport);
+ req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
if (!req) {
fprintf(stderr, "No request!\n");
return -1;
}
- r = xseg_prep_request(req, targetlen, size);
+ r = xseg_prep_request(xseg, req, targetlen, size);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
(unsigned long) targetlen, (unsigned long) size);
- xseg_put_request(xseg, srcport, req);
+ xseg_put_request(xseg, req, srcport);
return -1;
}
- strncpy(req->target, target, targetlen);
+ req_target = xseg_get_target(xseg, req);
+ strncpy(req_target, target, targetlen);
req->offset = 0;
req->size = size;
req->op = X_INFO;
- srl = xseg_submit(xseg, dstport, req);
- if (srl == None)
+ p = xseg_submit(xseg, req, srcport, X_ALLOC);
+ if (p == NoPort)
return -1;
- xseg_signal(xseg, dstport);
+ xseg_signal(xseg, p);
return 0;
}
{
uint32_t targetlen = strlen(target);
int r;
- xserial srl;
- struct xseg_request *req = xseg_get_request(xseg, srcport);
+ xport p;
+ char *req_target;
+ struct xseg_request *req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
+ printf("%x\n", req);
if (!req) {
fprintf(stderr, "No request\n");
return -1;
}
- r = xseg_prep_request(req, targetlen, size);
+ r = xseg_prep_request(xseg, req, targetlen, size);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
(unsigned long)targetlen, (unsigned long long)size);
- xseg_put_request(xseg, srcport, req);
+ xseg_put_request(xseg, req, srcport);
return -1;
}
- strncpy(req->target, target, targetlen);
+ req_target = xseg_get_target(xseg, req);
+ strncpy(req_target, target, targetlen);
req->offset = offset;
req->size = size;
req->op = X_READ;
-
- srl = xseg_submit(xseg, dstport, req);
- if (srl == None)
+ report_request(req);
+ p = xseg_submit(xseg, req, srcport, X_ALLOC);
+ if (p == NoPort)
return -1;
- xseg_signal(xseg, dstport);
+ xseg_signal(xseg, p);
return 0;
}
{
char *buf = NULL;
int r;
- xserial srl;
+ xport p;
uint64_t size = 0;
+ char *req_target, *req_data;
uint32_t targetlen = strlen(target);
struct xseg_request *req;
return -1;
}
- req = xseg_get_request(xseg, srcport);
+ req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
if (!req) {
fprintf(stderr, "No request\n");
return -1;
}
- r = xseg_prep_request(req, targetlen, size);
+ r = xseg_prep_request(xseg, req, targetlen, size);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
(unsigned long)targetlen, (unsigned long long)size);
- xseg_put_request(xseg, srcport, req);
+ xseg_put_request(xseg, req, srcport);
return -1;
}
- strncpy(req->target, target, targetlen);
- memcpy(req->buffer, buf, size);
+ req_target = xseg_get_target(xseg, req);
+ strncpy(req_target, target, targetlen);
+
+ req_data = xseg_get_data(xseg, req);
+ memcpy(req_data, buf, size);
req->offset = offset;
req->size = size;
req->op = X_WRITE;
- srl = xseg_submit(xseg, dstport, req);
- if (srl == None) {
+ p = xseg_submit(xseg, req, srcport, X_ALLOC);
+ if (p == NoPort) {
fprintf(stderr, "Cannot submit\n");
return -1;
}
+ xseg_signal(xseg, p);
return 0;
}
int cmd_delete(char *target)
{
+ uint32_t targetlen = strlen(target);
+ int r;
+ struct xseg_request *req;
+ init_local_signal();
+ xseg_bind_port(xseg, srcport, NULL);
+
+ req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
+ if (!req) {
+ fprintf(stderr, "No request!\n");
+ return -1;
+ }
+
+ r = xseg_prep_request(xseg, req, targetlen, 0);
+ if (r < 0) {
+ fprintf(stderr, "Cannot prepare request! (%lu, %lu)\n",
+ (unsigned long) targetlen, (unsigned long) req->bufferlen - targetlen);
+ xseg_put_request(xseg, req, srcport);
+ return -1;
+ }
+
+ char *reqtarget = xseg_get_target(xseg, req);
+ strncpy(reqtarget, target, targetlen);
+ req->op = X_DELETE;
+
+ xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+ if (p == NoPort){
+ fprintf(stderr, "Couldn't submit request\n");
+ xseg_put_request(xseg, req, srcport);
+ return -1;
+ }
+
+ xseg_signal(xseg, p);
+
return 0;
}
int cmd_clone(char *src, char *dst)
{
+
+ uint32_t targetlen = strlen(dst);
+ uint32_t parentlen = strlen(src);
+ struct xseg_request *req;
+ struct xseg_request_clone *xclone;
+ xseg_bind_port(xseg, srcport, NULL);
+ req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
+ if (!req) {
+ fprintf(stderr, "No request\n");
+ return -1;
+ }
+
+ int r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_request_clone));
+ if (r < 0) {
+ fprintf(stderr, "Cannot prepare request!\n");
+ xseg_put_request(xseg, req, srcport);
+ return -1;
+ }
+
+ char *target = xseg_get_target(xseg, req);
+ char *data = xseg_get_data(xseg, req);
+
+ strncpy(target, dst, targetlen);
+ xclone = (struct xseg_request_clone *) data;
+ strncpy(xclone->target, src, parentlen);
+ xclone->targetlen = parentlen;
+ xclone->size = -1;
+ req->offset = 0;
+ req->size = sizeof(struct xseg_request_clone);
+ req->op = X_CLONE;
+
+ xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+ if (p == NoPort){
+ fprintf(stderr, "Cannot submit request\n");
+ return -1;
+ }
+ xseg_signal(xseg, p);
+
return 0;
}
{
FILE *logfp;
char target[64], data[64];
+ char *req_target, *req_data;
/* null terminate name in case of req->target is less than 63 characters,
* and next character after name (aka first byte of next buffer) is not
* null
*/
unsigned int end = (req->targetlen > 63) ? 63 : req->targetlen;
+
+ req_target = xseg_get_target(xseg, req);
+ req_data = xseg_get_data(xseg, req);
logfp = fdopen(logfd, "a");
if (!logfp)
switch(method) {
case 0:
- strncpy(target, req->target, end);
+ strncpy(target, req_target, end);
target[end] = 0;
- strncpy(data, req->data, 63);
+ strncpy(data, req_data, 63);
data[63] = 0;
fprintf(logfp,
for (;;) {
active = 0;
- req = xseg_accept(xseg, portno1);
+ //FIXME
+ req = xseg_accept(xseg, portno1, 0);
if (req) {
- xseg_submit(xseg, portno2, req);
+ xseg_submit(xseg, req, portno2, X_ALLOC);
log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
active += 1;
}
- req = xseg_accept(xseg, portno2);
+ req = xseg_accept(xseg, portno2, 0);
if (req) {
- xseg_submit(xseg, portno1, req);
+ xseg_submit(xseg, req, portno1, X_ALLOC);
log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
active += 1;
}
- req = xseg_receive(xseg, portno1);
+ req = xseg_receive(xseg, portno1, 0);
if (req) {
- xseg_respond(xseg, portno2, req);
+ xseg_respond(xseg, req, portno2, X_ALLOC);
log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
active += 1;
}
- req = xseg_receive(xseg, portno2);
+ req = xseg_receive(xseg, portno2, 0);
if (req) {
- xseg_respond(xseg, portno1, req);
+ xseg_respond(xseg, req, portno1, X_ALLOC);
log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
active += 1;
}
long nr_submitted = 0, nr_received = 0, nr_failed = 0;
int reported = 0, r;
uint64_t offset;
- xserial srl;
+ xport port;
+ char *req_data, *req_target;
+ seed = random();
+ init_local_signal();
for (;;) {
xseg_prepare_wait(xseg, srcport);
if (nr_submitted < loops &&
- (submitted = xseg_get_request(xseg, srcport))) {
+ (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
xseg_cancel_wait(xseg, srcport);
- r = xseg_prep_request(submitted, targetlen, chunksize);
+ r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
targetlen, chunksize);
- xseg_put_request(xseg, submitted->portno, submitted);
+ xseg_put_request(xseg, submitted, srcport);
return -1;
}
+
+ req_target = xseg_get_target(xseg, submitted);
+ req_data = xseg_get_data(xseg, submitted);
- nr_submitted += 1;
reported = 0;
- seed = random();
mkname(namebuf, targetlen, seed);
namebuf[targetlen] = 0;
//printf("%ld: %s\n", nr_submitted, namebuf);
- strncpy(submitted->target, namebuf, targetlen);
+ strncpy(req_target, namebuf, targetlen);
offset = 0;// pick(size);
- mkchunk(submitted->buffer, chunksize, namebuf, targetlen, offset);
+ mkchunk(req_data, chunksize, namebuf, targetlen, offset);
submitted->offset = offset;
submitted->size = chunksize;
submitted->op = X_WRITE;
submitted->flags |= XF_NOSYNC;
- srl = xseg_submit(xseg, dstport, submitted);
- (void)srl;
- xseg_signal(xseg, dstport);
+ port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
+ if (port == NoPort) {
+ xseg_put_request(xseg, submitted, srcport);
+ } else {
+ seed = random();
+ nr_submitted += 1;
+ xseg_signal(xseg, port);
+ }
}
- received = xseg_receive(xseg, srcport);
+ received = xseg_receive(xseg, srcport, 0);
if (received) {
xseg_cancel_wait(xseg, srcport);
nr_received += 1;
nr_failed += 1;
report_request(received);
}
- if (xseg_put_request(xseg, received->portno, received))
- fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+ if (xseg_put_request(xseg, received, srcport))
+ fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
}
if (!submitted && !received)
long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0;
int reported = 0, r;
uint64_t offset;
- xserial srl;
+ xport port;
+ char *req_data, *req_target;
+ init_local_signal();
+ seed = random();
for (;;) {
submitted = NULL;
xseg_prepare_wait(xseg, srcport);
if (nr_submitted < loops &&
- (submitted = xseg_get_request(xseg, srcport))) {
+ (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
xseg_cancel_wait(xseg, srcport);
- r = xseg_prep_request(submitted, targetlen, chunksize);
+ r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
targetlen, chunksize);
- xseg_put_request(xseg, submitted->portno, submitted);
+ xseg_put_request(xseg, submitted, srcport);
return -1;
}
- nr_submitted += 1;
+ req_target = xseg_get_target(xseg, submitted);
reported = 0;
- seed = random();
mkname(namebuf, targetlen, seed);
namebuf[targetlen] = 0;
//printf("%ld: %s\n", nr_submitted, namebuf);
offset = 0;//pick(size);
- strncpy(submitted->target, namebuf, targetlen);
+ strncpy(req_target, namebuf, targetlen);
submitted->offset = offset;
submitted->size = chunksize;
submitted->op = X_READ;
-
- srl = xseg_submit(xseg, dstport, submitted);
- (void)srl;
- xseg_signal(xseg, dstport);
+ port = xseg_submit(xseg, submitted, srcport, X_ALLOC);
+ if (port == NoPort) {
+ xseg_put_request(xseg, submitted, srcport);
+ } else {
+ seed = random();
+ nr_submitted += 1;
+ xseg_signal(xseg, port);
+ }
}
- received = xseg_receive(xseg, srcport);
+ received = xseg_receive(xseg, srcport, 0);
if (received) {
xseg_cancel_wait(xseg, srcport);
nr_received += 1;
+ req_target = xseg_get_target(xseg, received);
+ req_data = xseg_get_data(xseg, received);
if (!(received->state & XS_SERVED)) {
nr_failed += 1;
report_request(received);
- } else if (!chkchunk(received->data, received->datalen,
- received->target, received->targetlen, received->offset)) {
+ } else if (!chkchunk(req_data, received->datalen,
+ req_target, received->targetlen, received->offset)) {
+ // report_request(received);
nr_mismatch += 1;
}
- if (xseg_put_request(xseg, received->portno, received))
- fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+ if (xseg_put_request(xseg, received, srcport))
+ fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
}
if (!submitted && !received)
uint64_t offset;
uint32_t targetlen = 10, chunksize = 4096;
struct timeval tv1, tv2;
- xserial srl;
+ xport p;
+ char *req_data, *req_target;
- xseg_bind_port(xseg, srcport);
+ xseg_bind_port(xseg, srcport, NULL);
gettimeofday(&tv1, NULL);
for (;;) {
submitted = NULL;
xseg_prepare_wait(xseg, srcport);
if (nr_submitted < loops && nr_flying < concurrent_reqs &&
- (submitted = xseg_get_request(xseg, srcport))) {
+ (submitted = xseg_get_request(xseg, srcport, dstport, X_ALLOC))) {
xseg_cancel_wait(xseg, srcport);
- r = xseg_prep_request(submitted, targetlen, chunksize);
+ r = xseg_prep_request(xseg, submitted, targetlen, chunksize);
if (r < 0) {
fprintf(stderr, "Cannot prepare request! (%u, %u)\n",
targetlen, chunksize);
- xseg_put_request(xseg, submitted->portno, submitted);
+ xseg_put_request(xseg, submitted, srcport);
return -1;
}
-
+
+ //FIXME
++nr_flying;
nr_submitted += 1;
reported = 0;
submitted->offset = offset;
submitted->size = chunksize;
+ req_target = xseg_get_target(xseg, submitted);
+ req_data = xseg_get_data(xseg, submitted);
if (op == 0)
submitted->op = X_INFO;
submitted->op = X_READ;
else if (op == 2) {
submitted->op = X_WRITE;
- mkchunk(submitted->buffer, submitted->datalen, submitted->target, submitted->targetlen, submitted->offset);
+ mkchunk(req_data, submitted->datalen, req_target, submitted->targetlen, submitted->offset);
}
- srl = xseg_submit(xseg, dstport, submitted);
- (void)srl;
- if (xseg_signal(xseg, dstport) < 0)
- perror("Cannot signal peer");
+ p = xseg_submit(xseg, submitted, srcport, X_ALLOC);
+ if ( p != NoPort){
+ if (xseg_signal(xseg, p) < 0)
+ perror("Cannot signal peer");
+ }
}
- received = xseg_receive(xseg, srcport);
+ received = xseg_receive(xseg, srcport, 0);
if (received) {
xseg_cancel_wait(xseg, srcport);
--nr_flying;
//report_request(received);
}
- if (xseg_put_request(xseg, received->portno, received))
- fprintf(stderr, "Cannot put request at port %u\n", received->portno);
+ if (xseg_put_request(xseg, received, srcport))
+ fprintf(stderr, "Cannot put request at port %u\n", received->src_portno);
}
if (!submitted && !received)
return 0;
}
-int cmd_report(uint32_t port)
+int cmd_report(uint32_t portno)
{
+ struct xseg_port *port = xseg_get_port(xseg, portno);
+ if (!port) {
+ printf("port %u is not assigned\n", portno);
+ return 0;
+ }
struct xq *fq, *rq, *pq;
- fq = &xseg->ports[port].free_queue;
- rq = &xseg->ports[port].request_queue;
- pq = &xseg->ports[port].reply_queue;
+ fq = xseg_get_queue(xseg, port, free_queue);
+ rq = xseg_get_queue(xseg, port, request_queue);
+ pq = xseg_get_queue(xseg, port, reply_queue);
fprintf(stderr, "port %u:\n"
+ " requests: %llu/%llu src gw: %u dst gw: %u\n"
" free_queue [%p] count : %u\n"
" request_queue [%p] count : %u\n"
" reply_queue [%p] count : %u\n",
- port,
+ portno, port->alloc_reqs, port->max_alloc_reqs,
+ xseg->src_gw[portno], xseg->dst_gw[portno],
(void *)fq, xq_count(fq),
(void *)rq, xq_count(rq),
(void *)pq, xq_count(pq));
if (cmd_join())
return -1;
- fprintf(stderr, "global free requests: %u\n", xq_count(xseg->free_requests));
+
+ fprintf(stderr, "Heap usage: %llu / %llu\n", xseg->heap->cur, xseg->config.heap_size);
for (t = 0; t < xseg->config.nr_ports; t++)
cmd_report(t);
struct xseg_request *req;
for (;;) {
- req = xseg_accept(xseg, dstport);
+ req = xseg_accept(xseg, dstport, 0);
if (!req)
break;
- if (xseg_put_request(xseg, req->portno, req))
- fprintf(stderr, "Cannot put request at port %u\n", req->portno);
+ if (xseg_put_request(xseg, req, srcport))
+ fprintf(stderr, "Cannot put request at port %u\n", req->src_portno);
}
return 0;
{
struct xseg_request *req;
char *buf = malloc(sizeof(char) * 8128);
-
- xseg_bind_port(xseg, srcport);
+ char *req_target, *req_data;
+ xseg_bind_port(xseg, srcport, NULL);
+ xport p;
for (; nr--;) {
xseg_prepare_wait(xseg, srcport);
- req = xseg_accept(xseg, srcport);
+ req = xseg_accept(xseg, srcport, 0);
if (req) {
+ req_target = xseg_get_target(xseg, req);
+ req_data = xseg_get_data(xseg, req);
xseg_cancel_wait(xseg, srcport);
if (fail == 1)
req->state &= ~XS_SERVED;
else {
if (req->op == X_READ)
- mkchunk(req->buffer, req->datalen, req->target, req->targetlen, req->offset);
+ mkchunk(req_data, req->datalen, req_target, req->targetlen, req->offset);
else if (req->op == X_WRITE)
- memcpy(buf, req->data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
+ memcpy(buf, req_data, (sizeof(*buf) > req->datalen) ? req->datalen : sizeof(*buf));
else if (req->op == X_INFO)
*((uint64_t *) req->data) = 4294967296;
req->serviced = req->size;
}
- xseg_respond(xseg, dstport, req);
- xseg_signal(xseg, dstport);
+ p = xseg_respond(xseg, req, srcport, X_ALLOC);
+ xseg_signal(xseg, p);
continue;
}
++nr;
void handle_reply(struct xseg_request *req)
{
+ char *req_data = xseg_get_data(xseg, req);
+ char *req_target = xseg_get_target(xseg, req);
if (!(req->state & XS_SERVED)) {
report_request(req);
goto put;
switch (req->op) {
case X_READ:
- fwrite(req->data, 1, req->datalen, stdout);
+ fwrite(req_data, 1, req->datalen, stdout);
break;
case X_WRITE:
+ fprintf(stdout, "wrote: ");
+ fwrite(req_data, 1, req->datalen, stdout);
+ break;
case X_SYNC:
case X_DELETE:
+ fprintf(stderr, "deleted %s\n", req_target);
+ break;
case X_TRUNCATE:
case X_COMMIT:
case X_CLONE:
+ fprintf(stderr, "cloned %s\n", ((struct xseg_request_clone *)req_data)->target);
+ break;
case X_INFO:
- fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req->data));
+ fprintf(stderr, "size: %llu\n", (unsigned long long)*((uint64_t *)req_data));
break;
default:
}
put:
- if (xseg_put_request(xseg, req->portno, req))
- fprintf(stderr, "Cannot put reply at port %u\n", req->portno);
+ if (xseg_put_request(xseg, req, srcport))
+ fprintf(stderr, "Cannot put reply at port %u\n", req->src_portno);
}
int cmd_wait(uint32_t nr)
{
struct xseg_request *req;
long ret;
+ init_local_signal();
for (;;) {
- req = xseg_receive(xseg, srcport);
+ req = xseg_receive(xseg, srcport, 0);
if (req) {
handle_reply(req);
nr--;
struct xseg_request *req;
for (;;) {
- req = xseg_receive(xseg, dstport);
+ req = xseg_receive(xseg, dstport, 0);
if (!req)
break;
fprintf(stderr, "request: %08llx%08llx\n"
//fwrite(req->buffer, 1, req->bufferlen, stdout);
- if (xseg_put_request(xseg, req->portno, req))
+ if (xseg_put_request(xseg, req, srcport))
fprintf(stderr, "Cannot put reply\n");
}
int cmd_bind(long portno)
{
- struct xseg_port *port = xseg_bind_port(xseg, portno);
+ struct xseg_port *port = xseg_bind_port(xseg, portno, NULL);
if (!port) {
fprintf(stderr, "failed to bind port %ld\n", portno);
return 1;