5 #include <xseg/protocol.h>
8 #include <sys/syscall.h>
25 volatile enum io_state_enum state;
26 struct xseg_request *mreq;
27 struct xseg_request **breqs;
28 unsigned long breq_len, breq_cnt;
31 void custom_peer_usage()
33 fprintf(stderr, "Custom peer options: \n"
35 "-bp : blocker port for blocks\n"
39 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
41 // xlock_acquire(&vio->lock, 1);
43 // xlock_release(&vio->lock);
46 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
48 enum io_state_enum state;
49 // xlock_acquire(&vio->lock, 1);
51 // xlock_release(&vio->lock);
55 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
57 return (struct vlmc_io *) pr->priv;
60 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
62 return (struct vlmcd *) peer->priv;
65 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
66 struct xseg_request *req)
68 struct vlmc_io *vio = __get_vlmcio(pr);
69 struct vlmcd *vlmc = __get_vlmcd(peer);
72 char *target, *mtarget;
75 if (pr->req->op == X_WRITE && !req->size && (pr->req->flags & (XF_FLUSH|XF_FUA))){
76 //hanlde flush requests here, so we don't mess with mapper
77 //because of the -1 offset
78 fprintf(stderr, "completing flush request\n");
79 pr->req->serviced = pr->req->size;
80 __set_vio_state(vio, CONCLUDED);
84 vio->err = 0; //reset error state
85 vio->mreq = xseg_get_request(peer->xseg, pr->portno,
86 vlmc->mportno, X_ALLOC);
90 /* use dalalen 0. let mapper allocate buffer space as needed */
91 r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
95 target = xseg_get_target(peer->xseg, pr->req);
98 mtarget = xseg_get_target(peer->xseg, vio->mreq);
102 strncpy(mtarget, target, pr->req->targetlen);
103 vio->mreq->size = pr->req->size;
104 vio->mreq->offset = pr->req->offset;
105 vio->mreq->flags = 0;
106 switch (pr->req->op) {
107 case X_READ: vio->mreq->op = X_MAPR; break;
108 case X_WRITE: vio->mreq->op = X_MAPW; break;
109 case X_INFO: vio->mreq->op = X_INFO; break;
110 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
111 default: goto out_put;
113 xseg_set_req_data(peer->xseg, vio->mreq, pr);
114 __set_vio_state(vio, MAPPING);
115 p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
118 r = xseg_signal(peer->xseg, p);
120 /* since submission is successful, just print a warning message */
121 fprintf(stderr, "couldnt signal port %u", p);
127 xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
129 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
131 __set_vio_state(vio, CONCLUDED);
136 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
137 struct xseg_request *req)
139 struct vlmc_io *vio = __get_vlmcio(pr);
140 struct vlmcd *vlmc = __get_vlmcd(peer);
141 uint64_t pos, datalen, offset;
143 struct xseg_request *breq;
148 //assert vio>mreq == req
149 if (vio->mreq != req){
150 printf("vio->mreq %lx, req: %lx state: %d breq[0]: %lx\n", vio->mreq, req, vio->state, vio->breqs[0]);
151 r = *(volatile int *)0;
154 /* FIXME shouldn's XS_FAILED be sufficient ?? */
155 if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){
156 fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op);
157 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
159 __set_vio_state(vio, CONCLUDED);
161 } else if (vio->mreq->op == X_INFO) {
162 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
163 char *data = xseg_get_data(peer->xseg, pr->req);
164 *(uint64_t *)data = xinfo->size;
165 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
167 __set_vio_state(vio, CONCLUDED);
169 } else if (vio->mreq->op == X_CLOSE) {
170 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
172 __set_vio_state(vio, CONCLUDED);
175 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
178 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
180 __set_vio_state(vio, CONCLUDED);
184 vio->breq_len = mreply->cnt;
185 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
188 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
190 __set_vio_state(vio, CONCLUDED);
195 __set_vio_state(vio, SERVING);
196 for (i = 0; i < vio->breq_len; i++) {
197 datalen = mreply->segs[i].size;
198 offset = mreply->segs[i].offset;
199 targetlen = mreply->segs[i].targetlen;
200 breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
205 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
208 xseg_put_request(peer->xseg, breq, pr->portno);
211 breq->offset = offset;
212 breq->size = datalen;
213 breq->op = pr->req->op;
214 target = xseg_get_target(peer->xseg, breq);
217 xseg_put_request(peer->xseg, breq, pr->portno);
220 strncpy(target, mreply->segs[i].target, targetlen);
221 r = xseg_set_req_data(peer->xseg, breq, pr);
224 xseg_put_request(peer->xseg, breq, pr->portno);
228 // this should work, right ?
229 breq->data = pr->req->data + pos;
231 p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
235 xseg_get_req_data(peer->xseg, breq, &dummy);
236 xseg_put_request(peer->xseg, breq, pr->portno);
239 r = xseg_signal(peer->xseg, p);
241 //XSEGLOG("couldn't signal port %u", p);
243 vio->breqs[i] = breq;
246 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
250 __set_vio_state(vio, CONCLUDED);
265 static int handle_serving(struct peerd *peer, struct peer_req *pr,
266 struct xseg_request *req)
268 struct vlmc_io *vio = __get_vlmcio(pr);
269 struct vlmcd *vlmc = __get_vlmcd(peer);
270 struct xseg_request *breq = req;
272 if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
273 fprintf(stderr, "req %lx (op: %d) failed at offset \n", req, req->op, req->offset);
276 //assert breq->serviced == breq->size
277 __sync_fetch_and_add(&pr->req->serviced, breq->serviced);
279 xseg_put_request(peer->xseg, breq, pr->portno);
281 if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) {
282 __set_vio_state(vio, CONCLUDED);
295 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
296 enum dispatch_reason reason)
298 struct vlmc_io *vio = __get_vlmcio(pr);
299 struct vlmcd *vlmc = __get_vlmcd(peer);
301 xlock_acquire(&vio->lock,1);
303 __set_vio_state(vio, ACCEPTED);
305 enum io_state_enum state = __get_vio_state(vio);
308 handle_accepted(peer, pr, req);
311 handle_mapping(peer, pr, req);
314 handle_serving(peer, pr, req);
317 fprintf(stderr, "invalid state. dispatch called for concluded\n");
320 fprintf(stderr, "wtf dude? invalid state\n");
323 xlock_release(&vio->lock);
328 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
331 struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
338 peer->priv = (void *) vlmc;
340 for (i = 0; i < peer->nr_ops; i++) {
341 vio = malloc(sizeof(struct vlmc_io));
349 xlock_release(&vio->lock);
350 peer->peer_reqs[i].priv = (void *) vio;
352 if (i < peer->nr_ops) {
353 for (j = 0; j < i; j++) {
354 free(peer->peer_reqs[i].priv);
359 for (i = 0; i < argc; i++) {
360 if (!strcmp(argv[i], "-mp") && (i+1) < argc){
361 vlmc->mportno = atoi(argv[i+1]);
365 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
366 vlmc->bportno = atoi(argv[i+1]);
372 const struct sched_param param = { .sched_priority = 99 };
373 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);