add improved argument parsing. also add helper messages
[archipelago] / xseg / peers / user / mt-vlmcd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <xseg/protocol.h>
6 #include <speer.h>
7 #include <sched.h>
8 #include <sys/syscall.h>
9
10 enum io_state_enum {
11         ACCEPTED = 0,
12         MAPPING = 1,
13         SERVING = 2,
14         CONCLUDED = 3
15 };
16
17 struct vlmcd {
18         xport mportno;
19         xport bportno;
20 };
21
22 struct vlmc_io {
23         int err;
24         struct xlock lock;
25         volatile enum io_state_enum state;
26         struct xseg_request *mreq;
27         struct xseg_request **breqs;
28         unsigned long breq_len, breq_cnt;
29 };
30
31 void custom_peer_usage()
32 {
33         fprintf(stderr, "Custom peer options: \n"
34                         "-mp : mapper port\n"
35                         "-bp : blocker port for blocks\n"
36                         "\n");
37 }
38
39 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
40 {
41 //      xlock_acquire(&vio->lock, 1);
42         vio->state = state;
43 //      xlock_release(&vio->lock);
44 }
45
46 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
47 {
48         enum io_state_enum state;
49 //      xlock_acquire(&vio->lock, 1);
50         state = vio->state;
51 //      xlock_release(&vio->lock);
52         return state;
53 }
54
55 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
56 {
57         return (struct vlmc_io *) pr->priv;
58 }
59
60 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
61 {
62         return (struct vlmcd *) peer->priv;
63 }
64
65 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
66                                 struct xseg_request *req)
67 {
68         struct vlmc_io *vio = __get_vlmcio(pr);
69         struct vlmcd *vlmc = __get_vlmcd(peer);
70         int r;
71         xport p;
72         char *target, *mtarget;
73         void *dummy;
74
75         if (pr->req->op == X_WRITE && !req->size && (pr->req->flags & (XF_FLUSH|XF_FUA))){
76                 //hanlde flush requests here, so we don't mess with mapper
77                 //because of the -1 offset
78                 fprintf(stderr, "completing flush request\n");
79                 pr->req->serviced = pr->req->size;
80                 __set_vio_state(vio, CONCLUDED);
81                 complete(peer, pr);
82                 return 0;               
83         }
84         vio->err = 0; //reset error state
85         vio->mreq = xseg_get_request(peer->xseg, pr->portno, 
86                                         vlmc->mportno, X_ALLOC);
87         if (!vio->mreq)
88                 goto out_err;
89
90         /* use dalalen 0. let mapper allocate buffer space as needed */
91         r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0); 
92         if (r < 0) {
93                 goto out_put;
94         }
95         target = xseg_get_target(peer->xseg, pr->req);
96         if (!target)
97                 goto out_put;
98         mtarget = xseg_get_target(peer->xseg, vio->mreq);
99         if (!mtarget)
100                 goto out_put;
101
102         strncpy(mtarget, target, pr->req->targetlen);
103         vio->mreq->size = pr->req->size;
104         vio->mreq->offset = pr->req->offset;
105         vio->mreq->flags = 0;
106         switch (pr->req->op) {
107                 case X_READ: vio->mreq->op = X_MAPR; break;
108                 case X_WRITE: vio->mreq->op = X_MAPW; break;
109                 case X_INFO: vio->mreq->op = X_INFO; break;
110                 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
111                 default: goto out_put;
112         }
113         xseg_set_req_data(peer->xseg, vio->mreq, pr);
114         __set_vio_state(vio, MAPPING);
115         p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
116         if (p == NoPort)
117                 goto out_unset;
118         r = xseg_signal(peer->xseg, p);
119         if (r < 0) {
120                 /* since submission is successful, just print a warning message */
121                 fprintf(stderr, "couldnt signal port %u", p);
122         }
123
124         return 0;
125
126 out_unset:
127         xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
128 out_put:
129         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
130 out_err:
131         __set_vio_state(vio, CONCLUDED);
132         fail(peer, pr);
133         return -1;
134 }
135
136 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
137                                 struct xseg_request *req)
138 {
139         struct vlmc_io *vio = __get_vlmcio(pr);
140         struct vlmcd *vlmc = __get_vlmcd(peer);
141         uint64_t pos, datalen, offset;
142         uint32_t targetlen;
143         struct xseg_request *breq;
144         char *target;
145         int i,r;
146         xport p;
147         
148         //assert vio>mreq == req 
149         if (vio->mreq != req){
150                 printf("vio->mreq %lx, req: %lx state: %d breq[0]: %lx\n", vio->mreq, req, vio->state, vio->breqs[0]);
151                 r = *(volatile int *)0;
152                 return -1;
153         }
154         /* FIXME shouldn's XS_FAILED be sufficient ?? */
155         if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){
156                 fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op);
157                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
158                 vio->mreq = NULL;
159                 __set_vio_state(vio, CONCLUDED);
160                 fail(peer, pr);
161         } else if (vio->mreq->op == X_INFO) {
162                 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
163                 char *data = xseg_get_data(peer->xseg, pr->req);
164                 *(uint64_t *)data = xinfo->size;
165                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
166                 vio->mreq = NULL;
167                 __set_vio_state(vio, CONCLUDED);
168                 complete(peer, pr);
169         } else if (vio->mreq->op == X_CLOSE) {
170                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
171                 vio->mreq = NULL;
172                 __set_vio_state(vio, CONCLUDED);
173                 complete(peer, pr);
174         } else {
175                 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
176                 if (!mreply->cnt){
177                         printf("foo2\n");
178                         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
179                         vio->mreq = NULL;
180                         __set_vio_state(vio, CONCLUDED);
181                         fail(peer, pr);
182                         goto out;
183                 }
184                 vio->breq_len = mreply->cnt;
185                 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
186                 if (!vio->breqs) {
187                         printf("foo3\n");
188                         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
189                         vio->mreq = NULL;
190                         __set_vio_state(vio, CONCLUDED);
191                         fail(peer, pr);
192                         goto out_err;
193                 }
194                 pos = 0;
195                 __set_vio_state(vio, SERVING); 
196                 for (i = 0; i < vio->breq_len; i++) {
197                         datalen = mreply->segs[i].size;
198                         offset = mreply->segs[i].offset;
199                         targetlen = mreply->segs[i].targetlen;
200                         breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
201                         if (!breq) {
202                                 vio->err = 1;
203                                 break;
204                         }
205                         r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
206                         if (r < 0) {
207                                 vio->err = 1;
208                                 xseg_put_request(peer->xseg, breq, pr->portno);
209                                 break;
210                         }
211                         breq->offset = offset;
212                         breq->size = datalen;
213                         breq->op = pr->req->op;
214                         target = xseg_get_target(peer->xseg, breq);
215                         if (!target) {
216                                 vio->err = 1;
217                                 xseg_put_request(peer->xseg, breq, pr->portno);
218                                 break;
219                         }
220                         strncpy(target, mreply->segs[i].target, targetlen);
221                         r = xseg_set_req_data(peer->xseg, breq, pr);
222                         if (r<0) {
223                                 vio->err = 1;
224                                 xseg_put_request(peer->xseg, breq, pr->portno);
225                                 break;
226                         }
227
228                         // this should work, right ?
229                         breq->data = pr->req->data + pos;
230                         pos += datalen;
231                         p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
232                         if (p == NoPort){
233                                 void *dummy;
234                                 vio->err = 1;
235                                 xseg_get_req_data(peer->xseg, breq, &dummy);
236                                 xseg_put_request(peer->xseg, breq, pr->portno);
237                                 break;
238                         }
239                         r = xseg_signal(peer->xseg, p);
240                         if (r < 0){
241                                 //XSEGLOG("couldn't signal port %u", p);
242                         }
243                         vio->breqs[i] = breq;
244                 }
245                 vio->breq_cnt = i;
246                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
247                 vio->mreq = NULL;
248                 if (i == 0) {
249                         printf("foo4\n");
250                         __set_vio_state(vio, CONCLUDED);
251                         free(vio->breqs);
252                         vio->breqs = NULL;
253                         fail(peer, pr);
254                         goto out_err;
255                 }
256         }
257
258 out:
259         return 0;
260
261 out_err:
262         return -1;
263 }
264
265 static int handle_serving(struct peerd *peer, struct peer_req *pr, 
266                                 struct xseg_request *req)
267 {
268         struct vlmc_io *vio = __get_vlmcio(pr);
269         struct vlmcd *vlmc = __get_vlmcd(peer);
270         struct xseg_request *breq = req;
271
272         if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
273                 fprintf(stderr, "req %lx (op: %d) failed at offset \n", req, req->op, req->offset);
274                 vio->err = 1;
275         } else {
276                 //assert breq->serviced == breq->size
277                 __sync_fetch_and_add(&pr->req->serviced, breq->serviced);
278         }
279         xseg_put_request(peer->xseg, breq, pr->portno);
280
281         if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) {
282                 __set_vio_state(vio, CONCLUDED);
283                 free(vio->breqs);
284                 vio->breqs = NULL;
285                 vio->breq_len = 0;
286                 if (vio->err)
287                         fail(peer, pr);
288                 else
289                         complete(peer, pr);
290         }
291
292         return 0;
293 }
294
295 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
296                 enum dispatch_reason reason)
297 {
298         struct vlmc_io *vio = __get_vlmcio(pr);
299         struct vlmcd *vlmc = __get_vlmcd(peer);
300
301         xlock_acquire(&vio->lock,1);
302         if (pr->req == req)
303                 __set_vio_state(vio, ACCEPTED);
304
305         enum io_state_enum state = __get_vio_state(vio);
306         switch (state) {
307                 case ACCEPTED:
308                         handle_accepted(peer, pr, req);
309                         break;
310                 case MAPPING:
311                         handle_mapping(peer, pr, req);
312                         break;
313                 case SERVING:
314                         handle_serving(peer, pr, req);
315                         break;
316                 case CONCLUDED:
317                         fprintf(stderr, "invalid state. dispatch called for concluded\n");
318                         break;
319                 default:
320                         fprintf(stderr, "wtf dude? invalid state\n");
321                         break;
322         }
323         xlock_release(&vio->lock);
324         return 0;
325 }
326
327
328 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
329 {
330         struct vlmc_io *vio;
331         struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
332         int i, j;
333
334         if (!vlmc) {
335                 perror("malloc");
336                 return -1;
337         }
338         peer->priv = (void *) vlmc;
339
340         for (i = 0; i < peer->nr_ops; i++) {
341                 vio = malloc(sizeof(struct vlmc_io));
342                 if (!vio) {
343                         break;
344                 }
345                 vio->mreq = NULL;
346                 vio->breqs = NULL;
347                 vio->breq_cnt = 0;
348                 vio->breq_len = 0;
349                 xlock_release(&vio->lock);
350                 peer->peer_reqs[i].priv = (void *) vio;
351         }
352         if (i < peer->nr_ops) {
353                 for (j = 0; j < i; j++) {
354                         free(peer->peer_reqs[i].priv);
355                 }
356                 return -1;
357         }
358
359         for (i = 0; i < argc; i++) {
360                 if (!strcmp(argv[i], "-mp") && (i+1) < argc){
361                         vlmc->mportno = atoi(argv[i+1]);
362                         i += 1;
363                         continue;
364                 }
365                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
366                         vlmc->bportno = atoi(argv[i+1]);
367                         i += 1;
368                         continue;
369                 }
370         }
371
372         const struct sched_param param = { .sched_priority = 99 };
373         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
374
375         return 0;
376 }