Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / commissioning / specificator.py @ 2005b18e

History | View | Annotate | Download (22 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2012 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
from random import random, choice, randint
36
from math import log
37
from inspect import isclass
38
from kamaki.clients.commissioning.utils.argmap import (
39
    argmap_decode,
40
    argmap_check,
41
    argmap_unpack_dict,
42
    argmap_unpack_list)
43

    
44
try:
45
    from collections import OrderedDict
46
except ImportError:
47
    from kamaki.clients.commissioning.utils.ordereddict import OrderedDict
48

    
49

    
50
def shorts(s):
51
    if not isinstance(s, unicode):
52
        s = str(s)
53

    
54
    if len(s) <= 64:
55
        return s
56

    
57
    return s[:61] + '...'
58

    
59

    
60
class CanonifyException(Exception):
61
    pass
62

    
63

    
64
class SpecifyException(Exception):
65
    pass
66

    
67

    
68
class Canonical(object):
69

    
70
    _random_choice = None
71

    
72
    def __init__(self, *args, **kw):
73
        self.args = []
74
        named_args = []
75
        for a in args:
76
            if isinstance(a, tuple) and len(a) == 2:
77
                named_args.append(a)
78
            else:
79
                self.args.append(a)
80
        ordered_dict = OrderedDict(named_args)
81

    
82
        self.name = kw.pop('classname', self.__class__.__name__)
83
        random_choice = kw.pop('random', None)
84
        if random_choice is not None:
85
            self.random_choice = random_choice
86
        opts = {}
87
        for k, v in kw.items():
88
            if not isinstance(v, Canonical):
89
                if isclass(v) and issubclass(v, Canonical):
90
                    m = ("argument '%s': value '%s' is a Canonical _class_. "
91
                         "Perhaps you meant to specify a Canonical _instance_"
92
                         % (k, v))
93
                    raise SpecifyException(m)
94
                opts[k] = v
95
                del kw[k]
96

    
97
        self.opts = opts
98
        ordered_dict.update(kw)
99
        self.kw = ordered_dict
100
        self.init()
101

    
102
        if 'default' in opts:
103
            item = opts['default']
104
            if item is None:
105
                opts['null'] = 1
106
            else:
107
                opts['default'] = self._check(item)
108

    
109
    def init(self):
110
        return
111

    
112
    def __call__(self, item):
113
        return self.check(item)
114

    
115
    def check(self, item):
116
        if argmap_check(item):
117
            item = self._unpack(item)
118

    
119
        opts = self.opts
120
        if item is None and 'default' in opts:
121
            item = opts['default']
122

    
123
        can_be_null = opts.get('null', False)
124
        if item is None and can_be_null:
125
            return None
126

    
127
        return self._check(item)
128

    
129
    def _check(self, item):
130
        return item
131

    
132
    def _unpack(self, item):
133
        return argmap_unpack_list(item)
134

    
135
    def create(self):
136
        return None
137

    
138
    def random(self, **kw):
139
        random_choice = self._random_choice
140
        if random_choice is None:
141
            return None
142

    
143
        if callable(random_choice):
144
            return random_choice(kw)
145

    
146
        if isinstance(random_choice, str):
147
            return getattr(self, random_choice)(kw)
148

    
149
        return choice(random_choice)
150

    
151
    def tostring(self, depth=0, showopts=0, multiline=0):
152
        depth += 1
153
        if not multiline:
154
            argdepth = ''
155
            owndepth = ''
156
            joinchar = ','
157
            padchar = ''
158
        else:
159
            argdepth = '    ' * depth
160
            owndepth = '    ' * (depth - 1)
161
            joinchar = ',\n'
162
            padchar = '\n'
163

    
164
        args = [a.tostring(
165
            depth=depth,
166
            showopts=showopts,
167
            multiline=multiline) for a in self.args]
168
        args += [("%s=%s" % (k, v.tostring(
169
            depth=depth,
170
            showopts=showopts,
171
            multiline=multiline))) for k, v in self.kw.items()]
172
        if showopts:
173
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
174

    
175
        if len(args) == 0:
176
            string = "%s(%s)" % (self.name, ','.join(args))
177
        else:
178
            string = "%s(%s" % (self.name, padchar)
179
            for arg in args:
180
                string += argdepth + arg + joinchar
181
            string = string[:-1] + padchar
182
            string += owndepth + ")"
183

    
184
        return string
185

    
186
    __str__ = tostring
187

    
188
    def __repr__(self):
189
        return self.tostring(multiline=0, showopts=1)
190

    
191
    def show(self):
192
        showable = self.opts.get('show', True)
193
        return self._show() if showable else ''
194

    
195
    def _show(self):
196
        return self.name
197

    
198

    
199
class Null(Canonical):
200

    
201
    def _check(self, item):
202
        return None
203

    
204
Nothing = Null()
205

    
206

    
207
class Integer(Canonical):
208

    
209
    def _check(self, item):
210
        try:
211
            num = long(item)
212
        except ValueError:
213
            try:
214
                num = long(item, 16)
215
            except Exception:
216
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
217
                raise CanonifyException(m)
218
        except TypeError:
219
            m = "%s: cannot convert '%s' to long" % (self, shorts(item))
220
            raise CanonifyException(m)
221

    
222
        optget = self.opts.get
223
        minimum = optget('minimum', None)
224
        maximum = optget('maximum', None)
225

    
226
        if minimum is not None and num < minimum:
227
            m = "%s: %d < minimum=%d" % (self, num, minimum)
228
            raise CanonifyException(m)
229

    
230
        if maximum is not None and num > maximum:
231
            m = "%s: %d > maximum=%d" % (self, num, maximum)
232
            raise CanonifyException(m)
233

    
234
        return num
235

    
236
    def _random_choice(self, kw):
237
        optget = self.opts.get
238
        kwget = kw.get
239
        minimum = kwget('minimum', optget('minimum', -4294967296L))
240
        maximum = kwget('maximum', optget('maximum', 4294967295L))
241
        r = random()
242
        if r < 0.1:
243
            return minimum
244
        if r < 0.2:
245
            return maximum
246
        if minimum <= 0 and maximum >= 0 and r < 0.3:
247
            return 0L
248
        return long(minimum + r * (maximum - minimum))
249

    
250

    
251
Serial = Integer(classname='Serial', null=True)
252

    
253

    
254
class Text(Canonical):
255

    
256
    re = None
257
    matcher = None
258
    choices = None
259

    
260
    def init(self):
261
        opts = self.opts
262
        if 'regex' in opts:
263
            pat = opts['regex']
264
            re = self.re
265
            if re is None:
266
                import re
267
                self.re = re
268

    
269
            self.matcher = re.compile(pat, re.UNICODE)
270
            self.pat = pat
271

    
272
        if 'choices' in opts:
273
            opts['choices'] = dict((
274
                unicode(x),
275
                unicode(x)) for x in opts['choices'])
276

    
277
    def _check(self, item):
278
        if not isinstance(item, unicode):
279
            # require non-unicode items to be utf8
280
            item = str(item)
281
            try:
282
                item = item.decode('utf8')
283
            except UnicodeDecodeError, e:
284
                item = item.decode('latin1')
285
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
286
                raise CanonifyException(m)
287

    
288
        opts = self.opts
289
        if 'choices' in opts:
290
            choices = opts['choices']
291
            try:
292
                unknown = item not in choices
293
            except TypeError, e:
294
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
295
                raise CanonifyException(m, e)
296

    
297
            if unknown:
298
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
299
                raise CanonifyException(m)
300

    
301
            return choices[item]
302

    
303
        optget = opts.get
304
        itemlen = len(item)
305
        maxlen = optget('maxlen', None)
306
        if maxlen is not None and itemlen > maxlen:
307
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
308
            raise CanonifyException(m)
309

    
310
        minlen = optget('minlen', None)
311
        if minlen is not None and itemlen < minlen:
312
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
313
            raise CanonifyException(m)
314

    
315
        matcher = self.matcher
316
        if matcher is not None:
317
            match = matcher.match(item)
318
            if ((not match) or (match.start(), match.end()) != (0, itemlen)):
319
                    m = ("%s: '%s' does not match '%s'" % (
320
                        self,
321
                        shorts(item),
322
                        self.pat))
323
                    raise CanonifyException(m)
324

    
325
        return item
326

    
327
    default_alphabet = '0123456789ฮฑฮฒฮณฮดฮตฮถ'.decode('utf8')
328

    
329
    def _random_choice(self, kw):
330
        opts = self.opts
331
        if 'regex' in opts:
332
            m = 'Unfortunately, random for regex strings not supported'
333
            raise ValueError(m)
334

    
335
        optget = opts.get
336
        kwget = kw.get
337
        minlen = kwget('minlen', optget('minlen', 0))
338
        maxlen = kwget('maxlen', optget('maxlen', 32))
339
        alphabet = kwget('alphabet', self.default_alphabet)
340
        z = maxlen - minlen
341
        if z < 1:
342
            z = 1
343

    
344
        g = log(z, 2)
345
        r = random() * g
346
        z = minlen + int(2 ** r)
347

    
348
        s = u''
349
        for _ in xrange(z):
350
            s += choice(alphabet)
351

    
352
        return s
353

    
354

    
355
class Bytes(Canonical):
356

    
357
    re = None
358
    matcher = None
359
    choices = None
360

    
361
    def init(self):
362
        opts = self.opts
363
        if 'regex' in opts:
364
            pat = opts['regex']
365
            re = self.re
366
            if re is None:
367
                import re
368
                self.re = re
369

    
370
            self.matcher = re.compile(pat)
371
            self.pat = pat
372

    
373
        if 'choices' in opts:
374
            opts['choices'] = dict((str(x), str(x)) for x in opts['choices'])
375

    
376
    def _check(self, item):
377
        if isinstance(item, unicode):
378
            # convert unicode to utf8
379
            item = item.encode('utf8')
380

    
381
        opts = self.opts
382
        if 'choices' in opts:
383
            choices = opts['choices']
384
            try:
385
                unknown = item not in choices
386
            except TypeError, e:
387
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
388
                raise CanonifyException(m, e)
389

    
390
            if unknown:
391
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
392
                raise CanonifyException(m)
393

    
394
            return choices[item]
395

    
396
        optget = opts.get
397
        itemlen = len(item)
398
        maxlen = optget('maxlen', None)
399
        if maxlen is not None and itemlen > maxlen:
400
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
401
            raise CanonifyException(m)
402

    
403
        minlen = optget('minlen', None)
404
        if minlen is not None and itemlen < minlen:
405
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
406
            raise CanonifyException(m)
407

    
408
        matcher = self.matcher
409
        if matcher is not None:
410
            match = matcher.match(item)
411
            if ((not match) or (match.start(), match.end()) != (0, itemlen)):
412
                    m = ("%s: '%s' does not match '%s'" % (
413
                        self,
414
                        shorts(item),
415
                        self.pat))
416
                    raise CanonifyException(m)
417

    
418
        return item
419

    
420
    default_alphabet = '0123456789abcdef'
421

    
422
    def _random_choice(self, kw):
423
        opts = self.opts
424
        if 'regex' in opts:
425
            m = 'Unfortunately, random for regex strings not supported'
426
            raise ValueError(m)
427

    
428
        optget = opts.get
429
        kwget = kw.get
430
        minlen = kwget('minlen', optget('minlen', 0))
431
        maxlen = kwget('maxlen', optget('maxlen', 32))
432
        alphabet = kwget('alphabet', self.default_alphabet)
433
        z = maxlen - minlen
434
        if z < 1:
435
            z = 1
436

    
437
        g = log(z, 2)
438
        r = random() * g
439
        z = minlen + int(2 ** r)
440

    
441
        s = u''
442
        for _ in xrange(z):
443
            s += choice(alphabet)
444

    
445
        return s
446

    
447

    
448
class ListOf(Canonical):
449

    
450
    def init(self):
451
        args = self.args
452
        kw = self.kw
453

    
454
        if not (args or kw):
455
            raise SpecifyException("ListOf requires one or more arguments")
456

    
457
        if args and kw:
458
            m = ("ListOf requires either positional "
459
                 "or keyword arguments, but not both")
460
            raise SpecifyException(m)
461

    
462
        if args:
463
            if len(args) > 1:
464
                self.canonical = Tuple(*args)
465
            else:
466
                self.canonical = args[0]
467
        else:
468
            self.canonical = Args(**kw)
469

    
470
    def _check(self, item):
471
        if item is None:
472
            item = ()
473

    
474
        try:
475
            items = iter(item)
476
        except TypeError:
477
            m = "%s: %s is not iterable" % (self, shorts(item))
478
            raise CanonifyException(m)
479

    
480
        canonical = self.canonical
481
        canonified = []
482
        append = canonified.append
483

    
484
        for item in items:
485
            item = canonical(item)
486
            append(item)
487

    
488
        if not canonified and self.opts.get('nonempty', False):
489
            m = "%s: must be nonempty" % (self,)
490
            raise CanonifyException(m)
491

    
492
        return canonified
493

    
494
    def _random_choice(self, kw):
495
        z = randint(1, 4)
496
        get_random = self.canonical.random
497

    
498
        return [get_random() for _ in xrange(z)]
499

    
500
    def _show(self):
501
        return '[ ' + self.canonical.show() + ' ... ]'
502

    
503

    
504
class Args(Canonical):
505

    
506
    def _unpack(self, item):
507
        arglist = argmap_unpack_dict(item)
508
        keys = self.kw.keys()
509
        arglen = len(arglist)
510
        if arglen != len(keys):
511
            m = "inconsistent number of parameters: %s != %s" % (
512
                arglen,
513
                len(keys))
514
            raise CanonifyException(m)
515

    
516
        position = 0
517
        named_args = OrderedDict()
518

    
519
        for k, v in arglist:
520
            if k is not None:
521
                named_args[k] = v
522
            else:
523
                # find the right position
524
                for i in range(position, arglen):
525
                    key = keys[i]
526
                    if not key in named_args.keys():
527
                        position = i + 1
528
                        break
529
                else:
530
                    m = "Formal arguments exhausted"
531
                    raise AssertionError(m)
532
                named_args[key] = v
533

    
534
        return named_args
535

    
536
    def _check(self, item):
537
        try:
538
            OrderedDict(item).items()
539
        except (TypeError, ValueError):
540
            m = "%s: %s is not dict-able" % (self, shorts(item))
541
            raise CanonifyException(m)
542

    
543
        canonified = OrderedDict()
544

    
545
        try:
546
            for n, c in self.kw.items():
547
                t = item[n] if n in item else None
548
                canonified[n] = c.check(t)
549
        except KeyError:
550
            m = ("%s: Argument '%s' not found in '%s'"
551
                 % (self, shorts(n), shorts(item)))
552
            raise CanonifyException(m)
553

    
554
        return canonified
555

    
556
    def _show(self):
557
        strings = [x for x in [c.show() for n, c in self.kw.items()] if x]
558
        return ' '.join(strings)
559

    
560
    def _random_choice(self, kw):
561
        args = {}
562
        for n, c in self.kw.items():
563
            args[n] = c.random()
564
        return args
565

    
566

    
567
class Tuple(Canonical):
568

    
569
    def _check(self, item):
570
        try:
571
            items = list(item)
572
        except TypeError:
573
            m = "%s: %s is not iterable" % (self, shorts(item))
574
            raise CanonifyException(m)
575

    
576
        canonicals = self.args
577
        zi = len(items)
578
        zc = len(canonicals)
579

    
580
        if zi != zc:
581
            m = "%s: expecting %d elements, not %d (%s)" % (
582
                self,
583
                zc,
584
                zi,
585
                str(items))
586
            raise CanonifyException(m)
587

    
588
        g = (canonical(element) for canonical, element in zip(self.args, item))
589

    
590
        return tuple(g)
591

    
592
    def __add__(self, other):
593
        oargs = other.args if isinstance(other, Tuple) else (other,)
594
        args = self.args + oargs
595
        return self.__class__(*args)
596

    
597
    def _random_choice(self, kw):
598
        return tuple(c.random() for c in self.args)
599

    
600
    def _show(self):
601
        canonicals = self.args
602
        strings = [x for x in [c.show() for c in canonicals] if x]
603
        return '[ ' + ' '.join(strings) + ' ]'
604

    
605

    
606
class Dict(Canonical):
607

    
608
    def _check(self, item):
609

    
610
        try:
611
            item = dict(item)
612
        except TypeError:
613
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
614
            raise CanonifyException(m)
615

    
616
        canonified = {}
617
        canonical = self.kw
618

    
619
        for n, c in canonical.items():
620
            if n not in item:
621
                m = "%s: key '%s' not found" % (self, shorts(n))
622
                raise CanonifyException(m)
623
            canonified[n] = c(item[n])
624

    
625
        strict = self.opts.get('strict', True)
626
        if strict and len(item) != len(canonical):
627
            for k in sorted(item.keys()):
628
                if k not in canonical:
629
                    break
630

    
631
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
632
            raise CanonifyException(m)
633

    
634
        return canonified
635

    
636
    def _random_choice(self, kw):
637
        item = {}
638
        for n, c in self.kw.items():
639
            item[n] = c.random()
640

    
641
        return item
642

    
643

    
644
class Canonifier(object):
645
    def __init__(self, name, input_canonicals, output_canonicals, doc_strings):
646
        self.name = name
647
        self.input_canonicals = dict(input_canonicals)
648
        self.output_canonicals = dict(output_canonicals)
649
        self.doc_strings = dict(doc_strings)
650

    
651
    def call_names(self):
652
        return self.input_canonicals.keys()
653

    
654
    def call_docs(self):
655
        get_input_canonical = self.input_canonical
656
        for call_name, call_doc in self.doc_strings.iteritems():
657
            if not call_doc:
658
                canonical = get_input_canonical(call_name)
659
                call_doc = canonical.tostring(showopts=1, multiline=1)
660
            yield call_name, call_doc
661

    
662
    def get_doc(self, name):
663
        doc_strings = self.doc_strings
664
        if name not in doc_strings:
665
            m = "%s: Invalid method name '%s'" % (self.name, name)
666
            raise CanonifyException(m)
667

    
668
        docstring = doc_strings[name]
669
        if not docstring:
670
            docstring = self.input_canonical(name).tostring()
671
        return docstring
672

    
673
    def call_attrs(self):
674
        for call_name, canonical in self.input_canonicals.iteritems():
675
            yield call_name, canonical.tostring(showopts=1, multiline=1)
676

    
677
    def input_canonical(self, name):
678
        input_canonicals = self.input_canonicals
679
        if name not in input_canonicals:
680
            m = "%s: Invalid input call '%s'" % (self.name, name)
681
            raise CanonifyException(m)
682

    
683
        return input_canonicals[name]
684

    
685
    def canonify_input(self, name, the_input):
686
        return self.input_canonical(name)(the_input)
687

    
688
    def output_canonical(self, name):
689
        output_canonicals = self.output_canonicals
690
        if name not in output_canonicals:
691
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
692
            raise CanonifyException(m)
693

    
694
        return output_canonicals[name]
695

    
696
    def canonify_output(self, name, the_output):
697
        return self.output_canonical(name)(the_output)
698

    
699
    def show_input_canonical(self, name):
700
        return self.input_canonical(name).show()
701

    
702
    def parse(self, method, arglist):
703
        args, rest = argmap_decode(arglist)
704
        return self.input_canonical(method).check(args)
705

    
706

    
707
class Specificator(object):
708

    
709
    def __new__(cls):
710
        if cls is Specificator:
711
            m = "Specificator classes must be subclassed"
712
            raise SpecifyException(m)
713

    
714
        import inspect
715

    
716
        canonical_inputs = {}
717
        canonical_outputs = {}
718
        doc_strings = {}
719

    
720
        for name in dir(cls):
721
            f = getattr(cls, name)
722
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
723
                continue
724

    
725
            doc_strings[name] = f.__doc__
726
            argspec = inspect.getargspec(f)
727
            defaults = argspec.defaults
728
            args = argspec.args
729
            if args and args[0] == 'self':
730
                args = args[1:]
731

    
732
            if not defaults:
733
                defaults = ()
734

    
735
            arglen = len(args)
736
            deflen = len(defaults)
737

    
738
            if arglen != deflen:
739
                a = (f.__name__, args[:arglen - deflen])
740
                m = "Unspecified arguments in '%s': %s" % a
741
                raise SpecifyException(m)
742

    
743
            args = zip(args, defaults)
744
            for a, c in args:
745
                if not isinstance(c, Canonical):
746
                    m = ("argument '%s=%s' not an instance of 'Canonical'" % (
747
                        a,
748
                        repr(c)))
749
                    raise SpecifyException(m)
750

    
751
            canonical = Null() if len(args) == 0 else Args(*args)
752
            canonical_inputs[name] = canonical
753

    
754
            self = object.__new__(cls)
755
            canonical = f(self)
756
            if not isinstance(canonical, Canonical):
757
                raise SpecifyException(', '.join([
758
                    "method %s does not return a Canonical" % name,
759
                    "but a (n) %s" % type(canonical)]))
760
            canonical_outputs[name] = canonical
761

    
762
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs,
763
                          doc_strings)
764

    
765
    def __call__(self):
766
        return self