Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / commissioning / specificator.py @ f54beeea

History | View | Annotate | Download (22 kB)

1
# -*- coding: utf-8 -*-
2
from random import random, choice, randint
3
from math import log
4
from inspect import isclass
5
from .utils.betteron import betteron_decode
6

    
7
try:
8
    from collections import OrderedDict
9
except ImportError:
10
    from .utils.ordereddict import OrderedDict
11

    
12
def shorts(s):
13
    if not isinstance(s, unicode):
14
        s = str(s)
15

    
16
    if len(s) <= 64:
17
        return s
18

    
19
    return s[:61] + '...'
20
        
21

    
22
class CanonifyException(Exception):
23
    pass
24

    
25
class SpecifyException(Exception):
26
    pass
27

    
28

    
29
class Canonical(object):
30

    
31
    random_choice = None
32

    
33
    def __init__(self, *args, **kw):
34
        self.args = []
35
        named_args = []
36
        for a in args:
37
            if isinstance(a, tuple) and len(a) == 2:
38
                named_args.append(a)
39
            else:
40
                self.args.append(a)
41
        ordered_dict = OrderedDict(named_args)
42

    
43
        self.name = kw.pop('classname', self.__class__.__name__)
44
        random_choice = kw.pop('random', None)
45
        if random_choice is not None:
46
            self.random_choice = random_choice
47
        opts = {}
48
        for k, v in kw.items():
49
            if not isinstance(v, Canonical):
50
                if isclass(v) and issubclass(v, Canonical):
51
                    m = ("argument '%s': value '%s' is a Canonical _class_. "
52
                         "Perhaps you meant to specify a Canonical _instance_"
53
                         % (k, v))
54
                    raise SpecifyException(m)
55
                opts[k] = v
56
                del kw[k]
57

    
58
        self.opts = opts
59
        ordered_dict.update(kw)
60
        self.kw = ordered_dict
61
        self.init()
62

    
63
        if 'default' in opts:
64
            item = opts['default']
65
            if item is None:
66
                opts['null'] = 1
67
            else:
68
                opts['default'] = self._check(item)
69

    
70
    def init(self):
71
        return
72

    
73
    def __call__(self, item):
74
        return self.check(item)
75

    
76
    def check(self, item):
77
        opts = self.opts
78
        if item is None and 'default' in opts:
79
            item = opts['default']
80

    
81
        can_be_null = opts.get('null', False)
82
        if item is None and can_be_null:
83
            return None
84

    
85
        return self._check(item)
86

    
87
    def _check(self, item):
88
        return item
89

    
90
    def parse(self, item):
91
        opts = self.opts
92
        if item is None and 'default' in opts:
93
            item = opts['default']
94

    
95
        can_be_null = opts.get('null', False)
96
        if item is None and can_be_null:
97
            return None
98

    
99
        return self._parse(item)
100

    
101
    def _parse(self, item):
102
        raise NotImplementedError
103
    
104
    def create(self):
105
        return None
106

    
107
    def random(self, **kw):
108
        random_choice = self.random_choice
109
        if random_choice is None:
110
            return None
111

    
112
        if callable(random_choice):
113
            return random_choice(kw)
114

    
115
        if isinstance(random_choice, str):
116
            return getattr(self, random_choice)(kw)
117

    
118
        return choice(random_choice)
119

    
120
    def tostring(self, depth=0, showopts=0, multiline=0):
121
        depth += 1
122
        if not multiline:
123
            argdepth = ''
124
            owndepth = ''
125
            joinchar = ','
126
            padchar = ''
127
        else:
128
            argdepth = '    ' * depth
129
            owndepth = '    ' * (depth - 1)
130
            joinchar = ',\n'
131
            padchar = '\n'
132

    
133
        args = [a.tostring( depth=depth,
134
                            showopts=showopts,
135
                            multiline=multiline) for a in self.args]
