Skip to Content

Barebones pythonic USR-WM1h controller

Posted on 4 mins read
es: Controlador python "mínimo" para relé USR-WM1h

The USR-WM1h is a wireless (IEEE 802.11) control relay manufactured by USR IOT, a company based in Jinan, China. This post covers just a minimal (barebones, but working!) python interpreter implementation of USR-WM1h’s protocol.

Development:

The main driving force was the need to integrate this device with my existing XMPP/Jabber infrastructure, so I could do things like opening a door by chatting with a XMPP chatbot built for that purpose. Here is just the code for talking with the device.

It was not necessary to disassemble the official Android App, the absence of encryption layer in the protocol allowed the observation (blackbox style) of the raw commands that were being sent between the device and its official Android App.

The packages sent from the App to the relay always started with the same header (0x55 0xaa), response packets (sent from the relay towards the App) reversed that sequence.

Send Header (snd hdr) 0x55 0xaa
Receive Header (rcv hdr) 0xaa 0x55

Then there were six bytes of parameters in which it could be seen that only two bytes were changing depending on whether the App indicated turning on or off the relay.

The observation of several additional messages (between the App and the relay) on the same protocol showed that the last parameter was the sum of all the previous parameters (5 bytes), a kind of checksum, so the specific order to activate or deactivate the relay was taking place exclusively in the first of said altered bytes.

App —> Relay “On” snd hdr 0x00 0x03 0x00 0x02 0x01 chk
App —> Relay “Off” snd hdr 0x00 0x03 0x00 0x01 0x01 chk

Notes:

  • I am just parsing the first 2 bytes (just what I need) of the response at the signon stage, which can be either NO or OK, the latter if authentication succeeded.
  • The device listens on port TCP/8899 by default.
  • The default credentials are admin/admin.
Send Packet Header chr(0x55) + chr(0xaa)
Recv Packet Header chr(0xaa) + chr(0x55)

Tips:

  • You can change the default credentials by means of its embedded HTTP server, however, I advise you not to use non-alphanumeric characters…
  • Factory settings reset procedure: reboot and press the physical button over 20s.
  • The device can be powered either by AC or 12VDC.
  • Security considerations: this device will try to call home by UPNP and might do so probably by other means… You should create a specific WLAN for isolating these suspectful devices, one without Internet/WAN access, and add a static DHCP IP Address with the MAC of the device. You can NAT its TCP/8899 to an outside intranet.
  • If you need to verify the username just send the username + CR + LF, if it is correct you will receive OK, otherwise you will get NO. In this example I am sending a valid user “admin”+‘CR’+‘LF’ and receiving “OK”:
    send: 0x61 0x64 0x6d 0x69 0x6e 0x0d 0x0a
    recv: 0x4f 0x4b

Code: relayctl.py

 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
#!/usr/bin/python

# Barebones pythonic USR-WM1h controller.
# 2016 Antornix - https://www.antornix.com
# License: MIT

import socket
import sys
import time

# Script parameters:
HOST = 'frontdoor.home.local'	# FQDN | IP Address.
PWRD = 'admin'					# Default password.
PORT = 8899						# Default TCP port.
RVBS = 2						# Receive Buffer Size.

# TCP Socket capability
try:
    TCP_SOCKET = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
    print 'FATAL: could not create socket ...'
    sys.exit()

# Resolve HOST in FQDN to IP Address:
try:
    IP_ADDRESS = socket.gethostbyname(HOST)

except socket.gaierror:
    print 'Hostname could not be resolved. Check your DNS ...'
    sys.exit()

def tcpsocksnd(data):
  try :
      TCP_SOCKET.sendall(data)
  except socket.error:
      print 'WARN: Socket error'
      sys.exit()
      
  return TCP_SOCKET.recv(RVBS)
  
def packetizer(p_0, p_1, p_2, p_3, p_4):

  hdr = chr(0x55) + chr(0xaa)
  chk = chr(p_0 + p_1 + p_2 + p_3 + p_4)
  
  bffr  = hdr			# send packet header
  bffr += chr(p_0)		# param 0
  bffr += chr(p_1)		# param 1
  bffr += chr(p_2)		# param 2
  bffr += chr(p_3)		# param 3 (command)
  bffr += chr(p_4)		# param 4
  bffr += chk			# params checksum
 
  return bffr

def signOn(password):
  return tcpsocksnd(password + chr(0xD) + chr(0xA)) == "OK"

def pktSeqOn():
  return packetizer(0x00, 0x03, 0x00, 0x02, 0x01)

def pktSeqOff():
  return packetizer(0x00, 0x03, 0x00, 0x01, 0x01)
  
def relayOn():
  TCP_SOCKET.connect((IP_ADDRESS, PORT))
  if signOn(PWRD):
    tcpsocksnd(pktSeqOn())
  TCP_SOCKET.close()

def relayOff():
  TCP_SOCKET.connect((IP_ADDRESS, PORT))
  if signOn(PWRD):
    tcpsocksnd(pktSeqOff())
  TCP_SOCKET.close()

def toggle():
  TCP_SOCKET.connect((IP_ADDRESS, PORT))
  if signOn(PWRD):
    tcpsocksnd(pktSeqOn())
    #time.sleep(2)
    tcpsocksnd(pktSeqOff())
  TCP_SOCKET.close()

toggle()