Statistics
| Branch: | Tag: | Revision:

root / xseg / tools / archipelago / archipelago / common.py @ 6b11e79a

History | View | Annotate | Download (23.9 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2012 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34
#
35

    
36

    
37
from xseg.xseg_api import *
38
from xseg.xprotocol import *
39
from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \
40
    create_string_buffer, pointer, sizeof, POINTER, byref
41

    
42
cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
43

    
44
import os
45
import sys
46
import time
47
import psutil
48
import errno
49
from subprocess import check_call
50
from collections import namedtuple
51

    
52
#archipelago peer roles. Order matters!
53
roles = ['blockerb', 'blockerm', 'mapperd', 'vlmcd']
54
Peer = namedtuple('Peer', ['executable', 'opts', 'role'])
55

    
56
peers = dict()
57
xsegbd_args = []
58
modules = ['xseg', 'segdev', 'xseg_posix', 'xseg_pthread', 'xseg_segdev']
59
xsegbd = 'xsegbd'
60

    
61
DEFAULTS = '/etc/default/archipelago'
62

    
63
#system defaults
64
ARCHIP_PREFIX = 'archip_'
65
LOG_SUFFIX = '.log'
66
PID_SUFFIX = '.pid'
67
PIDFILE_PATH = "/var/run/archipelago"
68
VLMC_LOCK_FILE = 'vlmc.lock'
69
LOGS_PATH = "/var/log/archipelago"
70
LOCK_PATH = "/var/lock"
71
DEVICE_PREFIX = "/dev/xsegbd"
72
XSEGBD_SYSFS = "/sys/bus/xsegbd/"
73

    
74
CHARDEV_NAME = "/dev/segdev"
75
CHARDEV_MAJOR = 60
76
CHARDEV_MINOR = 0
77

    
78
REQS = 512
79

    
80
FILE_BLOCKER = 'archip-pfiled'
81
RADOS_BLOCKER = 'archip-sosd'
82
MAPPER = 'archip-mapperd'
83
VLMC = 'archip-vlmcd'
84
BLOCKER = ''
85

    
86
available_storage = {'files': FILE_BLOCKER, 'rados': RADOS_BLOCKER}
87

    
88

    
89
config = {
90
    'CEPH_CONF_FILE': '/etc/ceph/ceph.conf',
91
    'XSEGBD_START': 0,
92
    'XSEGBD_END': 499,
93
    'VPORT_START': 500,
94
    'VPORT_END': 999,
95
    'BPORT': 1000,
96
    'MPORT': 1001,
97
    'MBPORT': 1002,
98
    'VTOOL': 1003,
99
    #RESERVED 1023
100
    #default config
101
    'SPEC': "segdev:xsegbd:1024:5120:12",
102
    'NR_OPS_BLOCKERB': "",
103
    'NR_OPS_BLOCKERM': "",
104
    'NR_OPS_VLMC': "",
105
    'NR_OPS_MAPPER': "",
106
    #'VERBOSITY_BLOCKERB': "",
107
    #'VERBOSITY_BLOCKERM': "",
108
    #'VERBOSITY_MAPPER': "",
109
    #'VERBOSITY_VLMC': "",
110
    #mt-pfiled specific options,
111
    'FILED_IMAGES': "",
112
    'FILED_MAPS': "",
113
    'PITHOS': "",
114
    'PITHOSMAPS': "",
115
    #mt-sosd specific options,
116
    'RADOS_POOL_MAPS': "",
117
    'RADOS_POOL_BLOCKS': ""
118
}
119

    
120
FIRST_COLUMN_WIDTH = 23
121
SECOND_COLUMN_WIDTH = 23
122

    
123

    
124
def green(s):
125
    return '\x1b[32m' + str(s) + '\x1b[0m'
126

    
127

    
128
def red(s):
129
    return '\x1b[31m' + str(s) + '\x1b[0m'
130

    
131

    
132
def yellow(s):
133
    return '\x1b[33m' + str(s) + '\x1b[0m'
134

    
135

    
136
def pretty_print(cid, status):
137
    sys.stdout.write(cid.ljust(FIRST_COLUMN_WIDTH))
138
    sys.stdout.write(status.ljust(SECOND_COLUMN_WIDTH))
139
    sys.stdout.write('\n')
140
    return
141

    
142

    
143
class Error(Exception):
144
    def __init__(self, msg):
145
        self.msg = msg
146

    
147
    def __str__(self):
148
        return self.msg
149

    
150

    
151
def check_conf():
152
    def isExec(file_path):
153
        return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
154

    
155
    def validExec(program):
156
        for path in os.environ["PATH"].split(os.pathsep):
157
            exe_file = os.path.join(path, program)
158
            if isExec(exe_file):
159
                return True
160
        return False
161

    
162
    def validPort(port, limit, name):
163
        try:
164
            if int(port) >= limit:
165
                print red(str(port) + " >= " + limit)
166
                return False
167
        except:
168
            print red("Invalid port "+name+" : " + str(port))
169
            return False
170

    
171
        return True
172

    
173
    if not LOGS_PATH:
174
        print red("LOGS_PATH is not set")
175
        return False
176
    if not PIDFILE_PATH:
177
        print red("PIDFILE_PATH is not set")
178
        return False
179

    
180
    try:
181
        if not os.path.isdir(str(LOGS_PATH)):
182
            print red("LOGS_PATH "+str(LOGS_PATH)+" does not exist")
183
            return False
184
    except:
185
        print red("LOGS_PATH doesn't exist or is not a directory")
186
        return False
187

    
188
    try:
189
        os.makedirs(str(PIDFILE_PATH))
190
    except OSError as e:
191
        if e.errno == errno.EEXIST:
192
            if os.path.isdir(str(PIDFILE_PATH)):
193
                pass
194
            else:
195
                print red(str(PIDFILE_PATH) + " is not a directory")
196
                return False
197
        else:
198
            print red("Cannot create " + str(PIDFILE_PATH))
199
            return False
200
    except:
201
        print red("PIDFILE_PATH is not set")
202
        return False
203

    
204
    splitted_spec = str(config['SPEC']).split(':')
205
    if len(splitted_spec) < 5:
206
        print red("Invalid spec")
207
        return False
208

    
209
    xseg_type = splitted_spec[0]
210
    xseg_name = splitted_spec[1]
211
    xseg_ports = int(splitted_spec[2])
212
    xseg_heapsize = int(splitted_spec[3])
213
    xseg_align = int(splitted_spec[4])
214

    
215
    if xseg_type != "segdev":
216
        print red("Segment type not segdev")
217
        return False
218
    if xseg_name != "xsegbd":
219
        print red("Segment name not equal xsegbd")
220
        return False
221
    if xseg_align != 12:
222
        print red("Wrong alignemt")
223
        return False
224

    
225
    for v in [config['VERBOSITY_BLOCKERB'],
226
              config['VERBOSITY_BLOCKERM'],
227
              config['VERBOSITY_MAPPER'],
228
              config['VERBOSITY_VLMC']
229
              ]:
230
        if v is None:
231
            print red("Verbosity missing")
232
        try:
233
            if (int(v) > 3 or int(v) < 0):
234
                print red("Invalid verbosity " + str(v))
235
                return False
236
        except:
237
            print red("Invalid verbosity " + str(v))
238
            return False
239

    
240
    for n in [config['NR_OPS_BLOCKERB'],
241
              config['NR_OPS_BLOCKERM'],
242
              config['NR_OPS_VLMC'],
243
              config['NR_OPS_MAPPER']
244
              ]:
245
        if n is None:
246
            print red("Nr ops missing")
247
        try:
248
            if (int(n) <= 0):
249
                print red("Invalid nr_ops " + str(n))
250
                return False
251
        except:
252
            print red("Invalid nr_ops " + str(n))
253
            return False
254

    
255
    if not validPort(config['VTOOL'], xseg_ports, "VTOOL"):
256
        return False
257
    if not validPort(config['MPORT'], xseg_ports, "MPORT"):
258
        return False
259
    if not validPort(config['BPORT'], xseg_ports, "BPORT"):
260
        return False
261
    if not validPort(config['MBPORT'], xseg_ports, "MBPORT"):
262
        return False
263
    if not validPort(config['VPORT_START'], xseg_ports, "VPORT_START"):
264
        return False
265
    if not validPort(config['VPORT_END'], xseg_ports, "VPORT_END"):
266
        return False
267
    if not validPort(config['XSEGBD_START'], xseg_ports, "XSEGBD_START"):
268
        return False
269
    if not validPort(config['XSEGBD_END'], xseg_ports, "XSEGBD_END"):
270
        return False
271

    
272
    if not config['XSEGBD_START'] < config['XSEGBD_END']:
273
        print red("XSEGBD_START should be less than XSEGBD_END")
274
        return False
275
    if not config['VPORT_START'] < config['VPORT_END']:
276
        print red("VPORT_START should be less than VPORT_END")
277
        return False
278
#TODO check than no other port is set in the above ranges
279

    
280
    global BLOCKER
281
    try:
282
        BLOCKER = available_storage[str(config['STORAGE'])]
283
    except:
284
        print red("Invalid storage " + str(config['STORAGE']))
285
        print "Available storage: \"" + ', "'.join(available_storage) + "\""
286
        return False
287

    
288
    if config['STORAGE'] == "files":
289
        if config['FILED_IMAGES'] and not \
290
                os.path.isdir(str(config['FILED_IMAGES'])):
291
            print red("FILED_IMAGES invalid")
292
            return False
293
        if config['FILED_MAPS'] and not \
294
                os.path.isdir(str(config['FILED_MAPS'])):
295
            print red("FILED_PATH invalid")
296
            return False
297
        if config['PITHOS'] and not os.path.isdir(str(config['PITHOS'])):
298
            print red("PITHOS invalid ")
299
            return False
300
        if config['PITHOSMAPS'] and not \
301
                os.path.isdir(str(config['PITHOSMAPS'])):
302
            print red("PITHOSMAPS invalid")
303
            return False
304
    elif config['STORAGE'] == "RADOS":
305
        #TODO use rados.py to check for pool existance
306
        pass
307

    
308
    for p in [BLOCKER, MAPPER, VLMC]:
309
        if not validExec(p):
310
            print red(p + "is not a valid executable")
311
            return False
312

    
313
    return True
314

    
315

    
316
def construct_peers():
317
    #these must be in sync with roles
318
    executables = dict()
319
    config_opts = dict()
320
    executables['blockerb'] = BLOCKER
321
    executables['blockerm'] = BLOCKER
322
    executables['mapperd'] = MAPPER
323
    executables['vlmcd'] = VLMC
324

    
325
    if BLOCKER == "pfiled":
326
        config_opts['blockerb'] = [
327
            "-p", str(config['BPORT']), "-g",
328
            str(config['SPEC']).encode(), "-n",
329
            str(config['NR_OPS_BLOCKERB']),
330
            str(config['PITHOS']), str(config['FILED_IMAGES']), "-d",
331
            "-f", os.path.join(PIDFILE_PATH, "blockerb" + PID_SUFFIX)
332
        ]
333
        config_opts['blockerm'] = [
334
            "-p", str(config['MBPORT']), "-g",
335
            str(config['SPEC']).encode(), "-n",
336
            str(config['NR_OPS_BLOCKERM']),
337
            str(config['PITHOSMAPS']), str(config['FILED_MAPS']), "-d",
338
            "-f", os.path.join(PIDFILE_PATH, "blockerm" + PID_SUFFIX)
339
        ]
340
    elif BLOCKER == "archip-sosd":
341
        config_opts['blockerb'] = [
342
            "-p", str(config['BPORT']), "-g",
343
            str(config['SPEC']).encode(), "-n",
344
            str(config['NR_OPS_BLOCKERB']),
345
            "--pool", str(config['RADOS_POOL_BLOCKS']), "-v",
346
            str(config['VERBOSITY_BLOCKERB']),
347
            "-d",
348
            "--pidfile", os.path.join(PIDFILE_PATH, "blockerb" + PID_SUFFIX),
349
            "-l", os.path.join(str(LOGS_PATH), "blockerb" + LOG_SUFFIX),
350
            "-t", "3"
351
        ]
352
        config_opts['blockerm'] = [
353
            "-p", str(config['MBPORT']), "-g",
354
            str(config['SPEC']).encode(), "-n",
355
            str(config['NR_OPS_BLOCKERM']),
356
            "--pool", str(config['RADOS_POOL_MAPS']), "-v",
357
            str(config['VERBOSITY_BLOCKERM']),
358
            "-d",
359
            "--pidfile", os.path.join(PIDFILE_PATH, "blockerm" + PID_SUFFIX),
360
            "-l", os.path.join(str(LOGS_PATH), "blockerm" + LOG_SUFFIX),
361
            "-t", "3"
362
        ]
363
    elif BLOCKER == "archip-pfiled":
364
        config_opts['blockerb'] = [
365
            "-p", str(config['BPORT']), "-g",
366
            str(config['SPEC']).encode(), "-n",
367
            str(config['NR_OPS_BLOCKERB']),
368
            "--pithos", str(config['PITHOS']), "--archip",
369
            str(config['FILED_IMAGES']),
370
            "-v", str(config['VERBOSITY_BLOCKERB']),
371
            "-d",
372
            "--pidfile", os.path.join(PIDFILE_PATH, "blockerb" + PID_SUFFIX),
373
            "-l", os.path.join(str(LOGS_PATH), "blockerb" + LOG_SUFFIX),
374
            "-t", str(config['NR_OPS_BLOCKERB']), "--prefix", ARCHIP_PREFIX
375
        ]
376
        config_opts['blockerm'] = [
377
            "-p", str(config['MBPORT']), "-g",
378
            str(config['SPEC']).encode(), "-n",
379
            str(config['NR_OPS_BLOCKERM']),
380
            "--pithos", str(config['PITHOSMAPS']), "--archip",
381
            str(config['FILED_MAPS']),
382
            "-v", str(config['VERBOSITY_BLOCKERM']),
383
            "-d",
384
            "--pidfile", os.path.join(PIDFILE_PATH, "blockerm" + PID_SUFFIX),
385
            "-l", os.path.join(str(LOGS_PATH), "blockerm" + LOG_SUFFIX),
386
            "-t", str(config['NR_OPS_BLOCKERM']), "--prefix", ARCHIP_PREFIX
387
        ]
388
    else:
389
            sys.exit(-1)
390

    
391
    config_opts['mapperd'] = [
392
        "-t", "1", "-p",  str(config['MPORT']), "-mbp",
393
        str(config['MBPORT']),
394
        "-g", str(config['SPEC']).encode(), "-n",
395
        str(config['NR_OPS_MAPPER']), "-bp", str(config['BPORT']),
396
        "--pidfile", os.path.join(PIDFILE_PATH, "mapperd" + PID_SUFFIX),
397
        "-v", str(config['VERBOSITY_MAPPER']), "-d",
398
        "-l", os.path.join(str(LOGS_PATH), "mapperd" + LOG_SUFFIX)
399
    ]
400
    config_opts['vlmcd'] = [
401
        "-t", "1", "-sp",  str(config['VPORT_START']), "-ep",
402
        str(config['VPORT_END']),
403
        "-g", str(config['SPEC']).encode(), "-n",
404
        str(config['NR_OPS_VLMC']), "-bp", str(config['BPORT']),
405
        "-mp", str(config['MPORT']), "-d", "-v",
406
        str(config['VERBOSITY_VLMC']),
407
        "--pidfile", os.path.join(PIDFILE_PATH, "vlmcd" + PID_SUFFIX),
408
        "-l", os.path.join(str(LOGS_PATH), "vlmcd" + LOG_SUFFIX)
409
    ]
410

    
411
    for r in roles:
412
        peers[r] = Peer(executable=executables[r], opts=config_opts[r],
413
                        role=r)
414

    
415
    return peers
416

    
417

    
418
def exclusive(fn):
419
    def exclusive_args(**kwargs):
420
        if not os.path.exists(LOCK_PATH):
421
            try:
422
                os.mkdir(LOCK_PATH)
423
            except OSError, (err, reason):
424
                print >> sys.stderr, reason
425
        if not os.path.isdir(LOCK_PATH):
426
            sys.stderr.write("Locking error: ")
427
            print >> sys.stderr, LOCK_PATH + " is not a directory"
428
            return -1
429
        lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE)
