Conectando o sensor BMP180 ao lego mindstorms 51515
- Eduardo Brennand Paranhos
- 9 de out. de 2024
- 7 min de leitura
Neste post, veremos como conectar o sensor de temperatura e pressão atmosférica bmp180 ao lego mindstorms robot inventor. Embora estejamos falando em específico sobre esse sensor, o método que utilizaremos aqui poderá se aplicar a qualquer sensor que você deseja conectar ao hub lego.
Para fazer isso, será necessária uma placa esp32, a qual conectaremos ao hub por meio de um cabo conectando o cabo rx do conector na porta 19 do esp e o tx na 18.

Instale o micropython em seu esp, além da biblioteca uartremote.py a seguir:
# this library support the following micropython based platforms
# - pybricks on EV3
# - micropython on ESP8266
# - micropython on ESP32
# - micropython on OpenMV H7 plus
# - SPIKE hub
# python3 on any other platform. pyserial is required in that case.
import struct
import sys
EV3=1
ESP32=2
ESP32_S2=3
ESP8266=4
SPIKE=5
H7=7
platforms = {
'linux':EV3, # EV3. TODO This might not be precise enough for python3 running on Linux laptops
'esp32':ESP32,
'Espressif ESP32-S2':ESP32_S2,
'esp8266':ESP8266,
'OpenMV4P-H7':H7,
'LEGO Learning System Hub':SPIKE
}
platform = platforms[sys.platform]
class UartRemoteError(Exception):
def __init__(self, message="An error occured with remote uart"):
super().__init__(message)
interrupt_pressed=0
def esp_interrupt(p):
# called by irq on gpio0
global interrupt_pressed
print("Interrupt Pressed")
dupterm(machine.UART(0, 115200), 1) # repl with 115200baud
interrupt_pressed=1
if platform==ESP8266:
from machine import UART
from machine import Pin
import machine
from utime import sleep_ms
from uos import dupterm
gpio0=Pin(0,Pin.IN)# define pin0 as input = BOOT button on board
gpio0.irq(trigger=Pin.IRQ_FALLING, handler=esp_interrupt)
elif platform==ESP32:
from machine import UART
from machine import Pin
import machine
from utime import sleep_ms
from uos import dupterm
#gpio0=Pin(0,Pin.IN)# define pin0 as input = BOOT button on board
#gpio0.irq(trigger=Pin.IRQ_FALLING, handler=esp_interrupt)
elif platform==ESP32_S2: # circuipython
from busio import UART
import board
from time import sleep
def sleep_ms(ms):
sleep(ms/1000)
elif platform==EV3:
from utime import sleep_ms
from pybricks.iodevices import UARTDevice
from pybricks.parameters import Port
elif platform==H7:
from machine import UART
from utime import sleep_ms
from uos import dupterm
elif platform==SPIKE:
from utime import sleep_ms
import hub
else:
from time import sleep
import serial
def sleep_ms(ms):
sleep(ms/1000)
class UartRemote:
"""
UartRemote
Use to communicate via REPL or some kind of RPC command loop with other devices.
"""
commands={}
command_formats={}
@staticmethod
def digitformat(f):
nn='0'
i=0
while f[i]>='0' and f[i]<='9':
nn+=f[i]
i+=1
return (int(nn),f[i:])
def __init__(self,port=0,baudrate=115200,timeout=1000,debug=False,esp32_rx=0,esp32_tx=26):
# Baud rates of up to 230400 work. 115200 is the default for REPL.
self.reads_per_ms = 10
if platform==EV3:
if not port: port=Port.S1
self.uart = UARTDevice(port,baudrate=baudrate,timeout=timeout)
elif platform==H7:
self.reads_per_ms = 20
if not port: port=3
self.uart = UART(port, baudrate, timeout_char=timeout)
elif platform==ESP8266:
self.baudrate=baudrate # store baudrate for repl init
self.uart = UART(port,baudrate=baudrate,timeout=timeout,timeout_char=timeout,rxbuf=100)
elif platform==ESP32:
if not port: port = 1
self.uart = UART(port,rx=esp32_rx,tx=esp32_tx,baudrate=baudrate,timeout=timeout)
elif platform==ESP32_S2:
self.uart = UART(board.TX,board.RX,baudrate=baudrate,timeout=0.5)
elif platform==SPIKE:
if type(port) == str:
self.uart = eval("hub.port."+port)
else:
self.uart = port
self.uart.mode(1)
sleep_ms(300)# wait for all duplex methods to appear
self.uart.baud(baudrate) # set baud rate
else:
# Try regular python3 pyserial
self.uart = serial.Serial(port, baudrate, timeout=timeout)
self.DEBUG=debug
self.unprocessed_data=b''
self.baudrate = baudrate
self.local_repl_enabled = True
self.add_command(self.enable_repl_locally, name='enable repl')
self.add_command(self.disable_repl_locally, name='disable repl')
self.add_command(self.echo, 's', name='echo')
self.add_command(self.raw_echo, name='raw_echo')
def echo(self, s):
if self.DEBUG: print(s)
return str(s)
@staticmethod
def raw_echo(s):
return s
def enable_repl_locally(self):
global interrupt_pressed
self.local_repl_enabled = True
interrupt_pressed = 1
def disable_repl_locally(self):
self.local_repl_enabled = False
@property
def local_repl_enabled(self):
return self._local_repl_enabled
@local_repl_enabled.setter
def local_repl_enabled(self, enabled):
if enabled:
if platform==ESP8266:
dupterm(self.uart, 1)
elif platform==H7:
dupterm(self.uart, 2)
else:
if platform==ESP8266:
dupterm(None, 1)
elif platform==H7:
dupterm(None, 2)
self._local_repl_enabled = enabled
def add_command(self,command_function, format="", name=None):
if not name:
name=repr(command_function).split(" ")[1]
self.commands[name]=command_function
self.command_formats[name]=format
def pack(self,*argv):
try:
f=argv[0] # formatstring
i=0
ff=''
s=b''
if f=='raw':
# No encoding, raw bytes
# s=struct.pack('B',len(argv[1])) + argv[1]
s=b'\x03raw'+argv[1]
else:
while (len(f)>0):# keep parsing formatstring
nf,f=self.digitformat(f) # split preceding digits and format character
if nf==0:
nf=1
fo=f[0]
data=argv[1+i]# get data data that needs to be encoded
td=type(data) # check type of data
if td==list: # for lists, use a special 'a' format character preceding the normal formatcharacter
n=len(data)
ff+="a%d"%n+fo # 'a' for list
for d in data:
s+=struct.pack(fo,d) # encode each element in list with format character fo
elif td==tuple: # for lists, use a special 'a' format character preceding the normal formatcharacter
n=len(data)
ff+="t%d"%n+fo # 'a' for list
for d in data:
s+=struct.pack(fo,d) # encode each element in list with format character fo
elif td==str:
n=len(data)
ff+="%d"%n+fo
s+=data.encode('utf-8')
elif td==bytes:
n=len(data)
ff+="%d"%n+fo
s+=data
else:
ff+=fo
s+=struct.pack(fo,data)
else:
fo="%d"%nf+f[0]
data=argv[1+i:1+i+nf]
ff+=fo
s+=struct.pack(fo,*data)
i+=nf
f=f[1:] # continue parsing with remainder of f
s=struct.pack('B',len(ff))+ff.encode('utf-8')+s
return s
except:
t=type(argv[0])
if t==bytes:
return argv[0]
elif t==str:
return bytes(argv[0],"utf-8")
elif t==int:
return bytes((argv[0],))
elif t==list:
return bytes(argv[0])
else:
return b'\x01z'
def unpack(self,s):
sizes={'b':1,'B':1,'i':4,'I':4,'f':4,'s':1,'r':1}
try:
p=0
nf=s[p]
p+=1
f=s[p:p+nf].decode('utf-8')
p+=nf
data=()
if f=="z":# dummy format 'z' for empty data
return None
if f=="raw": # Raw bytes, no decoding needed
return s[p:]
while (len(f)>0):
nf,f=self.digitformat(f)
fo=f[0]
if f[0]=='a' or f[0]=='t': # array
extra=f[0]
f=f[1:]
nf,f=self.digitformat(f)
fo=f[0]
nr_bytes=nf*sizes[fo]
if extra=='a':
data=data+(list(struct.unpack("%d"%nf+fo,s[p:p+nr_bytes])),) # make list from tuple returnd by unpack
else:
data=data+(tuple(struct.unpack("%d"%nf+fo,s[p:p+nr_bytes])),) # make list from tuple returnd by unpack
else:
ff=fo if nf==0 else "%d"%nf+fo
if nf==0: nf=1
nr_bytes=nf*sizes[fo]
if ff[-1]=='r': ff=ff[:-1]+'s'
decoded=struct.unpack(ff,s[p:p+nr_bytes])
if fo=='s':
decoded=(decoded[0].decode('utf-8'),) # transform bytes in string
data=data+(decoded)
p+=nr_bytes
f=f[1:]
if len(data)==1: # convert from tuple size 1 to single value
data=data[0]
return data
except:
return s
def encode(self,cmd,*argv,encoder=-1):
""" Encodes command with specified encoder """
if argv:
if encoder:
if encoder==-1:
encoder=self.pack
s=encoder(*argv)
else:
s=argv[0]
else: # no formatstring
s=b'\x01z'# dummy format 'z' for no arguments
s=struct.pack("B",len(cmd))+cmd.encode('utf-8')+s
s=struct.pack("B",len(s))+s
return s
def decode(self,s,decoder=-1):
""" Decodes command + encoded bytes with specified decoder"""
nl=s[0] #number bytes in total length of message
nc=s[1] #number of bytes in command
cmd=s[2:2+nc].decode('utf-8')
data=s[2+nc:]
if decoder:
if decoder==-1:
decoder=self.unpack
data=decoder(data)
else:
if data==b'\x01z': data=None
return cmd,data
def available(self):
""" Platform independent check for available characters in receive queue of UART """
if platform==SPIKE:
self.unprocessed_data=self.force_read(1, timeout=1)
if self.unprocessed_data==None:
self.unprocessed_data=b''
return len(self.unprocessed_data)
if platform==EV3:
return self.uart.waiting()
if platform==ESP32 or platform==ESP8266 or platform==H7:
return self.uart.any()
if platform==ESP32_S2:
return self.uart.in_waiting
else:
return self.uart.in_waiting()
def read_all(self):
# Read full receive buffer
available = self.available()
data = self.unprocessed_data
if platform == SPIKE:
self.unprocessed_data = b''
while True:
r=self.uart.read(1)
if r==b'': break
data += r
else:
if available:
data = self.uart.read(available)
return data
def force_read(self, size=1, timeout=50):
# SPIKE and OpenMV reads too fast and sometimes returns None
# check: on SPIKE b'' is returned, on OpenMV None
data = b''
r=self.uart.read(1)
for i in range(timeout*self.reads_per_ms):
if r==None:
r=b''
data += r
if len(data) == size:
return data
else:
r=self.uart.read(1)
if i > 3 and self.DEBUG:
print("Waiting for data in force read...")
return data
def receive_command(self,timeout=1000,**kwargs):
global interrupt_pressed
delim=b''
if platform==SPIKE:
if self.unprocessed_data:
delim = self.unprocessed_data
self.unprocessed_data=b''
for i in range(timeout*self.reads_per_ms):
if delim==b'<':
break
else:
delim=self.uart.read(1)
if delim!=b'<':
return ("err","< delim not found")
payload=self.force_read(1)
l=struct.unpack('B',payload)[0]
for i in range(l):
r = self.force_read(1)
payload+=r
delim=self.uart.read(1)
else: # other platforms
for i in range(timeout*self.reads_per_ms):
data = self.read_all()
if data: break
if not data:
if self.DEBUG: print("No data after timeout")
return ("err","No data")
#if self.DEBUG: print("Decoding {}".format(data))
size = len(data)
for i in range(size):
#if self.DEBUG: print(data[i:i+1])
if data[i:i+1] == b'<':
if size >= i+2:
l=data[i+1]
else:
l=self.force_read(1,timeout=10)[0]
max_reads = timeout*self.reads_per_ms
reads=0
while len(data) <= i+l+2: #not enough data, read some more. Maybe it's +2.
data += self.read_all()
reads+=1
if reads > 2 and self.DEBUG:
print("Waiting for data in rcv command...")
if reads > max_reads:
break
payload = data[i+1:i+2+l]
delim = data[i+2+l:i+3+l]
#if self.DEBUG: print("Payload: {}, delim: {}".format(payload,delim))
break
if delim!=b'>':
return ("err","> delim not found")
else:
result = self.decode(payload,**kwargs)
#if self.DEBUG: print(result)
return result
def send_command(self,command,*argv,**kwargs):
s=self.encode(command,*argv,**kwargs)
msg=b'<'+s+b'>'
#if self.DEBUG: print(msg)
if platform==SPIKE: # on spike send 32-bytes at a time
window=32
while len(msg) > window:
self.uart.write(msg[:window])
sleep_ms(5)
msg = msg[window:]
self.uart.write(msg)
else:
self.uart.write(msg)
self.flush()
def call(self,command,*args,**kwargs):
self.send_command(command,*args,**kwargs)
return self.receive_command(**kwargs)
def execute_command(self, command, value):
# name should reflect that it send back respons of exeuted command
if command in self.commands:
command_ack=command+"ack"
try:
if value!=None:
if type(value)==tuple:
resp=self.commands[command](*value)
else:
resp=self.commands[command](value)
else:
resp=self.commands[command]()
except Exception as e:
self.send_command('err','s', "Command failed: {}".format(e))
return
if resp!=None:
try:
f=self.command_formats[command]
if f: # There is a (smart)pack format
if type(resp)!=tuple:
resp=(resp,) # make a tuple
self.send_command(command_ack,f,*resp)
else: # user probably wants raw response.
self.send_command(command_ack,resp)
except Exception as e:
self.send_command('err','s', "Response packing failed: {}".format(e))
return
else:
self.send_command(command_ack,'s','ok')
else:
#if command[-3:] not in ['ack','err']:# discard any ack from other command
#self.send_command('err','s','Command not found')
self.send_command('err','s','Command not found: {}'.format(command))
def process_uart(self):
if self.local_repl_enabled:
self.local_repl_enabled=False
if self.available():
self.execute_command(*self.receive_command())
else:
if self.DEBUG:
print("Nothing available. Sleeping 100ms")
sleep_ms(100)
else:
if platform==H7:
sleep_ms(13)
else:
sleep_ms(1)
def loop(self):
global interrupt_pressed
while True:
if interrupt_pressed==1:
interrupt_pressed=0
break
self.process_uart()
def flush(self):
_ = self.read_all()
if self.DEBUG: print("Flushed: %r" % _)
def repl_activate(self):
self.flush()
self.send_command('enable repl')
sleep_ms(300)
self.uart.write(b"r\x03\x03\x01") # Ctrl-c, Ctrl-c, Ctrl-a
sleep_ms(300)
self.flush()
self.uart.write(b"r\x03\x03\x01") # Ctrl-c, Ctrl-c, Ctrl-a
sleep_ms(10)
data = self.read_all()
if not data[-14:] == b'L-B to exit\r\n>':
raise UartRemoteError("Raw REPL failed (response: %r)" % data)
def repl_run(self, command, reply=True, raw_paste=True):
command_bytes_left = bytes(command, "utf-8")
window = 128
if raw_paste:
self.uart.write(b"\x05A\x01") # Try raw paste
result = self.force_read(2)
if self.DEBUG: print(result)
if result == b'R\x01':
raw_paste = True
result = self.uart.read(3) # Should be b'x80\x00\x01' where \x80 is the window size
window = result[0]
else:
raw_paste = False
self.flush()
if platform==SPIKE:
window = 32
while len(command_bytes_left) > window:
self.uart.write(command_bytes_left[:window]) # Write our MicroPython command and ctrl-D to execute
sleep_ms(4)
result = self.uart.read(1)
command_bytes_left = command_bytes_left[window:]
self.uart.write(command_bytes_left+b'\x04')
if raw_paste:
data = self.force_read(1)
if data != b'\x04':
raise UartRemoteError("could not exec command (response: %r)" % data)
else:
sleep_ms(10)
# check if we could exec command
data = self.uart.read(2)
if data != b"OK":
raise UartRemoteError("could not exec command (response: %r)" % data)
if reply:
result = b""
decoded = []
while not len(decoded) >= 3: # We need at least 3x'\x04'
result += self.read_all()
decoded = result.decode("utf-8").split("\x04")
try:
value, error, _ = decoded # The last 5 bytes are b'\r\n\x04\x04>' Between the \x04's there can be an exception.
except:
raise UartRemoteError("Unexpected answer from repl: {}".format(result))
if error:
if self.DEBUG: print(error)
return error.strip() # using strip() to remove \r\n at the end.
elif value:
return value.strip()
else:
return
def repl_exit(self):
self.uart.write(b"\x02") # Ctrl-B
Depois, conecte seu sensor bmp180 nas portas 4 e 5 do esp e rode o codigo a seguir:
from machine import Pin, SoftI2C
from time import sleep
from bmp180 import BMP180
from uartremote import *
import gc
gc.collect()
u = UartRemote()
i2c = SoftI2C(sda=Pin(5), scl=Pin(4))
bmp = BMP180(i2c)
bmp.oversample_sett = 2
bmp.baseline = 101325
temperatura = bmp.temperature
def vl53():
d=bmp.temperature
return d
u.add_command(vl53, 'f') # use 'd' when you want to express a value that is not an integer, 4
u.loop()
Rode o código a seguir no hub:
from mindstorms import MSHub
import time
from projects.uartremote import *
from mindstorms import Motor
u=UartRemote("A")# ESP32 connected to port F
hub = MSHub()
while True:
r,value=u.call('vl53')
hub.light_matrix.write(str(value) + 'C')
time.sleep(1)
O resultado deverá ser este:
Comments