Statistics
| Branch: | Revision:

root / drivers / block-archipelago.c @ 99520dfc

History | View | Annotate | Download (21.9 kB)

1
/*
2
 * Copyright 2013 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <errno.h>
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <string.h>
39
#include <unistd.h>
40
#include <pthread.h>
41

    
42
#include <xseg/xseg.h>
43
#include <xseg/protocol.h>
44

    
45
#include "list.h"
46
#include "tapdisk.h"
47
#include "tapdisk-driver.h"
48
#include "tapdisk-interface.h"
49
#include "tapdisk-server.h"
50

    
51
#ifdef HACE_CONFIG_H
52
#include "config.h"
53
#endif
54

    
55
#define MAX_ARCHIPELAGO_REQS    TAPDISK_DATA_REQUESTS
56
#define MAX_ARCHIPELAGO_MERGED_REQS 32
57
#define NUM_XSEG_THREADS 2
58

    
59
struct tdarchipelago_request {
60
    td_request_t treq[MAX_ARCHIPELAGO_MERGED_REQS];
61
    int treq_count;
62
    int op;
63
    uint64_t offset;
64
    uint64_t size;
65
    void *buf;
66
    ssize_t result;
67
    struct list_head queue;
68
};
69

    
70
typedef struct AIORequestData {
71
    char *volname;
72
    off_t offset;
73
    ssize_t size;
74
    char *buf;
75
    int ret;
76
    int op;
77
    struct tdarchipelago_request *tdreq;
78
} AIORequestData;
79

    
80

    
81
struct xseg *xseg = NULL;
82
xport srcport = NoPort;
83
struct xseg_port *port;
84
xport mportno = NoPort;
85
xport vportno = NoPort;
86

    
87
struct posixfd_signal_desc {
88
    char signal_file[sizeof(void *)];
89
    int fd;
90
    int flag;
91
};
92

    
93
struct tdarchipelago_data {
94
    /* Archipelago Volume Name and Size */
95
    char *volname;
96
    ssize_t size;
97

    
98
    /* Requests Queue */
99
    struct list_head reqs_inflight;
100
    struct list_head reqs_free;
101
    struct tdarchipelago_request *req_deferred;
102
    struct tdarchipelago_request reqs[MAX_ARCHIPELAGO_REQS];
103
    int reqs_free_count;
104

    
105
    /* Flush event */
106
    int timeout_event_id;
107

    
108
    /* Inter-thread pipe */
109
    int pipe_fds[2];
110
    int pipe_event_id;
111

    
112
    /* Driver Stats */
113
    struct {
114
        int req_total;
115

    
116
        int req_issued;
117
        int req_issued_no_merge;
118
        int req_issued_forced;
119
        int req_issued_direct;
120
        int req_issued_timeout;
121

    
122
        int req_miss;
123
        int req_miss_op;
124
        int req_miss_ofs;
125
        int req_miss_buf;
126
    } stat;