136
        args += [("%s=%s" %
137
                    (k, v.tostring( depth=depth,
138
                                    showopts=showopts,
139
                                    multiline=multiline)))
140
                                    
141
                                    for k, v in self.kw.items()]
142
        if showopts:
143
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
144

    
145
        if len(args) == 0:
146
            string = "%s(%s)" % (self.name, ','.join(args))
147
        else:
148
            string = "%s(%s" % (self.name, padchar)
149
            for arg in args:
150
                string += argdepth + arg + joinchar
151
            string = string[:-1] + padchar
152
            string += owndepth + ")"
153

    
154
        return string
155

    
156
    __str__ = tostring
157

    
158
    def __repr__(self):
159
        return self.tostring(multiline=0, showopts=1)
160

    
161

    
162
class Null(Canonical):
163

    
164
    def _check(self, item):
165
        return None
166

    
167
Nothing = Null()
168

    
169

    
170
class Integer(Canonical):
171

    
172
    def _check(self, item):
173
        try:
174
            num = long(item)
175
        except ValueError, e:
176
            try:
177
                num = long(item, 16)
178
            except Exception:
179
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
180
                raise CanonifyException(m)
181
        except TypeError, e:
182
            m = "%s: cannot convert '%s' to long" % (self, shorts(item))
183
            raise CanonifyException(m)
184

    
185
        optget = self.opts.get
186
        minimum = optget('minimum', None)
187
        maximum = optget('maximum', None)
188

    
189
        if minimum is not None and num < minimum:
190
            m = "%s: %d < minimum=%d" % (self, num, minimum)
191
            raise CanonifyException(m)
192

    
193
        if maximum is not None and num > maximum:
194
            m = "%s: %d > maximum=%d" % (self, num, maximum)
195
            raise CanonifyException(m)
196

    
197
        return num
198

    
199
    def _parse(self, item):
200
        return self.check(item)
201

    
202
    def random_integer(self, kw):
203
        optget = self.opts.get
204
        kwget = kw.get
205
        minimum = kwget('minimum', optget('minimum', -4294967296L))
206
        maximum = kwget('maximum', optget('maximum', 4294967295L))
207
        r = random()
208
        if r < 0.1:
209
            return minimum
210
        if r < 0.2:
211
            return maximum
212
        if minimum <= 0 and maximum >= 0 and r < 0.3:
213
            return 0L
214
        return long(minimum + r * (maximum - minimum))
215

    
216
    random_choice = random_integer
217

    
218

    
219

    
220
Serial = Integer(
221
            classname   =   'Serial',
222
            null        =   True,
223
)
224

    
225

    
226
class Text(Canonical):
227

    
228
    re = None
229
    matcher = None
230
    choices = None
231

    
232
    def init(self):
233
        opts = self.opts
234
        if 'regex' in opts:
235
            pat = opts['regex']
236
            re = self.re
237
            if re is None:
238
                import re
239
                self.re = re
240

    
241
            self.matcher = re.compile(pat, re.UNICODE)
242
            self.pat = pat
243

    
244
        if 'choices' in opts:
245
            opts['choices'] = dict((unicode(x), unicode(x))
246
                                    for x in opts['choices'])
247

    
248
    def _check(self, item):
249
        if not isinstance(item, unicode):
250
            # require non-unicode items to be utf8
251
            item = str(item)
252
            try:
253
                item = item.decode('utf8')
254
            except UnicodeDecodeError, e:
255
                item = item.decode('latin1')
256
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
257
                raise CanonifyException(m)
258

    
259
        opts = self.opts
260
        if 'choices' in opts:
261
            choices = opts['choices']
262
            try:
263
                unknown = item not in choices
264
            except TypeError, e:
265
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
266
                raise CanonifyException(m, e)
267

    
268
            if unknown:
269
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
270
                raise CanonifyException(m)
271

    
272
            return choices[item]