430
        while True:
431
            try:
432
                fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
433
                break
434
            except OSError, (err, reason):
435
                print >> sys.stderr, reason
436
                if err == errno.EEXIST:
437
                    time.sleep(0.2)
438
                else:
439
                    raise OSError(err, lock_file + ' ' + reason)
440
        try:
441
            r = fn(**kwargs)
442
        finally:
443
            os.close(fd)
444
            os.unlink(lock_file)
445
        return r
446

    
447
    return exclusive_args
448

    
449

    
450
def loadrc(rc):
451
    try:
452
        if rc is None:
453
            execfile(os.path.expanduser(DEFAULTS), config)
454
        else:
455
            execfile(rc, config)
456
    except:
457
        raise Error("Cannot read config file")
458

    
459
    if not check_conf():
460
        raise Error("Invalid conf file")
461

    
462

    
463
def loaded_modules():
464
    lines = open("/proc/modules").read().split("\n")
465
    modules = [f.split(" ")[0] for f in lines]
466
    return modules
467

    
468

    
469
def loaded_module(name):
470
    return name in loaded_modules()
471

    
472

    
473
def load_module(name, args):
474
    s = "Loading %s " % name
475
    sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
476
    modules = loaded_modules()
477
    if name in modules:
478
        sys.stdout.write(yellow("Already loaded".ljust(SECOND_COLUMN_WIDTH)))