127
};
128

    
129
typedef struct ArchipelagoThread {
130
    pthread_t request_th;
131
    pthread_cond_t request_cond;
132
    pthread_mutex_t request_mutex;
133
    int is_signaled;
134
    int is_running;
135
} ArchipelagoThread;
136

    
137
ArchipelagoThread archipelago_th[NUM_XSEG_THREADS];
138

    
139
static void tdarchipelago_finish_aiocb(void *arg, ssize_t c, AIORequestData *reqdata);
140
static int tdarchipelago_close(td_driver_t *driver);
141
static void tdarchipelago_pipe_read_cb(event_id_t eb, char mode, void *data);
142

    
143
static int wait_reply(struct xseg_request *expected_req)
144
{
145
    struct xseg_request *rec;
146
    xseg_prepare_wait(xseg, srcport);
147
    struct posixfd_signal_desc *psd = xseg_get_signal_desc(xseg, port);
148
    while(1) {
149
        rec = xseg_receive(xseg, srcport, 0);
150
        if(rec) {
151
            if( rec != expected_req) {
152
                DPRINTF("wait_reply(): Unknown request.\n");
153
                xseg_put_request(xseg, rec, srcport);
154
            } else if(!(rec->state & XS_SERVED)) {
155
                DPRINTF("wait_reply(): Failed request.\n");
156
                return -1;
157
            } else {
158
                break;
159
            }
160
        }
161
        xseg_wait_signal(xseg, psd, 1000000UL);
162
    }
163
    xseg_cancel_wait(xseg, srcport);
164
    return 0;
165
}
166

    
167
static void xseg_request_handler(void *arthd)
168
{
169
    struct posixfd_signal_desc *psd = xseg_get_signal_desc(xseg, port);
170
    ArchipelagoThread *th = (ArchipelagoThread *) arthd;
171
    while(th->is_running) {
172
        struct xseg_request *req;
173
        xseg_prepare_wait(xseg, srcport);
174
        req = xseg_receive(xseg, srcport, 0);
175
        if(req) {
176
            AIORequestData *reqdata;
177
            xseg_get_req_data(xseg, req, (void **)&reqdata);
178
            if(reqdata->op == TD_OP_READ)
179
            {
180
                char *data = xseg_get_data(xseg, req);
181
                memcpy(reqdata->buf, data, req->serviced);
182
                int serviced = req->serviced;
183
                tdarchipelago_finish_aiocb(reqdata->tdreq, serviced, reqdata);
184
                xseg_put_request(xseg, req, srcport);
185
            } else if(reqdata->op == TD_OP_WRITE) {
186
                int serviced = req->serviced;
187
                tdarchipelago_finish_aiocb(reqdata->tdreq, serviced, reqdata);
188
                xseg_put_request(xseg, req, srcport);
189
            }
190
        } else {
191
            xseg_wait_signal(xseg, psd, 1000000UL);
192
        }
193
        xseg_cancel_wait(xseg, srcport);
194
    }
195
    th->is_signaled = 1;
196
    pthread_cond_signal(&th->request_cond);
197
    pthread_exit(NULL);
198
}
199

    
200
static uint64_t get_image_info(char *volname)
201
{
202
    uint64_t size;
203
    int r;
204

    
205
    int targetlen = strlen(volname);
206
    struct xseg_request *req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
207
    r = xseg_prep_request(xseg, req, targetlen, sizeof(struct xseg_reply_info));
208
    if(r < 0) {
209
        xseg_put_request(xseg, req, srcport);
210
        DPRINTF("get_image_info(): Cannot prepare request. Aborting...");
211
        exit(-1);
212
    }
213

    
214
    char *target = xseg_get_target(xseg, req);
215
    strncpy(target, volname, targetlen);
216
    req->size = req->datalen;
217
    req->offset = 0;
218
    req->op = X_INFO;
219

    
220
    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
221
    if(p == NoPort) {
222
        xseg_put_request(xseg, req, srcport);
223
        DPRINTF("get_image_info(): Cannot submit request. Aborting...");
224
        exit(-1);
225
    }
226
    xseg_signal(xseg, p);
227
    r = wait_reply(req);
228
    if(r) {
229
        xseg_put_request(xseg, req, srcport);
230
        DPRINTF("get_image_info(): wait_reply() error. Aborting...");
231
        exit(-1);
232
    }
233
    struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(xseg, req);
234
    size = xinfo->size;
235
    xseg_put_request(xseg, req, srcport);
236
    return size;
237
}
238

    
239
static void xseg_find_port(char *pstr, const char *needle, xport *port)
240
{
241
    char *a;
242
    char *dpstr = strdup(pstr);
243
    a = strtok(dpstr, needle);
244
    *port = (xport) atoi(a);
245
    free(dpstr);
246
}
247

    
248
static void parse_uri(char **volname, const char *s)
249
{
250
    int n=0, nn, i;
251
    char *tokens[4];
252

    
253
    char *ds = strdup(s);
254
    tokens[n] = strtok(ds, ":");
255
    *volname = malloc(strlen(tokens[n]) + 1);
256
    strcpy(*volname, tokens[n]);
257

    
258
    for(i = 0, nn = 0; s[i]; i++)
259
        nn += (s[i] == ':');
260
    /* FIXME: Protect tokens array overflow */
261
    if( nn > 3)
262
        i = 3;
263
    else
264
        i = nn;
265

    
266
    while(tokens[n] && n <= i) tokens[++n] = strtok(NULL, ":");
267

    
268
    for(nn = 0; nn <= i; nn++) {
269
        if(strstr(tokens[nn], "mport="))
270
            xseg_find_port(tokens[nn], "mport=", &mportno);
271
        if(strstr(tokens[nn], "vport="))
272
            xseg_find_port(tokens[nn], "vport=", &vportno);
273
    }
274
}
275

    
276
static int tdarchipelago_open(td_driver_t *driver, const char *name, td_flag_t flags)
277
{
278
    struct tdarchipelago_data *prv = driver->data;
279
    uint64_t size; /*Archipelago Volume Size*/
280
    int i, retval;
281

    
282
    /* Init private structure */
283
    memset(prv, 0x00, sizeof(struct tdarchipelago_data));
284

    
285
    /*Default mapperd and vlmcd ports */
286
    vportno = 501;
287
    mportno = 1001;
288

    
289
    INIT_LIST_HEAD(&prv->reqs_inflight);
290
    INIT_LIST_HEAD(&prv->reqs_free);
291

    
292
    for(i=0; i< MAX_ARCHIPELAGO_REQS; i++){
293
        INIT_LIST_HEAD(&prv->reqs[i].queue);
294
        list_add(&prv->reqs[i].queue, &prv->reqs_free);
295
    }
296

    
297
    prv->reqs_free_count = MAX_ARCHIPELAGO_REQS;
298

    
299
    prv->pipe_fds[0] = prv->pipe_fds[1] = prv->pipe_event_id = -1;
300
    prv->timeout_event_id = -1;
301

    
302
    /* Parse Archipelago Volume Name and XSEG mportno, vportno */
303
    parse_uri(&prv->volname, name);
304

    
305
    /* Inter-thread pipe setup */
306
    retval = pipe(prv->pipe_fds);
307
    if(retval) {
308
        DPRINTF("tdarchipelago_open(): Failed to create inter-thread pipe (%d)\n", retval);
309
        goto err_exit;
310
    }
311
    prv->pipe_event_id = tapdisk_server_register_event(
312
            SCHEDULER_POLL_READ_FD,
313
            prv->pipe_fds[0],
314
            0,
315
            tdarchipelago_pipe_read_cb,
316
            prv);
317

    
318
    /* Archipelago context */
319
    if(!xseg) {
320
        if(xseg_initialize()) {
321
            DPRINTF("tdarchipelago_open(): Cannot initialize xseg.\n");
322
            goto err_exit;
323
        }
324
    }
325
    xseg = xseg_join((char *)"segdev", (char *)"xsegbd", (char *)"posixfd", NULL);
326
    if(!xseg) {
327
        DPRINTF("tdarchipelago_open(): Cannot join segment.\n");
328
        goto err_exit;
329
    }
330

    
331
    port = xseg_bind_dynport(xseg);
332
    if(!port) {
333
        DPRINTF("tdarchipelago_open(): Failed to bind port.\n");
334
        goto err_exit;
335
    }
336
    srcport = port->portno;
337
    xseg_init_local_signal(xseg, srcport);
338

    
339
    prv->size = get_image_info(prv->volname);
340
    size = prv->size;
341

    
342
    driver->info.sector_size = DEFAULT_SECTOR_SIZE;
343
    driver->info.size = size >> SECTOR_SHIFT;
344
    driver->info.info = 0;
345

    
346
    /* Start XSEG Request Handler Threads */
347
    pthread_attr_t attr;
348
    pthread_attr_init(&attr);
349
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
350
    for(i=0; i< NUM_XSEG_THREADS; i++) {
351
        pthread_cond_init(&archipelago_th[i].request_cond, NULL);
352
        pthread_mutex_init(&archipelago_th[i].request_mutex, NULL);
353
        archipelago_th[i].is_signaled = 0;
354
        archipelago_th[i].is_running = 1;
355
        pthread_create(&archipelago_th[i].request_th, &attr,
356
                (void *) xseg_request_handler,
357
                (void *)&archipelago_th[i]);
358

    
359
    }
360
    return 0;
361

    
362
err_exit:
363
    tdarchipelago_close(driver);
364
    return retval;
365
}
366

    
367
static int tdarchipelago_close(td_driver_t *driver)
368
{
369
    struct tdarchipelago_data *prv = driver->data;
370
    int i;
371

    
372
    for(i=0; i<NUM_XSEG_THREADS; i++) {
373
        if(archipelago_th[i].is_running) {
374
            archipelago_th[i].is_running = 0;
375
            pthread_mutex_lock(&archipelago_th[i].request_mutex);
376
            if(!archipelago_th[i].is_signaled)
377
                pthread_cond_wait(&archipelago_th[i].request_cond, &archipelago_th[i].request_mutex);
378
            pthread_mutex_unlock(&archipelago_th[i].request_mutex);
379
            pthread_cond_destroy(&archipelago_th[i].request_cond);
380
            pthread_mutex_destroy(&archipelago_th[i].request_mutex);
381
        }
382
    }
383
    xseg_leave_dynport(xseg, port);
384
    xseg_leave(xseg);
385

    
386
    if(prv->pipe_fds[0] >= 0) {
387
        close(prv->pipe_fds[0]);
388
        close(prv->pipe_fds[1]);
389
    }
390

    
391
    if(prv->pipe_event_id >= 0)
392
        tapdisk_server_unregister_event(prv->pipe_event_id);
393

    
394
    return 0;
395
}
396

    
397
static void tdarchipelago_finish_aiocb(void *arg, ssize_t c, AIORequestData *reqdata)
398
{
399
    struct tdarchipelago_request *req = arg;
400
    struct tdarchipelago_data *prv = req->treq[0].image->driver->data;
401
    int rv;
402

    
403
    req->result = c;
404

    
405
    while(1) {
406
        rv = write(prv->pipe_fds[1], (void *)&req, sizeof(req));
407
        if(rv >= 0)
408
            break;
409
        if((errno != EAGAIN) && (errno != EINTR))
410
            break;
411
    }
412
    free(reqdata);
413
    if(rv <= 0)
414
        DPRINTF("tdarchipelago_finish_aiocb(): Failed to write to completion pipe\n");
415
}
416

    
417
static void tdarchipelago_pipe_read_cb(event_id_t eb, char mode, void *data)
418
{
419
    struct tdarchipelago_data *prv = data;
420
    struct tdarchipelago_request *req;
421
    char *p = (void *)&req;
422
    int retval, tr, i;
423

    
424
    for(tr=0; tr<sizeof(req);) {
425
        retval = read(prv->pipe_fds[0], p + tr, sizeof(req) - tr);
426
        if(retval == 0) {
427
            DPRINTF("tdarchipelago_pipe_read_cb(): Short read on completion pipe\n");
428
            break;
429
        }
430
        if(retval < 0) {
431
            if( (errno == EAGAIN) || (errno == EINTR))
432
                continue;
433
            break;
434
        }
435
        tr += retval;
436
    }
437

    
438
    if(tr != sizeof(req)) {
439
        DPRINTF("tdarchipelago_pipe_read_cb(): Read aborted on completion pipe\n");
440
        return;
441
    }
442

    
443
    for(i=0; i < req->treq_count; i++)
444
    {
445
        int err = req->result < 0 ? -EIO : 0;
446
        if(err < 0)
447
            DPRINTF("tdarchipelago_pipe_read_cb(): Error in req->result: %d\n", err);
448
        td_complete_request(req->treq[i], err);
449
    }
450

    
451
    list_move(&req->queue, &prv->reqs_free);
452
    prv->reqs_free_count++;
453
}
454

    
455
static int archipelago_aio_read(char *volname, off_t offset, ssize_t size, char *buf,
456
        struct tdarchipelago_request *tdreq)
