-
Notifications
You must be signed in to change notification settings - Fork 4
/
lanbox.py
1187 lines (1137 loc) · 59.7 KB
/
lanbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import socket
import ConfigParser
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor, defer
import json,os
commandDict = {'CommonGetAppID':'00050000','Common16BitMode':'65','CommonReboot':'B5','CommonSaveData':'A9','ChannelSetData':'C9','ChannelReadData':'CD','ChannelReadStatus':'CE','ChannelSetOutputEnable':'CA','ChannelSetActive':'CC','ChannelSetSolo':'CB','CommonGetLayers':'B1','LayerGetStatus':'0A','LayerSetID':'45','LayerSetOutput':'48','LayerSetFading':'46','LayerSetSolo':'4A','LayerSetAutoOutput':'64','LayerSetMixMode':'47','LayerSetTransparencyDepth':'63','LayerSetLocked':'43','LayerConfigure':'44','LayerGo':'56','LayerClear':'57','LayerPause':'58','LayerResume':'59','LayerNextStep':'5A','LayerPreviousStep':'5B','LayerNextCue':'73','LayerPreviousCue':'74','LayerSetChaseMode':'4B','LayerSetChaseSpeed':'4C','LayerSetFadeType':'4D','LayerSetFadeTime':'4E','LayerSetEditRunMode':'49','LayerUsesCueList':'0C','CueListCreate':'5F','LayerInsertStep':'5C','LayerReplaceStep':'67','LayerSetCueStepType':'4F','LayerSetCueStepParameters1':'50','LayerSetCueStepParameters2':'51','LayerSetCueStepParameters3':'52','LayerSetCueStepParameters4':'53','LayerSetCueStepParameters5':'54','LayerSetCueStepParameters6':'55','LayerSetDeviceID':'5E','LayerSetSustain':'40','LayerIgnoreNoteOff':'41','CueListGetDirectory':'A7','CueListRemove':'60','CueListRead':'AB','CueSceneRead':'AD','CueListWrite':'AA','CueSceneWrite':'AC','CueListRemoveStep':'62','CommonSetMIDIMode':'68','CommonMIDIBeat':'6B','CommonGetPatcher':'80','CommonSetPatcher':'81','CommonGetGain':'82','CommonSetGain':'83','CommonGetCurveTable':'84','CommonSetCurveTable':'85','CommonGetCurve1':'8C','CommonSetCurve1':'8D','CommonGetCurve2':'8E','CommonSetCurve2':'8F','CommonGetCurve3':'90','CommonSetCurve3':'91','CommonGetCurve4':'92','CommonSetCurve4':'93','CommonGetCurve5':'94','CommonSetCurve5':'95','CommonGetCurve6':'96','CommonSetCurve6':'97','CommonGetCurve7':'98','CommonSetCurve7':'99','CommonGetSlope':'86','CommonSetSlope':'87','CommonGetGlobalData':'0B','CommonSetBaudRate':'0006','CommonSetDMXOffset':'6A','CommonSetNumDMXChannels':'69','CommonSetName':'AE','CommonSetPassword':'AF','CommonSetIpConfig':'B0','CommonSetDmxIn':'B2','CommonSetUdpIn':'B8','CommonSetUdpOut':'B9','CommonSetTime':'BA','CommonGet16BitTable':'A0','CommonSet16BitTable':'A1','CommonStore16BitTable':'A2','CommonGetMIDIMapping':'A3','CommonSetMIDIMapping':'A4','CommonStoreMIDIMapping':'A5','CommonGetDigOutPatcher':'B3','CommonSetDigOutPatcher':'B4','CommonResetNonVolatile':'A8','DebugGetTotalUsage':'DD','DebugGetFreeList':'DE','DebugGetCuelistUsage':'DF'}
c = ConfigParser.ConfigParser()
c.read('/opt/LanBox-JSONRPC/config.ini')
LIGHTSERVER = (c.get('LanBox','name'), c.getint('LanBox','port'))
PASSWORD=c.get('LanBox','password')
class Scene():
'''Functions to recover/record light patterns by name outside the LanBox.'''
def __init__(self):
self.config='/opt/LanBox-JSONRPC/scenes.json'
try: self.scenes=json.load(open(self.config,'r'))
except: self.scenes={}
def get(self,scene):
try: return self.scenes[scene.lower()]
except: return {}
def set(self,scene,lights):
self.scenes[scene.lower()]=lights
json.dump(self.scenes, open(self.config,'w'),sort_keys=True,indent=4,separators=(',',':'))
class lanbox(Protocol):
'''Prototype async handler. Unfinished.'''
def __init__(self, methods):
self.methods = methods
self.methods._lanbox = self.sendLine
def connectionMade(self):
self.transport.write(PASSWORD+'\n')
@defer.inlineCallbacks
def sendLine(self, string):
self.transport.write(string+'\n')
defer.returnValue (self.dataReceived())
def dataReceived(self, data):
data.strip('\n')
data.strip()
if data is '?': return None
else: return data
class LanboxFactory(ReconnectingClientFactory):
'''Prototype async handler. Unfinished.'''
def __init__(self, methods):
self.host, self.port = LIGHTSERVER
self.methods = methods
def buildProtocol(self,addr):
return Lanbox(self.methods)
class LanboxMethods():
'''Lanbox methods from the manual, plus a few friendly extensions..'''
def __init__(self):
self.scene=Scene()
def _connectToLB(self, s=None):
'''Handler for connecting to the lanbox. Blocking.'''
if s is None:
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
data = ''
s.connect(LIGHTSERVER)
data=s.recv(4096)
while data != 'connected':
s.sendall(PASSWORD+'\n')
data=s.recv(4096)
s.sendall('*6501#') #16 bit mode on
data=s.recv(4096)
return s
def _lanbox(self, command,s=None):
'''Handler for connecting to the lanbox.'''
closeafter = False
if s is None:
closeafter = True
s = self._connectToLB()
print('*'+command+'#')
s.sendall('*'+command+'#')
ret = s.recv(4096)
print(ret)
if ret != '?':
ret = ret[1:-2]
if closeafter:
s.close()
return ret
def _chunk(self, seq, n ):
'''Iterable handler function - dismantles an iterable into iterables of length n'''
while seq:
yield seq[:n]
seq = seq[n:]
def _list_range(self, l):
'''Yields an iterable of tuples containing the initial number of a 'run' and the number in such a 'run' e.g.:
[1,2,3,4,5] -> ((1,5))
[1,2,4,5,6] -> ((1,2),(4,3))'''
tmp = l[:] #Take a copy of the list
tmp.sort()
start = tmp[0]
currentrange = [start, 1]
currentnext = start+1
for item in tmp[1:]:
if currentnext == item:
currentnext += 1
currentrange[1] += 1
else:
yield tuple(currentrange)
currentnext = item+1
currentrange = [item, 1]
yield tuple(currentrange)
def _to_hex(self, n,length=2):
'''Convert int to hex string of set length.'''
if isinstance(n,basestring):
try:
n = int(n)
except:
if n.lower().startswith('y'):
n = True
elif n.lower().startswith('n'):
n = False
else: raise ValueError #Trying to encode hex?
if isinstance(n,bool):
if n: return 'F'*length
else: return '0'*length
rstr = hex(n)[2:].zfill(length)
if len(rstr)>length: raise ValueError
return rstr
def _from_hex(self, n):
'''Convert hex string to int. Not designed to return bools.'''
if not isinstance(n,basestring): raise ValueError #Probably a typo!
return int(n,16)
def _Table1(self, response='',model = ''):
'''What model am I from response, or, what response would give this model?'''
T1 = {'LC+':'F8FB','LCX':'F8FD','LXM':'F8FF','LCE':'F901'}
rT1 = {value: key for key, value in T1.items()}
if response != '':
m = 'unknown'
for model in T1:
if T1[model] == response:
m = model
return m
if model != '':
r = ''
for response in rT1:
if rT1[response] == model:
r = response
return r
return T1.keys()
def showModels(self):
'''Show a list of LanBox model names.'''
return self._Table1()
def _Table2(self, model, device):
'''Does this model LanBox have this device, and if so what's the channel?'''
T2 = {'LCX':{'mixer':'FE','dmxout':'FF','dmxin':'FC','externalinputs':'FD'},
'LCE':{'mixer':'FE','dmxout':'FF','dmxin':'FC','externalinputs':'FD'},
'LCM':{'mixer':'FE','dmxout':'FF'},
'LC+':{'mixer':'09','dmxout':'0A'}}
ret = {}
try:
m_Table = T2[model]
ret = m_Table[device]
except:
return None
return ret
def _Table3(self, response = '', status = []):
'''What is the status list of this channel, or, what would give this status from a list?'''
T3 = {0:'mixstatus',1:'channeleditstatus',2:'solostatus',3:'fadestatus'}
rT3 = {value: key for key, value in T3.items()}
if response != '':
ret = []
bits = bin(int(response,16))[2:]
try:
for bit in T3:
if bits[-bit-1] == '1':
ret.append(T3[bit])
return ret
except:
return
if status != []:
ret = '0000'
for s in status:
if s.lower() in rT3:
ret[3-rT3[s.lower()]]='1'
return ret
return rT3.keys()
def showChannelStatusList(self):
'''Show a list of valid channel status names.'''
return self._Table3()
def _Table4(self, response='', flags = []):
'''What is the attribute flags list for this layer, or, what would give this flag list?'''
T4 = {0:'layeroutputenabled',1:'sequencemode',2:'fadestatus',3:'solostatus',4:'pausestatus',5:'auto',6:'sequencerwaiting',7:'locked'}
rT4 = {value: key for key, value in T4.items()}
if response != '':
ret = []
bits = bin(int(response,16))[2:]
try:
for bit in T4:
if bits[-bit-1] == '1':
ret.append(T4[bit])
return ret
except:
return
if flags != []:
ret = '00000000'
for f in flags:
if f.lower() in rT4:
ret[7-rT4[f.lower()]]='1'
return ret
return rT4.keys()
def showLayerAttributeList(self):
'''Show a list of valid layer attribute names.'''
return self._Table4()
def _Table5(self, response = '', mode = ''):
'''What is the mix mode of this layer, or, what would I need to set to have this mix mode?'''
T5 = {'0':'off','1':'copy','2':'htp','3':'ltp','4':'transparent','5':'add'}
rT5 = {value: key for key, value in T5.items()}
if response != '':
response = str(response).lstrip('0')
mode = 'unknown'
if response in T5:
mode = T5[response]
return mode
if mode != '':
ret = ''
if mode.lower() in rT5:
ret = rT5[mode.lower()].zfill(2)
return ret
return rT5.keys()
def showLayerMixModeList(self):
'''Show a list of valid layer mix modes.'''
return self._Table5()
def _Table6(self, response = '', mode = ''):
'''What is the chase mode of this layer, or, what would I need to set to have this chase mode?'''
T6={'0':'off','1':'chaseup','2':'loopup','3':'chasedown','4':'loopdown','5':'random',
'6':'looprandom','7':'bounce','8':'loopbounce'}
rT6 = {value: key for key, value in T6.items()}
if response != '':
response = str(response).lstrip('0')
mode = 'unknown'
if response in T6:
mode = T6[response]
if response == "": mode = 'off'
return mode
if mode != '':
ret = ''
if mode.lower() in rT6:
ret = rT6[mode.lower()].zfill(2)
return ret
return rT6.keys()
def showLayerChaseModeList(self):
'''Show a list of valid layer chase modes.'''
return self._Table6()
def _Table7(self, response = '', mode = ''):
'''What is the fade mode of this layer, or, what would I need to set to have this fade mode?'''
T7={'0':'off','1':'fadein','2':'fadeout','3':'crossfade','4':'off','5':'fadeincr',
'6':'fadeoutcr','7':'crossfadecr'}
rT7 = {value: key for key, value in T7.items()}
if response != '':
response = str(response).lstrip('0')
mode = 'unknown'
if response in T7:
mode = T7[response]
if response == "": mode = 'off'
return mode
if mode != '':
ret = ''
if mode.lower() in rT7:
ret = rT7[mode.lower()].zfill(2)
return ret
return rT7.keys()
def showLayerFadeModeList(self):
'''Show a list of valid layer fade modes.'''
return self._Table7()
def _Table8(self, response = '', speed = ''):
'''What baud rate am I, or, what baud rate would give this?'''
T8 = {'0':'38400','1':'19200','2':'9600','3':'31250','80':'31250','81':'31250','82':'31250','83':'31250'}
rT8 = {value: key for key, value in T8.items()}
if response != '':
response = str(response).lstrip('0')
speed = '38400'
if response in T8:
speed = T8[response]
return int(speed)
if speed != '':
ret = ''
if str(speed) in rT8:
ret = rT8[speed].zfill(2)
return ret
return rT8.keys()
def showBaudRateList(self):
'''Show a list of valid LanBox baud rates.'''
return self._Table8()
def _Table9(self, response = '', output = []):
'''What UDP output am I, or, what would I do for this?'''
T9 = {0:'broadcastdmxout',1:'broadcastmixerchannels',2:'broadcastexternalinputvalues',
3:'broadcastdmxin',4:'broadcastlayerlist',5:'synchronizelayers'}
rT9 = {value: key for key, value in T9.items()}
if response != '':
ret = []
bits = bin(int(response,16))[2:]
try:
for bit in T9:
if bits[-bit-1] == '1':
ret.append(T9[bit])
return ret
except:
return
if output != []:
ret = '00000000'
for f in output:
if f.lower() in rT9:
ret[7-rT9[f.lower()]]='1'
return ret
return rT9.keys()
def showUDPOutputList(self):
'''Show a list of valid LanBox UDP output modes.'''
return self._Table9()
def _Table10(self, secs='',offset=''):
'''Give me the right format to update the clock'''
if secs !='':
offset = int(secs) + 335361600
return self._to_hex(offset,8)
if offset !='':
offset = self._from_hex(offset)
secs = int(offset) - 335361600
return secs
def _AppendixA(self, response = '', secs = ''):
'''Convert a fade duration to a time, or, convert a duration to the closest time available.'''
ApA = [0,0.05,0.1,0.15,0.2,0.25,0.3,0.25,0.4,0.45,0.5,0.55,0.6,0.65,0.7,0.75,
0.8,0.85,0.9,0.95,1,1.1,1.2,1.3,1.5,1.6,1.8,2.0,2.2,2.4,2.7,3,3.3,3.6,3.9,
4.3,4.7,5.1,5.6,6.2,6.8,7.5,8.2,9.1,10,11,12,13,15,16,18,20,22,24,27,30,
33,36,39,43,47,51,56,60,66,72,78,90,96,108,120,132,144,162,180,198,222,234,
258,288,306,342,378,408,450,492,546,600,660,720,780,900,float('inf')]
if response is not '':
time = 0
try:
if int(response,16) in range(len(ApA)):
time = ApA[int(response,16)]
return time
except:
return
if secs is not '':
dur = float(secs)
if dur != float('inf'):
distance = abs(dur - ApA[0])
ret = 0
for n, v in enumerate(ApA[1:]):
if abs(dur - v) < distance:
distance = abs(dur - v)
ret = n+1
else:
continue
else: ret = 92 #Don't mess if you're infinite; closest won't work
return self._to_hex(ret,2)
def _commentTranslate(self, response = '', comment = ''):
'''Decode a comment line, or, encode one'''
commentchars = [' ','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O',
'P','Q','R','S','T','U','V','W','X','Y','Z','a','b','c','d','e','f','g','h','i',
'j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','0','1','2',
'3','4','5','6','7','8','9','-']
if response is not '':
try:
commentbytes = bin(self._from_hex(response))[2:]
out = ''
for i in range(0,len(commentbytes)-5,6): #six bit per letter
out = out + commentchars[int(commentbytes[i:i+5],2)]
return out
except:
return
if comment is not '':
comment = (str(comment)+' ')[:7]
response = ''
for letter in comment:
for n, c in enumerate(commentchars):
if c == letter:
response = response + bin(n)[2:].zfill(6) #ABCDEF ABCD...
return hex(int(response,2))[2:]
def _chaseSpeed(self, response = '', speed = ''):
'''Decode a chase speed response, or encode one.'''
if response is not '':
return 12800/(255-int(response,16))
if speed is not '':
s = int(speed)
if s <= 50.1: s = 50.1
if s > 1600: s = 1600
return hex(255-int(12800/s))[2:].zfill(2)
def _AppendixB(self, response = '', stepstocode = {}):
'''Decode a stepdata string, or, encode one.'''
stepdata={1:{'name':'showScene',1:'fadeType',2:'fadeTime',3:'holdTime'},
2:{'name':'showSceneOfCueList',1:'fadetype',2:'fadeTime',3:'holdTime',4:'cueList',6:'cueStep'},
10:{'name':'goCueStepInLayer',1:'layerId',2:'cueList',4:'cueStep'},
11:{'name':'clearLayer',1:'layerId'},
12:{'name':'pauseLayer',1:'layerId'},
13:{'name':'resumeLayer',1:'layerId'},
14:{'name':'startLayer',1:'layerId'},
15:{'name':'stopLayer',1:'layerId'},
16:{'name':'configureLayer',1:'sourceLayerId',2:'destLayerId',3:'newLayerId',4:'cueList',5:'cueStep'},
17:{'name':'stopLayer',1:'layerId'},
18:{'name':'goTrigger',1:'layerId',2:'cueList',3:'triggerid',4:'channel'},
20:{'name':'goCueStep',1:'cueStep'},
21:{'name':'goNextInLayer',1:'layerId'},
22:{'name':'goPreviousInLayer',1:'layerId'},
23:{'name':'loopToCueStep',1:'cueStep',2:'numberOfLoops'},
24:{'name':'hold',1:'holdTime'},
25:{'name':'holdUntil',1:'day',2:'hours',3:'minutes',4:'seconds',5:'frames'},
26:{'name':'goIfAnalogueChannel',1:'analogueData',6:'cueStep'},
27:{'name':'goIfChannel',1:'layerId',2:'channel',3:'goValues',5:'cueStep'},
30:{'name':'setLayerAttributes',1:'fadeEnable',2:'outputEnable',3:'soloEnable',4:'lock'},
31:{'name':'setLayerMixMode',1:'layerId',2:'mixMode',3:'transparencyDepth1',4:'transparencyDepth2',5:'fadeTime'},
32:{'name':'setLayerChaseMode',1:'layerId',2:'mixMode',3:'chaseSpeed1',4:'chaseSpeed2',5:'fadeTime'},
40:{'name':'writeMidiStream',1:'midiData'},
49:{'name':'writeSerialStream1',1:'serialData'},
50:{'name':'writeSerialStream2',1:'serialData'},
51:{'name':'writeSerialStream3',1:'serialData'},
52:{'name':'writeSerialStream4',1:'serialData'},
53:{'name':'writeSerialStream5',1:'serialData'},
54:{'name':'writeSerialStream6',1:'serialData'},
55:{'name':'writeSerialStream7',1:'serialData'},
56:{'name':'writeSerialStream8',1:'serialData'},
70:{'name':'comment',1:'comment'}}
if response != '':
ret = dict()
type = int(response[0:2],16)
if type in stepdata:
steptype = stepdata[type]
for value in steptype:
if value == 'name':
ret['name'] = steptype['name']
else:
datatype = steptype[value] #Where value = field number
if datatype == 'fadeEnable' or datatype == 'outputEnable' or datatype == 'soloEnable' or datatype == 'lock': #Booleans
if response[2*value+1:2*value+2] is '00': ret[datatype]=False
else: ret[datatype]=True
elif datatype == 'fadeTime' or datatype == 'holdTime' or datatype == 'fadeTime': #_Appendix A lookup
ret[datatype] = self._AppendixA(response[2*value+1:2*value+2])
elif datatype == 'channel' or datatype == 'goValues': #2-byte length
ret[datatype] = response[2*value+1:2*value+4]
elif datatype == 'midiData' or datatype =='analoguedata' or datatype == 'serialData': #6-byte length
ret[datatype] = response[2*value+1:2*value+12]
elif datatype == 'transparencyDepth1' or datatype == 'transparencyDepth2': #100% = 255
ret[datatype] = str(int(response[2*value+1:2*value+2],16)*100/255)
elif datatype == 'chaseSpeed1' or datatype =='chaseSpeed2': #_chaseSpeeds
ret[datatype] = self._chaseSpeed(response[2*value+1:2*value+2])
elif datatype == 'fadeType':
ret[datatype] = self._Table7(response[2*value+1:2*value+2])
elif datatype == 'mixMode':
ret[datatype] = self._Table5(response[2*value+1:2*value+2])
elif datatype == 'comment':
ret[datatype] = self._commentTranslate(response[2*value+1:2*value+12])
elif datatype == 'day':
d = response[2*value+1:2*value+2]
if d == '00': ret[datatype]='Mon'
if d == '01': ret[datatype]='Tue'
if d == '02': ret[datatype]='Wed'
if d == '03': ret[datatype]='Thu'
if d == '04': ret[datatype]='Fri'
if d == '05': ret[datatype]='Sat'
if d == '06': ret[datatype]='Sun'
if d == '80': ret[datatype]='ALL'
else:
ret[datatype] = response[2*value+1:2*value+2]
return ret
if stepstocode != {}:
returnlist = ['00','00','00','00','00','00','00']
for steptype in stepdata:
if stepstocode['name'].lower() == stepdata[steptype]['name'].lower():
returnlist[0]= hex(steptype)[2:].zfill(2)
del stepstocode['name']
for element in stepstocode:
for position in stepdata[steptype]:
if element.lower() == stepdata[steptype][position].lower():
payload = stepstocode[element]
if element.lower() == 'fadeenable' or element.lower() == 'outputenable' or element.lower() == 'soloenable' or element.lower() == 'lock': #Booleans
if payload is True:
returnlist[position] = 'FF'
else: returnlist[position]='00'
elif element.lower() == 'fadetime' or element.lower() == 'holdtime': #_Appendix A lookup
returnlist[position] = self._AppendixA('',payload)
elif element.lower() == 'channel' or element.lower() == 'govalues': #2-byte length
returnlist[position] = payload[0:2]
returnlist[position+1] = payload[2:4]
elif element.lower() == 'mididata' or element.lower() =='analoguedata' or element.lower() == 'serialdata': #6-byte length
returnlist[position] = payload[0:2]
returnlist[position+1] = payload[2:4]
returnlist[position+2] = payload[4:6]
returnlist[position+3] = payload[6:8]
returnlist[position+4] = payload[8:10]
returnlist[position+5] = payload[10:12]
elif element.lower() == 'transparencydepth1' or element.lower() == 'transparencydepth2': #100% = 255
try:
percent = int(payload)
if percent >100: percent = 100
if percent <0: percent = 0
except:
percent = 0
returnlist[position] = hex(int(payload)*255/100)[2:].zfill(2)
elif element.lower() == 'chasespeed1' or element.lower() =='chasespeed2': #_chaseSpeeds
returnlist[position] = self._chaseSpeed('',payload)
elif element.lower() == 'fadetype':
returnlist[position] = self._Table7('',payload)
elif element.lower() == 'mixmode':
returnlist[position] = self._Table5('',payload)
elif element.lower() == 'comment':
returnlist[position] = self._commentTranslate('',payload)
elif element.lower() == 'day':
if payload.lower()[:3] == 'mon': returnlist[position] = '00'
if payload.lower()[:3] == 'tue': returnlist[position] = '01'
if payload.lower()[:3] == 'wed': returnlist[position] = '02'
if payload.lower()[:3] == 'thu': returnlist[position] = '03'
if payload.lower()[:3] == 'fri': returnlist[position] = '04'
if payload.lower()[:3] == 'sat': returnlist[position] = '05'
if payload.lower()[:3] == 'sun': returnlist[position] = '06'
if payload.lower()[:3] == 'all': returnlist[position] = '80'
if payload.lower()[:5] == 'every': returnlist[position] = '80'
else:
retlist[position]= hex(element)[2:].zfill(2)
return ''.join(returnlist)
else: pass
ret = {}
for type in stepdata:
names = stepdata[type]
n = names['name']
del names['name']
ret[n] = names.values()
return ret
def showStepDataList(self):
'''Show a table of valid step data arguments for each type of step data.'''
return self._AppendixB()
def setChannels(self,lights,layer = 1):
'''Sets many channels as per a dictionary of levels. Returns a dict of set levels.'''
if not isinstance(lights,dict): raise ValueError('Give me a dict of levels.')
slights = {}
for light in lights:
if lights[light]<0: slights[str(light)] = 0
elif lights[light]>255: slights[str(light)] = 255
else: slights[str(light)] = lights[light]
blocks = self._chunk(slights.keys(),255) #Only set 255 at a time.
for block in blocks:
setting = {b:slights[b] for b in block}
self.channelSetData(setting,layer)
return slights
def getChannels(self,lights=None,layer = 1):
'''Gets many channel levels as per a list/dict of levels (None implies everything in layer).
Returns a dictionary of levels.'''
if lights is None:
lranges = [(1,500)]
elif isinstance(lights,int):
lranges = [(lights,1)]
elif isinstance(lights,str):
lranges = [(int(lights),1)]
elif isinstance(lights,list):
lranges = self._list_range([int(x) for x in lights]) #[(start,count)...]
elif isinstance(lights,dict):
lranges = self._list_range([int(x) for x in lights.keys()]) #[(start,count)...]
else:
return {}
retdict = {}
for rangepair in lranges:
rangestart = 0
while rangestart<rangepair[1]: #Can only do 255 at once!
currentrange = min(rangepair[1]-rangestart,255)
retdict.update(self.channelReadData(int(rangepair[0])+rangestart,currentrange,layer))
rangestart += currentrange
retdict = {str(x):retdict[x] for x in retdict.keys()}
return retdict
def fadeTo(self, todict, time = 0.5, cueList=2,layer = 1):
'''Sets up a crossfade between current light values and the recieved dictionary.'''
#get the current light values
fromdict = self.getChannels(todict)
#build a new cue.
step1={'name':'showscene','fadetype':'crossfade','fadetime':time,'holdtime':0}
step2={'name':'showscene','fadetype':'crossfade','fadetime':time,'holdtime':float('inf')}
self.cueListWrite(cueList,step1,step2)
#transmit the cue steps
self.cueSceneWrite(cueList,1,fromdict)
self.cueSceneWrite(cueList,2,todict)
#execute the cue
self.layerGo(cueList)
def toggleChannel(self,channel,layer = 1):
'''Toggles a channel from on(any value) to off, or off to max.'''
level = self.getChannels(channel,layer)[str(channel)]
if level > 0:
retval = self.setChannels({str(channel):0},layer)
else:
retval = self.setChannels({str(channel):255},layer)
return retval
def buildCue(self,cueList,*stepData):
'''A more complete constructor of a cue. Expects a list of dicts with params from Appendix B, plus a dict 'lights'
that contains an association of lights for each cue step.'''
lights = []
for step in stepData:
try:
lights.append(step['lights'])
del step['lights']
except:
lights.append({})
#build a new cue.
self.cueListWrite(cueList,*stepData)
for n,scene in enumerate(lights):
if scene is not {}:
self.cueSceneWrite(cueList,n,scene)
def commonGetAppID (self):
'''Return the device ID and version of this Lanbox.'''
ret = {}
response = self._lanbox(commandDict['CommonGetAppID'])
ret['deviceId'] = self._Table1(response[0:4])
ret['version'] = self._from_hex(response[4:8])
return ret
def _common16BitMode (self, sixteenBitMode):
'''Toggle 16-Bit mode. Warning - this will not do anything!'''
return self._lanbox(commandDict['Common16BitMode']+self._to_hex(sixteenBitMode,2))
def commonReboot (self):
'''Reboot the lanbox.'''
return self._lanbox(commandDict['CommonReboot'])
def commonSaveData (self):
'''Save data to the internal flash.'''
return self._lanbox(commandDict['CommonSaveData'])
def channelSetData (self,channelData,layer=1):
'''Set each channel to a value on each layer. Expects a dict.'''
cmd=commandDict['ChannelSetData']
cmd = cmd + self._to_hex(layer,2)
for d in channelData:
channel = d
value = channelData[d]
cmd = cmd + self._to_hex(channel,4)+self._to_hex(value,2)
return self._lanbox(cmd)
def channelReadData (self, startChannel, num=1, layer=1):
'''Returns a list of values of channels on layer.'''
response=self._lanbox(commandDict['ChannelReadData']+self._to_hex(layer,2)+self._to_hex(startChannel,4)+self._to_hex(num,2))
ret = {}
for n,c in enumerate(self._chunk(response,2)):
ret[startChannel+n] = self._from_hex(c)
return ret
def channelReadStatus (self, startChannel, num=1,layer=1):
'''Gives a dict of flags for channel in layer'''
response=self._lanbox(commandDict['ChannelReadData']+self._to_hex(layer,2)+self._to_hex(startChannel,4)+self._to_hex(num,2))
ret = {}
for n,c in enumerate(self._chunk(response,2)):
ret[startChannel+n] = self._Table3(c)
return ret
def channelSetOutputEnable (self,channelData,layer=1):
'''Sets the enable flag on channel. Expects a dict{channel:bool}'''
cmd=commandDict['ChannelSetOutputEnable']
cmd = cmd + self._to_hex(layer,2)
for d in channelData:
channel = d
value = channelData[d]
cmd = cmd + self._to_hex(channel,4)+self._to_hex(value,2)
return self._lanbox(cmd)
def channelSetActive (self,channelData,layer=1):
'''Sets the active flag on channel. Expects a dict{channel:bool}'''
cmd=commandDict['ChannelSetActive']
cmd = cmd + self._to_hex(layer,2)
for d in channelData:
channel = d
value = channelData[d]
cmd = cmd + self._to_hex(channel,4)+self._to_hex(value,2)
return self._lanbox(cmd)
def channelSetSolo (self,channelData,layer=1):
'''Sets the solo flag on channel. Expects a dict{channel:bool}'''
cmd=commandDict['ChannelSetSolo']
cmd = cmd + self._to_hex(layer,2)
for d in channelData:
channel = d
value = channelData[d]
cmd = cmd + self._to_hex(channel,4)+self._to_hex(value,2)
return self._lanbox(cmd)
def commonGetLayers (self):
'''Returns a dict of all layers with nested layer attributes'''
ret = {}
response = self._lanbox(commandDict['CommonGetLayers'])
for n, c in enumerate(self._chunk(response,24)):
layer = self._from_hex(c[:2])
ret[layer]={}
ret[layer]['mixOrder']=n
ret[layer]['layerID']=self._from_hex(c[2:4])
ret[layer]['layerAttr']=self._Table4(c[4:6])
ret[layer]['cueList']=self._from_hex(c[6:10])
ret[layer]['cueStep']=self._from_hex(c[10:12])
ret[layer]['fadeTime']=self._AppendixA(c[12:14])
ret[layer]['remainingFadeTime']=self._from_hex(c[14:18])*0.05
ret[layer]['holdTime']=self._AppendixA(c[18:20])
ret[layer]['remainingHoldTime']=self._from_hex(c[20:24])*0.05
return ret
def layerGetStatus (self, layer=1):
'''Returns a dict of layer with extended layer attributes'''
ret = {}
response = self._lanbox(commandDict['LayerGetStatus']+self._to_hex(layer,2))
ret['outputStatus'] = bool(self._from_hex(response[:2]))
ret['sequenceStatus'] = bool(self._from_hex(response[2:4]))
ret['fadeStatus'] = bool(self._from_hex(response[4:6]))
ret['soloStatus'] = bool(self._from_hex(response[6:8]))
ret['mixStatus'] = self._Table5(response[8:10])
ret['holdTime'] = self._AppendixA(response[12:14])
ret['remainingHoldTime'] = self._from_hex(response[14:18])*0.05
ret['activeCueList'] = self._from_hex(response[18:22])
ret['activeCueStep'] = self._from_hex(response[22:24])
ret['chaseMode'] = self._Table6(response[24:26])
ret['layerSpeed'] = self._chaseSpeed(response[26:28])
ret['fadeType'] = self._Table7(response[28:30])
ret['fadeTime'] = self._AppendixA(response[30:32])
ret['cueStepFadeTime'] = self._AppendixA(response[32:34])
ret['remainingFadeTime'] = self._from_hex(response[34:38])*0.05
ret['layerTransparency'] = self._from_hex(response[38:40])/2.55
ret['loadingIndication'] = self._from_hex(response[40:42])*0.05/255
ret['pauseStatus'] = bool(self._from_hex(response[42:44]))
ret['sysExDeviceId'] = self._from_hex(response[44:46])
ret['autoStatus'] = bool(self._from_hex(response[46:48]))
ret['currentCueStep'] = self._AppendixB(response[48:60])
return ret
def layerSetID (self, oldLayer, newLayer):
'''Sets a layer ID'''
return self._lanbox(commandDict['LayerSetID']+self._to_hex(oldLayer,2)+self._to_hex(newLayer,2))
def layerSetOutput (self, layer, output):
'''Sets a layer output flag'''
return self._lanbox(commandDict['LayerSetOutput']+self._to_hex(layer,2)+self._to_hex(output,2))
def layerSetFading (self, layer, fade):
'''Sets a layer fade flag'''
return self._lanbox(commandDict['LayerSetFading']+self._to_hex(layer,2)+self._to_hex(fade,2))
def layerSetSolo (self, layer, solo):
'''Sets a layer solo flag'''
return self._lanbox(commandDict['LayerSetSolo']+self._to_hex(layer,2)+self._to_hex(solo,2))
def layerSetAutoOutput (self, layer, auto):
'''Sets a layer automatic output flag'''
return self._lanbox(commandDict['LayerSetAutoOutput']+self._to_hex(layer,2)+self._to_hex(auto,2))
def layerSetMixMode (self, layer, mixMode):
'''Sets a layer mix mode flag from a dict of mixmodes supplied.'''
return self._lanbox(commandDict['LayerSetMixMode']+self._to_hex(layer,2)+self._Table5('',mixMode))
def layerSetTransparencyDepth (self, layer, transparencyDepth):
'''Sets a layer transparency depth from 0 to 100%'''
return self._lanbox(commandDict['LayerSetTransparencyDepth']+self._to_hex(layer,2)+self._to_hex(transparencyDepth*2.55,2))
def layerSetLocked (self, layer, locked):
'''Sets a layer automatic locked flag'''
return self._lanbox(commandDict['LayerSetLocked']+self._to_hex(layer,2)+self._to_hex(locked,2))
def layerConfigure (self, destLayer, sourceLayer, layerId=None, layerAttr=None, startCueList=None, startCueStep=None):
'''Configures a layer based on another layer.'''
cmd=commandDict['LayerConfigure']
cmd = cmd + self._to_hex(destLayer,2)
cmd = cmd + self._to_hex(sourceLayer,2)
if layerId is not None:
cmd = cmd+self._to_hex(layerId,2)
if layerAttr is None:
raise ValueError('need layerAttr')
cmd = cmd+self._Table4('',layerAttr)
if startCueList is None:
raise ValueError('need startCueList')
cmd = cmd+self._to_hex(startCueList,4)
if startCueStep is None:
raise ValueError
raise ValueError('need startCueStep')
return self._lanbox(cmd)
def layerGo (self, cueList=1, layer=1, cueStep=None):
'''Runs the cues set to a layer.'''
cmd=commandDict['LayerGo']+self._to_hex(layer,2)+self._to_hex(cueList,4)
if cueStep is not None:
cmd = cmd + self._to_hex(cueStep,2)
return self._lanbox(cmd)
def layerClear (self, layer=1):
'''Clears a layer.'''
return self._lanbox(commandDict['LayerClear']+self._to_hex(layer,2))
def layerPause (self, layer=1):
'''Pauses a layers cues.'''
return self._lanbox(commandDict['LayerPause']+self._to_hex(layer,2))
def layerResume (self, layer=1):
'''Resumes a layers cues.'''
return self._lanbox(commandDict['LayerResume']+self._to_hex(layer,2))
def layerNextStep (self, layer=1):
'''Go to the next step of a layers cues.'''
return self._lanbox(commandDict['LayerNextStep']+self._to_hex(layer,2))
def layerPreviousStep (self, layer=1):
'''Go to the previous step of a layers cues.'''
return self._lanbox(commandDict['LayerPreviousStep']+self._to_hex(layer,2))
def layerNextCue (self, layer=1):
'''Go to the next cue of a layers cues.'''
return self._lanbox(commandDict['LayerNextCue']+self._to_hex(layer,2))
def layerPreviousCue (self, layer=1):
'''Go to the previous cue of a layers cues.'''
return self._lanbox(commandDict['LayerPreviousCue']+self._to_hex(layer,2))
def layerSetChaseMode (self, layer,chaseMode):
'''Sets the chase mode flag of a layer.'''
return self._lanbox(commandDict['LayerSetChaseMode']+self._to_hex(layer,2)+self._Table6('',chaseMode))
def layerSetChaseSpeed (self, layer, chaseSpeed):
'''Sets the chase speed of a layer i.e. how quickly each cue gets executed.'''
return self._lanbox(commandDict['LayerSetChaseSpeed']+self._to_hex(layer,2)+self._chaseSpeed('',chaseSpeed))
def layerSetFadeType (self, layer, fadeType):
'''Sets the fade type of a layer. Expects a string.'''
return self._lanbox(commandDict['LayerSetFadeType']+self._to_hex(layer,2)+self._Table7('',fadeType))
def layerSetFadeTime (self, layer, fadeTime):
'''Sets the fade time of a layer.'''
return self._lanbox(commandDict['LayerSetFadeTime']+self._to_hex(layer,2)+self._AppendixA('',fadeTime))
def layerSetEditRunMode (self, layer, edit):
'''Puts a layer in Edit Run mode.'''
return self._lanbox(commandDict['LayerSetEditRunMode']+self._to_hex(layer,2)+self._to_hex(edit,2))
def layerUsesCueList (self, layer, cueList):
'''Assigns a cue list to a layer.'''
return self._lanbox(commandDict['LayerUsesCueList']+self._to_hex(layer,2)+self._to_hex(cueList,4))
def cueListCreate (self, cueList):
'''creates a new cue list.'''
return self._lanbox(commandDict['CueListCreate']+self._to_hex(cueList,4))
def layerInsertStep (self, layer, cueStep=None):
'''Adds a new step to a layer.'''
cmd = commandDict['LayerInsertStep']+self._to_hex(layer,2)
if cueStep is not None: cmd = cmd + self._to_hex(cueStep,2)
return self._lanbox(cmd)
def layerReplaceStep (self, layer, cueStep):
'''Exchanges a step on a layer with another.'''
return self._lanbox(commandDict['LayerReplaceStep']+self._to_hex(layer,2)+self._to_hex(cueStep,2))
def layerSetCueStepParameters (self, layer, stepData):
'''Sets a step Parameters.'''
data = self._AppendixB('',stepData)
self._lanbox(commandDict['LayerSetCueStepType']+self._to_hex(layer,2)+self._to_hex(data[:2],2))
self._lanbox(commandDict['LayerSetCueStepParameters1']+self._to_hex(layer,2)+self._to_hex(data[2:4],2))
self._lanbox(commandDict['LayerSetCueStepParameters2']+self._to_hex(layer,2)+self._to_hex(data[4:6],2))
self._lanbox(commandDict['LayerSetCueStepParameters3']+self._to_hex(layer,2)+self._to_hex(data[6:8],2))
self._lanbox(commandDict['LayerSetCueStepParameters4']+self._to_hex(layer,2)+self._to_hex(data[8:10],2))
self._lanbox(commandDict['LayerSetCueStepParameters5']+self._to_hex(layer,2)+self._to_hex(data[10:12],2))
self._lanbox(commandDict['LayerSetCueStepParameters6']+self._to_hex(layer,2)+self._to_hex(data[12:14],2))
return None
def layerSetDeviceID (self, layer, deviceId):
'''Sets the MIDI Device ID for a layer.'''
return self._lanbox(commandDict['LayerSetDeviceID']+self._to_hex(layer,2)+self._to_hex(deviceId,2))
def layerSetSustain (self, layer, sustain):
'''Sets the MIDI sustain flag on a layer. This affects how MIDI notes with velocity 0 interact with the layer.'''
return self._lanbox(commandDict['LayerSetSustain']+self._to_hex(layer,2)+self._to_hex(sustain,2))
def layerIgnoreNoteOff (self, layer, ignoreNote):
'''Toggles the effect of MIDI Note-Off signals'''
return self._lanbox(commandDict['LayerIgnoreNoteOff']+self._to_hex(layer,2)+self._to_hex(ignoreNote,2))
def cueListGetDirectory (self, index=1):
'''Lists 80 cue list lengths from index.'''
ret = {}
response = self._lanbox(commandDict['CueListGetDirectory']+self._to_hex(index,4))
for c in self._chunk(response,6):
ret[self._from_hex(c[:4])]=self._from_hex(c[4:6])
return ret
def cueListRemove (self, cueList):
'''Delete a cue list.'''
return self._lanbox(commandDict['CueListRemove']+self._to_hex(cueList,4))
def cueListRead (self, cueList, start, num=0):
'''Returns a cue list steps with flags.'''
ret = {}
response = self._lanbox(commandDict['CueListRead']+self._to_hex(cueList,4)+self._to_hex(start,2)+self._to_hex(num,2))
for n,c in enumerate(self._chunk(response,14)):
ret[start+n]=self._AppendixB(c)
return ret
def cueSceneRead (self, cueList, cueStep, start=1):
'''Returns the scene (channel data) info for cueStep of cueList.Start may be needed if length of return over 250.'''
ret = {}
cmd = commandDict['CueSceneRead']+self._to_hex(cueList,4)+self._to_hex(cueStep,2)
if start>1: cmd = cmd + self._to_hex(start,2)
response = self._lanbox(cmd)
ret['channelsInScene']=self._from_hex(response[2:6])
for c in self._chunk(response[6:],6):
ret[self._from_hex(c[:4])]=self._from_hex(c[4:6])
return ret
def cueListWrite (self, cueList, *stepData):
'''Write in the cueList step data. Expects a dicts of flags per step.'''
cmd = commandDict['CueListWrite']+self._to_hex(cueList,4)+self._to_hex(len(stepData),2)
for step in stepData:
cmd = cmd + self._AppendixB('',step)
return self._lanbox(cmd)
def cueSceneWrite (self, cueList, cueStep, channelData):
'''Write in the cueList scene (channel values) data. Expects a dict of channels and values.'''
cmd = commandDict['CueSceneWrite']+self._to_hex(cueList,4)+self._to_hex(cueStep,2)
cmd = cmd + '00'+self._to_hex(len(channelData),4)
for channel in channelData:
cmd = cmd +self._to_hex(channel,4)+self._to_hex(channelData[channel],2)
return self._lanbox(cmd)
def cueListRemoveStep (self, cueList, stepNum):
'''Deletes a step in the cueList.'''
return self._lanbox(commandDict['CueListRemoveStep']+self._to_hex(cueList,4)+self._to_hex(stepNum,2))
def commonSetMIDIMode (self, midiMode):
'''Sets the MIDI mode of the lanbox.'''
return self._lanbox(commandDict['CommonSetMIDIMode']+self._to_hex(midiMode,2))
def commonMIDIBeat (self):
'''Send a MIDI beat.'''
return self._lanbox(commandDict['CommonSetMIDIBeat'])
def commonGetPatcher (self, dmxChan=1, num=255):
'''Return the DMX Patch table.'''
response = self._lanbox(commandDict['CommonGetPatcher']+self._to_hex(dmxChan,4)+self._to_hex(num,2))
for n,c in enumerate(self._chunk(response,4)):
ret[dmxChan+n]=self._from_hex(c)
return ret
def commonSetPatcher (self, dmxData):
'''Set the DMX Patch table. Expects a dict.'''
cmd = commandDict['CommonSetPatcher']
for l in dmxData:
cmd = cmd + self._to_hex(l,4)
cmd = cmd + self._to_hex(dmxData[l],4)
return self._lanbox(cmd)
def commonGetGain (self, dmxChan=1, num=255):
'''Returns the gain factor for each channel.'''
ret = {}
cmd = commandDict['CommonGetGain']+self._to_hex(dmxChan,4)+self._to_hex(num,2)
response = self._lanbox(cmd)
for n,c in enumerate(self._chunk(response,2)):
ret[dmxChan+n]=self._from_hex(c)
return ret
def commonSetGain (self, dmxData):
'''Sets the gain factor for channels. Expects a dict.'''
cmd = commandDict['CommonSetGain']
for l in dmxData:
cmd = cmd + self._to_hex(l,4)
cmd = cmd + self._to_hex(dmxData[l],2)
return self._lanbox(cmd)
def commonGetCurveTable (self, dmxChan=1, num=255):
'''Returns which curve tables are assigned to what DMX channels.'''
ret = {}
cmd = commandDict['CommonGetCurveTable']+self._to_hex(dmxChan,4)+self._to_hex(num,2)
response = self._lanbox(cmd)
for n,c in enumerate(self._chunk(response,2)):
ret[dmxChan+n]=self._from_hex(c)
return ret
def commonSetCurveTable (self, dmxData):
'''Assigns a curve table to channels. Expects a dict, chan->table.'''
cmd = commandDict['CommonSetCurveTable']
for l in dmxData:
cmd = cmd + self._to_hex(l,4)
cmd = cmd + self._to_hex(dmxData[l],2)
return self._lanbox(cmd)
def commonGetCurve (self, curveNum, firstVal=0, num=255):
'''Returns a curve table from memory as a dict.'''
ret = {}
if curveNum<1 or curveNum>8: raise ValueError
cmd = commandDict['CommonGetCurve'+str(curveNum)]
cmd = cmd+self._to_hex(firstVal,2)+self._to_hex(num,2)
response = self._lanbox(cmd)
for n,c in enumerate(self._chunk(response,2)):
ret[firstVal+n]=self._from_hex(c)
return ret
def commonSetCurve (self, curveNum, curveData):
'''Sets a curve table. Expects a dict, input:output.'''
if curveNum<1 or curveNum>8: raise ValueError
cmd = commandDict['CommonSetCurve'+str(curveNum)]
for l in curveData:
cmd = cmd + self._to_hex(l,4)
cmd = cmd + self._to_hex(curveData[l],2)
return self._lanbox(cmd)
def commonGetSlope (self, dmxChan=1, num=255):
'''Returns the maximum rate of change (in frames) per channel.'''
ret = {}
cmd = commandDict['CommonGetSlope']+self._to_hex(dmxChan,4)+self._to_hex(num,2)
response = self._lanbox(cmd)
for n,c in enumerate(self._chunk(response,2)):
ret[dmxChan+n]=self._from_hex(c)
return ret
def commonSetSlope (self, dmxData):
'''Sets the maximum rate of change per channel. Expects a dict, channel:rate.'''
cmd = commandDict['CommonSetSlope']
for l in dmxData:
cmd = cmd + self._to_hex(l,4)
cmd = cmd + self._to_hex(dmxData[l],2)
return self._lanbox(cmd)
def commonGetGlobalData (self):
'''Returns a lot of global information, baudrate, name, dmx channel data etc.'''
ret = {}
response = self._lanbox(commandDict['CommonGetGlobalData'])
ret['baudRate']=self._Table8(response[:2])
ret['dmxOffset']=self._from_hex(response[2:6])
ret['dmxChannels']=self._from_hex(response[6:10])
namelength = self._from_hex(response[10:12])
ret['name']=''
namelist = list(self._chunk(response[12:38],2))[:namelength]
for letter in namelist:
ret['name']=ret['name']+chr(int(letter,16))