5 #include <xseg/protocol.h>
8 #include <sys/syscall.h>
25 volatile enum io_state_enum state;
26 struct xseg_request *mreq;
27 struct xseg_request **breqs;
28 unsigned long breq_len, breq_cnt;
31 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
33 // xlock_acquire(&vio->lock, 1);
35 // xlock_release(&vio->lock);
38 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
40 enum io_state_enum state;
41 // xlock_acquire(&vio->lock, 1);
43 // xlock_release(&vio->lock);
47 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
49 return (struct vlmc_io *) pr->priv;
52 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
54 return (struct vlmcd *) peer->priv;
57 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
58 struct xseg_request *req)
60 struct vlmc_io *vio = __get_vlmcio(pr);
61 struct vlmcd *vlmc = __get_vlmcd(peer);
64 char *target, *mtarget;
67 if (pr->req->op == X_WRITE && !req->size && (pr->req->flags & (XF_FLUSH|XF_FUA))){
68 //hanlde flush requests here, so we don't mess with mapper
69 //because of the -1 offset
70 fprintf(stderr, "completing flush request\n");
71 pr->req->serviced = pr->req->size;
72 __set_vio_state(vio, CONCLUDED);
76 vio->err = 0; //reset error state
77 vio->mreq = xseg_get_request(peer->xseg, pr->portno,
78 vlmc->mportno, X_ALLOC);
82 /* use dalalen 0. let mapper allocate buffer space as needed */
83 r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
87 target = xseg_get_target(peer->xseg, pr->req);
90 mtarget = xseg_get_target(peer->xseg, vio->mreq);
94 strncpy(mtarget, target, pr->req->targetlen);
95 vio->mreq->size = pr->req->size;
96 vio->mreq->offset = pr->req->offset;
98 switch (pr->req->op) {
99 case X_READ: vio->mreq->op = X_MAPR; break;
100 case X_WRITE: vio->mreq->op = X_MAPW; break;
101 case X_INFO: vio->mreq->op = X_INFO; break;
102 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
103 default: goto out_put;
105 xseg_set_req_data(peer->xseg, vio->mreq, pr);
106 __set_vio_state(vio, MAPPING);
107 p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
110 r = xseg_signal(peer->xseg, p);
112 /* since submission is successful, just print a warning message */
113 fprintf(stderr, "couldnt signal port %u", p);
119 xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
121 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
123 __set_vio_state(vio, CONCLUDED);
128 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
129 struct xseg_request *req)
131 struct vlmc_io *vio = __get_vlmcio(pr);
132 struct vlmcd *vlmc = __get_vlmcd(peer);
133 uint64_t pos, datalen, offset;
135 struct xseg_request *breq;
140 //assert vio>mreq == req
141 if (vio->mreq != req){
142 printf("vio->mreq %lx, req: %lx state: %d breq[0]: %lx\n", vio->mreq, req, vio->state, vio->breqs[0]);
143 r = *(volatile int *)0;
146 /* FIXME shouldn's XS_FAILED be sufficient ?? */
147 if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){
148 fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op);
149 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
151 __set_vio_state(vio, CONCLUDED);
153 } else if (vio->mreq->op == X_INFO) {
154 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
155 char *data = xseg_get_data(peer->xseg, pr->req);
156 *(uint64_t *)data = xinfo->size;
157 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
159 __set_vio_state(vio, CONCLUDED);
161 } else if (vio->mreq->op == X_CLOSE) {
162 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
164 __set_vio_state(vio, CONCLUDED);
167 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
170 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
172 __set_vio_state(vio, CONCLUDED);
176 vio->breq_len = mreply->cnt;
177 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
180 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
182 __set_vio_state(vio, CONCLUDED);
187 __set_vio_state(vio, SERVING);
188 for (i = 0; i < vio->breq_len; i++) {
189 datalen = mreply->segs[i].size;
190 offset = mreply->segs[i].offset;
191 targetlen = mreply->segs[i].targetlen;
192 breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
197 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
200 xseg_put_request(peer->xseg, breq, pr->portno);
203 breq->offset = offset;
204 breq->size = datalen;
205 breq->op = pr->req->op;
206 target = xseg_get_target(peer->xseg, breq);
209 xseg_put_request(peer->xseg, breq, pr->portno);
212 strncpy(target, mreply->segs[i].target, targetlen);
213 r = xseg_set_req_data(peer->xseg, breq, pr);
216 xseg_put_request(peer->xseg, breq, pr->portno);
220 // this should work, right ?
221 breq->data = pr->req->data + pos;
223 p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
227 xseg_get_req_data(peer->xseg, breq, &dummy);
228 xseg_put_request(peer->xseg, breq, pr->portno);
231 r = xseg_signal(peer->xseg, p);
233 //XSEGLOG("couldn't signal port %u", p);
235 vio->breqs[i] = breq;
238 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
242 __set_vio_state(vio, CONCLUDED);
257 static int handle_serving(struct peerd *peer, struct peer_req *pr,
258 struct xseg_request *req)
260 struct vlmc_io *vio = __get_vlmcio(pr);
261 struct vlmcd *vlmc = __get_vlmcd(peer);
262 struct xseg_request *breq = req;
264 if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
265 fprintf(stderr, "req %lx (op: %d) failed at offset \n", req, req->op, req->offset);
268 //assert breq->serviced == breq->size
269 __sync_fetch_and_add(&pr->req->serviced, breq->serviced);
271 xseg_put_request(peer->xseg, breq, pr->portno);
273 if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) {
274 __set_vio_state(vio, CONCLUDED);
287 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
288 enum dispatch_reason reason)
290 struct vlmc_io *vio = __get_vlmcio(pr);
291 struct vlmcd *vlmc = __get_vlmcd(peer);
293 xlock_acquire(&vio->lock,1);
295 __set_vio_state(vio, ACCEPTED);
297 enum io_state_enum state = __get_vio_state(vio);
300 handle_accepted(peer, pr, req);
303 handle_mapping(peer, pr, req);
306 handle_serving(peer, pr, req);
309 fprintf(stderr, "invalid state. dispatch called for concluded\n");
312 fprintf(stderr, "wtf dude? invalid state\n");
315 xlock_release(&vio->lock);
320 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
323 struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
330 peer->priv = (void *) vlmc;
332 for (i = 0; i < peer->nr_ops; i++) {
333 vio = malloc(sizeof(struct vlmc_io));
341 xlock_release(&vio->lock);
342 peer->peer_reqs[i].priv = (void *) vio;
344 if (i < peer->nr_ops) {
345 for (j = 0; j < i; j++) {
346 free(peer->peer_reqs[i].priv);
351 for (i = 0; i < argc; i++) {
352 if (!strcmp(argv[i], "-mp") && (i+1) < argc){
353 vlmc->mportno = atoi(argv[i+1]);
357 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
358 vlmc->bportno = atoi(argv[i+1]);
364 const struct sched_param param = { .sched_priority = 99 };
365 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);