Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / commissioning / specificator.py @ d13a447b

History | View | Annotate | Download (21.8 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2012 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
from random import random, choice, randint
36
from math import log
37
from inspect import isclass
38
from .utils.argmap import (argmap_decode, argmap_check, argmap_dict_to_list,
39
                           argmap_list_to_dict)
40

    
41
try:
42
    from collections import OrderedDict
43
except ImportError:
44
    from .utils.ordereddict import OrderedDict
45

    
46
def shorts(s):
47
    if not isinstance(s, unicode):
48
        s = str(s)
49

    
50
    if len(s) <= 64:
51
        return s
52

    
53
    return s[:61] + '...'
54

    
55

    
56
class CanonifyException(Exception):
57
    pass
58

    
59
class SpecifyException(Exception):
60
    pass
61

    
62

    
63
class Canonical(object):
64

    
65
    random_choice = None
66

    
67
    def __init__(self, *args, **kw):
68
        self.args = []
69
        named_args = []
70
        for a in args:
71
            if isinstance(a, tuple) and len(a) == 2:
72
                named_args.append(a)
73
            else:
74
                self.args.append(a)
75
        ordered_dict = OrderedDict(named_args)
76

    
77
        self.name = kw.pop('classname', self.__class__.__name__)
78
        random_choice = kw.pop('random', None)
79
        if random_choice is not None:
80
            self.random_choice = random_choice
81
        opts = {}
82
        for k, v in kw.items():
83
            if not isinstance(v, Canonical):
84
                if isclass(v) and issubclass(v, Canonical):
85
                    m = ("argument '%s': value '%s' is a Canonical _class_. "
86
                         "Perhaps you meant to specify a Canonical _instance_"
87
                         % (k, v))
88
                    raise SpecifyException(m)
89
                opts[k] = v
90
                del kw[k]
91

    
92
        self.opts = opts
93
        ordered_dict.update(kw)
94
        self.kw = ordered_dict
95
        self.init()
96

    
97
        if 'default' in opts:
98
            item = opts['default']
99
            if item is None:
100
                opts['null'] = 1
101
            else:
102
                opts['default'] = self._check(item)
103

    
104
    def init(self):
105
        return
106

    
107
    def __call__(self, item):
108
        return self.check(item)
109

    
110
    def check(self, item):
111
        opts = self.opts
112
        if item is None and 'default' in opts:
113
            item = opts['default']
114

    
115
        can_be_null = opts.get('null', False)
116
        if item is None and can_be_null:
117
            return None
118

    
119
        if argmap_check(item):
120
            item = self._unpack(item)
121

    
122
        return self._check(item)
123

    
124
    def _check(self, item):
125
        return item
126

    
127
    def _unpack(self, item):
128
        argmap = argmap_list_to_dict(item)
129
        if len(argmap) == 2:
130
            return argmap[None]
131
        return argmap
132

    
133
    def create(self):
134
        return None
135

    
136
    def random(self, **kw):
137
        random_choice = self.random_choice
138
        if random_choice is None:
139
            return None
140

    
141
        if callable(random_choice):
142
            return random_choice(kw)
143

    
144
        if isinstance(random_choice, str):
145
            return getattr(self, random_choice)(kw)
146

    
147
        return choice(random_choice)
148

    
149
    def tostring(self, depth=0, showopts=0, multiline=0):
150
        depth += 1
151
        if not multiline:
152
            argdepth = ''
153
            owndepth = ''
154
            joinchar = ','
155
            padchar = ''
156
        else:
157
            argdepth = '    ' * depth
158
            owndepth = '    ' * (depth - 1)
159
            joinchar = ',\n'
160
            padchar = '\n'
161

    
162
        args = [a.tostring( depth=depth,
163
                            showopts=showopts,
164
                            multiline=multiline) for a in self.args]
165
        args += [("%s=%s" %
166
                    (k, v.tostring( depth=depth,
167
                                    showopts=showopts,
168
                                    multiline=multiline)))
169

    
170
                                    for k, v in self.kw.items()]
171
        if showopts:
172
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
173

    
174
        if len(args) == 0:
175
            string = "%s(%s)" % (self.name, ','.join(args))
176
        else:
177
            string = "%s(%s" % (self.name, padchar)
178
            for arg in args:
179
                string += argdepth + arg + joinchar
180
            string = string[:-1] + padchar
181
            string += owndepth + ")"
182

    
183
        return string
184

    
185
    __str__ = tostring
186

    
187
    def __repr__(self):
188
        return self.tostring(multiline=0, showopts=1)
189

    
190
    def show(self):
191
        showable = self.opts.get('show', True)
192
        return self._show() if showable else ''
193

    
194
    def _show(self):
195
        return self.name
196

    
197
class Null(Canonical):
198

    
199
    def _check(self, item):
200
        return None
201

    
202
Nothing = Null()
203

    
204

    
205
class Integer(Canonical):
206

    
207
    def _check(self, item):
208
        try:
209
            num = long(item)
210
        except ValueError, e:
211
            try:
212
                num = long(item, 16)
213
            except Exception:
214
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
215
                raise CanonifyException(m)
216
        except TypeError, e:
217
            m = "%s: cannot convert '%s' to long" % (self, shorts(item))
218
            raise CanonifyException(m)
219

    
220
        optget = self.opts.get
221
        minimum = optget('minimum', None)
222
        maximum = optget('maximum', None)
223

    
224
        if minimum is not None and num < minimum:
225
            m = "%s: %d < minimum=%d" % (self, num, minimum)
226
            raise CanonifyException(m)
227

    
228
        if maximum is not None and num > maximum:
229
            m = "%s: %d > maximum=%d" % (self, num, maximum)
230
            raise CanonifyException(m)
231

    
232
        return num
233

    
234
    def random_integer(self, kw):
235
        optget = self.opts.get
236
        kwget = kw.get
237
        minimum = kwget('minimum', optget('minimum', -4294967296L))
238
        maximum = kwget('maximum', optget('maximum', 4294967295L))
239
        r = random()
240
        if r < 0.1:
241
            return minimum
242
        if r < 0.2:
243
            return maximum
244
        if minimum <= 0 and maximum >= 0 and r < 0.3:
245
            return 0L
246
        return long(minimum + r * (maximum - minimum))
247

    
248
    random_choice = random_integer
249

    
250

    
251

    
252
Serial = Integer(
253
            classname   =   'Serial',
254
            null        =   True,
255
)
256

    
257

    
258
class Text(Canonical):
259

    
260
    re = None
261
    matcher = None
262
    choices = None
263

    
264
    def init(self):
265
        opts = self.opts
266
        if 'regex' in opts:
267
            pat = opts['regex']
268
            re = self.re
269
            if re is None:
270
                import re
271
                self.re = re
272

    
273
            self.matcher = re.compile(pat, re.UNICODE)
274
            self.pat = pat
275

    
276
        if 'choices' in opts:
277
            opts['choices'] = dict((unicode(x), unicode(x))
278
                                    for x in opts['choices'])
279

    
280
    def _check(self, item):
281
        if not isinstance(item, unicode):
282
            # require non-unicode items to be utf8
283
            item = str(item)
284
            try:
285
                item = item.decode('utf8')
286
            except UnicodeDecodeError, e:
287
                item = item.decode('latin1')
288
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
289
                raise CanonifyException(m)
290

    
291
        opts = self.opts
292
        if 'choices' in opts:
293
            choices = opts['choices']
294
            try:
295
                unknown = item not in choices
296
            except TypeError, e:
297
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
298
                raise CanonifyException(m, e)
299

    
300
            if unknown:
301
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
302
                raise CanonifyException(m)
303

    
304
            return choices[item]
305

    
306
        optget = opts.get
307
        itemlen = len(item)
308
        maxlen = optget('maxlen', None)
309
        if maxlen is not None and itemlen > maxlen:
310
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
311
            raise CanonifyException(m)
312

    
313
        minlen = optget('minlen', None)
314
        if minlen is not None and itemlen < minlen:
315
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
316
            raise CanonifyException(m)
317

    
318
        matcher = self.matcher
319
        if matcher is not None:
320
            match = matcher.match(item)
321
            if  (       match is None
322
                    or  (match.start(), match.end()) != (0, itemlen)    ):
323

    
324
                    m = ("%s: '%s' does not match '%s'"
325
                            % (self, shorts(item), self.pat))
326
                    raise CanonifyException(m)
327

    
328
        return item
329

    
330
    default_alphabet = '0123456789αβγδεζ'.decode('utf8')
331

    
332
    def random_string(self, kw):
333
        opts = self.opts
334
        if 'regex' in opts:
335
            m = 'Unfortunately, random for regex strings not supported'
336
            raise ValueError(m)
337

    
338
        optget = opts.get
339
        kwget = kw.get
340
        minlen = kwget('minlen', optget('minlen', 0))
341
        maxlen = kwget('maxlen', optget('maxlen', 32))
342
        alphabet = kwget('alphabet', self.default_alphabet)
343
        z = maxlen - minlen
344
        if z < 1:
345
            z = 1
346

    
347
        g = log(z, 2)
348
        r = random() * g
349
        z = minlen + int(2**r)
350

    
351
        s = u''
352
        for _ in xrange(z):
353
            s += choice(alphabet)
354

    
355
        return s
356

    
357
    random_choice = random_string
358

    
359

    
360
class Bytes(Canonical):
361

    
362
    re = None
363
    matcher = None
364
    choices = None
365

    
366
    def init(self):
367
        opts = self.opts
368
        if 'regex' in opts:
369
            pat = opts['regex']
370
            re = self.re
371
            if re is None:
372
                import re
373
                self.re = re
374

    
375
            self.matcher = re.compile(pat)
376
            self.pat = pat
377

    
378
        if 'choices' in opts:
379
            opts['choices'] = dict((str(x), str(x))
380
                                    for x in opts['choices'])
381

    
382
    def _check(self, item):
383
        if isinstance(item, unicode):
384
            # convert unicode to utf8
385
            item = item.encode('utf8')
386

    
387
        opts = self.opts
388
        if 'choices' in opts:
389
            choices = opts['choices']
390
            try:
391
                unknown = item not in choices
392
            except TypeError, e:
393
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
394
                raise CanonifyException(m, e)
395

    
396
            if unknown:
397
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
398
                raise CanonifyException(m)
399

    
400
            return choices[item]
401

    
402
        optget = opts.get
403
        itemlen = len(item)
404
        maxlen = optget('maxlen', None)
405
        if maxlen is not None and itemlen > maxlen:
406
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
407
            raise CanonifyException(m)
408

    
409
        minlen = optget('minlen', None)
410
        if minlen is not None and itemlen < minlen:
411
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
412
            raise CanonifyException(m)
413

    
414
        matcher = self.matcher
415
        if matcher is not None:
416
            match = matcher.match(item)
417
            if  (       match is None
418
                    or  (match.start(), match.end()) != (0, itemlen)    ):
419

    
420
                    m = ("%s: '%s' does not match '%s'"
421
                            % (self, shorts(item), self.pat))
422
                    raise CanonifyException(m)
423

    
424
        return item
425

    
426
    default_alphabet = '0123456789abcdef'
427

    
428
    def random_bytes(self, kw):
429
        opts = self.opts
430
        if 'regex' in opts:
431
            m = 'Unfortunately, random for regex strings not supported'
432
            raise ValueError(m)
433

    
434
        optget = opts.get
435
        kwget = kw.get
436
        minlen = kwget('minlen', optget('minlen', 0))
437
        maxlen = kwget('maxlen', optget('maxlen', 32))
438
        alphabet = kwget('alphabet', self.default_alphabet)
439
        z = maxlen - minlen
440
        if z < 1:
441
            z = 1
442

    
443
        g = log(z, 2)
444
        r = random() * g
445
        z = minlen + int(2**r)
446

    
447
        s = u''
448
        for _ in xrange(z):
449
            s += choice(alphabet)
450

    
451
        return s
452

    
453
    random_choice = random_bytes
454

    
455

    
456
class ListOf(Canonical):
457

    
458
    def init(self):
459
        args = self.args
460
        kw = self.kw
461

    
462
        if not (args or kw):
463
            raise SpecifyException("ListOf requires one or more arguments")
464

    
465
        if args and kw:
466
            m = ("ListOf requires either positional "
467
                 "or keyword arguments, but not both")
468
            raise SpecifyException(m)
469

    
470
        if args:
471
            if len(args) > 1:
472
                self.canonical = Tuple(*args)
473
            else:
474
                self.canonical = args[0]
475
        else:
476
            self.canonical = Args(**kw)
477

    
478
    def _check(self, item):
479
        if item is None:
480
            item = ()
481

    
482
        try:
483
            items = iter(item)
484
        except TypeError, e:
485
            m = "%s: %s is not iterable" % (self, shorts(item))
486
            raise CanonifyException(m)
487

    
488
        canonical = self.canonical
489
        canonified = []
490
        append = canonified.append
491

    
492
        for item in items:
493
            item = canonical(item)
494
            append(item)
495

    
496
        if not canonified and self.opts.get('nonempty', False):
497
            m = "%s: must be nonempty" % (self,)
498
            raise CanonifyException(m)
499

    
500
        return canonified
501

    
502
    def random_listof(self, kw):
503
        z = randint(1, 4)
504
        get_random = self.canonical.random
505

    
506
        return [get_random() for _ in xrange(z)]
507

    
508
    random_choice = random_listof
509

    
510
    def _show(self):
511
        return '[ ' + self.canonical.show() + ' ... ]'
512

    
513
class Args(Canonical):
514

    
515
    def _unpack(self, arglist):
516
        return arglist
517

    
518
    def _check(self, item):
519
        if argmap_check(item):
520
            if hasattr(item, 'keys') and callable(item.keys):
521
                arglist = argmap_dict_to_list(item)[:-1]
522
            else:
523
                arglist = item[:-1]
524
        else:
525
            try:
526
                arglist = OrderedDict(item).items()
527
            except (TypeError, ValueError), e:
528
                m = "%s: %s is not dict-able" % (self, shorts(item))
529
                raise CanonifyException(m)
530

    
531
        keys = self.kw.keys()
532
        kw = self.kw
533
        arglen = len(arglist)
534
        if arglen != len(keys):
535
            m = "inconsistent number of parameters: %s != %s" % (
536
                arglen, len(keys))
537
            raise CanonifyException(m)
538

    
539
        canonified = OrderedDict()
540
        position = 0
541

    
542
        for k, v in arglist:
543
            if k is not None:
544
                canonified[k] = kw[k].check(v)
545
            else:
546
                # find the right position
547
                for i in range(position, arglen):
548
                    key = keys[i]
549
                    if not key in canonified.keys():
550
                        position = i + 1
551
                        break
552
                else: # exhausted
553
                    raise Exception("shouldn't happen")
554
                canonified[key] = kw[key].check(v)
555

    
556
        return canonified
557

    
558
    def _show(self):
559
        strings = [x for x in [c.show() for n, c in self.kw.items()] if x]
560
        return ' '.join(strings)
561

    
562
    def random_args(self, kw):
563
        args = {}
564
        for n, c in self.kw.items():
565
            args[n] = c.random()
566
        return args
567

    
568
    random_choice = random_args
569

    
570

    
571
class Tuple(Canonical):
572

    
573
    def _check(self, item):
574
        try:
575
            items = list(item)
576
        except TypeError, e:
577
            m = "%s: %s is not iterable" % (self, shorts(item))
578
            raise CanonifyException(m)
579

    
580
        canonicals = self.args
581
        zi = len(items)
582
        zc = len(canonicals)
583

    
584
        if zi != zc:
585
            m = "%s: expecting %d elements, not %d (%s)" % (self, zc, zi, str(items))
586
            raise CanonifyException(m)
587

    
588
        g = (canonical(element) for canonical, element in zip(self.args, item))
589

    
590
        return tuple(g)
591

    
592
    def __add__(self, other):
593
        oargs = other.args if isinstance(other, Tuple) else (other,)
594
        args = self.args + oargs
595
        return self.__class__(*args)
596

    
597
    def random_tuple(self, kw):
598
        return tuple(c.random() for c in self.args)
599

    
600
    random_choice = random_tuple
601

    
602
    def _show(self):
603
        canonicals = self.args
604
        strings = [x for x in [c.show() for c in canonicals] if x]
605
        return '[ ' + ' '.join(strings) + ' ]'
606

    
607
class Dict(Canonical):
608

    
609
    def _check(self, item):
610

    
611
        try:
612
            item = dict(item)
613
        except TypeError:
614
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
615
            raise CanonifyException(m)
616

    
617
        canonified = {}
618
        canonical = self.kw
619

    
620
        for n, c in canonical.items():
621
            if n not in item:
622
                m = "%s: key '%s' not found" % (self, shorts(n))
623
                raise CanonifyException(m)
624
            canonified[n] = c(item[n])
625

    
626
        strict = self.opts.get('strict', True)
627
        if strict and len(item) != len(canonical):
628
            for k in sorted(item.keys()):
629
                if k not in canonical:
630
                    break
631

    
632
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
633
            raise CanonifyException(m)
634

    
635
        return canonified
636

    
637
    def random_dict(self, kw):
638
        item = {}
639
        for n, c in self.canonical.items():
640
            item[n] = c.random()
641

    
642
        return item
643

    
644
    random_choice = random_dict
645

    
646

    
647
class Canonifier(object):
648
    def __init__(self, name, input_canonicals, output_canonicals, doc_strings):
649
        self.name = name
650
        self.input_canonicals = dict(input_canonicals)
651
        self.output_canonicals = dict(output_canonicals)
652
        self.doc_strings = dict(doc_strings)
653

    
654
    def call_names(self):
655
        return self.input_canonicals.keys()
656

    
657
    def call_docs(self):
658
        for call_name, call_doc in self.doc_strings.iteritems():
659
            yield call_name, call_doc
660

    
661
    def get_doc(self, name):
662
        if name not in self.doc_strings:
663
            m = "%s: Invalid method name '%s'" % (self.name, name)
664
            raise CanonifyException(m)
665

    
666
        return self.doc_strings[name]
667

    
668
    def call_attrs(self):
669
        for call_name, canonical in self.input_canonicals.iteritems():
670
            yield call_name, canonical.tostring(showopts=1, multiline=1)
671

    
672
    def input_canonical(self, name):
673
        input_canonicals = self.input_canonicals
674
        if name not in input_canonicals:
675
            m = "%s: Invalid input call '%s'" % (self.name, name)
676
            raise CanonifyException(m)
677

    
678
        return input_canonicals[name]
679

    
680
    def canonify_input(self, name, the_input):
681
        return self.input_canonical(name)(the_input)
682

    
683
    def output_canonical(self, name):
684
        output_canonicals = self.output_canonicals
685
        if name not in output_canonicals:
686
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
687
            raise CanonifyException(m)
688

    
689
        return output_canonicals[name]
690

    
691
    def canonify_output(self, name, the_output):
692
        return self.output_canonical(name)(the_output)
693

    
694
    def show_input_canonical(self, name):
695
        return self.input_canonical(name).show()
696

    
697
    def parse(self, method, arglist):
698
        args, rest = argmap_decode(arglist)
699
        return self.input_canonical(method).check(args)
700

    
701

    
702
class Specificator(object):
703

    
704
    def __new__(cls):
705
        if cls is Specificator:
706
            m = "Specificator classes must be subclassed"
707
            raise SpecifyException(m)
708

    
709
        import inspect
710

    
711
        canonical_inputs = {}
712
        canonical_outputs = {}
713
        doc_strings = {}
714

    
715
        for name in dir(cls):
716
            f = getattr(cls, name)
717
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
718
                continue
719

    
720
            doc_strings[name] = f.__doc__
721
            argspec = inspect.getargspec(f)
722
            defaults = argspec.defaults
723
            args = argspec.args
724
            if args and args[0] == 'self':
725
                args = args[1:]
726

    
727
            if not defaults:
728
                defaults = ()
729

    
730
            arglen = len(args)
731
            deflen = len(defaults)
732

    
733
            if arglen != deflen:
734
                a = (f.__name__, args[:arglen-deflen])
735
                m = "Unspecified arguments in '%s': %s" % a
736
                raise SpecifyException(m)
737

    
738
            args = zip(args, defaults)
739
            for a, c in args:
740
                if not isinstance(c, Canonical):
741
                    m = ("argument '%s=%s' is not an instance of 'Canonical'"
742
                         % (a, repr(c)))
743
                    raise SpecifyException(m)
744

    
745
            canonical = Null() if len(args) == 0 else Args(*args)
746
            canonical_inputs[name] = canonical
747

    
748
            self = object.__new__(cls)
749
            canonical = f(self)
750
            if not isinstance(canonical, Canonical):
751
                m = ("method '%s' does not return a Canonical, but a(n) %s "
752
                                                    % (name, type(canonical)))
753
                raise SpecifyException(m)
754
            canonical_outputs[name] = canonical
755

    
756
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs,
757
                          doc_strings)
758

    
759
    def __call__(self):
760
        return self
761