Statistics
| Branch: | Revision:

root / drivers / block-valve.c @ abdb293f

History | View | Annotate | Download (14.2 kB)

1
/*
2
 * Copyright (c) 2010, Citrix Systems, Inc.
3
 * All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions are met:
7
 *     * Redistributions of source code must retain the above copyright
8
 *       notice, this list of conditions and the following disclaimer.
9
 *     * Redistributions in binary form must reproduce the above copyright
10
 *       notice, this list of conditions and the following disclaimer in the
11
 *       documentation and/or other materials provided with the distribution.
12
 *     * Neither the name of XenSource Inc. nor the names of its contributors
13
 *       may be used to endorse or promote products derived from this software
14
 *       without specific prior written permission.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
20
 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
21
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
23
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
24
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
25
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
 */
28

    
29
#ifdef HAVE_CONFIG_H
30
#include "config.h"
31
#endif
32

    
33
#include <stdlib.h>
34
#include <stdio.h>
35
#include <string.h>
36
#include <unistd.h>
37
#include <errno.h>
38
#include <sys/socket.h>
39
#include <sys/un.h>
40

    
41
#include "tapdisk.h"
42
#include "tapdisk-driver.h"
43
#include "tapdisk-server.h"
44
#include "tapdisk-interface.h"
45

    
46
#include "block-valve.h"
47

    
48
typedef struct td_valve td_valve_t;
49
typedef struct td_valve_request td_valve_request_t;
50

    
51
struct td_valve_request {
52
        td_request_t            treq;
53
        int                     secs;
54

    
55
        struct list_head        entry;
56
        td_valve_t             *valve;
57
};
58

    
59
struct td_valve_stats {
60
        unsigned long long      stor;
61
        unsigned long long      forw;
62
};
63

    
64
struct td_valve {
65
        char                   *brname;
66
        unsigned long           flags;
67

    
68
        int                     sock;
69
        event_id_t              sock_id;
70

    
71
        event_id_t              sched_id;
72
        event_id_t              retry_id;
73

    
74
        unsigned int            cred;
75
        unsigned int            need;
76
        unsigned int            done;
77

    
78
        struct list_head        stor;
79
        struct list_head        forw;
80

    
81
        td_valve_request_t      reqv[MAX_REQUESTS];
82
        td_valve_request_t     *free[MAX_REQUESTS];
83
        int                     n_free;
84

    
85
        struct td_valve_stats   stats;
86
};
87

    
88
#define td_valve_for_each_stored_request(_req, _next, _valve)                \
89
        list_for_each_entry_safe(_req, _next, &(_valve)->stor, entry)
90

    
91
#define td_valve_for_each_forwarded_request(_req, _next, _valve)        \
92
        list_for_each_entry_safe(_req, _next, &(_valve)->forw, entry)
93

    
94
#define TD_VALVE_CONNECT_INTERVAL 2 /* s */
95

    
96
#define TD_VALVE_RDLIMIT  (1<<0)
97
#define TD_VALVE_WRLIMIT  (1<<1)
98
#define TD_VALVE_KILLED   (1<<31)
99

    
100
static void valve_schedule_retry(td_valve_t *);
101
static void valve_conn_receive(td_valve_t *);
102
static void valve_conn_request(td_valve_t *, unsigned long);
103
static void valve_forward_stored_requests(td_valve_t *);
104
static void valve_kill(td_valve_t *);
105

    
106
#define DBG(_f, _a...)    if (1) { tlog_syslog(TLOG_DBG, _f, ##_a); }
107
#define INFO(_f, _a...)   tlog_syslog(TLOG_INFO, "valve: " _f, ##_a)
108
#define WARN(_f, _a...)   tlog_syslog(TLOG_WARN, "WARNING: "_f " in %s:%d", \
109
                                      ##_a, __func__, __LINE__)
110
#define ERR(_f, _a...)    tlog_syslog(TLOG_WARN, "ERROR: " _f " in %s:%d", \
111
                                      ##_a, __func__, __LINE__)
112
#define VERR(_err, _f, _a...) tlog_syslog(TLOG_WARN,                         \
113
                                          "ERROR: err=%d (%s), " _f ".", \
114
                                          _err, strerror(-(_err)), ##_a)
