/
mhz19.py
60 lines (51 loc) · 1.88 KB
/
mhz19.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from wemos import D8
from machine import Pin
import time
class MHZ19:
def __init__(self):
self.pin = Pin(D8, Pin.IN)
self.lastTimestamp = None
self.lowDuration = None
self.highDuration = None
self.latestMeasurements = []
def start(self):
self.pin.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.callback)
def callback(self, p):
now = time.ticks_ms()
if (self.lastTimestamp):
duration = time.ticks_diff(time.ticks_ms(), self.lastTimestamp)
else:
duration = 0
value = p.value()
print('MH-Z19 interrupt handler', value, duration)
self.lastTimestamp = now
if value:
self.lowDuration = duration
if self.lowDuration and self.highDuration:
co2 = self.calculateCo2Ppm(self.highDuration, self.lowDuration)
print("instant CO2", co2)
self.latestMeasurements.append(co2)
if len(self.latestMeasurements) > 100:
self.latestMeasurements.pop(0)
else:
self.highDuration = duration
self.lastTimestamp = now
def calculateCo2Ppm(self, highDuration, lowDuration):
return int(5000.0 * (1002.0 * highDuration - 2.0 * lowDuration) / 1000.0 / (highDuration + lowDuration))
def getCo2(self):
# get a median of the latest CO2 readings
measurements = self.latestMeasurements
self.latestMeasurements = []
if len(measurements):
measurements.sort()
medianCo2 = measurements[len(measurements) // 2]
print("CO2 median", medianCo2)
return medianCo2
else:
print("WARN: no CO2 measurements found")
return None
if __name__ == '__main__':
mhz19 = MHZ19()
mhz19.start()
time.sleep(10)
print('Median CO2', mhz19.getCo2())