457
{
458
    AIORequestData *reqdata = malloc(sizeof(AIORequestData));
459
    int retval;
460
    int targetlen = strlen(volname);
461
    struct xseg_request *req = xseg_get_request(xseg, srcport, vportno, X_ALLOC);
462
    if(!req) {
463
        DPRINTF("archipelago_aio_read(): Cannot get xseg request.\n");
464
        retval = -1;
465
        goto err_exit;
466
    }
467

    
468
    retval = xseg_prep_request(xseg, req, targetlen, size);
469
    if(retval < 0) {
470
        DPRINTF("archipelago_aio_read(): Cannot prepare xseg request.\n");
471
        retval = -1;
472
        goto err_exit;
473
    }
474
    char *target = xseg_get_target(xseg, req);
475
    if(!target) {
476
        DPRINTF("archipelago_aio_read(): Cannot get xseg target.\n");
477
        retval = -1;
478
        goto err_exit;
479
    }
480
    strncpy(target, volname, targetlen);
481
    req->size = size;
482
    req->offset = offset;
483
    req->op = X_READ;
484
    req->flags |= XF_FLUSH;
485

    
486
    reqdata->volname = volname;
487
    reqdata->offset = offset;
488
    reqdata->size = size;
489
    reqdata->buf = buf;
490
    reqdata->op = TD_OP_READ;
491
    reqdata->tdreq = tdreq;
492

    
493
    xseg_set_req_data(xseg, req, reqdata);
494
    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
495
    if(p == NoPort) {
496
        DPRINTF("archipelago_aio_read(): Could not submit xseg request.\n");
497
        retval = -1;
498
        goto err_exit;
499
    }
500
    xseg_signal(xseg, p);
501
    return 0;
502
err_exit:
503
    DPRINTF("archipelago_aio_read(): Read error: %d\n", retval);
504
    xseg_put_request(xseg, req, srcport);
505
    return retval;
506
}
507

    
508
static int archipelago_aio_write(char *volname, off_t offset, ssize_t size, char *buf,
509
        struct tdarchipelago_request *tdreq)