115
#undef  PERROR
116
#define PERROR(_f, _a...) VERR(-errno, _f, ##_a)
117

    
118
#define BUG() do {                                                \
119
                ERR("Aborting");                                \
120
                td_panic();                                        \
121
        } while (0)
122

    
123
#define BUG_ON(_cond)                                                \
124
        if (unlikely(_cond)) {                                        \
125
                ERR("(%s) = %ld", #_cond, (long)(_cond));        \
126
                BUG();                                                \
127
        }
128

    
129
#define WARN_ON(_cond) ({                                        \
130
        int __cond = _cond;                                        \
131
        if (unlikely(__cond))                                        \
132
                WARN("(%s) = %ld", #_cond, (long)(_cond));        \
133
        __cond;                                                \
134
})
135

    
136
#define ARRAY_SIZE(_a)   (sizeof(_a)/sizeof((_a)[0]))
137
#define TREQ_SIZE(_treq) ((unsigned int)(_treq.secs) << 9)
138

    
139
static td_valve_request_t *
140
valve_alloc_request(td_valve_t *valve)
141
{
142
        td_valve_request_t *req = NULL;
143

    
144
        if (valve->n_free)
145
                req = valve->free[--valve->n_free];
146

    
147
        return req;
148
}
149

    
150
static void
151
valve_free_request(td_valve_t *valve, td_valve_request_t *req)
152
{
153
        BUG_ON(valve->n_free >= ARRAY_SIZE(valve->free));
154
        list_del_init(&req->entry);
155
        valve->free[valve->n_free++] = req;
156
}
157

    
158
static void
159
__valve_sock_event(event_id_t id, char mode, void *private)
160
{
161
        td_valve_t *valve = private;
162

    
163
        valve_conn_receive(valve);
164

    
165
        valve_forward_stored_requests(valve);
166
}
167

    
168
static void
169
valve_set_done_pending(td_valve_t *valve)
170
{
171
        WARN_ON(valve->done == 0);
172
        tapdisk_server_mask_event(valve->sched_id, 0);
173
}
174

    
175
static void
176
valve_clear_done_pending(td_valve_t *valve)
177
{
178
        WARN_ON(valve->done != 0);
179
        tapdisk_server_mask_event(valve->sched_id, 1);
180
}
181

    
182
static void
183
__valve_sched_event(event_id_t id, char mode, void *private)
184
{
185
        td_valve_t *valve = private;
186

    
187
        if (likely(valve->done > 0))
188
                /* flush valve->done */
189
                valve_conn_request(valve, 0);
190
}
191

    
192
static void
193
valve_sock_close(td_valve_t *valve)
194
{
195
        if (valve->sock >= 0) {
196
                close(valve->sock);
197
                valve->sock = -1;
198
        }
199

    
200
        if (valve->sock_id >= 0) {
201
                tapdisk_server_unregister_event(valve->sock_id);
202
                valve->sock_id = -1;
203
        }
204

    
205
        if (valve->sched_id >= 0) {
206
                tapdisk_server_unregister_event(valve->sched_id);
207
                valve->sched_id = -1;
208
        }
209
}
210

    
211
static int
212
valve_sock_open(td_valve_t *valve)
213
{
214
        struct sockaddr_un addr = { .sun_family = AF_UNIX };
215
        int s, id, err;
216

    
217
        s = socket(AF_UNIX, SOCK_STREAM, 0);
218
        if (s < 0) {
219
                PERROR("socket");
220
                err = -errno;
221
                goto fail;
222
        }
223

    
224
        valve->sock = s;
225

    
226
        if (valve->brname[0] == '/')
227
                strncpy(addr.sun_path, valve->brname,
228
                        sizeof(addr.sun_path));
229
        else
230
                snprintf(addr.sun_path, sizeof(addr.sun_path),
231
                         "%s/%s", TD_VALVE_SOCKDIR, valve->brname);
232

    
233
        err = connect(valve->sock, &addr, sizeof(addr));
234
        if (err) {
235
                err = -errno;
236
                goto fail;
237
        }
238

    
239
        id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
240
                                           valve->sock, 0,
241
                                           __valve_sock_event,
242
                                           valve);
243
        if (id < 0) {
244
                err = id;
245
                goto fail;
246
        }
247

    
248
        valve->sock_id = id;
249

    
250
        id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