273

    
274
        optget = opts.get
275
        itemlen = len(item)
276
        maxlen = optget('maxlen', None)
277
        if maxlen is not None and itemlen > maxlen:
278
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
279
            raise CanonifyException(m)
280

    
281
        minlen = optget('minlen', None)
282
        if minlen is not None and itemlen < minlen:
283
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
284
            raise CanonifyException(m)
285
            
286
        matcher = self.matcher
287
        if matcher is not None:
288
            match = matcher.match(item)
289
            if  (       match is None
290
                    or  (match.start(), match.end()) != (0, itemlen)    ):
291

    
292
                    m = ("%s: '%s' does not match '%s'"
293
                            % (self, shorts(item), self.pat))
294
                    raise CanonifyException(m)
295

    
296
        return item
297

    
298
    def _parse(self, item):
299
        return self.check(item)
300

    
301
    default_alphabet = '0123456789αβγδεζ'.decode('utf8')
302

    
303
    def random_string(self, kw):
304
        opts = self.opts
305
        if 'regex' in opts:
306
            m = 'Unfortunately, random for regex strings not supported'
307
            raise ValueError(m)
308

    
309
        optget = opts.get
310
        kwget = kw.get
311
        minlen = kwget('minlen', optget('minlen', 0))
312
        maxlen = kwget('maxlen', optget('maxlen', 32))
313
        alphabet = kwget('alphabet', self.default_alphabet)
314
        z = maxlen - minlen
315
        if z < 1:
316
            z = 1
317

    
318
        g = log(z, 2)
319
        r = random() * g
320
        z = minlen + int(2**r)
321

    
322
        s = u''
323
        for _ in xrange(z):
324
            s += choice(alphabet)
325

    
326
        return s
327

    
328
    random_choice = random_string
329

    
330

    
331
class Bytes(Canonical):
332

    
333
    re = None
334
    matcher = None
335
    choices = None
336

    
337
    def init(self):
338
        opts = self.opts
339
        if 'regex' in opts:
340
            pat = opts['regex']
341
            re = self.re
342
            if re is None:
343
                import re
344
                self.re = re
345

    
346
            self.matcher = re.compile(pat)
347
            self.pat = pat
348

    
349
        if 'choices' in opts:
350
            opts['choices'] = dict((str(x), str(x))
351
                                    for x in opts['choices'])
352

    
353
    def _check(self, item):
354
        if isinstance(item, unicode):
355
            # convert unicode to utf8
356
            item = item.encode('utf8')
357

    
358
        opts = self.opts
359
        if 'choices' in opts:
360
            choices = opts['choices']
361
            try:
362
                unknown = item not in choices
363
            except TypeError, e:
364
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
365
                raise CanonifyException(m, e)
366

    
367
            if unknown:
368
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
369
                raise CanonifyException(m)
370

    
371
            return choices[item]
372

    
373
        optget = opts.get
374
        itemlen = len(item)
375
        maxlen = optget('maxlen', None)
376
        if maxlen is not None and itemlen > maxlen:
377
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
378
            raise CanonifyException(m)
379

    
380
        minlen = optget('minlen', None)
381
        if minlen is not None and itemlen < minlen:
382
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
383
            raise CanonifyException(m)
384
            
385
        matcher = self.matcher
386
        if matcher is not None:
387
            match = matcher.match(item)
388
            if  (       match is None
389
                    or  (match.start(), match.end()) != (0, itemlen)    ):
390

    
391
                    m = ("%s: '%s' does not match '%s'"
392
                            % (self, shorts(item), self.pat))
393
                    raise CanonifyException(m)
394

    
395
        return item
396

    
397
    default_alphabet = '0123456789abcdef'
398

    
399
    def random_bytes(self, kw):
400
        opts = self.opts
401
        if 'regex' in opts:
402
            m = 'Unfortunately, random for regex strings not supported'
403
            raise ValueError(m)
