Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for UBC1303BA00 and other modems with JSON-formatted lists #24

Merged
merged 8 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ This library was written for and tested with:
* Ubee EVW32C-0N
* Ubee EVW3200-Wifi
* Ubee EVW3226 (UPC)
* Ubee UBC1303BA00
154 changes: 112 additions & 42 deletions pyubee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
from abc import abstractmethod
from base64 import b64encode


jussihi marked this conversation as resolved.
Show resolved Hide resolved
import requests
from requests.auth import HTTPDigestAuth
from requests.exceptions import RequestException

import json


jussihi marked this conversation as resolved.
Show resolved Hide resolved

_LOGGER = logging.getLogger(__name__)
_LOGGER_TRAFFIC = logging.getLogger(__name__ + '.traffic')
Expand Down Expand Up @@ -136,6 +141,32 @@ def headers(self):

return headers

class DigestAuthAuthenticator(Authenticator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to do anything by itself, except holding a username/password. Later on, the flow is changed solely for this Authenticator. I am for using as much as possible from other libraries, but I am also for consistency in the software and flow. I see you "forgot" to change the _post() method (probably don't need it in your case.) When somebody else wants to add support for their router, s/he will have to figure out what is happening/why it is done this way.

One option is to make our own DigestAuthenticator, but with its own logic and not relying on requests. Sure, this defeats the re-use of the requests' HTTPDigestAuth, but simplifies the the software in other parts.

Or maybe the current Authenticators can be changed such that these can be passed to requests.get(auth=..)/requests.post(auth=..)

Or perhaps there is another option, one I haven't thought of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I didn't need the POST method so I left it untouched. I can change that for sure.

I also would like the code flow to be consistent. However, reinventing the wheel here would just feel stupider. According to all the debug prints in the rest of the _get() method it seems like it is rather cumbersome. On top of that, I don't really see the point in not relying in requests; it is used later on in both _get() and _post() anyway. If I wrote the code from scratch, I would've maybe

  1. used the ready Authenticator classes from requests instead of creating our own (and if our own is needed, create a child class of an requests authenticator)
  2. saved that class as an object in the Ubee class, so that it can be dynamically used as for example inside the _get() as follows: response = requests.get(url, timeout=HTTP_REQUEST_TIMEOUT, auth=_self.authenticator(self.username, self.password)) where self.authenticator was a (derivative of) requests Authenticator class.

So yes, I would've preferred the Authenticators of pyubee to be compatible with requests. It would remove the reinvention of the wheel I was talking before. After the change, the whole GET or POST methods would be one-liners :)

I will take a closer look at the comments tomorrow. Thanks for the review. This is just my opinion on this one comment!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are entirely right that it is better to use the tools requests provides, thank you for showing this. I guess this was added at some point and others just continued building on the things that were already there.

Though some Authenticators do sneak in their own GETs/POSTs, necessary for some models. Not sure how to handle this, but probably possible.

"""Digest Auth authenticator."""

def __init__(self, base_url, model_info, http_get_handler, http_post_handler):
"""Create authenticator."""
super().__init__(base_url, model_info, http_get_handler, http_post_handler)

self._username = None
self._password = None

def _build_login_payload(self, login, password, csrf_token=None):
return None

def authenticate(self, url, username, password):
"""Store username/password for later use."""
# Store username/password.
self._username = username
self._password = password

@property
def headers(self):
"""Get authentication related headers, used for every request."""
headers = {}

return headers


MODEL_REGEX = re.compile(r'<modelName>(.*)</modelName>')

Expand Down Expand Up @@ -165,7 +196,8 @@ def headers(self):
r'[0-9a-fA-F]{2}:[0-9a-fA-F]{2})</td>' # mac address, cont'd
r'<td>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})</td>' # ip address
),
'authenticator': DefaultAuthenticator
'authenticator': DefaultAuthenticator,
'JSONList': False
},
'EVW320B': {
'url_session_active': '/BasicStatus.asp',
Expand All @@ -191,7 +223,8 @@ def headers(self):
r'<td>([0-9a-fA-F]{12})</td>' # mac address
r'<td>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})</td>' # ip address
),
'authenticator': DefaultAuthenticator
'authenticator': DefaultAuthenticator,
'JSONList': False
},
'EVW321B': {
'url_session_active': '/HomePageMR4.asp',
Expand All @@ -208,7 +241,8 @@ def headers(self):
r'[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})</td>' # mac address, cont'd
r'<td id="IPAddr">(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})</td>' # ip address
),
'authenticator': DefaultAuthenticator
'authenticator': DefaultAuthenticator,
'JSONList': False
},
'EVW3226@UPC': {
'url_session_active': '/cgi-bin/setup.cgi?gonext=login',
Expand All @@ -225,7 +259,8 @@ def headers(self):
r'<td>([0-9a-fA-F:]{17})</td>\n \t\t\t\t\t\t' # mac address
r'<td>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})</td>' # ip address
),
'authenticator': Evw3226Authenticator
'authenticator': Evw3226Authenticator,
'JSONList': False
},
'DVW32CB': {
'url_session_active': '/main.asp',
Expand All @@ -252,7 +287,8 @@ def headers(self):
r'<td>([0-9a-fA-F:]{17})</td>\n \t\t\t\t\t\t' # mac address
r'<td>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})</td>' # ip address
),
'authenticator': DefaultAuthenticator
'authenticator': DefaultAuthenticator,
'JSONList': False
},
'DDW36C': {
'url_session_active': '/RgSwInfo.asp',
Expand All @@ -273,7 +309,21 @@ def headers(self):
r'<td>([0-9a-fA-F:]{17})</td>' # mac address
r'<td>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})</td>' # ip address
),
'authenticator': BasicAccessAuthAuthenticator
'authenticator': BasicAccessAuthAuthenticator,
'JSONList': False
},
'UBC1303BA00': {
'url_session_active': '/htdocs/cm_info_status.php',
'url_login': '/htdocs/cm_info_status.php',
'url_logout': '/htdocs/unauth.php',
'url_connected_devices_lan': '/htdocs/rg_mgt_clientlist.php',
# no URL for WiFi
'url_connected_devices_wifi': None,
'regex_login': re.compile(r'name="loginUsername"'),
'regex_wifi_devices': None,
'regex_lan_devices': r'\'{\"mgt_cpestatus_table\".*\'',
'authenticator': DigestAuthAuthenticator,
'JSONList': True
},
}

