-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
164 lines (119 loc) · 3.22 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import sys
import network
import time
import machine
import ubinascii
import ujson
import neopixel # for blinky flashy thingy
from umqtt.simple import MQTTClient
from machine import Timer
with open('farnsworth.json') as fp:
config = ujson.loads(fp.read())
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(config['wifi']['ssid'],config['wifi']['psk'])
while not station.isconnected():
machine.idle()
ap_if = network.WLAN(network.AP_IF)
if ap_if.active():
ap_if.active(False)
# Pin 0 is D3 on the NodeMCU, 16 is the number of neopixels
np = neopixel.NeoPixel(machine.Pin(0), 16)
default = []
fade_i=0
#ecc486 skin
#135,206,250
for i in range(8): default.append((28, 251, 255))
for i in range(8): default.append((255, 232, 150))
def standard(np):
for i in range(np.n):
np[i] = default[i]
np.write()
standard(np)
time.sleep(1)
CLIENT_ID = ubinascii.hexlify(machine.unique_id())
def apply_colors(m):
for i in range(16):
try:
t=m[i]
np[i]=t
default[i]=t
except IndexError:
pass
np.write()
def flash(c=(255,255,255),times=4):
for i in range(times * np.n):
for j in range(np.n):
np[j] = (0, 0, 0)
np[i % np.n] = c
np.write()
time.sleep_ms(10)
apply_colors(default)
def bounce():
for i in range(4 * np.n):
for j in range(np.n):
np[j] = (0, 0, 128)
if (i // np.n) % 2 == 0:
np[i % np.n] = (0, 0, 0)
else:
np[np.n - 1 - (i % np.n)] = (0, 0, 0)
np.write()
time.sleep_ms(60)
def blink():
for i in range(0, 4 * 256, 8):
for j in range(np.n):
if (i // 256) % 2 == 0:
val = i & 0xff
else:
val = 255 - (i & 0xff)
np[j] = (val, 0, 0)
np.write()
apply_colors(default)
def fade_one(i):
n = np.n
for j in range(n):
val = i
d = default[j]
np[j] = (int(d[0] * (val/255)), int(d[1] * (val/255)), int(d[2] * (val/255)))
np.write()
def fade_timer(t=False):
global fade_i
global fade_going
if not fade_going:
return fade_i
fade_one(abs(fade_i))
if fade_i >= 255:
fade_i = -255
else:
fade_i = fade_i+2
return fade_i
#timer.init(period=2000, mode=Timer.PERIODIC, callback=lambda t:print(2))
timer = Timer(-1)
fade_i=0
fade_going=True
timer.init(period=50, mode=Timer.PERIODIC, callback=lambda t: fade_timer(t))
def on_receive(t, m):
global fade_going
fade_going=False
flash(times=1)
blink()
flash(times=3)
blink()
flash(times=1)
fade_going=True
while True:
try:
c = MQTTClient(client_id = CLIENT_ID,
server = config['mqtt']['server'],
user = config['mqtt']['user'],
password = config['mqtt']['password'],
port = config['mqtt']['port'],
ssl = config['mqtt']['ssl']
)
c.set_callback(on_receive)
c.connect()
c.subscribe(config['mqtt']['topic'])
while True:
c.wait_msg()
except OSError as e:
print("Woops, trying to do stuff again\n")