479
        sys.stdout.write("\n")
480
        return
481
    cmd = ["modprobe", "%s" % name]
482
    if args:
483
        for arg in args:
484
            cmd.extend(["%s=%s" % (arg)])
485
    try:
486
        check_call(cmd, shell=False)
487
    except Exception:
488
        sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
489
        sys.stdout.write("\n")
490
        raise Error("Cannot load module %s. Check system logs" % name)
491
    sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
492
    sys.stdout.write("\n")
493

    
494

    
495
def unload_module(name):
496
    s = "Unloading %s " % name
497
    sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
498
    modules = loaded_modules()
499
    if name not in modules:
500
        sys.stdout.write(yellow("Not loaded".ljust(SECOND_COLUMN_WIDTH)))
501
        sys.stdout.write("\n")
502
        return
503
    cmd = ["modprobe -r %s" % name]
504
    try:
505
        check_call(cmd, shell=True)
506
    except Exception:
507
        sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
508
        sys.stdout.write("\n")
509
        raise Error("Cannot unload module %s. Check system logs" % name)
510
    sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
511
    sys.stdout.write("\n")
512

    
513
xseg_initialized = False
514

    
515

    
516
def initialize_xseg():
517
    global xseg_initialized
518
    if not xseg_initialized:
519
        xseg_initialize()