251
                                           -1, 0,
252
                                           __valve_sched_event,
253
                                           valve);
254
        if (id < 0) {
255
                err = id;
256
                goto fail;
257
        }
258

    
259
        valve->sched_id = id;
260

    
261
        INFO("Connected to %s", addr.sun_path);
262

    
263
        valve->cred = 0;
264
        valve->need = 0;
265
        valve->done = 0;
266

    
267
        valve_clear_done_pending(valve);
268

    
269
        return 0;
270

    
271
fail:
272
        valve_sock_close(valve);
273
        return err;
274
}
275

    
276
static int
277
valve_sock_send(td_valve_t *valve, const void *msg, size_t size)
278
{
279
        ssize_t n;
280

    
281
        n = send(valve->sock, msg, size, MSG_DONTWAIT);
282
        if (n < 0)
283
                return -errno;
284
        if (n != size)
285
                return -EPROTO;
286

    
287
        return 0;
288
}
289

    
290
static int
291
valve_sock_recv(td_valve_t *valve, void *msg, size_t size)
292
{
293
        ssize_t n;
294

    
295
        n = recv(valve->sock, msg, size, MSG_DONTWAIT);
296
        if (n < 0)
297
                return -errno;
298

    
299
        return n;
300
}
301

    
302
static void
303
__valve_retry_timeout(event_id_t id, char mode, void *private)
304
{
305
        td_valve_t *valve = private;
306
        int err;
307

    
308
        err = valve_sock_open(valve);
309
        if (!err)
310
                tapdisk_server_unregister_event(valve->retry_id);
311
}
312

    
313
static void
314
valve_schedule_retry(td_valve_t *valve)
315
{
316
        int id;
317

    
318
        BUG_ON(valve->sock_id >= 0);
319

    
320
        id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
321
                                           -1, TD_VALVE_CONNECT_INTERVAL,
322
                                           __valve_retry_timeout,
323
                                           valve);
324
        BUG_ON(id < 0);
325

    
326
        valve->retry_id = id;
