rewrite mt-mapperd based on st threads
[archipelago] / xseg / peers / user / mt-vlmcd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <xseg/protocol.h>
6 #include <speer.h>
7 #include <sched.h>
8 #include <sys/syscall.h>
9
10 enum io_state_enum {
11         ACCEPTED = 0,
12         MAPPING = 1,
13         SERVING = 2,
14         CONCLUDED = 3
15 };
16
17 struct vlmcd {
18         xport mportno;
19         xport bportno;
20 };
21
22 struct vlmc_io {
23         int err;
24         struct xlock lock;
25         volatile enum io_state_enum state;
26         struct xseg_request *mreq;
27         struct xseg_request **breqs;
28         unsigned long breq_len, breq_cnt;
29 };
30
31 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
32 {
33 //      xlock_acquire(&vio->lock, 1);
34         vio->state = state;
35 //      xlock_release(&vio->lock);
36 }
37
38 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
39 {
40         enum io_state_enum state;
41 //      xlock_acquire(&vio->lock, 1);
42         state = vio->state;
43 //      xlock_release(&vio->lock);
44         return state;
45 }
46
47 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
48 {
49         return (struct vlmc_io *) pr->priv;
50 }
51
52 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
53 {
54         return (struct vlmcd *) peer->priv;
55 }
56
57 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
58                                 struct xseg_request *req)
59 {
60         struct vlmc_io *vio = __get_vlmcio(pr);
61         struct vlmcd *vlmc = __get_vlmcd(peer);
62         int r;
63         xport p;
64         char *target, *mtarget;
65         void *dummy;
66
67         if (pr->req->op == X_WRITE && !req->size && (pr->req->flags & (XF_FLUSH|XF_FUA))){
68                 //hanlde flush requests here, so we don't mess with mapper
69                 //because of the -1 offset
70                 fprintf(stderr, "completing flush request\n");
71                 pr->req->serviced = pr->req->size;
72                 __set_vio_state(vio, CONCLUDED);
73                 complete(peer, pr);
74                 return 0;               
75         }
76         vio->err = 0; //reset error state
77         vio->mreq = xseg_get_request(peer->xseg, pr->portno, 
78                                         vlmc->mportno, X_ALLOC);
79         if (!vio->mreq)
80                 goto out_err;
81
82         /* use dalalen 0. let mapper allocate buffer space as needed */
83         r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0); 
84         if (r < 0) {
85                 goto out_put;
86         }
87         target = xseg_get_target(peer->xseg, pr->req);
88         if (!target)
89                 goto out_put;
90         mtarget = xseg_get_target(peer->xseg, vio->mreq);
91         if (!mtarget)
92                 goto out_put;
93
94         strncpy(mtarget, target, pr->req->targetlen);
95         vio->mreq->size = pr->req->size;
96         vio->mreq->offset = pr->req->offset;
97         vio->mreq->flags = 0;
98         switch (pr->req->op) {
99                 case X_READ: vio->mreq->op = X_MAPR; break;
100                 case X_WRITE: vio->mreq->op = X_MAPW; break;
101                 case X_INFO: vio->mreq->op = X_INFO; break;
102                 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
103                 default: goto out_put;
104         }
105         xseg_set_req_data(peer->xseg, vio->mreq, pr);
106         __set_vio_state(vio, MAPPING);
107         p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
108         if (p == NoPort)
109                 goto out_unset;
110         r = xseg_signal(peer->xseg, p);
111         if (r < 0) {
112                 /* since submission is successful, just print a warning message */
113                 fprintf(stderr, "couldnt signal port %u", p);
114         }
115
116         return 0;
117
118 out_unset:
119         xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
120 out_put:
121         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
122 out_err:
123         __set_vio_state(vio, CONCLUDED);
124         fail(peer, pr);
125         return -1;
126 }
127
128 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
129                                 struct xseg_request *req)
130 {
131         struct vlmc_io *vio = __get_vlmcio(pr);
132         struct vlmcd *vlmc = __get_vlmcd(peer);
133         uint64_t pos, datalen, offset;
134         uint32_t targetlen;
135         struct xseg_request *breq;
136         char *target;
137         int i,r;
138         xport p;
139         
140         //assert vio>mreq == req 
141         if (vio->mreq != req){
142                 printf("vio->mreq %lx, req: %lx state: %d breq[0]: %lx\n", vio->mreq, req, vio->state, vio->breqs[0]);
143                 r = *(volatile int *)0;
144                 return -1;
145         }
146         /* FIXME shouldn's XS_FAILED be sufficient ?? */
147         if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){
148                 fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op);
149                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
150                 vio->mreq = NULL;
151                 __set_vio_state(vio, CONCLUDED);
152                 fail(peer, pr);
153         } else if (vio->mreq->op == X_INFO) {
154                 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
155                 char *data = xseg_get_data(peer->xseg, pr->req);
156                 *(uint64_t *)data = xinfo->size;
157                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
158                 vio->mreq = NULL;
159                 __set_vio_state(vio, CONCLUDED);
160                 complete(peer, pr);
161         } else if (vio->mreq->op == X_CLOSE) {
162                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
163                 vio->mreq = NULL;
164                 __set_vio_state(vio, CONCLUDED);
165                 complete(peer, pr);
166         } else {
167                 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
168                 if (!mreply->cnt){
169                         printf("foo2\n");
170                         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
171                         vio->mreq = NULL;
172                         __set_vio_state(vio, CONCLUDED);
173                         fail(peer, pr);
174                         goto out;
175                 }
176                 vio->breq_len = mreply->cnt;
177                 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
178                 if (!vio->breqs) {
179                         printf("foo3\n");
180                         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
181                         vio->mreq = NULL;
182                         __set_vio_state(vio, CONCLUDED);
183                         fail(peer, pr);
184                         goto out_err;
185                 }
186                 pos = 0;
187                 __set_vio_state(vio, SERVING); 
188                 for (i = 0; i < vio->breq_len; i++) {
189                         datalen = mreply->segs[i].size;
190                         offset = mreply->segs[i].offset;
191                         targetlen = mreply->segs[i].targetlen;
192                         breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
193                         if (!breq) {
194                                 vio->err = 1;
195                                 break;
196                         }
197                         r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
198                         if (r < 0) {
199                                 vio->err = 1;
200                                 xseg_put_request(peer->xseg, breq, pr->portno);
201                                 break;
202                         }
203                         breq->offset = offset;
204                         breq->size = datalen;
205                         breq->op = pr->req->op;
206                         target = xseg_get_target(peer->xseg, breq);
207                         if (!target) {
208                                 vio->err = 1;
209                                 xseg_put_request(peer->xseg, breq, pr->portno);
210                                 break;
211                         }
212                         strncpy(target, mreply->segs[i].target, targetlen);
213                         r = xseg_set_req_data(peer->xseg, breq, pr);
214                         if (r<0) {
215                                 vio->err = 1;
216                                 xseg_put_request(peer->xseg, breq, pr->portno);
217                                 break;
218                         }
219
220                         // this should work, right ?
221                         breq->data = pr->req->data + pos;
222                         pos += datalen;
223                         p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
224                         if (p == NoPort){
225                                 void *dummy;
226                                 vio->err = 1;
227                                 xseg_get_req_data(peer->xseg, breq, &dummy);
228                                 xseg_put_request(peer->xseg, breq, pr->portno);
229                                 break;
230                         }
231                         r = xseg_signal(peer->xseg, p);
232                         if (r < 0){
233                                 //XSEGLOG("couldn't signal port %u", p);
234                         }
235                         vio->breqs[i] = breq;
236                 }
237                 vio->breq_cnt = i;
238                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
239                 vio->mreq = NULL;
240                 if (i == 0) {
241                         printf("foo4\n");
242                         __set_vio_state(vio, CONCLUDED);
243                         free(vio->breqs);
244                         vio->breqs = NULL;
245                         fail(peer, pr);
246                         goto out_err;
247                 }
248         }
249
250 out:
251         return 0;
252
253 out_err:
254         return -1;
255 }
256
257 static int handle_serving(struct peerd *peer, struct peer_req *pr, 
258                                 struct xseg_request *req)
259 {
260         struct vlmc_io *vio = __get_vlmcio(pr);
261         struct vlmcd *vlmc = __get_vlmcd(peer);
262         struct xseg_request *breq = req;
263
264         if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
265                 fprintf(stderr, "req %lx (op: %d) failed at offset \n", req, req->op, req->offset);
266                 vio->err = 1;
267         } else {
268                 //assert breq->serviced == breq->size
269                 __sync_fetch_and_add(&pr->req->serviced, breq->serviced);
270         }
271         xseg_put_request(peer->xseg, breq, pr->portno);
272
273         if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) {
274                 __set_vio_state(vio, CONCLUDED);
275                 free(vio->breqs);
276                 vio->breqs = NULL;
277                 vio->breq_len = 0;
278                 if (vio->err)
279                         fail(peer, pr);
280                 else
281                         complete(peer, pr);
282         }
283
284         return 0;
285 }
286
287 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
288                 enum dispatch_reason reason)
289 {
290         struct vlmc_io *vio = __get_vlmcio(pr);
291         struct vlmcd *vlmc = __get_vlmcd(peer);
292
293         xlock_acquire(&vio->lock,1);
294         if (pr->req == req)
295                 __set_vio_state(vio, ACCEPTED);
296
297         enum io_state_enum state = __get_vio_state(vio);
298         switch (state) {
299                 case ACCEPTED:
300                         handle_accepted(peer, pr, req);
301                         break;
302                 case MAPPING:
303                         handle_mapping(peer, pr, req);
304                         break;
305                 case SERVING:
306                         handle_serving(peer, pr, req);
307                         break;
308                 case CONCLUDED:
309                         fprintf(stderr, "invalid state. dispatch called for concluded\n");
310                         break;
311                 default:
312                         fprintf(stderr, "wtf dude? invalid state\n");
313                         break;
314         }
315         xlock_release(&vio->lock);
316         return 0;
317 }
318
319
320 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
321 {
322         struct vlmc_io *vio;
323         struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
324         int i, j;
325
326         if (!vlmc) {
327                 perror("malloc");
328                 return -1;
329         }
330         peer->priv = (void *) vlmc;
331
332         for (i = 0; i < peer->nr_ops; i++) {
333                 vio = malloc(sizeof(struct vlmc_io));
334                 if (!vio) {
335                         break;
336                 }
337                 vio->mreq = NULL;
338                 vio->breqs = NULL;
339                 vio->breq_cnt = 0;
340                 vio->breq_len = 0;
341                 xlock_release(&vio->lock);
342                 peer->peer_reqs[i].priv = (void *) vio;
343         }
344         if (i < peer->nr_ops) {
345                 for (j = 0; j < i; j++) {
346                         free(peer->peer_reqs[i].priv);
347                 }
348                 return -1;
349         }
350
351         for (i = 0; i < argc; i++) {
352                 if (!strcmp(argv[i], "-mp") && (i+1) < argc){
353                         vlmc->mportno = atoi(argv[i+1]);
354                         i += 1;
355                         continue;
356                 }
357                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
358                         vlmc->bportno = atoi(argv[i+1]);
359                         i += 1;
360                         continue;
361                 }
362         }
363
364         const struct sched_param param = { .sched_priority = 99 };
365         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
366
367         return 0;
368 }