520
        xseg_initialized = True
521

    
522

    
523
def create_segment():
524
    #fixme blocking....
525
    initialize_xseg()
526
    xconf = xseg_config()
527
    xseg_parse_spec(str(config['SPEC']), xconf)
528
    r = xseg_create(xconf)
529
    if r < 0:
530
        raise Error("Cannot create segment")
531

    
532

    
533
def destroy_segment():
534
    #fixme blocking....
535
    try:
536
        initialize_xseg()
537
        xconf = xseg_config()
538
        xseg_parse_spec(str(config['SPEC']), xconf)
539
        xseg = xseg_join(xconf.type, xconf.name, "posix",
540
                         cast(0, cb_null_ptrtype))
541
        if not xseg:
542
            raise Error("Cannot join segment")
543
        xseg_leave(xseg)
544
        xseg_destroy(xseg)
545
    except Exception:
546
        raise Error("Cannot destroy segment")
547

    
548

    
549
def check_running(name, pid=None):
550
    for p in psutil.process_iter():
551
        if p.name[0:len(name)] == name:
552
            if pid:
553
                if pid == p.pid:
554
                    return pid
555
            else:
556
                return pid
557
    return None
558

    
559

    
560
def check_pidfile(name):
561
    pidfile = os.path.join(PIDFILE_PATH, name + PID_SUFFIX)