404

    
405
        optget = opts.get
406
        kwget = kw.get
407
        minlen = kwget('minlen', optget('minlen', 0))
408
        maxlen = kwget('maxlen', optget('maxlen', 32))
409
        alphabet = kwget('alphabet', self.default_alphabet)
410
        z = maxlen - minlen
411
        if z < 1:
412
            z = 1
413

    
414
        g = log(z, 2)
415
        r = random() * g
416
        z = minlen + int(2**r)
417

    
418
        s = u''
419
        for _ in xrange(z):
420
            s += choice(alphabet)
421

    
422
        return s
423

    
424
    random_choice = random_bytes
425

    
426

    
427
class ListOf(Canonical):
428

    
429
    def init(self):
430
        args = self.args
431
        kw = self.kw
432

    
433
        if not (args or kw):
434
            raise SpecifyException("ListOf requires one or more arguments")
435

    
436
        if args and kw:
437
            m = ("ListOf requires either positional "
438
                 "or keyword arguments, but not both")
439
            raise SpecifyException(m)
440

    
441
        if args:
442
            if len(args) > 1:
443
                self.canonical = Tuple(*args)
444
            else:
445
                self.canonical = args[0]
446
        else:
447
            self.canonical = Args(**kw)
448

    
449
    def _check(self, item):
450
        if item is None:
451
            item = ()
452

    
453
        try:
454
            items = iter(item)
455
        except TypeError, e:
456
            m = "%s: %s is not iterable" % (self, shorts(item))
457
            raise CanonifyException(m)
458

    
459
        canonical = self.canonical
460
        canonified = []
461
        append = canonified.append
462

    
463
        for item in items:
464
            item = canonical(item)
465
            append(item)
466

    
467
        if not canonified and self.opts.get('nonempty', False):
468
            m = "%s: must be nonempty" % (self,)
469
            raise CanonifyException(m)
470

    
471
        return canonified
472

    
473
    def _parse(self, item):
474
        if item is None:
475
            item = ()
476

    
477
        try:
478
            items = iter(item)
479
        except TypeError, e:
480
            m = "%s: %s is not iterable" % (self, shorts(item))
481
            raise CanonifyException(m)
482

    
483
        canonical = self.canonical
484
        canonified = []
485
        append = canonified.append
486

    
487
        for k, v in items:
488
            item = canonical.parse(v)
489
            append(item)
490

    
491
        if not canonified and self.opts.get('nonempty', False):
492
            m = "%s: must be nonempty" % (self,)
493
            raise CanonifyException(m)
494

    
495
        return canonified
496
        
497
    def random_listof(self, kw):
498
        z = randint(1, 4)
499
        get_random = self.canonical.random
500

    
501
        return [get_random() for _ in xrange(z)]
502

    
503
    random_choice = random_listof
504

    
505

    
506
class Args(Canonical):
507

    
508
    def _parse(self, arglist):
509
        formalslen = len(self.kw)
510
        arglen = len(arglist)
511
        if arglen != formalslen:
512
            raise Exception('param inconsistent')
513

    
514
        parsed = OrderedDict()
515
        keys = self.kw.keys()
516
        position = 0
517

    
518
        for k, v in arglist:
519
            if k:
520
                parsed[k] = self.kw[k].parse(v)
521
            else:
522
                # find the right position
523
                for i in range(position, arglen):
524
                    key = keys[i]
525
                    if not key in parsed.keys():
526
                        position = i + 1
527
                        break
528
                else: # exhausted
529
                    raise Exception("shouldn't happen")
530
                parsed[key] = self.kw[key].parse(v)
531

    
532
        return parsed
533
        
534
    def _check(self, item):
535
        try:
536
            item = OrderedDict(item)
537
        except TypeError, e:
538
            m = "%s: %s is not dict-able" % (self, shorts(item))
539
            raise CanonifyException(m)
