d6019af17f3cd263e462762b2b74df3c162b578c
[archipelago] / xseg / peers / user / mt-sosd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <peer.h>
6 #include <rados/librados.h>
7 #include <xseg/protocol.h>
8 #include <pthread.h>
9
10 #define MAX_POOL_NAME 64
11 #define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
12
13 enum rados_state {
14         ACCEPTED = 0,
15         PENDING = 1,
16         READING = 2,
17         WRITING = 3
18 };
19
20 struct radosd {
21         rados_t cluster;
22         rados_ioctx_t ioctx;
23         char pool[MAX_POOL_NAME + 1];
24 };
25
26 struct rados_io{
27         char obj_name[MAX_OBJ_NAME + 1];
28         enum rados_state state;
29         uint64_t size;
30         char *src_name, *buf;
31         uint64_t read;
32         uint64_t watch_handle;
33         pthread_t tid;
34         pthread_cond_t cond;
35         pthread_mutex_t m;
36 };
37
38 void rados_ack_cb(rados_completion_t c, void *arg)
39 {
40         struct peer_req *pr = (struct peer_req*) arg;
41         struct peerd *peer = pr->peer;
42         int ret = rados_aio_get_return_value(c);
43         pr->retval = ret;
44         rados_aio_release(c);
45         dispatch(peer, pr, pr->req, dispatch_internal);
46 }
47
48 void rados_commit_cb(rados_completion_t c, void *arg)
49 {
50         struct peer_req *pr = (struct peer_req*) arg;
51         struct peerd *peer = pr->peer;
52         int ret = rados_aio_get_return_value(c);
53         pr->retval = ret;
54         rados_aio_release(c);
55         dispatch(peer, pr, pr->req, dispatch_internal);
56 }
57
58 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
59                 char *target, char *buf, uint64_t size, uint64_t offset)
60 {
61         struct radosd *rados = (struct radosd *) peer->priv;
62         struct rados_io *rio = (struct rados_io *) pr->priv;
63         int r;
64
65         rados_completion_t rados_compl;
66         switch (op) {
67                 case X_READ:
68                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
69                         if (r < 0)
70                                 return -1;
71                         r = rados_aio_read(rados->ioctx, target, rados_compl,
72                                         buf, size, offset);
73                         break;
74                 case X_WRITE:
75                         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
76                         if (r < 0)
77                                 return -1;
78                         r = rados_aio_write(rados->ioctx, target, rados_compl,
79                                         buf, size, offset);
80                         break;
81                 case X_DELETE:
82                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
83                         if (r < 0)
84                                 return -1;
85                         r = rados_aio_remove(rados->ioctx, target, rados_compl);
86                         break;
87                 case X_INFO:
88                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
89                         if (r < 0)
90                                 return -1;
91                         r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
92                         break;
93                 default:
94                         return -1;
95                         break;
96         }
97         if (r < 0) {
98                 rados_aio_release(rados_compl);
99         }
100         return r;
101 }
102
103 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
104 {
105         struct xseg_request *req = pr->req;
106         struct rados_io *rio = (struct rados_io *) pr->priv;
107         char *data = xseg_get_data(peer->xseg, pr->req);
108
109         return do_aio_generic(peer, pr, X_READ, rio->obj_name,
110                         data + req->serviced,
111                         req->size - req->serviced,
112                         req->offset + req->serviced);
113 }
114
115 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
116 {
117         struct xseg_request *req = pr->req;
118         struct rados_io *rio = (struct rados_io *) pr->priv;
119         char *data = xseg_get_data(peer->xseg, pr->req);
120
121         return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
122                         data + req->serviced,
123                         req->size - req->serviced,
124                         req->offset + req->serviced);
125 }
126
127 int handle_delete(struct peerd *peer, struct peer_req *pr)
128 {
129         int r;
130         //struct radosd *rados = (struct radosd *) peer->priv;
131         struct rados_io *rio = (struct rados_io *) pr->priv;
132
133         if (rio->state == ACCEPTED) {
134                 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
135                 rio->state = PENDING;
136                 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
137                 if (r < 0) {
138                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
139                         fail(peer, pr);
140                 }
141         }
142         else {
143                 if (pr->retval < 0){
144                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
145                         fail(peer, pr);
146                 }
147                 else {
148                         XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
149                         complete(peer, pr);
150                 }
151         }
152         return 0;
153 }
154
155 int handle_info(struct peerd *peer, struct peer_req *pr)
156 {
157         int r;
158         struct xseg_request *req = pr->req;
159         //struct radosd *rados = (struct radosd *) peer->priv;
160         struct rados_io *rio = (struct rados_io *) pr->priv;
161         char *req_data = xseg_get_data(peer->xseg, req);
162         struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
163
164         if (rio->state == ACCEPTED) {
165                 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
166                 rio->state = PENDING;
167                 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
168                 if (r < 0) {
169                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
170                         fail(peer, pr);
171                 }
172         }
173         else {
174                 if (pr->retval < 0){
175                         xinfo->size = 0;
176                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
177                         fail(peer, pr);
178                 }
179                 else {
180                         xinfo->size = rio->size;
181                         pr->retval = sizeof(uint64_t);
182                         XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
183                         complete(peer, pr);
184                 }
185         }
186         return 0;
187 }
188
189 int handle_read(struct peerd *peer, struct peer_req *pr)
190 {
191         struct rados_io *rio = (struct rados_io *) (pr->priv);
192         struct xseg_request *req = pr->req;
193         char *data;
194         if (rio->state == ACCEPTED) {
195                 if (!req->size) {
196                         complete(peer, pr);
197                         return 0;
198                 }
199                 rio->state = READING;
200                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
201                 if (do_aio_read(peer, pr) < 0) {
202                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
203                                                 rio->obj_name);
204                         fail(peer, pr);
205                 }
206         }
207         else if (rio->state == READING) {
208                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
209                 data = xseg_get_data(peer->xseg, pr->req);
210                 if (pr->retval > 0)
211                         req->serviced += pr->retval;
212                 else if (pr->retval == 0) {
213                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
214                                 "%llu bytes. Zeroing out rest", rio->obj_name,
215                                 (unsigned long long) req->serviced);
216                         /* reached end of object. zero out rest of data
217                          * requested from this object
218                          */
219                         memset(data + req->serviced, 0, req->datalen - req->serviced);
220                         req->serviced = req->datalen;
221                 }
222                 else if (pr->retval == -2) {
223                         XSEGLOG2(&lc, I, "Reading of %s return -2. "
224                                         "Zeroing out data", rio->obj_name);
225                         /* object not found. return zeros instead */
226                         memset(data, 0, req->datalen);
227                         req->serviced = req->datalen;
228                 }
229                 else {
230                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
231                         /* pr->retval < 0 && pr->retval != -2 */
232                         fail(peer, pr);
233                         return 0;
234                 }
235                 if (req->serviced >= req->datalen) {
236                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
237                         complete(peer, pr);
238                         return 0;
239                 }
240
241                 if (!req->size) {
242                         /* should not happen */
243                         fail(peer, pr);
244                         return 0;
245                 }
246                 /* resubmit */
247                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
248                 if (do_aio_read(peer, pr) < 0) {
249                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
250                                         rio->obj_name);
251                         fail(peer, pr);
252                 }
253         }
254         else {
255                 /* should not reach this */
256                 printf("read request reached this\n");
257                 fail(peer, pr);
258         }
259         return 0;
260 }
261
262 int handle_write(struct peerd *peer, struct peer_req *pr)
263 {
264         struct rados_io *rio = (struct rados_io *) (pr->priv);
265         struct xseg_request *req = pr->req;
266         if (rio->state == ACCEPTED) {
267                 if (!req->size) {
268                         // for future use
269                         if (req->flags & XF_FLUSH) {
270                                 complete(peer, pr);
271                                 return 0;
272                         }
273                         else {
274                                 complete(peer, pr);
275                                 return 0;
276                         }
277                 }
278                 //should we ensure req->op = X_READ ?
279                 rio->state = WRITING;
280                 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
281                 if (do_aio_write(peer, pr) < 0) {
282                         XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
283                                         rio->obj_name);
284                         fail(peer, pr);
285                 }
286         }
287         else if (rio->state == WRITING) {
288                 /* rados writes return 0 if write succeeded or < 0 if failed
289                  * no resubmission occurs
290                  */
291                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
292                 if (pr->retval == 0) {
293                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
294                         req->serviced = req->datalen;
295                         complete(peer, pr);
296                         return 0;
297                 }
298                 else {
299                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
300                         fail(peer, pr);
301                         return 0;
302                 }
303         }
304         else {
305                 /* should not reach this */
306                 printf("write request reached this\n");
307                 fail(peer, pr);
308         }
309         return 0;
310 }
311
312 int handle_copy(struct peerd *peer, struct peer_req *pr)
313 {
314         //struct radosd *rados = (struct radosd *) peer->priv;
315         struct xseg_request *req = pr->req;
316         struct rados_io *rio = (struct rados_io *) pr->priv;
317         int r;
318         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
319
320         if (rio->state == ACCEPTED){
321                 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
322                                 rio->src_name, rio->obj_name);
323                 if (!req->size) {
324                         complete(peer, pr); //or fail?
325                         return 0;
326                 }
327
328                 rio->src_name = malloc(MAX_OBJ_NAME + 1);
329                 if (!rio->src_name){
330                         fail(peer, pr);
331                         return -1;
332                 }
333                 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
334                 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
335                 strncpy(rio->src_name, xcopy->target, end);
336                 rio->src_name[end] = 0;
337
338                 rio->buf = malloc(req->size);
339                 if (!rio->buf) {
340                         r = -1;
341                         goto out_src;
342                 }
343
344                 rio->state = READING;
345                 rio->read = 0;
346                 XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
347                 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
348                         req->size - rio->read, req->offset + rio->read) < 0) {
349                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
350                         fail(peer, pr);
351                         r = -1;
352                         goto out_buf;
353                 }
354         }
355         else if (rio->state == READING){
356                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
357                 if (pr->retval > 0)
358                         rio->read += pr->retval;
359                 else if (pr->retval == 0) {
360                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
361                                 "%llu bytes. Zeroing out rest", rio->obj_name,
362                                 (unsigned long long) req->serviced);
363                         memset(rio->buf + rio->read, 0, req->size - rio->read);
364                         rio->read = req->size ;
365                 }
366                 else {
367                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
368                         r = -1;
369                         goto out_buf;
370                 }
371
372                 if (rio->read >= req->size) {
373                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
374                         //do_aio_write
375                         rio->state = WRITING;
376                         XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
377                         if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
378                                         rio->buf, req->size, req->offset) < 0) {
379                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
380                                 r = -1;
381                                 goto out_buf;
382                         }
383                         return 0;
384                 }
385
386                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
387                 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
388                         req->size - rio->read, req->offset + rio->read) < 0) {
389                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
390                                         rio->obj_name);
391                         r = -1;
392                         goto out_buf;
393                 }
394         }
395         else if (rio->state == WRITING){
396                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
397                 if (pr->retval == 0) {
398                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
399                         XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
400                         req->serviced = req->size;
401                         r = 0;
402                         goto out_buf;
403                 }
404                 else {
405                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
406                         XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
407                         r = -1;
408                         goto out_buf;
409                 }
410         }
411         else {
412                 XSEGLOG2(&lc, E, "Unknown state");
413         }
414         return 0;
415
416
417 out_buf:
418         free(rio->buf);
419 out_src:
420         free(rio->src_name);
421
422         rio->buf = NULL;
423         rio->src_name = NULL;
424         rio->read = 0;
425
426         if (r < 0)
427                 fail(peer ,pr);
428         else
429                 complete(peer, pr);
430         return 0;
431 }
432
433 int spawnthread(struct peerd *peer, struct peer_req *pr,
434                         void *(*func)(void *arg))
435 {
436         //struct radosd *rados = (struct radosd *) peer->priv;
437         struct rados_io *rio = (struct rados_io *) (pr->priv);
438
439         pthread_attr_t attr;
440         pthread_attr_init(&attr);
441         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
442
443         return (pthread_create(&rio->tid, &attr, func, (void *) pr));
444 }
445
446 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
447 {
448         //assert pr valid
449         struct peer_req *pr = (struct peer_req *)arg;
450         //struct radosd *rados = (struct radosd *) pr->peer->priv;
451         struct rados_io *rio = (struct rados_io *) (pr->priv);
452
453         if (pr->req->op == X_OPEN){
454                 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
455                 pthread_cond_signal(&rio->cond);
456         }
457         else
458                 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
459 }
460
461 void * lock_op(void *arg)
462 {
463         struct peer_req *pr = (struct peer_req *)arg;
464         struct radosd *rados = (struct radosd *) pr->peer->priv;
465         struct rados_io *rio = (struct rados_io *) (pr->priv);
466
467         XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
468         if (!(pr->req->flags & XF_NOSYNC)){
469                 if (rados_watch(rados->ioctx, rio->obj_name, 0,
470                                 &rio->watch_handle, watch_cb, pr) < 0){
471                         XSEGLOG2(&lc, E, "Rados watch failed for %s",
472                                         rio->obj_name);
473                         fail(pr->peer, pr);
474                         return NULL;
475                 }
476         }
477
478         while(rados_lock(rados->ioctx, rio->obj_name) < 0){
479                 if (!(pr->req->flags & XF_NOSYNC)){
480                         XSEGLOG2(&lc, E, "Rados lock failed for %s",
481                                         rio->obj_name);
482                         fail(pr->peer, pr);
483                         return NULL;
484                 }
485                 else{
486                         XSEGLOG2(&lc, D, "rados lock for %s sleeping",
487                                         rio->obj_name);
488                         pthread_mutex_lock(&rio->m);
489                         pthread_cond_wait(&rio->cond, &rio->m);
490                         pthread_mutex_unlock(&rio->m);
491                         XSEGLOG2(&lc, D, "rados lock for %s woke up",
492                                         rio->obj_name);
493                 }
494         }
495         if (!(pr->req->flags & XF_NOSYNC)){
496                 if (rados_unwatch(rados->ioctx, rio->obj_name,
497                                         rio->watch_handle) < 0){
498                         XSEGLOG2(&lc, E, "Rados unwatch failed");
499                 }
500         }
501         XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
502         complete(pr->peer, pr);
503         return NULL;
504 }
505
506 void * unlock_op(void *arg)
507 {
508         struct peer_req *pr = (struct peer_req *)arg;
509         struct radosd *rados = (struct radosd *) pr->peer->priv;
510         struct rados_io *rio = (struct rados_io *) (pr->priv);
511         int r;
512         XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
513         r = rados_unlock(rados->ioctx, rio->obj_name);
514         if (r < 0){
515                 XSEGLOG2(&lc, E, "Rados unlock failed for %s", rio->obj_name);
516                 fail(pr->peer, pr);
517         }
518         else {
519                 if (rados_notify(rados->ioctx, rio->obj_name, 
520                                         0, NULL, 0) < 0) {
521                         XSEGLOG2(&lc, E, "rados notify failed");
522                 }
523                 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
524                 complete(pr->peer, pr);
525         }
526         return NULL;
527 }
528
529 int handle_open(struct peerd *peer, struct peer_req *pr)
530 {
531         int r = spawnthread(peer, pr, lock_op);
532         if (r < 0)
533                 fail(pr->peer, pr);
534         return 0;
535 }
536
537
538 int handle_close(struct peerd *peer, struct peer_req *pr)
539 {
540         int r = spawnthread(peer, pr, unlock_op);
541         if (r < 0)
542                 fail(pr->peer, pr);
543         return 0;
544 }
545
546 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
547 {
548         int i, j;
549         struct radosd *rados = malloc(sizeof(struct radosd));
550         struct rados_io *rio;
551         if (!rados) {
552                 perror("malloc");
553                 return -1;
554         }
555         rados->pool[0] = 0;
556         for (i = 0; i < argc; i++) {
557                 if (!strcmp(argv[i], "--pool") && (i+1) < argc){
558                         strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
559                         rados->pool[MAX_POOL_NAME] = 0;
560                         i += 1;
561                         continue;
562                 }
563         }
564         if (!rados->pool[0]){
565                 XSEGLOG2(&lc, E , "Pool must be provided");
566                 free(rados);
567                 return -1;
568         }
569
570         if (rados_create(&rados->cluster, NULL) < 0) {
571                 XSEGLOG2(&lc, E, "Rados create failed!");
572                 return -1;
573         }
574         if (rados_conf_read_file(rados->cluster, NULL) < 0){
575                 XSEGLOG2(&lc, E, "Error reading rados conf files!");
576                 return -1;
577         }
578         if (rados_connect(rados->cluster) < 0) {
579                 XSEGLOG2(&lc, E, "Rados connect failed!");
580                 rados_shutdown(rados->cluster);
581                 free(rados);
582                 return 0;
583         }
584         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
585                 XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
586                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
587                         XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
588                         rados_shutdown(rados->cluster);
589                         free(rados);
590                         return -1;
591                 }
592                 XSEGLOG2(&lc, I, "Pool created.");
593         }
594         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
595                 XSEGLOG2(&lc, E, "ioctx create problem.");
596                 rados_shutdown(rados->cluster);
597                 free(rados);
598                 return -1;
599         }
600         peer->priv = (void *) rados;
601         for (i = 0; i < peer->nr_ops; i++) {
602                 rio = malloc(sizeof(struct rados_io));
603                 if (!rio) {
604                         //ugly
605                         //is this really necessary?
606                         for (j = 0; j < i; j++) {
607                                 free(peer->peer_reqs[j].priv);
608                         }
609                         free(rados);
610                         perror("malloc");
611                         return -1;
612                 }
613                 rio->buf = 0;
614                 rio->read = 0;
615                 rio->size = 0;
616                 rio->src_name = 0;
617                 rio->watch_handle = 0;
618                 pthread_cond_init(&rio->cond, NULL);
619                 pthread_mutex_init(&rio->m, NULL);
620                 peer->peer_reqs[i].priv = (void *) rio;
621         }
622         return 0;
623 }
624
625 // nothing to do here for now
626 int custom_arg_parse(int argc, const char *argv[])
627 {
628         return 0;
629 }
630
631 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
632                 enum dispatch_reason reason)
633 {
634         struct rados_io *rio = (struct rados_io *) (pr->priv);
635         char *target = xseg_get_target(peer->xseg, pr->req);
636         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
637         strncpy(rio->obj_name, target, end);
638         rio->obj_name[end] = 0;
639         //log_pr("dispatch", pr);
640         if (reason == dispatch_accept)
641                 rio->state = ACCEPTED;
642
643         switch (pr->req->op){
644                 case X_READ:
645                         handle_read(peer, pr); break;
646                 case X_WRITE: 
647                         handle_write(peer, pr); break;
648                 case X_DELETE:
649                         if (canDefer(peer))
650                                 defer_request(peer, pr);
651                         else
652                                 handle_delete(peer, pr);
653                         break;
654                 case X_INFO:
655                         if (canDefer(peer))
656                                 defer_request(peer, pr);
657                         else
658                                 handle_info(peer, pr);
659                         break;
660                 case X_COPY:
661                         if (canDefer(peer))
662                                 defer_request(peer, pr);
663                         else
664                                 handle_copy(peer, pr);
665                         break;
666                 case X_OPEN:
667                         handle_open(peer, pr); break;
668                 case X_CLOSE:
669                         handle_close(peer, pr); break;
670
671                 default:
672                         fail(peer, pr);
673         }
674         return 0;
675 }