562
    pf = None
563
    try:
564
        pf = open(pidfile, "r")
565
        pid = int(pf.read())
566
        pf.close()
567
    except:
568
        if pf:
569
            pf.close()
570
        return -1
571

    
572
    return pid
573

    
574

    
575
class Xseg_ctx(object):
576
    ctx = None
577
    port = None
578
    portno = None
579

    
580
    def __init__(self, spec, portno):
581
        initialize_xseg()
582
        xconf = xseg_config()
583
        xseg_parse_spec(create_string_buffer(spec), xconf)
584
        ctx = xseg_join(xconf.type, xconf.name, "posix",
585
                        cast(0, cb_null_ptrtype))
586
        if not ctx:
587
            raise Error("Cannot join segment")
588
        port = xseg_bind_port(ctx, portno, c_void_p(0))
589
        if not port:
590
            raise Error("Cannot bind to port")
591
        xseg_init_local_signal(ctx, portno)
592
        self.ctx = ctx
593
        self.port = port
594
        self.portno = portno
595

    
596
    def __del__(self):
597
        return
598

    
599
    def __enter__(self):
600
        if not self.ctx:
601
            raise Error("No segment")
602
        return self
603

    
604
    def __exit__(self, type_, value, traceback):
605
        self.shutdown()
606
        return False