Expand Down Expand Up @@ -314,36 +364,43 @@ def _base_url(self):
return 'http://{}'.format(self.host)

def _get(self, url, **headers):
"""Do a HTTP GET."""
# pylint: disable=no-self-use
_LOGGER.debug('HTTP GET: %s', url)
req_headers = {'Host': self.host}
if hasattr(self, 'authenticator') and isinstance(self.authenticator, DigestAuthAuthenticator):
# We are using digest auth:
response = requests.get(url, timeout=HTTP_REQUEST_TIMEOUT, auth=HTTPDigestAuth(self.username, self.password))
return response
else:
jussihi marked this conversation as resolved.
Show resolved Hide resolved
# Use the rudimentary auth
"""Do a HTTP GET."""
# pylint: disable=no-self-use
_LOGGER.debug('HTTP GET: %s', url)
req_headers = {'Host': self.host}

# Add custom headers.
for key, value in headers.items():
key_title = key.title()
req_headers[key_title] = value

# Add headers from authenticator.
for key, value in self._authenticator_headers.items():
key_title = key.title()
req_headers[key_title] = value

_LOGGER_TRAFFIC.debug('Sending request:')
_LOGGER_TRAFFIC.debug(' HTTP GET %s', url)
for key, value in req_headers.items():
_LOGGER_TRAFFIC.debug(' Header: %s: %s', key, value)

