Pasar al contenido

Controlador python "mínimo" para relé USR-WM1h

Publicado el 4 minutos de lectura
en: Barebones pythonic USR-WM1h controller

El dispositivo USR-WM1h es un controlador inalámbrico (IEEE 802.11) de relé fabricado por USR IOT, una compañía asentada en Jinan, China. Este artículo cubre una implementación mínima (“en los huesos”, pero funcional!) sobre intérprete python del protocolo usado en el dispositivo USR-WM1h.

Desarrollo:

El desarrollo fue motivado por la necesidad de integrar este dispositivo con mi infraestructura XMPP/Jabber existente, de modo que pudiera hacer cosas como abrir una puerta mediante órden a un robot XMPP implementado a tal efecto. En este artículo muestro la implementación para comandar el dispositivo.

No fue necesario desensamblar la App oficial de Android, la ausencia de capa de cifrado en el protocolo permitió la observación de los comandos en bruto que estaban siendo enviados entre el dispositivo y su App oficial para Android.

Los paquetes enviados desde la App hacia el relé comenzaban siempre por la misma cabecera (0x55 0xaa), los paquetes de respuesta (enviados desde el relé hacia la App) invertían esa secuencia.

Cabecera Envío (snd hdr) 0x55 0xaa
Cabecera Respuesta (rcv hdr) 0xaa 0x55

A continuación seguían 6 bytes de parámetros en las que pude apreciar que sólo cambiaban dos octetos en función de si la App indicaba encender o apagar el relé.

La observación de varios mensajes adicionales (entre la App y el relé) empleando mismo protocolo evidenció que el último de los parámetros era la suma simple de todos los parámetros anteriores (5 Bytes), una especie de suma de verificación, por lo que la orden específica de activar o desactivar el relé incurría exclusivamente en el primero de los dos octetos alterados.

App —> Relé “Activar” snd hdr 0x00 0x03 0x00 0x02 0x01 chk
App —> Relé “Desactivar” snd hdr 0x00 0x03 0x00 0x01 0x01 chk

Notas:

  • Sólo estoy analizando los primeros 2 bytes (justo lo que necesito) de la respuesta en la etapa de inicio de sesión (signon), que sólo pueden resultar en NO ó OK, éste último sólo en caso de una autenticación satisfactoria.
  • El dispositivo escucha en el puerto TCP/8899 por defecto.
  • Las credenciales por defecto son admin/admin.

Consejos:

  • Se pueden cambiar las credenciales establecidas por defecto mediante el servidor HTTP embebido del dispositivo, sin embargo, recomiendo que no emplee caracteres no alfanuméricos…
  • Procedimiento para reinicialización a modo fábrica: mantener presionado el pulsador durante unos 20s.
  • El dispositivo puede ser alimentado tanto por AC como por 12VDC.
  • Consideraciones de seguridad: este dispositivo tratará de llamar a casa mediante UPNP y podría intentarlo probablemente por otros medios… Debería designar una WLAN específica para aislar estos dispositivos sospechosos, una WLAN carente de acceso WAN/Internet, por último añadiría una reserva DHCP que vincule la MAC del dispositivo con una IP concreta. Ya para nota sólo permitir NAT al puerto TCP/8899 desde otra intranet.
  • Si necesita verificar el nombre de usuario símplemente envíe el nombre de usuario + CR + LF, si el nombre de usuario proporcionado es correcto recibirá OK, de lo contrario recibirá NO. En este ejemplo envío un nombre de usuario válido “admin”+‘CR’+‘LF’ y recibo “OK”:
    send: 0x61 0x64 0x6d 0x69 0x6e 0x0d 0x0a
    recv: 0x4f 0x4b

Código: relayctl.py

 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
#!/usr/bin/python

# Barebones pythonic USR-WM1h controller.
# 2016 Antornix - https://www.antornix.com
# License: MIT

import socket
import sys
import time

# Script parameters:
HOST = 'frontdoor.home.local'	# FQDN | IP Address.
PWRD = 'admin'					# Default password.
PORT = 8899						# Default TCP port.
RVBS = 2						# Receive Buffer Size.

# TCP Socket capability
try:
    TCP_SOCKET = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
    print 'FATAL: could not create socket ...'
    sys.exit()

# Resolve HOST in FQDN to IP Address:
try:
    IP_ADDRESS = socket.gethostbyname(HOST)

except socket.gaierror:
    print 'Hostname could not be resolved. Check your DNS ...'
    sys.exit()

def tcpsocksnd(data):
  try :
      TCP_SOCKET.sendall(data)
  except socket.error:
      print 'WARN: Socket error'
      sys.exit()
      
  return TCP_SOCKET.recv(RVBS)
  
def packetizer(p_0, p_1, p_2, p_3, p_4):

  hdr = chr(0x55) + chr(0xaa)
  chk = chr(p_0 + p_1 + p_2 + p_3 + p_4)
  
  bffr  = hdr			# send packet header
  bffr += chr(p_0)		# param 0
  bffr += chr(p_1)		# param 1
  bffr += chr(p_2)		# param 2
  bffr += chr(p_3)		# param 3 (command)
  bffr += chr(p_4)		# param 4
  bffr += chk			# params checksum
 
  return bffr

def signOn(password):
  return tcpsocksnd(password + chr(0xD) + chr(0xA)) == "OK"

def pktSeqOn():
  return packetizer(0x00, 0x03, 0x00, 0x02, 0x01)

def pktSeqOff():
  return packetizer(0x00, 0x03, 0x00, 0x01, 0x01)
  
def relayOn():
  TCP_SOCKET.connect((IP_ADDRESS, PORT))
  if signOn(PWRD):
    tcpsocksnd(pktSeqOn())
  TCP_SOCKET.close()

def relayOff():
  TCP_SOCKET.connect((IP_ADDRESS, PORT))
  if signOn(PWRD):
    tcpsocksnd(pktSeqOff())
  TCP_SOCKET.close()

def toggle():
  TCP_SOCKET.connect((IP_ADDRESS, PORT))
  if signOn(PWRD):
    tcpsocksnd(pktSeqOn())
    #time.sleep(2)
    tcpsocksnd(pktSeqOff())
  TCP_SOCKET.close()

toggle()