607

    
608
    def shutdown(self):
609
        if self.ctx:
610
            xseg_quit_local_signal(self.ctx, self.portno)
611
            xseg_leave(self.ctx)
612
        self.ctx = None
613

    
614

    
615
class Request(object):
616
    xseg_ctx = None
617
    req = None
618

    
619
    def __init__(self, xseg_ctx, dst_portno, targetlen, datalen):
620
        ctx = xseg_ctx.ctx
621
        if not ctx:
622
            raise Error("No context")
623
        req = xseg_get_request(ctx, xseg_ctx.portno, dst_portno, X_ALLOC)
624
        if not req:
625
            raise Error("Cannot get request")
626
        r = xseg_prep_request(ctx, req, targetlen, datalen)
627
        if r < 0:
628
            xseg_put_request(ctx, req, xseg_ctx.portno)
629
            raise Error("Cannot prepare request")
630
#        print hex(addressof(req.contents))
631
        self.req = req
632
        self.xseg_ctx = xseg_ctx
633
        return
634

    
635
    def __del__(self):
636
        if self.req:
637
            if xq_count(byref(self.req.contents.path)) == 0:
638
                xseg_put_request(self.xseg_ctx.ctx, self.req,
639
                                 self.xseg_ctx.portno)
640
        self.req = None
641
        return False
642

    
643
    def __enter__(self):
644
        if not self.req:
645
            raise Error("xseg request not set")
646
        return self
647

    
648
    def __exit__(self, type_, value, traceback):
649
        if self.req:
650
            if xq_count(byref(self.req.contents.path)) == 0:
651
                xseg_put_request(self.xseg_ctx.ctx, self.req,
652
                                 self.xseg_ctx.portno)