510
{
511
    char *data = NULL;
512
    int retval;
513
    AIORequestData *reqdata = malloc(sizeof(AIORequestData));
514
    int targetlen = strlen(volname);
515

    
516
    struct xseg_request *req = xseg_get_request(xseg, srcport, vportno, X_ALLOC);
517
    if(!req) {
518
        DPRINTF("archipelago_aio_write(): Cannot get xseg request.\n");
519
        retval = -1;
520
        goto err_exit;
521
    }
522
    retval = xseg_prep_request(xseg, req, targetlen, size);
523
    if( retval < 0) {
524
        DPRINTF("archipelago_aio_write(): Cannot prepare xseg request.\n");
525
        retval = -1;
526
        goto err_exit;
527
    }
528
    char *target = xseg_get_target(xseg, req);
529
    if(!target) {
530
        DPRINTF("archipelago_aio_write(): Cannot get xseg target.\n");
531
        retval = -1;
532
        goto err_exit;
533
    }
534
    strncpy(target, volname, targetlen);
535
    req->size = size;
536
    req->offset = offset;
537
    req->op = X_WRITE;
538
    req->flags |= XF_FLUSH;
539

    
540
    reqdata->volname= volname;
541
    reqdata->offset = offset;
542
    reqdata->size = size;
543
    reqdata->buf = buf;
544
    reqdata->op = TD_OP_WRITE;
545
    reqdata->tdreq = tdreq;
546

    
547
    xseg_set_req_data(xseg, req, reqdata);
548
    data = xseg_get_data(xseg, req);
549
    if(!data) {
550
        DPRINTF("archipelago_aio_write(): Cannot get xseg data.\n");
551
        retval = -1;
552
        goto err_exit;
553
    }
554

    
555
    memcpy(data, buf, size);
556
    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
557
    if(p == NoPort) {
558
        DPRINTF("archipelago_aio_write(): Cannot submit xseg req.\n");
559
        retval = -1;
560
        goto err_exit;
561
    }
562
    xseg_signal(xseg, p);
563
    return 0;
564
err_exit:
565
    DPRINTF("archipelago_aio_write(): Write error: %d\n", retval);
566
    xseg_put_request(xseg, req, srcport);
567
    return retval;
568
}
569

    
570
static int tdarchipelago_submit_request(struct tdarchipelago_data *prv,
571
        struct tdarchipelago_request *req)