response = requests.get(url, timeout=HTTP_REQUEST_TIMEOUT, headers=req_headers)
_LOGGER.debug('Response status code: %s', response.status_code)

_LOGGER_TRAFFIC.debug('Received response:')
_LOGGER_TRAFFIC.debug(' Status: %s, Reason: %s', response.status_code, response.reason)
for key, value in response.headers.items():
_LOGGER_TRAFFIC.debug(' Header: %s: %s', key, value)
_LOGGER_TRAFFIC.debug(' Data: %s', repr(response.text))

return response

# Add custom headers.
for key, value in headers.items():
key_title = key.title()
req_headers[key_title] = value

# Add headers from authenticator.
for key, value in self._authenticator_headers.items():
key_title = key.title()
req_headers[key_title] = value

_LOGGER_TRAFFIC.debug('Sending request:')
_LOGGER_TRAFFIC.debug(' HTTP GET %s', url)
for key, value in req_headers.items():
_LOGGER_TRAFFIC.debug(' Header: %s: %s', key, value)

response = requests.get(url, timeout=HTTP_REQUEST_TIMEOUT, headers=req_headers)
_LOGGER.debug('Response status code: %s', response.status_code)

_LOGGER_TRAFFIC.debug('Received response:')
_LOGGER_TRAFFIC.debug(' Status: %s, Reason: %s', response.status_code, response.reason)
for key, value in response.headers.items():
_LOGGER_TRAFFIC.debug(' Header: %s: %s', key, value)
_LOGGER_TRAFFIC.debug(' Data: %s', repr(response.text))

return response

def _post(self, url, data, **headers):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this need the alternate flow for the new Authenticator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik this is handled by the one-liner requests.get(...). I'll leave this open though, not 100% sure if some manual work is needed here.

"""Do a HTTP POST."""
Expand Down Expand Up @@ -387,7 +444,7 @@ def detect_model(self):
response = self._get(url)
except RequestException as ex:
_LOGGER.error("Connection to the router failed: %s", ex)
return "Unknown"
return "Unknown. Some models cannot be automatically detected at the moment."

data = response.text
entries = MODEL_REGEX.findall(data)
Expand All @@ -397,7 +454,7 @@ def detect_model(self):
return entries[1]

_LOGGER.debug('Could not detect model')
return "Unknown"
return "Unknown. Some models cannot be automatically detected at the moment."

def session_active(self):
"""Check if session is active."""
Expand Down Expand Up @@ -462,6 +519,8 @@ def get_connected_devices(self):
_LOGGER.debug('WIFI devices: %s', wifi_devices)
devices = lan_devices.copy()
devices.update(wifi_devices)
if self._model_info['JSONList']:
devices = {key:val for key, val in devices.items() if val != "Unknown"}
jussihi marked this conversation as resolved.
Show resolved Hide resolved
return devices

def get_connected_devices_lan(self):
Expand All @@ -476,11 +535,22 @@ def get_connected_devices_lan(self):
return {}

data = response.text
entries = self._model_info['regex_lan_devices'].findall(data)
return {
self._format_mac_address(address): ip
for address, ip in entries
}
if self._model_info['JSONList']:
lan_regexp = self._model_info['regex_lan_devices']
#data = data[1:-1]
matches = re.search(lan_regexp, data, re.MULTILINE)
match = matches.group()[1:-1]
entries = json.loads(match)
return {
self._format_mac_address(entry["lan_dhcpinfo_mac_address"]): entry["lan_dhcpinfo_hostname"]
for entry in entries["lan_dhcpinfo_table"]
}
else:
jussihi marked this conversation as resolved.
Show resolved Hide resolved
entries = self._model_info['regex_lan_devices'].findall(data)
return {
self._format_mac_address(address): ip
for address, ip in entries
}

def get_connected_devices_wifi(self):
"""Get list of connected devices via wifi."""
Expand Down