540

    
541
        canonified = OrderedDict()
542
        
543
        try:
544
            for n, c in self.kw.items():
545
                t = item[n] if n in item else None
546
                canonified[n] = c(t)
547
        except KeyError:
548
            m = ("%s: Argument '%s' not found in '%s'" 
549
                        % (self, shorts(n), shorts(item)))
550
            raise CanonifyException(m)
551

    
552
        return canonified
553

    
554
    def random_args(self, kw):
555
        args = {}
556
        for n, c in self.kw.items():
557
            args[n] = c.random()
558
        return args
559

    
560
    random_choice = random_args
561

    
562

    
563
class Tuple(Canonical):
564

    
565
    def _check(self, item):
566
        try:
567
            items = list(item)
568
        except TypeError, e:
569
            m = "%s: %s is not iterable" % (self, shorts(item))
570
            raise CanonifyException(m)
571

    
572
        canonicals = self.args
573
        zi = len(items)
574
        zc = len(canonicals)
575

    
576
        if zi != zc:
577
            m = "%s: expecting %d elements, not %d (%s)" % (self, zc, zi, str(items))
578
            raise CanonifyException(m)
579

    
580
        g = (canonical(element) for canonical, element in zip(self.args, item))
581

    
582
        return tuple(g)
583

    
584
    def _parse(self, item):
585
        try:
586
            items = list(item)
587
        except TypeError, e:
588
            m = "%s: %s is not iterable" % (self, shorts(item))
589
            raise CanonifyException(m)
590

    
591
        canonicals = self.args
592
        zi = len(items)
593
        zc = len(canonicals)
594

    
595
        if zi != zc:
596
            m = "%s: expecting %d elements, not %d (%s)" % (self, zc, zi, str(items))
597
            raise CanonifyException(m)
598

    
599
        g = (canonical.parse(element)
600
             for canonical, (k, element) in zip(self.args, item))
601
        return tuple(g)
602
        
603
    def __add__(self, other):
604
        oargs = other.args if isinstance(other, Tuple) else (other,)
605
        args = self.args + oargs
606
        return self.__class__(*args)
607

    
608
    def random_tuple(self, kw):
609
        return tuple(c.random() for c in self.args)
610

    
611
    random_choice = random_tuple
612

    
613

    
614
class Dict(Canonical):
615

    
616
    def _check(self, item):
617

    
618
        try:
619
            item = dict(item)
620
        except TypeError:
621
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
622
            raise CanonifyException(m)
623

    
624
        canonified = {}
625
        canonical = self.kw
626

    
627
        for n, c in canonical.items():
628
            if n not in item:
629
                m = "%s: key '%s' not found" % (self, shorts(n))
630
                raise CanonifyException(m)
631
            canonified[n] = c(item[n])   
632

    
633
        strict = self.opts.get('strict', True)
634
        if strict and len(item) != len(canonical):
635
            for k in sorted(item.keys()):
636
                if k not in canonical:
637
                    break
638

    
639
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
640
            raise CanonifyException(m)
641

    
642
        return canonified
643

    
644
    def _parse(self, item):
645

    
646
        try:
647
            item = dict(item)
648
        except TypeError:
649
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
650
            raise CanonifyException(m)
651

    
652
        canonified = {}
653
        canonical = self.kw
654

    
655
        for n, c in canonical.items():
656
            if n not in item:
657
                m = "%s: key '%s' not found" % (self, shorts(n))
658
                raise CanonifyException(m)
659
            canonified[n] = c(item[n])
660

    
661
        strict = self.opts.get('strict', True)
662
        if strict and len(item) != len(canonical):
663
            for k in sorted(item.keys()):
664
                if k not in canonical:
665
                    break
666

    
667
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
668
            raise CanonifyException(m)
669

    
670
        return canonified
671
        
672
    def random_dict(self, kw):
673
        item = {}
674
        for n, c in self.canonical.items():
