4c322097dd24e0bf41c0962fac8b2ac35c0c7587
[archipelago] / xseg / peers / user / mt-vlmcd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <xseg/protocol.h>
6 #include <mpeer.h>
7 #include <sched.h>
8 #include <sys/syscall.h>
9
10 enum io_state_enum {
11         ACCEPTED = 0,
12         MAPPING = 1,
13         SERVING = 2,
14         CONCLUDED = 3
15 };
16
17 struct vlmcd {
18         xport mportno;
19         xport bportno;
20 };
21
22 struct vlmc_io {
23         int err;
24         struct xlock lock;
25         volatile enum io_state_enum state;
26         struct xseg_request *mreq;
27         struct xseg_request **breqs;
28         unsigned long breq_len, breq_cnt;
29 };
30
31 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
32 {
33 //      xlock_acquire(&vio->lock, 1);
34         vio->state = state;
35 //      xlock_release(&vio->lock);
36 }
37
38 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
39 {
40         enum io_state_enum state;
41 //      xlock_acquire(&vio->lock, 1);
42         state = vio->state;
43 //      xlock_release(&vio->lock);
44         return state;
45 }
46
47 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
48 {
49         return (struct vlmc_io *) pr->priv;
50 }
51
52 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
53 {
54         return (struct vlmcd *) peer->priv;
55 }
56
57 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
58                                 struct xseg_request *req)
59 {
60         struct vlmc_io *vio = __get_vlmcio(pr);
61         struct vlmcd *vlmc = __get_vlmcd(peer);
62         int r;
63         xport p;
64         char *target, *mtarget;
65         void *dummy;
66
67         if (pr->req->op == X_WRITE && !req->size && (pr->req->flags & (XF_FLUSH|XF_FUA))){
68                 //hanlde flush requests here, so we don't mess with mapper
69                 //because of the -1 offset
70                 fprintf(stderr, "completing flush request\n");
71                 pr->req->serviced = pr->req->size;
72                 __set_vio_state(vio, CONCLUDED);
73                 complete(peer, pr);
74                 return 0;               
75         }
76         vio->err = 0; //reset error state
77         vio->mreq = xseg_get_request(peer->xseg, peer->portno, 
78                                         vlmc->mportno, X_ALLOC);
79         if (!vio->mreq)
80                 goto out_err;
81
82         /* use dalalen 0. let mapper allocate buffer space as needed */
83         r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0); 
84         if (r < 0) {
85                 goto out_put;
86         }
87         target = xseg_get_target(peer->xseg, pr->req);
88         if (!target)
89                 goto out_put;
90         mtarget = xseg_get_target(peer->xseg, vio->mreq);
91         if (!mtarget)
92                 goto out_put;
93
94         strncpy(mtarget, target, pr->req->targetlen);
95         vio->mreq->size = pr->req->size;
96         vio->mreq->offset = pr->req->offset;
97         vio->mreq->flags = 0;
98         switch (pr->req->op) {
99                 case X_READ: vio->mreq->op = X_MAPR; break;
100                 case X_WRITE: vio->mreq->op = X_MAPW; break;
101                 case X_INFO: vio->mreq->op = X_INFO; break;
102                 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
103                 default: goto out_put;
104         }
105         xseg_set_req_data(peer->xseg, vio->mreq, pr);
106         __set_vio_state(vio, MAPPING);
107         p = xseg_submit(peer->xseg, vio->mreq, peer->portno, X_ALLOC);
108         if (p == NoPort)
109                 goto out_unset;
110         r = xseg_signal(peer->xseg, p);
111         if (r < 0) {
112                 /* since submission is successful, just print a warning message */
113                 fprintf(stderr, "couldnt signal port %u", p);
114         }
115
116         return 0;
117
118 out_unset:
119         xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
120 out_put:
121         xseg_put_request(peer->xseg, vio->mreq, peer->portno);
122 out_err:
123         __set_vio_state(vio, CONCLUDED);
124         fail(peer, pr);
125         return -1;
126 }
127
128 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
129                                 struct xseg_request *req)
130 {
131         struct vlmc_io *vio = __get_vlmcio(pr);
132         struct vlmcd *vlmc = __get_vlmcd(peer);
133         uint64_t pos, datalen, offset;
134         uint32_t targetlen;
135         struct xseg_request *breq;
136         char *target;
137         int i,r;
138         xport p;
139         
140         //assert vio>mreq == req 
141         if (vio->mreq != req){
142                 printf("vio->mreq %lx, req: %lx state: %d breq[0]: %lx\n", vio->mreq, req, vio->state, vio->breqs[0]);
143                 r = *(volatile int *)0;
144                 return -1;
145         }
146         /* FIXME shouldn's XS_FAILED be sufficient ?? */
147         if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){
148                 fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op);
149                 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
150                 vio->mreq = NULL;
151                 __set_vio_state(vio, CONCLUDED);
152                 fail(peer, pr);
153         } else if (vio->mreq->op == X_INFO) {
154                 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
155                 char *data = xseg_get_data(peer->xseg, pr->req);
156                 *(off_t *)data = xinfo->size;
157                 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
158                 vio->mreq = NULL;
159                 __set_vio_state(vio, CONCLUDED);
160                 complete(peer, pr);
161         } else if (vio->mreq->op == X_CLOSE) {
162                 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
163                 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
164                 vio->mreq = NULL;
165                 __set_vio_state(vio, CONCLUDED);
166                 complete(peer, pr);
167         } else {
168                 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
169                 if (!mreply->cnt){
170                         printf("foo2\n");
171                         xseg_put_request(peer->xseg, vio->mreq, peer->portno);
172                         vio->mreq = NULL;
173                         __set_vio_state(vio, CONCLUDED);
174                         fail(peer, pr);
175                         goto out;
176                 }
177                 vio->breq_len = mreply->cnt;
178                 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
179                 if (!vio->breqs) {
180                         printf("foo3\n");
181                         xseg_put_request(peer->xseg, vio->mreq, peer->portno);
182                         vio->mreq = NULL;
183                         __set_vio_state(vio, CONCLUDED);
184                         fail(peer, pr);
185                         goto out_err;
186                 }
187                 pos = 0;
188                 __set_vio_state(vio, SERVING); 
189                 for (i = 0; i < vio->breq_len; i++) {
190                         datalen = mreply->segs[i].size;
191                         offset = mreply->segs[i].offset;
192                         targetlen = mreply->segs[i].targetlen;
193                         breq = xseg_get_request(peer->xseg, peer->portno, vlmc->bportno, X_ALLOC);
194                         if (!breq) {
195                                 vio->err = 1;
196                                 break;
197                         }
198                         r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
199                         if (r < 0) {
200                                 vio->err = 1;
201                                 xseg_put_request(peer->xseg, breq, peer->portno);
202                                 break;
203                         }
204                         breq->offset = offset;
205                         breq->size = datalen;
206                         breq->op = pr->req->op;
207                         target = xseg_get_target(peer->xseg, breq);
208                         if (!target) {
209                                 vio->err = 1;
210                                 xseg_put_request(peer->xseg, breq, peer->portno);
211                                 break;
212                         }
213                         strncpy(target, mreply->segs[i].target, targetlen);
214                         r = xseg_set_req_data(peer->xseg, breq, pr);
215                         if (r<0) {
216                                 vio->err = 1;
217                                 xseg_put_request(peer->xseg, breq, peer->portno);
218                                 break;
219                         }
220
221                         // this should work, right ?
222                         breq->data = pr->req->data + pos;
223                         pos += datalen;
224                         p = xseg_submit(peer->xseg, breq, peer->portno, X_ALLOC);
225                         if (p == NoPort){
226                                 void *dummy;
227                                 vio->err = 1;
228                                 xseg_get_req_data(peer->xseg, breq, &dummy);
229                                 xseg_put_request(peer->xseg, breq, peer->portno);
230                                 break;
231                         }
232                         r = xseg_signal(peer->xseg, p);
233                         if (r < 0){
234                                 //XSEGLOG("couldn't signal port %u", p);
235                         }
236                         vio->breqs[i] = breq;
237                 }
238                 vio->breq_cnt = i;
239                 xseg_put_request(peer->xseg, vio->mreq, peer->portno);
240                 vio->mreq = NULL;
241                 if (i == 0) {
242                         printf("foo4\n");
243                         __set_vio_state(vio, CONCLUDED);
244                         free(vio->breqs);
245                         vio->breqs = NULL;
246                         fail(peer, pr);
247                         goto out_err;
248                 }
249         }
250
251 out:
252         return 0;
253
254 out_err:
255         return -1;
256 }
257
258 static int handle_serving(struct peerd *peer, struct peer_req *pr, 
259                                 struct xseg_request *req)
260 {
261         struct vlmc_io *vio = __get_vlmcio(pr);
262         struct vlmcd *vlmc = __get_vlmcd(peer);
263         struct xseg_request *breq = req;
264
265         if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
266                 fprintf(stderr, "req %lx (op: %d) failed at offset \n", req, req->op, req->offset);
267                 vio->err = 1;
268         } else {
269                 //assert breq->serviced == breq->size
270                 __sync_fetch_and_add(&pr->req->serviced, breq->serviced);
271         }
272         xseg_put_request(peer->xseg, breq, peer->portno);
273
274         if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) {
275                 __set_vio_state(vio, CONCLUDED);
276                 free(vio->breqs);
277                 vio->breqs = NULL;
278                 vio->breq_len = 0;
279                 if (vio->err)
280                         fail(peer, pr);
281                 else
282                         complete(peer, pr);
283         }
284
285         return 0;
286 }
287
288 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
289 {
290         struct vlmc_io *vio = __get_vlmcio(pr);
291         struct vlmcd *vlmc = __get_vlmcd(peer);
292
293         xlock_acquire(&vio->lock,1);
294         if (pr->req == req)
295                 __set_vio_state(vio, ACCEPTED);
296
297         enum io_state_enum state = __get_vio_state(vio);
298         switch (state) {
299                 case ACCEPTED:
300                         handle_accepted(peer, pr, req);
301                         break;
302                 case MAPPING:
303                         handle_mapping(peer, pr, req);
304                         break;
305                 case SERVING:
306                         handle_serving(peer, pr, req);
307                         break;
308                 case CONCLUDED:
309                         fprintf(stderr, "invalid state. dispatch called for concluded\n");
310                         break;
311                 default:
312                         fprintf(stderr, "wtf dude? invalid state\n");
313                         break;
314         }
315         xlock_release(&vio->lock);
316         return 0;
317 }
318
319
320 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
321 {
322         struct vlmc_io *vio;
323         struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
324         int i, j;
325
326         if (!vlmc) {
327                 perror("malloc");
328                 return -1;
329         }
330         peer->priv = (void *) vlmc;
331
332         for (i = 0; i < peer->nr_ops; i++) {
333                 vio = malloc(sizeof(struct vlmc_io));
334                 if (!vio) {
335                         break;
336                 }
337                 vio->mreq = NULL;
338                 vio->breqs = NULL;
339                 vio->breq_cnt = 0;
340                 vio->breq_len = 0;
341                 xlock_release(&vio->lock);
342                 peer->peer_reqs[i].priv = (void *) vio;
343         }
344         if (i < peer->nr_ops) {
345                 for (j = 0; j < i; j++) {
346                         free(peer->peer_reqs[i].priv);
347                 }
348                 return -1;
349         }
350
351         for (i = 0; i < argc; i++) {
352                 if (!strcmp(argv[i], "-mp") && (i+1) < argc){
353                         vlmc->mportno = atoi(argv[i+1]);
354                         i += 1;
355                         continue;
356                 }
357                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
358                         vlmc->bportno = atoi(argv[i+1]);
359                         i += 1;
360                         continue;
361                 }
362         }
363
364         const struct sched_param param = { .sched_priority = 99 };
365         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
366
367         return 0;
368 }