572
{
573
    int retval, i;
574
    prv->stat.req_issued++;
575
    list_add_tail(&req->queue, &prv->reqs_inflight);
576

    
577
    switch(req->op) {
578
        case TD_OP_READ:
579
            retval = archipelago_aio_read(prv->volname, req->offset, req->size, req->buf, req);
580
            break;
581
        case TD_OP_WRITE:
582
            retval = archipelago_aio_write(prv->volname, req->offset, req->size, req->buf, req);
583
            break;
584
        default:
585
            retval = - EINVAL;
586
    }
587

    
588
    if( retval < 0) {
589
        retval = -EIO;
590
        goto err;
591
    }
592

    
593
    return 0;
594

    
595
err:
596
    for(i=0; i < req->treq_count; i++)
597
        td_complete_request(req->treq[i], retval);
598
    return retval;
599
}
600

    
601
static void tdarchipelago_timeout_cb(event_id_t eb, char mode, void *data)
602
{
603
    struct tdarchipelago_data *prv = data;
604

    
605
    if(prv->req_deferred) {
606
        tdarchipelago_submit_request(prv, prv->req_deferred);
607
        prv->req_deferred = NULL;
608
        prv->stat.req_issued_timeout++;
609
    }
610

    
611
    tapdisk_server_unregister_event(eb);
612
    prv->timeout_event_id = -1;
613
}
614

    
615
static void tdarchipelago_queue_request(td_driver_t *driver, td_request_t treq)
616
{
617
    struct tdarchipelago_data *prv= driver->data;
618
    size_t size = treq.secs * driver->info.sector_size;
619
    uint64_t offset = treq.sec * (uint64_t)driver->info.sector_size;
620
    struct tdarchipelago_request *req;
621
    int merged = 0;
622

    
623
    /* Update stats */
624
    prv->stat.req_total++;
625

    
626
    if(prv->req_deferred) {
627
        struct tdarchipelago_request *dr = prv->req_deferred;
628

    
629
        if((dr->op == treq.op) &&
630
            ((dr->offset + dr->size) == offset) &&
631
                (((unsigned long)dr->buf + dr->size)==(unsigned long)treq.buf))
632
        {
633
                    dr->treq[dr->treq_count++] = treq;
634
                    dr->size += size;
635
                    merged = 1;
636
        } else {
637
                    prv->stat.req_miss++;
638
                    if(dr->op != treq.op)
639
                        prv->stat.req_miss_op++;
640
                    if((dr->offset + dr->size) != offset)
641
                        prv->stat.req_miss_ofs++;
642
                    if(((unsigned long)dr->buf + dr->size) != (unsigned long)treq.buf)
643
                        prv->stat.req_miss_buf++;
644
        }
645

    
646
        if(!merged || (size != (11 * 4096)) || //44k request
647
                (dr->size >= 1024 * 1024) ||
648
                (dr->treq_count == MAX_ARCHIPELAGO_MERGED_REQS))
649
        {
650
            tdarchipelago_submit_request(prv, dr);
651
            prv->req_deferred = NULL;
652

    
653
            if(!merged)
654
                prv->stat.req_issued_no_merge++;
655
            else
656
                prv->stat.req_issued_forced++;
657
        }
658
    }
659

    
660

    
661
    if(!merged) {
662
        if(prv->reqs_free_count == 0) {
663
            td_complete_request(treq, -EBUSY);
664
            goto no_free;
665
        }
666
        req = list_entry(prv->reqs_free.next, struct tdarchipelago_request, queue);
667

    
668
        list_del(&req->queue);
669
        prv->reqs_free_count--;
670

    
671
        /* Fill request */
672
        req->treq_count = 1;
673
        req->treq[0] = treq;
674

    
675
        req->op = treq.op;
676
        req->offset = offset;
677
        req->size = size;
678
        req->buf = treq.buf;
679

    
680
        if ((size == (11 * 4096)) && (size < 1024 * 1024)) {
681
            prv->req_deferred = req;
682
        } else {
683
            tdarchipelago_submit_request(prv, req);
684
            prv->stat.req_issued_direct++;
685
        }
686
    }
687
no_free:
688
    if(prv->req_deferred && (prv->timeout_event_id == -1)) {
689
        prv->timeout_event_id = tapdisk_server_register_event(
690
            SCHEDULER_POLL_TIMEOUT,
691
            -1,
692
            0,
693
            tdarchipelago_timeout_cb,
694
            prv
695
            );
696
    } else if(!prv->req_deferred && (prv->timeout_event_id != -1)) {
697
        tapdisk_server_unregister_event(prv->timeout_event_id);
698
        prv->timeout_event_id = -1;
699
    }
700
}
701

    
702
static int tdarchipelago_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
703
{
704
    return TD_NO_PARENT;
705
}
706

    
707
static int tdarchipelago_validate_parent(td_driver_t *driver, td_driver_t *parent,
708
        td_flag_t flags)