675
            item[n] = c.random()
676

    
677
        return item
678

    
679
    random_choice = random_dict
680

    
681

    
682
class Canonifier(object):
683
    def __init__(self, name, input_canonicals, output_canonicals, doc_strings):
684
        self.name = name
685
        self.input_canonicals = dict(input_canonicals)
686
        self.output_canonicals = dict(output_canonicals)
687
        self.doc_strings = dict(doc_strings)
688

    
689
    def call_names(self):
690
        return self.input_canonicals.keys()
691

    
692
    def call_docs(self):
693
        for call_name, call_doc in self.doc_strings.iteritems():
694
            yield call_name, call_doc
695

    
696
    def get_doc(self, name):
697
        if name not in self.doc_strings:
698
            m = "%s: Invalid method name '%s'" % (self.name, name)
699
            raise CanonifyException(m)
700

    
701
        return self.doc_strings[name]
702

    
703
    def call_attrs(self):
704
        for call_name, canonical in self.input_canonicals.iteritems():
705
            yield call_name, canonical.tostring(showopts=1, multiline=1)
706

    
707
    def input_canonical(self, name):
708
        input_canonicals = self.input_canonicals
709
        if name not in input_canonicals:
710
            m = "%s: Invalid input call '%s'" % (self.name, name)
711
            raise CanonifyException(m)
712

    
713
        return input_canonicals[name]
714

    
715
    def canonify_input(self, name, the_input):
716
        return self.input_canonical(name)(the_input)
717

    
718
    def output_canonical(self, name):
719
        output_canonicals = self.output_canonicals
720
        if name not in output_canonicals:
721
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
722
            raise CanonifyException(m)
723

    
724
        return output_canonicals[name]
725

    
726
    def canonify_output(self, name, the_output):
727
        return self.output_canonical(name)(the_output)
728

    
729
    def parse(self, method, arglist):
730
        args, rest = betteron_decode(arglist)
731
        argdict = self.input_canonical(method).parse(args)
732
        return argdict
733

    
734

    
735
class Specificator(object):
736

    
737
    def __new__(cls):
738
        if cls is Specificator:
739
            m = "Specificator classes must be subclassed"
740
            raise SpecifyException(m)
741

    
742
        import inspect
743

    
744
        canonical_inputs = {}
745
        canonical_outputs = {}
746
        doc_strings = {}
747

    
748
        for name in dir(cls):
749
            f = getattr(cls, name)
750
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
751
                continue
752

    
753
            doc_strings[name] = f.__doc__
754
            argspec = inspect.getargspec(f)
755
            defaults = argspec.defaults
756
            args = argspec.args
757
            if args and args[0] == 'self':
758
                args = args[1:]
759

    
760
            if not defaults:
761
                defaults = ()
762

    
763
            arglen = len(args)
764
            deflen = len(defaults)
765

    
766
            if arglen != deflen:
767
                a = (f.__name__, args[:arglen-deflen])
768
                m = "Unspecified arguments in '%s': %s" % a
769
                raise SpecifyException(m)
770

    
771
            args = zip(args, defaults)
772
            for a, c in args:
773
                if not isinstance(c, Canonical):
774
                    m = ("argument '%s=%s' is not an instance of 'Canonical'"
775
                         % (a, repr(c)))
776
                    raise SpecifyException(m)
777

    
778
            canonical = Null() if len(args) == 0 else Args(*args)
779
            canonical_inputs[name] = canonical
780

    
781
            self = object.__new__(cls)
782
            canonical = f(self)
783
            if not isinstance(canonical, Canonical):
784
                m = ("method '%s' does not return a Canonical, but a(n) %s "
785
                                                    % (name, type(canonical)))
786
                raise SpecifyException(m)
787
            canonical_outputs[name] = canonical
788

    
789
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs,
790
                          doc_strings)
791

    
792
    def __call__(self):
793
        return self
794