Statistics
| Branch: | Tag: | Revision:

root / commissioning / api / specificator.py @ a2db0eb5

History | View | Annotate | Download (17.5 kB)

1
# -*- coding: utf8 -*- 
2
from random import random, choice, randint
3
from math import log
4

    
5
def shorts(s):
6
    if not isinstance(s, unicode):
7
        s = str(s)
8

    
9
    if len(s) <= 64:
10
        return s
11

    
12
    return s[:61] + '...'
13
        
14

    
15
class CanonifyException(Exception):
16
    pass
17

    
18
class SpecifyException(Exception):
19
    pass
20

    
21

    
22
class Canonical(object):
23

    
24
    random_choice = None
25

    
26
    def __init__(self, *args, **kw):
27
        self.args = args
28
        self.kw = kw
29
        self.name = kw.pop('classname', self.__class__.__name__)
30
        random_choice = kw.pop('random', None)
31
        if random_choice is not None:
32
            self.random_choice = random_choice
33
        opts = {}
34
        for k, v in kw.items():
35
            if not isinstance(v, Canonical):
36
                opts[k] = v
37
                del kw[k]
38

    
39
        self.opts = opts
40
        self.init()
41

    
42
        if 'default' in opts:
43
            item = opts['default']
44
            if item is None:
45
                opts['null'] = 1
46
            else:
47
                opts['default'] = self.check(item)
48

    
49
    def init(self):
50
        return
51

    
52
    def __call__(self, item):
53
        opts = self.opts
54
        if item is None and 'default' in opts:
55
            item = opts['default']
56

    
57
        can_be_null = opts.get('null', False)
58
        if item is None and can_be_null:
59
            return None
60

    
61
        return self.check(item)
62

    
63
    def check(self, item):
64
        return item
65

    
66
    def create(self):
67
        return None
68

    
69
    def random(self, **kw):
70
        random_choice = self.random_choice
71
        if random_choice is None:
72
            return None
73

    
74
        if callable(random_choice):
75
            return random_choice(kw)
76

    
77
        if isinstance(random_choice, str):
78
            return getattr(self, random_choice)(kw)
79

    
80
        return choice(random_choice)
81

    
82
    def tostring(self, depth=0, showopts=0, multiline=0):
83
        depth += 1
84
        if not multiline:
85
            argdepth = ''
86
            owndepth = ''
87
            joinchar = ','
88
            padchar = ''
89
        else:
90
            argdepth = '    ' * depth
91
            owndepth = '    ' * (depth - 1)
92
            joinchar = ',\n'
93
            padchar = '\n'
94

    
95
        args = [a.tostring( depth=depth,
96
                            showopts=showopts,
97
                            multiline=multiline) for a in self.args]
98
        args += [("%s=%s" %
99
                    (k, v.tostring( depth=depth,
100
                                    showopts=showopts,
101
                                    multiline=multiline)))
102
                                    
103
                                    for k, v in self.kw.items()]
104
        if showopts:
105
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
106

    
107
        if len(args) == 0:
108
            string = "%s(%s)" % (self.name, ','.join(args))
109
        else:
110
            string = "%s(%s" % (self.name, padchar)
111
            for arg in args:
112
                string += argdepth + arg + joinchar
113
            string = string[:-1] + padchar
114
            string += owndepth + ")"
115

    
116
        return string
117

    
118
    __str__ = tostring
119

    
120
    def __repr__(self):
121
        return self.tostring(multiline=0, showopts=1)
122

    
123
    def check(item):
124
        canonified = item
125
        return canonified
126

    
127

    
128
class Null(Canonical):
129

    
130
    def check(self, item):
131
        return None
132

    
133
Nothing = Null()
134

    
135

    
136
class Integer(Canonical):
137

    
138
    def check(self, item):
139
        try:
140
            num = long(item)
141
        except ValueError, e:
142
            try:
143
                num = long(item, 16)
144
            except Exception:
145
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
146
                raise CanonifyException(m)
147
        except TypeError, e:
148
            m = "%s: cannot convert '%s' to long" % (self, shorts(item))
149
            raise CanonifyException(m)
150

    
151
        optget = self.opts.get
152
        minimum = optget('minimum', None)
153
        maximum = optget('maximum', None)
154

    
155
        if minimum is not None and num < minimum:
156
            m = "%s: %d < minimum=%d" % (self, num, minimum)
157
            raise CanonifyException(m)
158

    
159
        if maximum is not None and num > maximum:
160
            m = "%s: %d > maximum=%d" % (self, num, maximum)
161
            raise CanonifyException(m)
162

    
163
        return num
164

    
165
    def random_integer(self, kw):