653
        self.req = None
654
        return False
655

    
656
    def set_op(self, op):
657
        self.req.contents.op = op
658

    
659
    def get_op(self):
660
        return self.req.contents.op
661

    
662
    def set_offset(self, offset):
663
        self.req.contents.offset = offset
664

    
665
    def get_offset(self):
666
        return self.req.contents.offset
667

    
668
    def get_size(self):
669
        return self.req.contents.size
670

    
671
    def set_size(self, size):
672
        self.req.contents.size = size
673

    
674
    def set_flags(self, flags):
675
        self.req.contents.flags = flags
676

    
677
    def get_flags(self):
678
        return self.req.contents.flags
679

    
680
    def set_target(self, target):
681
        """Sets the target of the request, respecting request's targetlen"""
682
        if len(target) != self.req.contents.targetlen:
683
            return False
684
        c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
685
        p_target = create_string_buffer(target)
686
#        print hex(addressof(c_target.contents))
687
        memmove(c_target, p_target, len(target))
688
        return True
689

    
690
    def get_target(self):
691
        """Return a string to the target of the request"""
692
        c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
693
#        print "target_addr " + str(addressof(c_target.contents))
694
        return string_at(c_target, self.req.contents.targetlen)
695

    
696
    def set_data(self, data):
697
        """Sets requests data. Data should be a xseg protocol structure"""
698
        if sizeof(data) != self.req.contents.datalen:
699
            return False
700
        c_data = xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req)
701
        p_data = pointer(data)
702
        memmove(c_data, p_data, self.req.contents.datalen)
703

    
704
        return True
705

    
706
    def get_data(self, _type):
707
        """return a pointer to the data buffer of the request, casted to the
708
        selected type"""
709
#        print "data addr " + str(addressof(xseg_get_data_nonstatic(\
710
#            self.xseg_ctx.ctx, self.req).contents))
711
#        ret = cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
712
#                   _type)
713
#        print addressof(ret.contents)
714
#        return ret
715
        if _type:
716
            return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
717
                        POINTER(_type))
718
        else:
719
            return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
720
                        c_void_p)
721

    
722
    def submit(self):
723
        """Submit the associated xseg_request"""
724
        p = xseg_submit(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno,
725
                        X_ALLOC)
726
        if p == NoPort:
727
            raise Exception
728
        xseg_signal(self.xseg_ctx.ctx, p)
729

    
730
    def wait(self):
731
        """Wait until the associated xseg_request is responded, discarding any
732
        other requests that may be received in the meantime"""
733
        while True:
734
            received = xseg_receive(self.xseg_ctx.ctx, self.xseg_ctx.portno, 0)
735
            if received:
736
#                print addressof(cast(self.req, c_void_p))
737
#                print addressof(cast(received, c_void_p))
738
#                print addressof(self.req.contents)
739
#                print addressof(received.contents)
740
                if addressof(received.contents) == \
741
                        addressof(self.req.contents):
742
#                if addressof(cast(received, c_void_p)) == \
743
#                        addressof(cast(self.req, c_void_p)):
744
                    break
745
                else:
746
                    p = xseg_respond(self.xseg_ctx.ctx, received,
747
                                     self.xseg_ctx.portno, X_ALLOC)
748
                    if p == NoPort:
749
                        xseg_put_request(self.xseg_ctx.ctx, received,
750
                                         self.xseg_ctx.portno)
751
                    else:
752
                        xseg_signal(self.xseg_ctx.ctx, p)
753
            else:
754
                xseg_prepare_wait(self.xseg_ctx.ctx, self.xseg_ctx.portno)
755
                xseg_wait_signal(self.xseg_ctx.ctx, 10000000)
756
                xseg_cancel_wait(self.xseg_ctx.ctx, self.xseg_ctx.portno)
757
        return True
758

    
759
    def success(self):
760
        return bool((self.req.contents.state & XS_SERVED) and not
761
                   (self.req.contents.state & XS_FAILED))