Merge branch 'xseg-refactor' into debian
[archipelago] / xseg / peers / user / mt-sosd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <peer.h>
6 #include <rados/librados.h>
7 #include <xseg/protocol.h>
8 #include <pthread.h>
9
10 #define MAX_POOL_NAME 64
11 #define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
12 #define RADOS_LOCK_NAME "RadosLock"
13 //#define RADOS_LOCK_COOKIE "Cookie"
14 #define RADOS_LOCK_COOKIE "foo"
15
16 void custom_peer_usage()
17 {
18         fprintf(stderr, "Custom peer options:\n"
19                 "--pool: Rados pool to connect\n"
20                 "\n");
21 }
22
23 enum rados_state {
24         ACCEPTED = 0,
25         PENDING = 1,
26         READING = 2,
27         WRITING = 3
28 };
29
30 struct radosd {
31         rados_t cluster;
32         rados_ioctx_t ioctx;
33         char pool[MAX_POOL_NAME + 1];
34 };
35
36 struct rados_io{
37         char obj_name[MAX_OBJ_NAME + 1];
38         enum rados_state state;
39         uint64_t size;
40         char *src_name, *buf;
41         uint64_t read;
42         uint64_t watch_handle;
43         pthread_t tid;
44         pthread_cond_t cond;
45         pthread_mutex_t m;
46 };
47
48 void rados_ack_cb(rados_completion_t c, void *arg)
49 {
50         struct peer_req *pr = (struct peer_req*) arg;
51         struct peerd *peer = pr->peer;
52         int ret = rados_aio_get_return_value(c);
53         pr->retval = ret;
54         rados_aio_release(c);
55         dispatch(peer, pr, pr->req, dispatch_internal);
56 }
57
58 void rados_commit_cb(rados_completion_t c, void *arg)
59 {
60         struct peer_req *pr = (struct peer_req*) arg;
61         struct peerd *peer = pr->peer;
62         int ret = rados_aio_get_return_value(c);
63         pr->retval = ret;
64         rados_aio_release(c);
65         dispatch(peer, pr, pr->req, dispatch_internal);
66 }
67
68 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
69                 char *target, char *buf, uint64_t size, uint64_t offset)
70 {
71         struct radosd *rados = (struct radosd *) peer->priv;
72         struct rados_io *rio = (struct rados_io *) pr->priv;
73         int r;
74
75         rados_completion_t rados_compl;
76         switch (op) {
77                 case X_READ:
78                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
79                         if (r < 0)
80                                 return -1;
81                         r = rados_aio_read(rados->ioctx, target, rados_compl,
82                                         buf, size, offset);
83                         break;
84                 case X_WRITE:
85                         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
86                         if (r < 0)
87                                 return -1;
88                         r = rados_aio_write(rados->ioctx, target, rados_compl,
89                                         buf, size, offset);
90                         break;
91                 case X_DELETE:
92                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
93                         if (r < 0)
94                                 return -1;
95                         r = rados_aio_remove(rados->ioctx, target, rados_compl);
96                         break;
97                 case X_INFO:
98                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
99                         if (r < 0)
100                                 return -1;
101                         r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
102                         break;
103                 default:
104                         return -1;
105                         break;
106         }
107         if (r < 0) {
108                 rados_aio_release(rados_compl);
109         }
110         return r;
111 }
112
113 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
114 {
115         struct xseg_request *req = pr->req;
116         struct rados_io *rio = (struct rados_io *) pr->priv;
117         char *data = xseg_get_data(peer->xseg, pr->req);
118
119         return do_aio_generic(peer, pr, X_READ, rio->obj_name,
120                         data + req->serviced,
121                         req->size - req->serviced,
122                         req->offset + req->serviced);
123 }
124
125 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
126 {
127         struct xseg_request *req = pr->req;
128         struct rados_io *rio = (struct rados_io *) pr->priv;
129         char *data = xseg_get_data(peer->xseg, pr->req);
130
131         return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
132                         data + req->serviced,
133                         req->size - req->serviced,
134                         req->offset + req->serviced);
135 }
136
137 int handle_delete(struct peerd *peer, struct peer_req *pr)
138 {
139         int r;
140         //struct radosd *rados = (struct radosd *) peer->priv;
141         struct rados_io *rio = (struct rados_io *) pr->priv;
142
143         if (rio->state == ACCEPTED) {
144                 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
145                 rio->state = PENDING;
146                 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
147                 if (r < 0) {
148                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
149                         fail(peer, pr);
150                 }
151         }
152         else {
153                 if (pr->retval < 0){
154                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
155                         fail(peer, pr);
156                 }
157                 else {
158                         XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
159                         complete(peer, pr);
160                 }
161         }
162         return 0;
163 }
164
165 int handle_info(struct peerd *peer, struct peer_req *pr)
166 {
167         int r;
168         struct xseg_request *req = pr->req;
169         //struct radosd *rados = (struct radosd *) peer->priv;
170         struct rados_io *rio = (struct rados_io *) pr->priv;
171         char *req_data = xseg_get_data(peer->xseg, req);
172         struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
173
174         if (rio->state == ACCEPTED) {
175                 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
176                 rio->state = PENDING;
177                 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
178                 if (r < 0) {
179                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
180                         fail(peer, pr);
181                 }
182         }
183         else {
184                 if (pr->retval < 0){
185                         xinfo->size = 0;
186                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
187                         fail(peer, pr);
188                 }
189                 else {
190                         xinfo->size = rio->size;
191                         pr->retval = sizeof(uint64_t);
192                         XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
193                         complete(peer, pr);
194                 }
195         }
196         return 0;
197 }
198
199 int handle_read(struct peerd *peer, struct peer_req *pr)
200 {
201         struct rados_io *rio = (struct rados_io *) (pr->priv);
202         struct xseg_request *req = pr->req;
203         char *data;
204         if (rio->state == ACCEPTED) {
205                 if (!req->size) {
206                         complete(peer, pr);
207                         return 0;
208                 }
209                 rio->state = READING;
210                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
211                 if (do_aio_read(peer, pr) < 0) {
212                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
213                                                 rio->obj_name);
214                         fail(peer, pr);
215                 }
216         }
217         else if (rio->state == READING) {
218                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
219                 data = xseg_get_data(peer->xseg, pr->req);
220                 if (pr->retval > 0)
221                         req->serviced += pr->retval;
222                 else if (pr->retval == 0) {
223                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
224                                 "%llu bytes. Zeroing out rest", rio->obj_name,
225                                 (unsigned long long) req->serviced);
226                         /* reached end of object. zero out rest of data
227                          * requested from this object
228                          */
229                         memset(data + req->serviced, 0, req->datalen - req->serviced);
230                         req->serviced = req->datalen;
231                 }
232                 else if (pr->retval == -2) {
233                         XSEGLOG2(&lc, I, "Reading of %s return -2. "
234                                         "Zeroing out data", rio->obj_name);
235                         /* object not found. return zeros instead */
236                         memset(data, 0, req->datalen);
237                         req->serviced = req->datalen;
238                 }
239                 else {
240                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
241                         /* pr->retval < 0 && pr->retval != -2 */
242                         fail(peer, pr);
243                         return 0;
244                 }
245                 if (req->serviced >= req->datalen) {
246                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
247                         complete(peer, pr);
248                         return 0;
249                 }
250
251                 if (!req->size) {
252                         /* should not happen */
253                         fail(peer, pr);
254                         return 0;
255                 }
256                 /* resubmit */
257                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
258                 if (do_aio_read(peer, pr) < 0) {
259                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
260                                         rio->obj_name);
261                         fail(peer, pr);
262                 }
263         }
264         else {
265                 /* should not reach this */
266                 printf("read request reached this\n");
267                 fail(peer, pr);
268         }
269         return 0;
270 }
271
272 int handle_write(struct peerd *peer, struct peer_req *pr)
273 {
274         struct rados_io *rio = (struct rados_io *) (pr->priv);
275         struct xseg_request *req = pr->req;
276         if (rio->state == ACCEPTED) {
277                 if (!req->size) {
278                         // for future use
279                         if (req->flags & XF_FLUSH) {
280                                 complete(peer, pr);
281                                 return 0;
282                         }
283                         else {
284                                 complete(peer, pr);
285                                 return 0;
286                         }
287                 }
288                 //should we ensure req->op = X_READ ?
289                 rio->state = WRITING;
290                 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
291                 if (do_aio_write(peer, pr) < 0) {
292                         XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
293                                         rio->obj_name);
294                         fail(peer, pr);
295                 }
296         }
297         else if (rio->state == WRITING) {
298                 /* rados writes return 0 if write succeeded or < 0 if failed
299                  * no resubmission occurs
300                  */
301                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
302                 if (pr->retval == 0) {
303                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
304                         req->serviced = req->datalen;
305                         complete(peer, pr);
306                         return 0;
307                 }
308                 else {
309                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
310                         fail(peer, pr);
311                         return 0;
312                 }
313         }
314         else {
315                 /* should not reach this */
316                 printf("write request reached this\n");
317                 fail(peer, pr);
318         }
319         return 0;
320 }
321
322 int handle_copy(struct peerd *peer, struct peer_req *pr)
323 {
324         //struct radosd *rados = (struct radosd *) peer->priv;
325         struct xseg_request *req = pr->req;
326         struct rados_io *rio = (struct rados_io *) pr->priv;
327         int r;
328         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
329
330         if (rio->state == ACCEPTED){
331                 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
332                                 rio->src_name, rio->obj_name);
333                 if (!req->size) {
334                         complete(peer, pr); //or fail?
335                         return 0;
336                 }
337
338                 rio->src_name = malloc(MAX_OBJ_NAME + 1);
339                 if (!rio->src_name){
340                         fail(peer, pr);
341                         return -1;
342                 }
343                 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
344                 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
345                 strncpy(rio->src_name, xcopy->target, end);
346                 rio->src_name[end] = 0;
347
348                 rio->buf = malloc(req->size);
349                 if (!rio->buf) {
350                         r = -1;
351                         goto out_src;
352                 }
353
354                 rio->state = READING;
355                 rio->read = 0;
356                 XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
357                 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
358                         req->size - rio->read, req->offset + rio->read) < 0) {
359                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
360                         fail(peer, pr);
361                         r = -1;
362                         goto out_buf;
363                 }
364         }
365         else if (rio->state == READING){
366                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
367                 if (pr->retval > 0)
368                         rio->read += pr->retval;
369                 else if (pr->retval == 0) {
370                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
371                                 "%llu bytes. Zeroing out rest", rio->obj_name,
372                                 (unsigned long long) req->serviced);
373                         memset(rio->buf + rio->read, 0, req->size - rio->read);
374                         rio->read = req->size ;
375                 }
376                 else {
377                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
378                         r = -1;
379                         goto out_buf;
380                 }
381
382                 if (rio->read >= req->size) {
383                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
384                         //do_aio_write
385                         rio->state = WRITING;
386                         XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
387                         if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
388                                         rio->buf, req->size, req->offset) < 0) {
389                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
390                                 r = -1;
391                                 goto out_buf;
392                         }
393                         return 0;
394                 }
395
396                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
397                 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
398                         req->size - rio->read, req->offset + rio->read) < 0) {
399                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
400                                         rio->obj_name);
401                         r = -1;
402                         goto out_buf;
403                 }
404         }
405         else if (rio->state == WRITING){
406                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
407                 if (pr->retval == 0) {
408                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
409                         XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
410                         req->serviced = req->size;
411                         r = 0;
412                         goto out_buf;
413                 }
414                 else {
415                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
416                         XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
417                         r = -1;
418                         goto out_buf;
419                 }
420         }
421         else {
422                 XSEGLOG2(&lc, E, "Unknown state");
423         }
424         return 0;
425
426
427 out_buf:
428         free(rio->buf);
429 out_src:
430         free(rio->src_name);
431
432         rio->buf = NULL;
433         rio->src_name = NULL;
434         rio->read = 0;
435
436         if (r < 0)
437                 fail(peer ,pr);
438         else
439                 complete(peer, pr);
440         return 0;
441 }
442
443 int spawnthread(struct peerd *peer, struct peer_req *pr,
444                         void *(*func)(void *arg))
445 {
446         //struct radosd *rados = (struct radosd *) peer->priv;
447         struct rados_io *rio = (struct rados_io *) (pr->priv);
448
449         pthread_attr_t attr;
450         pthread_attr_init(&attr);
451         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
452
453         return (pthread_create(&rio->tid, &attr, func, (void *) pr));
454 }
455
456 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
457 {
458         //assert pr valid
459         struct peer_req *pr = (struct peer_req *)arg;
460         //struct radosd *rados = (struct radosd *) pr->peer->priv;
461         struct rados_io *rio = (struct rados_io *) (pr->priv);
462
463         if (pr->req->op == X_OPEN){
464                 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
465                 pthread_cond_signal(&rio->cond);
466         }
467         else
468                 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
469 }
470
471 void * lock_op(void *arg)
472 {
473         struct peer_req *pr = (struct peer_req *)arg;
474         struct radosd *rados = (struct radosd *) pr->peer->priv;
475         struct rados_io *rio = (struct rados_io *) (pr->priv);
476
477         XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
478         if (!(pr->req->flags & XF_NOSYNC)){
479                 if (rados_watch(rados->ioctx, rio->obj_name, 0,
480                                 &rio->watch_handle, watch_cb, pr) < 0){
481                         XSEGLOG2(&lc, E, "Rados watch failed for %s",
482                                         rio->obj_name);
483                         fail(pr->peer, pr);
484                         return NULL;
485                 }
486         }
487
488         while(rados_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
489                 C_LOCK_EXCLUSIVE, RADOS_LOCK_COOKIE, "", "", NULL, 0) < 0){
490                 if (pr->req->flags & XF_NOSYNC){
491                         XSEGLOG2(&lc, E, "Rados lock failed for %s",
492                                         rio->obj_name);
493                         fail(pr->peer, pr);
494                         return NULL;
495                 }
496                 else{
497                         XSEGLOG2(&lc, D, "rados lock for %s sleeping",
498                                         rio->obj_name);
499                         pthread_mutex_lock(&rio->m);
500                         pthread_cond_wait(&rio->cond, &rio->m);
501                         pthread_mutex_unlock(&rio->m);
502                         XSEGLOG2(&lc, D, "rados lock for %s woke up",
503                                         rio->obj_name);
504                 }
505         }
506         if (!(pr->req->flags & XF_NOSYNC)){
507                 if (rados_unwatch(rados->ioctx, rio->obj_name,
508                                         rio->watch_handle) < 0){
509                         XSEGLOG2(&lc, E, "Rados unwatch failed");
510                 }
511         }
512         XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
513         complete(pr->peer, pr);
514         return NULL;
515 }
516
517 void * unlock_op(void *arg)
518 {
519         struct peer_req *pr = (struct peer_req *)arg;
520         struct radosd *rados = (struct radosd *) pr->peer->priv;
521         struct rados_io *rio = (struct rados_io *) (pr->priv);
522         int r;
523         XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
524         if (pr->req->flags & XF_FORCE)
525                 r = rados_break_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
526                         RADOS_LOCK_COOKIE);
527         else
528                 r = rados_unlock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
529                         RADOS_LOCK_COOKIE);
530         if (r < 0){
531                 XSEGLOG2(&lc, E, "Rados unlock failed for %s (r: %d)", rio->obj_name, r);
532                 fail(pr->peer, pr);
533         }
534         else {
535                 if (rados_notify(rados->ioctx, rio->obj_name, 
536                                         0, NULL, 0) < 0) {
537                         XSEGLOG2(&lc, E, "rados notify failed");
538                 }
539                 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
540                 complete(pr->peer, pr);
541         }
542         return NULL;
543 }
544
545 int handle_open(struct peerd *peer, struct peer_req *pr)
546 {
547         int r = spawnthread(peer, pr, lock_op);
548         if (r < 0)
549                 fail(pr->peer, pr);
550         return 0;
551 }
552
553
554 int handle_close(struct peerd *peer, struct peer_req *pr)
555 {
556         int r = spawnthread(peer, pr, unlock_op);
557         if (r < 0)
558                 fail(pr->peer, pr);
559         return 0;
560 }
561
562 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
563 {
564         int i, j;
565         struct radosd *rados = malloc(sizeof(struct radosd));
566         struct rados_io *rio;
567         if (!rados) {
568                 perror("malloc");
569                 return -1;
570         }
571         rados->pool[0] = 0;
572         for (i = 0; i < argc; i++) {
573                 if (!strcmp(argv[i], "--pool") && (i+1) < argc){
574                         strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
575                         rados->pool[MAX_POOL_NAME] = 0;
576                         i += 1;
577                         continue;
578                 }
579         }
580         if (!rados->pool[0]){
581                 XSEGLOG2(&lc, E , "Pool must be provided");
582                 free(rados);
583                 return -1;
584         }
585
586         if (rados_create(&rados->cluster, NULL) < 0) {
587                 XSEGLOG2(&lc, E, "Rados create failed!");
588                 return -1;
589         }
590         if (rados_conf_read_file(rados->cluster, NULL) < 0){
591                 XSEGLOG2(&lc, E, "Error reading rados conf files!");
592                 return -1;
593         }
594         if (rados_connect(rados->cluster) < 0) {
595                 XSEGLOG2(&lc, E, "Rados connect failed!");
596                 rados_shutdown(rados->cluster);
597                 free(rados);
598                 return 0;
599         }
600         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
601                 XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
602                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
603                         XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
604                         rados_shutdown(rados->cluster);
605                         free(rados);
606                         return -1;
607                 }
608                 XSEGLOG2(&lc, I, "Pool created.");
609         }
610         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
611                 XSEGLOG2(&lc, E, "ioctx create problem.");
612                 rados_shutdown(rados->cluster);
613                 free(rados);
614                 return -1;
615         }
616         peer->priv = (void *) rados;
617         for (i = 0; i < peer->nr_ops; i++) {
618                 rio = malloc(sizeof(struct rados_io));
619                 if (!rio) {
620                         //ugly
621                         //is this really necessary?
622                         for (j = 0; j < i; j++) {
623                                 free(peer->peer_reqs[j].priv);
624                         }
625                         free(rados);
626                         perror("malloc");
627                         return -1;
628                 }
629                 rio->buf = 0;
630                 rio->read = 0;
631                 rio->size = 0;
632                 rio->src_name = 0;
633                 rio->watch_handle = 0;
634                 pthread_cond_init(&rio->cond, NULL);
635                 pthread_mutex_init(&rio->m, NULL);
636                 peer->peer_reqs[i].priv = (void *) rio;
637         }
638         return 0;
639 }
640
641 // nothing to do here for now
642 int custom_arg_parse(int argc, const char *argv[])
643 {
644         return 0;
645 }
646
647 void custom_peer_finalize(struct peerd *peer)
648 {
649         return;
650 }
651
652 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
653                 enum dispatch_reason reason)
654 {
655         struct rados_io *rio = (struct rados_io *) (pr->priv);
656         char *target = xseg_get_target(peer->xseg, pr->req);
657         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
658         strncpy(rio->obj_name, target, end);
659         rio->obj_name[end] = 0;
660         //log_pr("dispatch", pr);
661         if (reason == dispatch_accept)
662                 rio->state = ACCEPTED;
663
664         switch (pr->req->op){
665                 case X_READ:
666                         handle_read(peer, pr); break;
667                 case X_WRITE: 
668                         handle_write(peer, pr); break;
669                 case X_DELETE:
670                         if (canDefer(peer))
671                                 defer_request(peer, pr);
672                         else
673                                 handle_delete(peer, pr);
674                         break;
675                 case X_INFO:
676                         if (canDefer(peer))
677                                 defer_request(peer, pr);
678                         else
679                                 handle_info(peer, pr);
680                         break;
681                 case X_COPY:
682                         if (canDefer(peer))
683                                 defer_request(peer, pr);
684                         else
685                                 handle_copy(peer, pr);
686                         break;
687                 case X_OPEN:
688                         handle_open(peer, pr); break;
689                 case X_CLOSE:
690                         handle_close(peer, pr); break;
691
692                 default:
693                         fail(peer, pr);
694         }
695         return 0;
696 }