166
        optget = self.opts.get
167
        kwget = kw.get
168
        minimum = kwget('minimum', optget('minimum', -4294967296L))
169
        maximum = kwget('maximum', optget('maximum', 4294967295L))
170
        r = random()
171
        if r < 0.1:
172
            return minimum
173
        if r < 0.2:
174
            return maximum
175
        if minimum <= 0 and maximum >= 0 and r < 0.3:
176
            return 0L
177
        return long(minimum + r * (maximum - minimum))
178

    
179
    random_choice = random_integer
180

    
181

    
182

    
183
Serial = Integer(
184
            classname   =   'Serial',
185
            null        =   True,
186
)
187

    
188

    
189
class Text(Canonical):
190

    
191
    re = None
192
    matcher = None
193
    choices = None
194

    
195
    def init(self):
196
        opts = self.opts
197
        if 'regex' in opts:
198
            pat = opts['regex']
199
            re = self.re
200
            if re is None:
201
                import re
202
                self.re = re
203

    
204
            self.matcher = re.compile(pat, re.UNICODE)
205
            self.pat = pat
206

    
207
        if 'choices' in opts:
208
            opts['choices'] = dict((unicode(x), unicode(x))
209
                                    for x in opts['choices'])
210

    
211
    def check(self, item):
212
        if not isinstance(item, unicode):
213
            # require non-unicode items to be utf8
214
            item = str(item)
215
            try:
216
                item = item.decode('utf8')
217
            except UnicodeDecodeError, e:
218
                item = item.decode('latin1')
219
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
220
                raise CanonifyException(m)
221

    
222
        opts = self.opts
223
        if 'choices' in opts:
224
            choices = opts['choices']
225
            try:
226
                unknown = item not in choices
227
            except TypeError, e:
228
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
229
                raise CanonifyException(m, e)
230

    
231
            if unknown:
232
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
233
                raise CanonifyException(m)
234

    
235
            return choices[item]
236

    
237
        optget = opts.get
238
        itemlen = len(item)
239
        maxlen = optget('maxlen', None)
240
        if maxlen is not None and itemlen > maxlen:
241
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
242
            raise CanonifyException(m)
243

    
244
        minlen = optget('minlen', None)
245
        if minlen is not None and itemlen < minlen:
246
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
247
            raise CanonifyException(m)
248
            
249
        matcher = self.matcher
250
        if matcher is not None:
251
            match = matcher.match(item)
252
            if  (       match is None
253
                    or  (match.start(), match.end()) != (0, itemlen)    ):
254

    
255
                    m = ("%s: '%s' does not match '%s'"
256
                            % (self, shorts(item), self.pat))
257
                    raise CanonifyException(m)
258

    
259
        return item
260

    
261
    default_alphabet = '0123456789αβγδεζ'.decode('utf8')
262

    
263
    def random_string(self, kw):
264
        opts = self.opts
265
        if 'regex' in opts:
266
            m = 'Unfortunately, random for regex strings not supported'
267
            raise ValueError(m)
268

    
269
        optget = opts.get
270
        kwget = kw.get
271
        minlen = kwget('minlen', optget('minlen', 0))
272
        maxlen = kwget('maxlen', optget('maxlen', 32))
273
        alphabet = kwget('alphabet', self.default_alphabet)
274
        z = maxlen - minlen
275
        if z < 1:
276
            z = 1
277

    
278
        g = log(z, 2)
279
        r = random() * g
280
        z = minlen + int(2**r)
281

    
282
        s = u''
283
        for _ in xrange(z):
284
            s += choice(alphabet)
285

    
286
        return s
287

    
288
    random_choice = random_string
289

    
290

    
291
class Bytes(Canonical):
292

    
293
    re = None
294
    matcher = None
295
    choices = None
296

    
297
    def init(self):
298
        opts = self.opts
299
        if 'regex' in opts:
300
            pat = opts['regex']
301
            re = self.re
302
            if re is None:
303
                import re
304
                self.re = re
305

    
306
            self.matcher = re.compile(pat)
307
            self.pat = pat
308

    
309
        if 'choices' in opts:
310
            opts['choices'] = dict((str(x), str(x))
311
                                    for x in opts['choices'])
312

    
313
    def check(self, item):
314
        if isinstance(item, unicode):
315
            # convert unicode to utf8
316
            item = item.encode('utf8')
317

    
318
        opts = self.opts
319
        if 'choices' in opts:
320
            choices = opts['choices']
321
            try:
322
                unknown = item not in choices
323
            except TypeError, e:
324
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
325
                raise CanonifyException(m, e)
326

    
327
            if unknown:
328
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
329
                raise CanonifyException(m)
330

    
331
            return choices[item]
332

    
333
        optget = opts.get
334
        itemlen = len(item)
335
        maxlen = optget('maxlen', None)
336
        if maxlen is not None and itemlen > maxlen:
337
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
338
            raise CanonifyException(m)
339

    
340
        minlen = optget('minlen', None)
341
        if minlen is not None and itemlen < minlen:
342
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
343
            raise CanonifyException(m)
344
            
345
        matcher = self.matcher
346
        if matcher is not None:
347
            match = matcher.match(item)
348
            if  (       match is None
349
                    or  (match.start(), match.end()) != (0, itemlen)    ):
350

    
351
                    m = ("%s: '%s' does not match '%s'"
352
                            % (self, shorts(item), self.pat))
353
                    raise CanonifyException(m)
354

    
355
        return item
356

    
357
    default_alphabet = '0123456789abcdef'
358

    
359
    def random_bytes(self, kw):
360
        opts = self.opts
361
        if 'regex' in opts:
362
            m = 'Unfortunately, random for regex strings not supported'
363
            raise ValueError(m)
364

    
365
        optget = opts.get
366
        kwget = kw.get
367
        minlen = kwget('minlen', optget('minlen', 0))
368
        maxlen = kwget('maxlen', optget('maxlen', 32))
369
        alphabet = kwget('alphabet', self.default_alphabet)
370
        z = maxlen - minlen
371
        if z < 1:
372
            z = 1
373

    
374
        g = log(z, 2)
375
        r = random() * g
376
        z = minlen + int(2**r)
377

    
378
        s = u''
379
        for _ in xrange(z):
380
            s += choice(alphabet)
381

    
382
        return s
383

    
384
    random_choice = random_bytes
385

    
386

    
387
class ListOf(Canonical):
388

    
389
    def init(self):
390
        args = self.args
391
        kw = self.kw
392

    
393
        if not (args or kw):
394
            raise SpecifyException("ListOf requires one or more arguments")
395

    
396
        if args and kw:
397
            m = ("ListOf requires either positional "
398
                 "or keyword arguments, but not both")
399
            raise SpecifyException(m)
400

    
401
        if args:
402
            if len(args) > 1:
403
                self.canonical = Tuple(*args)
404
            else:
405
                self.canonical = args[0]
406
        else:
407
            self.canonical = Args(**kw)
408

    
409
    def check(self, item):
410
        if item is None:
411
            item = ()
412

    
413
        try:
414
            items = iter(item)
415
        except TypeError, e:
416
            m = "%s: %s is not iterable" % (self, shorts(item))
417
            raise CanonifyException(m)
418

    
419
        canonical = self.canonical
420
        canonified = []
421
        append = canonified.append
422

    
423
        for item in items:
424
            item = canonical(item)
425
            append(item)
426

    
427
        if not canonified and self.opts.get('nonempty', False):
428
            m = "%s: must be nonempty" % (self,)
429
            raise CanonifyException(m)
430

    
431
        return canonified
432

    
433
    def random_listof(self, kw):
434
        z = randint(1, 4)
435
        get_random = self.canonical.random
436

    
437
        return [get_random() for _ in xrange(z)]
438

    
439
    random_choice = random_listof
440

    
441

    
442
class Args(Canonical):
443

    
444
    def init(self):
445
        if self.args:
446
            raise ValueError("Args accepts only keyword arguments")
447

    
448
    def check(self, item):
449
        try:
450
            item = dict(item)
451
        except TypeError, e:
452
            m = "%s: %s is not dict-able" % (self, shorts(item))
453
            raise CanonifyException(m)
454

    
455
        canonified = {}
456

    
457
        try:
458
            for n, c in self.kw.items():
459
                t = item[n] if n in item else None
460
                canonified[n] = c(t)
461
        except KeyError:
462
            m = ("%s: Argument '%s' not found in '%s'" 
463
                        % (self, shorts(n), shorts(item)))
464
            raise CanonifyException(m)
465

    
466
        return canonified
467

    
468
    def random_args(self, kw):
469
        args = {}
470
        for n, c in self.kw.items():
471
            args[n] = c.random()
472
        return args
473

    
474
    random_choice = random_args
475

    
476

    
477
class Tuple(Canonical):
478

    
479
    def check(self, item):
480
        try:
481
            items = list(item)
482
        except TypeError, e:
483
            m = "%s: %s is not iterable" % (self, shorts(item))
484
            raise CanonifyException(m)
485

    
486
        canonicals = self.args
487
        zi = len(items)
488
        zc = len(canonicals)
489

    
490
        if zi != zc:
