5 #include <xseg/protocol.h>
8 #include <sys/syscall.h>
25 volatile enum io_state_enum state;
26 struct xseg_request *mreq;
27 struct xseg_request **breqs;
28 unsigned long breq_len, breq_cnt;
31 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
33 // xlock_acquire(&vio->lock, 1);
35 // xlock_release(&vio->lock);
38 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
40 enum io_state_enum state;
41 // xlock_acquire(&vio->lock, 1);
43 // xlock_release(&vio->lock);
47 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
49 return (struct vlmc_io *) pr->priv;
52 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
54 return (struct vlmcd *) peer->priv;
57 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
58 struct xseg_request *req)
60 struct vlmc_io *vio = __get_vlmcio(pr);
61 struct vlmcd *vlmc = __get_vlmcd(peer);
64 char *target, *mtarget;
67 if (pr->req->op == X_WRITE && !req->size && (pr->req->flags & (XF_FLUSH|XF_FUA))){
68 //hanlde flush requests here, so we don't mess with mapper
69 //because of the -1 offset
70 fprintf(stderr, "completing flush request\n");
71 pr->req->serviced = pr->req->size;
72 __set_vio_state(vio, CONCLUDED);
76 vio->err = 0; //reset error state
77 vio->mreq = xseg_get_request(peer->xseg, peer->portno,
78 vlmc->mportno, X_ALLOC);
82 /* use dalalen 0. let mapper allocate buffer space as needed */
83 r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
87 target = xseg_get_target(peer->xseg, pr->req);
90 mtarget = xseg_get_target(peer->xseg, vio->mreq);
94 strncpy(mtarget, target, pr->req->targetlen);
95 vio->mreq->size = pr->req->size;
96 vio->mreq->offset = pr->req->offset;
98 switch (pr->req->op) {
99 case X_READ: vio->mreq->op = X_MAPR; break;
100 case X_WRITE: vio->mreq->op = X_MAPW; break;
101 case X_INFO: vio->mreq->op = X_INFO; break;
102 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
103 default: goto out_put;
105 xseg_set_req_data(peer->xseg, vio->mreq, pr);
106 __set_vio_state(vio, MAPPING);
107 p = xseg_submit(peer->xseg, vio->mreq, peer->portno, X_ALLOC);
110 r = xseg_signal(peer->xseg, p);
112 /* since submission is successful, just print a warning message */
113 fprintf(stderr, "couldnt signal port %u", p);
119 xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
121 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
123 __set_vio_state(vio, CONCLUDED);
128 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
129 struct xseg_request *req)
131 struct vlmc_io *vio = __get_vlmcio(pr);
132 struct vlmcd *vlmc = __get_vlmcd(peer);
133 uint64_t pos, datalen, offset;
135 struct xseg_request *breq;
140 //assert vio>mreq == req
141 if (vio->mreq != req){
142 printf("vio->mreq %lx, req: %lx state: %d breq[0]: %lx\n", vio->mreq, req, vio->state, vio->breqs[0]);
143 r = *(volatile int *)0;
146 /* FIXME shouldn's XS_FAILED be sufficient ?? */
147 if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){
148 fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op);
149 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
151 __set_vio_state(vio, CONCLUDED);
153 } else if (vio->mreq->op == X_INFO) {
154 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
155 char *data = xseg_get_data(peer->xseg, pr->req);
156 *(off_t *)data = xinfo->size;
157 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
159 __set_vio_state(vio, CONCLUDED);
161 } else if (vio->mreq->op == X_CLOSE) {
162 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
163 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
165 __set_vio_state(vio, CONCLUDED);
168 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
171 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
173 __set_vio_state(vio, CONCLUDED);
177 vio->breq_len = mreply->cnt;
178 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
181 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
183 __set_vio_state(vio, CONCLUDED);
188 __set_vio_state(vio, SERVING);
189 for (i = 0; i < vio->breq_len; i++) {
190 datalen = mreply->segs[i].size;
191 offset = mreply->segs[i].offset;
192 targetlen = mreply->segs[i].targetlen;
193 breq = xseg_get_request(peer->xseg, peer->portno, vlmc->bportno, X_ALLOC);
198 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
201 xseg_put_request(peer->xseg, breq, peer->portno);
204 breq->offset = offset;
205 breq->size = datalen;
206 breq->op = pr->req->op;
207 target = xseg_get_target(peer->xseg, breq);
210 xseg_put_request(peer->xseg, breq, peer->portno);
213 strncpy(target, mreply->segs[i].target, targetlen);
214 r = xseg_set_req_data(peer->xseg, breq, pr);
217 xseg_put_request(peer->xseg, breq, peer->portno);
221 // this should work, right ?
222 breq->data = pr->req->data + pos;
224 p = xseg_submit(peer->xseg, breq, peer->portno, X_ALLOC);
228 xseg_get_req_data(peer->xseg, breq, &dummy);
229 xseg_put_request(peer->xseg, breq, peer->portno);
232 r = xseg_signal(peer->xseg, p);
234 //XSEGLOG("couldn't signal port %u", p);
236 vio->breqs[i] = breq;
239 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
243 __set_vio_state(vio, CONCLUDED);
258 static int handle_serving(struct peerd *peer, struct peer_req *pr,
259 struct xseg_request *req)
261 struct vlmc_io *vio = __get_vlmcio(pr);
262 struct vlmcd *vlmc = __get_vlmcd(peer);
263 struct xseg_request *breq = req;
265 if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
266 fprintf(stderr, "req %lx (op: %d) failed at offset \n", req, req->op, req->offset);
269 //assert breq->serviced == breq->size
270 __sync_fetch_and_add(&pr->req->serviced, breq->serviced);
272 xseg_put_request(peer->xseg, breq, peer->portno);
274 if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) {
275 __set_vio_state(vio, CONCLUDED);
288 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
290 struct vlmc_io *vio = __get_vlmcio(pr);
291 struct vlmcd *vlmc = __get_vlmcd(peer);
293 xlock_acquire(&vio->lock,1);
295 __set_vio_state(vio, ACCEPTED);
297 enum io_state_enum state = __get_vio_state(vio);
300 handle_accepted(peer, pr, req);
303 handle_mapping(peer, pr, req);
306 handle_serving(peer, pr, req);
309 fprintf(stderr, "invalid state. dispatch called for concluded\n");
312 fprintf(stderr, "wtf dude? invalid state\n");
315 xlock_release(&vio->lock);
320 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
323 struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
330 peer->priv = (void *) vlmc;
332 for (i = 0; i < peer->nr_ops; i++) {
333 vio = malloc(sizeof(struct vlmc_io));
341 xlock_release(&vio->lock);
342 peer->peer_reqs[i].priv = (void *) vio;
344 if (i < peer->nr_ops) {
345 for (j = 0; j < i; j++) {
346 free(peer->peer_reqs[i].priv);
351 for (i = 0; i < argc; i++) {
352 if (!strcmp(argv[i], "-mp") && (i+1) < argc){
353 vlmc->mportno = atoi(argv[i+1]);
357 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
358 vlmc->bportno = atoi(argv[i+1]);
364 const struct sched_param param = { .sched_priority = 99 };
365 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);