prefs->src_port = peer->portno_start; //TODO: allow user to change this
prefs->dst_port = (xport) dst_port;
- /**************************
- * Customize struct peerd *
- **************************/
+ /*********************************
+ * Create timers for all metrics *
+ *********************************/
prefs->total_tm = malloc(sizeof(struct timer));
prefs->get_tm = malloc(sizeof(struct timer));
memset(prefs->sub_tm, 0, sizeof(struct timer));
memset(prefs->rec_tm, 0, sizeof(struct timer));
+ /**************************
+ * Customize struct peerd *
+ **************************/
+
peer->peerd_loop = custom_peerd_loop;
peer->priv = (void *) prefs;
return 0;
}
-int send_request(struct peerd *peer, struct bench *prefs)
+static int send_request(struct peerd *peer, struct bench *prefs)
{
struct xseg_request *req;
struct xseg *xseg = peer->xseg;
+ struct peer_req *pr;
xport srcport = prefs->src_port;
xport dstport = prefs->dst_port;
xport p;
//srcport and dstport must already be provided by the user.
//returns struct xseg_request with basic initializations
- XSEGLOG2(&lc, I, "Get request %lu\n", prefs->get_tm->completed);
+ XSEGLOG("Get request %lu\n", prefs->get_tm->completed);
timer_start(prefs->get_tm);
req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
if (!req) {
}
timer_stop(prefs->get_tm);
- XSEGLOG2(&lc, I, "Prepare request\n");
+ XSEGLOG("Prepare request\n");
//Allocate enough space for the data and the target's name
r = xseg_prep_request(xseg, req, targetlen, size);
if (r < 0) {
req->op = X_WRITE;
#endif
- XSEGLOG2(&lc, I, "Submit request %lu\n", prefs->sub_tm->completed);
+ //Measure this?
+ XSEGLOG("Set req data\n");
+ pr = alloc_peer_req(peer);
+ r = xseg_set_req_data(xseg, req, pr);
+
+
+ XSEGLOG("Submit request %lu\n", prefs->sub_tm->completed);
//Submit the request from the source port to the target port
timer_start(prefs->sub_tm);
p = xseg_submit(xseg, req, srcport, X_ALLOC);
return -1;
}
timer_stop(prefs->sub_tm);
+
timer_start(prefs->rec_tm);
//Send SIGIO to the process that has binded this port to inform that
//IO is possible
XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
xseg_init_local_signal(xseg, peer->portno_start);
uint64_t loops;
+ unsigned long max_completed = prefs->ts / prefs->bs;
timer_start(prefs->total_tm);
- while (!isTerminate()
- && xq_count(&peer->free_reqs) == peer->nr_ops
- && prefs->rec_tm->completed != prefs->ts / prefs->bs ) {
+ while (!isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops) {
#ifdef MT
if (t->func) {
XSEGLOG2(&lc, D, "%s executes function\n", id);
continue;
}
#endif
+send_request:
while (prefs->sub_tm->completed - prefs->rec_tm->completed <
prefs->iodepth){
XSEGLOG2(&lc, I, "Start sending new request\n");
//Heart of peerd_loop. This loop is common for everyone.
for (loops = threshold; loops > 0; loops--) {
- if (loops == 1)
- xseg_prepare_wait(xseg, peer->portno_start);
- if (check_ports(peer))
- loops = threshold;
+ if (check_ports(peer)) {
+ if (max_completed == prefs->rec_tm->completed)
+ return 0;
+ else
+ //If an old request has just been acked, the most sensible
+ //thing to do is immediately to send a new one
+ goto send_request;
+ }
}
+ xseg_prepare_wait(xseg, peer->portno_start);
#ifdef ST_THREADS
if (ta){
st_sleep(0);
xseg_cancel_wait(xseg, peer->portno_start);
XSEGLOG2(&lc, I, "%s woke up\n", id);
}
+
+ XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
+ xq_count(&peer->free_reqs), peer->nr_ops);
return 0;
}
}
-void handle_received(struct peerd *peer, struct peer_req *pr)
+static void handle_received(struct peerd *peer, struct peer_req *pr)
{
//FIXME: handle null pointer
struct bench *prefs = peer->priv;
//This is wrong, benchmarking peer should not accept requests,
//only receive them.
complete(peer, pr);
+ break;
case dispatch_receive:
handle_received(peer, pr);
+ break;
default:
fail(peer, pr);
}