491
            m = "%s: expecting %d elements, not %d (%s)" % (self, zc, zi, str(items))
492
            raise CanonifyException(m)
493

    
494
        g = (canonical(element) for canonical, element in zip(self.args, item))
495

    
496
        return tuple(g)
497

    
498
    def __add__(self, other):
499
        oargs = other.args if isinstance(other, Tuple) else (other,)
500
        args = self.args + oargs
501
        return self.__class__(*args)
502

    
503
    def random_tuple(self, kw):
504
        return tuple(c.random() for c in self.args)
505

    
506
    random_choice = random_tuple
507

    
508

    
509
class Dict(Canonical):
510

    
511
    def check(self, item):
512

    
513
        try:
514
            item = dict(item)
515
        except TypeError:
516
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
517
            raise CanonifyException(m)
518

    
519
        canonified = {}
520
        canonical = self.kw
521

    
522
        for n, c in canonical.items():
523
            if n not in item:
524
                m = "%s: key '%s' not found" % (self, shorts(n))
525
                raise CanonifyException(m)
526
            canonified[n] = c(item[n])   
527

    
528
        strict = self.opts.get('strict', False)
529
        if strict and len(item) != len(canonical):
530
            for k in sorted(item.keys()):
531
                if k not in canonical:
532
                    break
533

    
534
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
535
            raise CanonifyException(m)
536

    
537
        return canonified
538

    
539
    def random_dict(self, kw):
540
        item = {}
541
        for n, c in self.canonical.items():
542
            item[n] = c.random()
543

    
544
        return item
545

    
546
    random_choice = random_dict
547

    
548

    
549
class Canonifier(object):
550
    def __init__(self, name, input_canonicals, output_canonicals):
551
        self.name = name
552
        self.input_canonicals = dict(input_canonicals)
553
        self.output_canonicals = dict(output_canonicals)
554

    
555
    def call_names(self):
556
        return self.input_canonicals.keys()
557

    
558
    def call_attrs(self):
559
        for call_name, canonical in self.input_canonicals.iteritems():
560
            yield call_name, canonical.tostring(showopts=1, multiline=1)
561

    
562
    def input_canonical(self, name):
563
        input_canonicals = self.input_canonicals
564
        if name not in input_canonicals:
565
            m = "%s: Invalid input call '%s'" % (self.name, name)
566
            raise CanonifyException(m)
567

    
568
        return input_canonicals[name]
569

    
570
    def canonify_input(self, name, the_input):
571
        return self.input_canonical(name)(the_input)
572

    
573
    def output_canonical(self, name):
574
        output_canonicals = self.output_canonicals
575
        if name not in output_canonicals:
576
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
577
            raise CanonifyException(m)
578

    
579
        return output_canonicals[name]
580

    
581
    def canonify_output(self, name, the_output):
582
        return self.output_canonical(name)(the_output)
583

    
584

    
585
class Specificator(object):
586

    
587
    def __new__(cls):
588
        if cls is Specificator:
589
            m = "Specificator classes must be subclassed"
590
            raise SpecifyException(m)
591

    
592
        import inspect
593

    
594
        canonical_inputs = {}
595
        canonical_outputs = {}
596

    
597
        for name in dir(cls):
598
            f = getattr(cls, name)
599
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
600
                continue
601

    
602
            argspec = inspect.getargspec(f)
603
            defaults = argspec.defaults
604
            args = argspec.args
605
            if args and args[0] == 'self':
606
                args = args[1:]
607

    
608
            if not defaults:
609
                defaults = ()
610

    
611
            arglen = len(args)
612
            deflen = len(defaults)
613

    
614
            if arglen != deflen:
615
                a = (f.__name__, args[:arglen-deflen])
616
                m = "Unspecified arguments in '%s': %s" % a
617
                raise SpecifyException(m)
618

    
619
            args = dict(zip(args, defaults))
620
            for a, c in args.items():
621
                if not isinstance(c, Canonical):
622
                    m = ("argument '%s=%s' is not an instance of 'Canonical'"
623
                         % (a, repr(c)))
624
                    raise SpecifyException(m)
625

    
626
            canonical = Null() if len(args) == 0 else Args(**args)
627
            canonical_inputs[name] = canonical
628

    
629
            self = object.__new__(cls)
630
            canonical = f(self)
631
            if not isinstance(canonical, Canonical):
632
                m = ("method '%s' does not return a Canonical, but a(n) %s "
633
                                                    % (name, type(canonical)))
634
                raise SpecifyException(m)
635
            canonical_outputs[name] = canonical
636

    
637
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs)
638

    
639
    def __call__(self):
640
        return self
641