327
}
328

    
329
static void
330
valve_conn_open(td_valve_t *valve)
331
{
332
        int err;
333

    
334
        BUG_ON(valve->flags & TD_VALVE_KILLED);
335

    
336
        err = valve_sock_open(valve);
337
        if (err) {
338
                WARN("%s: %s", valve->brname, strerror(-err));
339
                valve_schedule_retry(valve);
340
        }
341
}
342

    
343
static void
344
valve_conn_close(td_valve_t *valve, int reset)
345
{
346
        td_valve_request_t *req, *next;
347

    
348
        valve_sock_close(valve);
349

    
350
        if (reset)
351
                td_valve_for_each_stored_request(req, next, valve) {
352
                        td_forward_request(req->treq);
353
                        valve->stats.forw++;
354
                        valve_free_request(valve, req);
355
                }
356

    
357
        WARN_ON(!list_empty(&valve->stor));
358
}
359

    
360
static void
361
valve_conn_reset(td_valve_t *valve)
362
{
363
        valve_conn_close(valve, 1);
364
        valve_conn_open(valve);
365
}
366

    
367
void
368
valve_conn_receive(td_valve_t *valve)
369
{
370
        unsigned long buf[32], cred = 0;
371
        ssize_t n;
372
        int i, err;
373

    
374
        n = valve_sock_recv(valve, buf, sizeof(buf));
375
        if (!n) {
376
                err = -ECONNRESET;
377
                goto reset;
378
        }
379

    
380
        if (n < 0) {
381
                err = n;
382
                if (err != -EAGAIN)
383
                        goto reset;
384
        }
385

    
386
        for (i = 0; i < n / sizeof(buf[0]); i++) {
387
                err = WARN_ON(buf[i] >= TD_RLB_REQUEST_MAX);
388
                if (err)
389
                        goto kill;
390

    
391
                cred += buf[i];
392
        }
393

    
394
        if (cred > valve->need) {
395
                err = -EINVAL;
396
                goto reset;
397
        }
398

    
399
        valve->cred += cred;
400
        valve->need -= cred;
401

    
402
        return;
403

    
404
reset:
405
        VERR(err, "resetting connection");
406
        valve_conn_reset(valve);
407
        return;
408

    
409
kill:
410
        ERR("Killing valve.");
411
        valve_kill(valve);
412
}
413

    
414
static void
415
valve_conn_request(td_valve_t *valve, unsigned long size)
416
{
417
        struct td_valve_req _req;
418
        int err;
419

    
420
        _req.need    = size;
421
        _req.done    = valve->done;
422

    
423
        valve->need += size;
424
        valve->done  = 0;
425

    
426
        valve_clear_done_pending(valve);
427

    
428
        err = valve_sock_send(valve, &_req, sizeof(_req));
429
        if (!err)
430
                return;
431

    
432
        VERR(err, "resetting connection");
433
        valve_conn_reset(valve);
434
}
435

    
436
static int
437
valve_expend_request(td_valve_t *valve, const td_request_t treq)
438
{
439
        if (valve->flags & TD_VALVE_KILLED)
440
                return 0;
441

    
442
        if (valve->sock < 0)
443
                return 0;
444

    
445
        if (valve->cred < TREQ_SIZE(treq))
446
                return -EAGAIN;
447

    
448
        valve->cred -= TREQ_SIZE(treq);
449

    
450
        return 0;
451
}
452

    
453
static void
454
__valve_complete_treq(td_request_t treq, int error)
455
{
456
        td_valve_request_t *req = treq.cb_data;
457
        td_valve_t *valve = req->valve;
458

    
459
        BUG_ON(req->secs < treq.secs);
460
        req->secs -= treq.secs;
461

    
462
        valve->done += TREQ_SIZE(treq);
463
        valve_set_done_pending(valve);
464

    
465
        if (!req->secs) {
466
                td_complete_request(req->treq, error);
467
                valve_free_request(valve, req);
468
        }
469
}
470

    
471
static void
472
valve_forward_stored_requests(td_valve_t *valve)
473
{
474
        td_valve_request_t *req, *next;
475
        td_request_t clone;
476
        int err;
477

    
478
        td_valve_for_each_stored_request(req, next, valve) {
479

    
480
                err = valve_expend_request(valve, req->treq);
481
                if (err)
482
                        break;
483

    
484
                clone         = req->treq;
485
                clone.cb      = __valve_complete_treq;
486
                clone.cb_data = req;
487

    
488
                td_forward_request(clone);
489
                valve->stats.forw++;
490

    
491
                list_move(&req->entry, &valve->forw);
492
        }
493
}
494

    
495
static int
496
valve_store_request(td_valve_t *valve, td_request_t treq)
497
{
498
        td_valve_request_t *req;
499

    
500
        req = valve_alloc_request(valve);
501
        if (!req)
502
                return -EBUSY;
503

    
504
        valve_conn_request(valve, TREQ_SIZE(treq));
505

    
506
        req->treq = treq;
507
        req->secs = treq.secs;
508

    
509
        list_add_tail(&req->entry, &valve->stor);
510
        valve->stats.stor++;
511

    
512
        return 0;
513
}
514

    
515
static void
516
valve_kill(td_valve_t *valve)
517
{
518
        valve->flags |= TD_VALVE_KILLED;
519
        valve_conn_close(valve, 1);
520
}
521

    
522
static void
523
valve_init(td_valve_t *valve, unsigned long flags)
524
{
525
        int i;
526

    
527
        memset(valve, 0, sizeof(*valve));
528

    
529
        INIT_LIST_HEAD(&valve->stor);
530
        INIT_LIST_HEAD(&valve->forw);
531

    
532
        valve->sock     = -1;
533
        valve->sock_id  = -1;
534

    
535
        valve->retry_id = -1;
536
        valve->sched_id = -1;
537

    
538
        valve->flags    = flags;
539

    
540
        for (i = ARRAY_SIZE(valve->reqv) - 1; i >= 0; i--) {
541
                td_valve_request_t *req = &valve->reqv[i];
542

    
543
                req->valve = valve;
544
                INIT_LIST_HEAD(&req->entry);
545

    
546
                valve_free_request(valve, req);
547
        }
548
}
549

    
550
static int
551
td_valve_close(td_driver_t *driver)
552
{
553
        td_valve_t *valve = driver->data;
554

    
555
        WARN_ON(!list_empty(&valve->stor));
556
        WARN_ON(!list_empty(&valve->forw));
557

    
558
        valve_conn_close(valve, 0);
559

    
560
        if (valve->brname) {
561
                free(valve->brname);
562
                valve->brname = NULL;
563
        }
564

    
565
        return 0;
566
}
567

    
568
static int
569
td_valve_open(td_driver_t *driver,
570
              const char *name, td_flag_t flags)
