Add partial support for verification
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48 #include <bench-lfsr.h>
49 #include <limits.h>
50
51 char global_id[IDLEN];
52 /*
53  * This macro checks two things:
54  * a) If in-flight requests are less than given iodepth
55  * b) If we have submitted all of the requests
56  */
57 #define CAN_SEND_REQUEST(prefs)                                                                                           \
58         ((prefs->status->submitted - prefs->status->received < prefs->iodepth) && \
59         (prefs->status->submitted < prefs->status->max))
60
61 #define CAN_VERIFY(prefs)                                                                                                         \
62         ((GET_FLAG(VERIFY, prefs->flags) != VERIFY_NO) && prefs->op == X_READ)
63
64 void custom_peer_usage()
65 {
66         fprintf(stderr, "Custom peer options: \n"
67                         "  --------------------------------------------\n"
68                         "    -op       | None    | XSEG operation [read|write|info|delete]\n"
69                         "    --pattern | None    | I/O pattern [seq|rand]\n"
70                         "    --verify  | no      | Verify written requests [no|meta|full]\n"
71                         "    -to       | None    | Total objects (not for read/write)\n"
72                         "    -ts       | None    | Total I/O size\n"
73                         "    -os       | 4M      | Object size\n"
74                         "    -bs       | 4k      | Block size\n"
75                         "    -tp       | None    | Target port\n"
76                         "    --iodepth | 1       | Number of in-flight I/O requests\n"
77                         "    --seed    | None    | Initialize LFSR and target names\n"
78                         "    --insanity| sane    | Adjust insanity level of benchmark:\n"
79                         "              |         |     [sane|eccentric|manic|paranoid]\n"
80                         "\n");
81 }
82
83 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
84 {
85         struct bench *prefs;
86         char total_objects[MAX_ARG_LEN + 1];
87         char total_size[MAX_ARG_LEN + 1];
88         char object_size[MAX_ARG_LEN + 1];
89         char block_size[MAX_ARG_LEN + 1];
90         char op[MAX_ARG_LEN + 1];
91         char pattern[MAX_ARG_LEN + 1];
92         char insanity[MAX_ARG_LEN + 1];
93         char verify[MAX_ARG_LEN + 1];
94         struct xseg *xseg = peer->xseg;
95         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
96         long iodepth = -1;
97         long dst_port = -1;
98         unsigned long seed = -1;
99         struct timespec timer_seed;
100         int set_by_hand = 0;
101         int r;
102
103         op[0] = 0;
104         pattern[0] = 0;
105         total_objects[0] = 0;
106         total_size[0] = 0;
107         block_size[0] = 0;
108         object_size[0] = 0;
109         insanity[0] = 0;
110         verify[0] = 0;
111
112 #ifdef MT
113         for (i = 0; i < nr_threads; i++) {
114                 prefs = peer->thread[i]->priv;
115                 prefs = malloc(sizeof(struct bench));
116                 if (!prefs) {
117                         perror("malloc");
118                         return -1;
119                 }
120         }
121 #endif
122         prefs = malloc(sizeof(struct bench));
123         if (!prefs) {
124                 perror("malloc");
125                 return -1;
126         }
127         prefs->flags = 0;
128
129         prefs->status = malloc(sizeof(struct req_status));
130         if (!prefs->status) {
131                 perror("malloc");
132                 return -1;
133         }
134         memset(prefs->status, 0, sizeof(struct req_status));
135
136         //Begin reading the benchmark-specific arguments
137         BEGIN_READ_ARGS(argc, argv);
138         READ_ARG_STRING("-op", op, MAX_ARG_LEN);
139         READ_ARG_STRING("--pattern", pattern, MAX_ARG_LEN);
140         READ_ARG_STRING("-to", total_objects, MAX_ARG_LEN);
141         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
142         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
143         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
144         READ_ARG_ULONG("--iodepth", iodepth);
145         READ_ARG_ULONG("-tp", dst_port);
146         READ_ARG_ULONG("--seed", seed);
147         READ_ARG_STRING("--insanity", insanity, MAX_ARG_LEN);
148         READ_ARG_STRING("--verify", verify, MAX_ARG_LEN);
149         END_READ_ARGS();
150
151         /*****************************\
152          * Check I/O type parameters *
153         \*****************************/
154
155         //We support 4 xseg operations: X_READ, X_WRITE, X_DELETE, X_INFO
156         //The I/O pattern of these operations can be either sequential (seq) or
157         //random (rand)
158         if (!op[0]) {
159                 XSEGLOG2(&lc, E, "xseg operation needs to be supplied\n");
160                 goto arg_fail;
161         }
162         r = read_op(op);
163         if (r < 0) {
164                 XSEGLOG2(&lc, E, "Invalid syntax: -op %s\n", op);
165                 goto arg_fail;
166         }
167         prefs->op = r;
168
169         if (!pattern[0]) {
170                 XSEGLOG2(&lc, E, "I/O pattern needs to be supplied\n");
171                 goto arg_fail;
172         }
173         r = read_pattern(pattern);
174         if (r < 0) {
175                 XSEGLOG2(&lc, E, "Invalid syntax: --pattern %s\n", pattern);
176                 goto arg_fail;
177         }
178         SET_FLAG(PATTERN, prefs->flags, r);
179
180         if (!verify[0])
181                 strcpy(verify, "no");
182         r = read_verify(verify);
183         if (r < 0) {
184                 XSEGLOG2(&lc, E, "Invalid syntax: --verify %s\n", verify);
185                 goto arg_fail;
186         }
187         SET_FLAG(VERIFY, prefs->flags, r);
188
189         //Default iodepth value is 1
190         if (iodepth < 0)
191                 prefs->iodepth = 1;
192         else
193                 prefs->iodepth = iodepth;
194
195         /**************************\
196          * Check timer parameters *
197         \**************************/
198
199         //Most of the times, not all timers need to be used.
200         //We can choose which timers will be used by adjusting the "insanity"
201         //level of the benchmark i.e. the obscurity of code paths (get request,
202         //submit request) that will be timed.
203         if (!insanity[0])
204                 strcpy(insanity, "sane");
205
206         r = read_insanity(insanity);
207         if (r < 0) {
208                 XSEGLOG2(&lc, E, "Invalid syntax: --insanity %s\n", insanity);
209                 goto arg_fail;
210         }
211         SET_FLAG(INSANITY, prefs->flags, r);
212
213         /*
214          * If we have a request other than read/write, we don't need to check
215          * about size parameters, but only how many objects we want to affect
216          */
217         if (prefs->op != X_READ && prefs->op != X_WRITE) {
218
219                 /***************************\
220                  * Check object parameters *
221                 \***************************/
222
223                 if (!total_objects[0]) {
224                         XSEGLOG2(&lc, E,
225                                         "Total number of objects needs to be supplied\n");
226                         goto arg_fail;
227                 }
228                 prefs->to = str2num(total_objects);
229                 if (!prefs->to) {
230                         XSEGLOG2(&lc, E, "Invalid syntax: -to %s\n", total_objects);
231                         goto arg_fail;
232                 }
233
234                 //In this case, the maximum number of requests is the total number of
235                 //objects we will handle
236                 prefs->status->max = prefs->to;
237         } else {
238
239                 /*************************\
240                  * Check size parameters *
241                 \*************************/
242
243                 //Block size (bs): Defaults to 4K.
244                 //It must be a number followed by one of these characters:
245                 //                                              [k|K|m|M|g|G]
246                 //If not, it will be considered as size in bytes.
247                 //Must be integer multiple of segment's page size (typically 4k).
248                 if (!block_size[0])
249                         strcpy(block_size,"4k");
250
251                 prefs->bs = str2num(block_size);
252                 if (!prefs->bs) {
253                         XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", block_size);
254                         goto arg_fail;
255                 } else if (prefs->bs % xseg_page_size) {
256                         XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
257                         goto arg_fail;
258                 }
259
260                 //Total I/O size (ts): Must be supplied by user.
261                 //Must have the same format as "total size"
262                 //Must be integer multiple of "block size"
263                 if (!total_size[0]) {
264                         XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
265                         goto arg_fail;
266                 }
267
268                 prefs->ts = str2num(total_size);
269                 if (!prefs->ts) {
270                         XSEGLOG2(&lc, E, "Invalid syntax: -ts %s\n", total_size);
271                         goto arg_fail;
272                 } else if (prefs->ts % prefs->bs) {
273                         XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
274                         goto arg_fail;
275                 } else if (prefs->ts > xseg->segment_size) {
276                         XSEGLOG2(&lc, E,
277                                         "Total I/O size exceeds segment size\n", total_size);
278                         goto arg_fail;
279                 }
280
281                 //Object size (os): Defaults to 4M.
282                 //Must have the same format as "total size"
283                 //Must be integer multiple of "block size"
284                 if (!object_size[0])
285                         strcpy(object_size,"4M");
286
287                 prefs->os = str2num(object_size);
288                 if (!prefs->os) {
289                         XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
290                         goto arg_fail;
291                 } else if (prefs->os % prefs->bs) {
292                         XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
293                         goto arg_fail;
294                 }
295
296                 //In this case, the maximum number of requests is the number of blocks
297                 //we need to cover the total I/O size
298                 prefs->status->max = prefs->ts / prefs->bs;
299         }
300
301         /*************************\
302          * Check port parameters *
303         \*************************/
304
305         if (dst_port < 0){
306                 XSEGLOG2(&lc, E, "Target port needs to be supplied\n");
307                 goto arg_fail;
308         }
309
310         prefs->src_port = peer->portno_start; //TODO: allow user to change this
311         prefs->dst_port = (xport) dst_port;
312
313         /*********************************\
314          * Create timers for all metrics *
315         \*********************************/
316
317         if (init_timer(&prefs->total_tm, INSANITY_SANE))
318                 goto tm_fail;
319         if (init_timer(&prefs->sub_tm, INSANITY_MANIC))
320                 goto tm_fail;
321         if (init_timer(&prefs->get_tm, INSANITY_PARANOID))
322                 goto tm_fail;
323         if (init_timer(&prefs->rec_tm, INSANITY_ECCENTRIC))
324                 goto tm_fail;
325
326         /********************************\
327          * Customize struct peerd/prefs *
328         \********************************/
329
330         prefs->peer = peer;
331
332 reseed:
333         //We proceed to initialise the global_id, and seed variables.
334         if (seed == -1) {
335                 clock_gettime(CLOCK_MONOTONIC_RAW, &timer_seed);
336                 seed = timer_seed.tv_nsec;
337         } else {
338                 set_by_hand = 1;
339         }
340         create_id(seed);
341
342         if (GET_FLAG(PATTERN, prefs->flags) == PATTERN_RAND) {
343                 prefs->lfsr = malloc(sizeof(struct bench_lfsr));
344                 if (!prefs->lfsr) {
345                         perror("malloc");
346                         goto lfsr_fail;
347                 }
348
349                 r = lfsr_init(prefs->lfsr, prefs->status->max, seed, seed & 0xF);
350                 if (r && set_by_hand) {
351                         XSEGLOG2(&lc, E, "LFSR could not be initialized\n");
352                         goto lfsr_fail;
353                 } else if (r) {
354                         seed = -1;
355                         goto reseed;
356                 }
357         }
358         XSEGLOG2(&lc, I, "Global ID is %s\n", global_id);
359
360         peer->peerd_loop = bench_peerd_loop;
361         peer->priv = (void *) prefs;
362         return 0;
363
364 arg_fail:
365         custom_peer_usage();
366 lfsr_fail:
367         free(prefs->lfsr);
368 tm_fail:
369         free(prefs->total_tm);
370         free(prefs->sub_tm);
371         free(prefs->get_tm);
372         free(prefs->rec_tm);
373         free(prefs);
374         return -1;
375 }
376
377
378 static int send_request(struct peerd *peer, struct bench *prefs)
379 {
380         struct xseg_request *req;
381         struct xseg *xseg = peer->xseg;
382         struct peer_req *pr;
383         xport srcport = prefs->src_port;
384         xport dstport = prefs->dst_port;
385         xport p;
386
387         int r;
388         uint64_t new;
389         uint64_t size = prefs->bs;
390
391         //srcport and dstport must already be provided by the user.
392         //returns struct xseg_request with basic initializations
393         XSEGLOG2(&lc, D, "Get new request\n");
394         timer_start(prefs, prefs->get_tm);
395         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
396         if (!req) {
397                 XSEGLOG2(&lc, W, "Cannot get request\n");
398                 return -1;
399         }
400         timer_stop(prefs, prefs->get_tm, NULL);
401
402         //Allocate enough space for the data and the target's name
403         XSEGLOG2(&lc, D, "Prepare new request\n");
404         r = xseg_prep_request(xseg, req, TARGETLEN, size);
405         if (r < 0) {
406                 XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
407                                 TARGETLEN, (unsigned long long)size);
408                 goto put_xseg_request;
409         }
410
411         //Determine what the next target/chunk will be, based on I/O pattern
412         new = determine_next(prefs);
413         req->op = prefs->op;
414         XSEGLOG2(&lc, I, "Our new request is %lu\n", new);
415         //Create a target of this format: "bench-<global_id>-<obj_no>"
416         create_target(prefs, req, new);
417
418         if (prefs->op == X_WRITE || prefs->op == X_READ) {
419                 req->size = size;
420                 //Calculate the chunk offset inside the object
421                 req->offset = (new * prefs->bs) % prefs->os;
422                 XSEGLOG2(&lc, D, "Offset of request %lu is %lu\n", new, req->offset);
423
424                 if (prefs->op == X_WRITE)
425                         create_chunk(prefs, req, new);
426         }
427
428
429         //Measure this?
430         XSEGLOG2(&lc, D, "Allocate peer request\n");
431         pr = alloc_peer_req(peer);
432         if (!pr) {
433                 XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
434                                 peer->nr_ops - xq_count(&peer->free_reqs));
435                 goto put_xseg_request;
436         }
437         pr->peer = peer;
438         pr->portno = srcport;
439         pr->req = req;
440         pr->priv = malloc(sizeof(struct timespec));
441         if (!pr->priv) {
442                 perror("malloc");
443                 goto put_peer_request;
444         }
445
446         //XSEGLOG2(&lc, D, "Set request data\n");
447         r = xseg_set_req_data(xseg, req, pr);
448         if (r < 0) {
449                 XSEGLOG2(&lc, W, "Cannot set request data\n");
450                 goto put_peer_request;
451         }
452
453         /*
454          * Start measuring receive time.
455          * When we receive a request, we need to have its submission time to
456          * measure elapsed time. Thus, we memcpy its submission time to pr->priv.
457          * QUESTION: Is this the fastest way?
458          */
459         timer_start(prefs, prefs->rec_tm);
460         if (prefs->rec_tm->insanity <= GET_FLAG(INSANITY, prefs->flags))
461                 memcpy(pr->priv, &prefs->rec_tm->start_time, sizeof(struct timespec));
462
463         //Submit the request from the source port to the target port
464         XSEGLOG2(&lc, D, "Submit request %lu\n", new);
465         timer_start(prefs, prefs->sub_tm);
466         p = xseg_submit(xseg, req, srcport, X_ALLOC);
467         if (p == NoPort) {
468                 XSEGLOG2(&lc, W, "Cannot submit request\n");
469                 goto put_peer_request;
470         }
471         prefs->status->submitted++;
472         timer_stop(prefs, prefs->sub_tm, NULL);
473
474         //Send SIGIO to the process that has bound this port to inform that
475         //IO is possible
476         r = xseg_signal(xseg, p);
477         //if (r < 0)
478         //      XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
479
480         return 0;
481
482 put_peer_request:
483         free(pr->priv);
484         free_peer_req(peer, pr);
485 put_xseg_request:
486         if (xseg_put_request(xseg, req, srcport))
487                 XSEGLOG2(&lc, W, "Cannot put request\n");
488         return -1;
489 }
490
491 /*
492  * This function substitutes the default generic_peerd_loop of peer.c.
493  * It's plugged to struct peerd at custom peer's initialisation
494  */
495 int bench_peerd_loop(void *arg)
496 {
497 #ifdef MT
498         struct thread *t = (struct thread *) arg;
499         struct peerd *peer = t->peer;
500         char *id = t->arg;
501 #else
502         struct peerd *peer = (struct peerd *) arg;
503         char id[4] = {'P','e','e','r'};
504 #endif
505         struct xseg *xseg = peer->xseg;
506         struct bench *prefs = peer->priv;
507         xport portno_start = peer->portno_start;
508         xport portno_end = peer->portno_end;
509         uint64_t threshold=1000/(1 + portno_end - portno_start);
510         pid_t pid = syscall(SYS_gettid);
511         int r;
512         uint64_t loops;
513
514         XSEGLOG2(&lc, I, "%s has tid %u.\n",id, pid);
515         xseg_init_local_signal(xseg, peer->portno_start);
516
517         timer_start(prefs, prefs->total_tm);
518 send_request:
519         while (!isTerminate()) {
520 #ifdef MT
521                 if (t->func) {
522                         XSEGLOG2(&lc, D, "%s executes function\n", id);
523                         xseg_cancel_wait(xseg, peer->portno_start);
524                         t->func(t->arg);
525                         t->func = NULL;
526                         t->arg = NULL;
527                         continue;
528                 }
529 #endif
530                 while (CAN_SEND_REQUEST(prefs)) {
531                         xseg_cancel_wait(xseg, peer->portno_start);
532                         XSEGLOG2(&lc, D, "...because %lu < %lu && %lu < %lu\n",
533                                         prefs->status->submitted - prefs->status->received,
534                                         prefs->iodepth, prefs->status->received,
535                                         prefs->status->max);
536                         XSEGLOG2(&lc, D, "Start sending new request\n");
537                         r = send_request(peer, prefs);
538                         if (r < 0)
539                                 break;
540                 }
541                 //Heart of peerd_loop. This loop is common for everyone.
542                 for (loops = threshold; loops > 0; loops--) {
543                         if (loops == 1)
544                                 xseg_prepare_wait(xseg, peer->portno_start);
545
546                         if (check_ports(peer)) {
547                                 //If an old request has just been acked, the most sensible
548                                 //thing to do is to immediately send a new one
549                                 if (prefs->status->received < prefs->status->max)
550                                         goto send_request;
551                                 else
552                                         return 0;
553                         }
554                 }
555                 //struct xseg_port *port = xseg_get_port(xseg, portno_start);
556                 //struct xq *q;
557                 //q = XPTR_TAKE(port->request_queue, xseg->segment);
558                 //XSEGLOG2(&lc, I, "%s goes to sleep with %u requests pending\n",
559                 //              id, xq_count(q));
560                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
561 #ifdef ST_THREADS
562                 if (ta){
563                         st_sleep(0);
564                         continue;
565                 }
566 #endif
567                 xseg_wait_signal(xseg, 10000000UL);
568                 xseg_cancel_wait(xseg, peer->portno_start);
569                 XSEGLOG2(&lc, I, "%s woke up\n", id);
570         }
571
572         XSEGLOG2(&lc, I, "peer->free_reqs = %d, peer->nr_ops = %d\n",
573                         xq_count(&peer->free_reqs), peer->nr_ops);
574         return 0;
575 }
576
577 void custom_peer_finalize(struct peerd *peer)
578 {
579         struct bench *prefs = peer->priv;
580         //TODO: Measure mean time, standard variation
581
582         if (!prefs->total_tm->completed)
583                 timer_stop(prefs, prefs->total_tm, NULL);
584
585         print_stats(prefs);
586         print_res(prefs, prefs->total_tm, "Total Requests");
587         return;
588 }
589
590 /*
591  * handle_received: +1 to our received requests.
592  * Do some sanity checks and then check if request is failed.
593  * If not try to verify the request if asked.
594  */
595 static void handle_received(struct peerd *peer, struct peer_req *pr)
596 {
597         //FIXME: handle null pointer
598         struct bench *prefs = peer->priv;
599         struct timer *rec = prefs->rec_tm;
600
601         prefs->status->received++;
602         if (!pr->req) {
603                 //This is a serious error, so we must stop
604                 XSEGLOG2(&lc, E, "Received peer request with no xseg request");
605                 terminated++;
606                 return;
607         }
608
609         if ((GET_FLAG(INSANITY, prefs->flags) < rec->insanity) && !pr->priv) {
610                 XSEGLOG2(&lc, W, "Cannot find submission time of request");
611                 return;
612         }
613
614         timer_stop(prefs, rec, pr->priv);
615
616         if (!(pr->req->state & XS_SERVED))
617                 prefs->status->failed++;
618         else if (CAN_VERIFY(prefs) && read_chunk(prefs, pr->req))
619                 prefs->status->corrupted++;
620
621         if (xseg_put_request(peer->xseg, pr->req, pr->portno))
622                 XSEGLOG2(&lc, W, "Cannot put xseg request\n");
623
624         //QUESTION, can't we just keep the malloced memory for future use?
625         free(pr->priv);
626         free_peer_req(peer, pr);
627 }
628
629 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
630                 enum dispatch_reason reason)
631 {
632         switch (reason) {
633                 case dispatch_accept:
634                         //This is wrong, benchmarking peer should not accept requests,
635                         //only receive them.
636                         XSEGLOG2(&lc, W, "Bench peer should not accept requests\n");
637                         complete(peer, pr);
638                         break;
639                 case dispatch_receive:
640                         handle_received(peer, pr);
641                         break;
642                 default:
643                         fail(peer, pr);
644         }
645         return 0;
646 }