709
{
710
    return -EINVAL;
711
}
712

    
713

    
714
static void tdarchipelago_stats(td_driver_t *driver, td_stats_t *st)
715
{
716
    struct tdarchipelago_data *prv = driver->data;
717
    tapdisk_stats_field(st, "req_free_count", "d", prv->reqs_free_count);
718
    tapdisk_stats_field(st, "req_total", "d", prv->stat.req_total);
719
    tapdisk_stats_field(st, "req_issued", "d", prv->stat.req_issued);
720
    tapdisk_stats_field(st, "req_issued_no_merge", "d", prv->stat.req_issued_no_merge);
721
    tapdisk_stats_field(st, "req_issued_forced", "d", prv->stat.req_issued_forced);
722
    tapdisk_stats_field(st, "req_issued_direct", "d", prv->stat.req_issued_direct);
723
    tapdisk_stats_field(st, "req_issued_timeout", "d", prv->stat.req_issued_timeout);
724
    tapdisk_stats_field(st, "req_miss", "d", prv->stat.req_miss);
725
    tapdisk_stats_field(st, "req_miss_op", "d", prv->stat.req_miss_op);
726
    tapdisk_stats_field(st, "req_miss_ofs", "d", prv->stat.req_miss_ofs);
727
    tapdisk_stats_field(st, "req_miss_buf", "d", prv->stat.req_miss_buf);
728
    tapdisk_stats_field(st, "max_merge_size", "d", 1024 * 1024);
729
}
730

    
731
struct tap_disk tapdisk_archipelago = {
732
    .disk_type = "tapdisk_archipelago",
733
    .private_data_size = sizeof(struct tdarchipelago_data),
734
    .flags = 0,
735
    .td_open = tdarchipelago_open,
736
    .td_close= tdarchipelago_close,
737
    .td_queue_read = tdarchipelago_queue_request,
738
    .td_queue_write = tdarchipelago_queue_request,
739
    .td_get_parent_id = tdarchipelago_get_parent_id,
740
    .td_validate_parent = tdarchipelago_validate_parent,
741
    .td_debug = NULL,
742
    .td_stats = tdarchipelago_stats,
743
};