571
{
572
        td_valve_t *valve = driver->data;
573
        int err;
574

    
575
        valve_init(valve, TD_VALVE_WRLIMIT);
576

    
577
        valve->brname = strdup(name);
578
        if (!valve->brname) {
579
                err = -errno;
580
                goto fail;
581
        }
582

    
583
        valve_conn_open(valve);
584

    
585
        return 0;
586

    
587
fail:
588
        td_valve_close(driver);
589
        return err;
590
}
591

    
592
static void
593
td_valve_queue_request(td_driver_t *driver, td_request_t treq)
594
{
595
        td_valve_t *valve = driver->data;
596
        int err;
597

    
598
        switch (treq.op) {
599

    
600
        case TD_OP_READ:
601
                if (valve->flags & TD_VALVE_RDLIMIT)
602
                        break;
603

    
604
                goto forward;
605

    
606
        case TD_OP_WRITE:
607
                if (valve->flags & TD_VALVE_WRLIMIT)
608
                        break;
609

    
610
                goto forward;
611

    
612
        default:
613
                BUG();
614
        }
615

    
616
        err = valve_expend_request(valve, treq);
617
        if (!err)
618
                goto forward;
619

    
620
        err = valve_store_request(valve, treq);
621
        if (err)
622
                td_complete_request(treq, -EBUSY);
623

    
624
        return;
625

    
626
forward:
627
        td_forward_request(treq);
628
        valve->stats.forw++;
629
}
630

    
631
static int
632
td_valve_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
633
{
634
        return -EINVAL;
635
}
636

    
637
static int
638
td_valve_validate_parent(td_driver_t *driver,
639
                         td_driver_t *parent_driver, td_flag_t flags)
640
{
641
        return -EINVAL;
642
}
643

    
644
static void
645
td_valve_stats(td_driver_t *driver, td_stats_t *st)
646
{
647
        td_valve_t *valve = driver->data;
648
        td_valve_request_t *req, *next;
649
        int n_reqs;
650

    
651
        tapdisk_stats_field(st, "bridge", "d", valve->brname);
652
        tapdisk_stats_field(st, "flags", "#x", valve->flags);
653

    
654
        tapdisk_stats_field(st, "cred", "d", valve->cred);
655
        tapdisk_stats_field(st, "need", "d", valve->need);
656
        tapdisk_stats_field(st, "done", "d", valve->done);
657

    
658
        /*
659
         * stored is [ waiting, total-waits ]
660
         */
661

    
662
        n_reqs = 0;
663
        td_valve_for_each_stored_request(req, next, valve)
664
                n_reqs++;
665

    
666
        tapdisk_stats_field(st, "stor", "[");
667
        tapdisk_stats_val(st, "d", n_reqs);
668
        tapdisk_stats_val(st, "llu", valve->stats.stor);
669
        tapdisk_stats_leave(st, ']');
670

    
671
        /*
672
         * forwarded is [ in-flight, total-requests ]
673
         */
674

    
675
        n_reqs = 0;
676
        td_valve_for_each_forwarded_request(req, next, valve)
677
                n_reqs++;
678

    
679
        tapdisk_stats_field(st, "forw", "[");
680
        tapdisk_stats_val(st, "d", n_reqs);
681
        tapdisk_stats_val(st, "llu", valve->stats.forw);
682
        tapdisk_stats_leave(st, ']');
683
}
684

    
685
struct tap_disk tapdisk_valve = {
686
        .disk_type                  = "tapdisk_valve",
687
        .flags                      = 0,
688
        .private_data_size          = sizeof(td_valve_t),
689
        .td_open                    = td_valve_open,
690
        .td_close                   = td_valve_close,
691
        .td_queue_read              = td_valve_queue_request,
692
        .td_queue_write             = td_valve_queue_request,
693
        .td_get_parent_id           = td_valve_get_parent_id,
694
        .td_validate_parent         = td_valve_validate_parent,
695
        .td_stats                   = td_valve_stats,
696
};