Module wideq.core

A low-level, general abstraction for the LG SmartThinQ API.

Expand source code
"""A low-level, general abstraction for the LG SmartThinQ API.
"""
import base64
import uuid
from urllib.parse import urljoin, urlencode, urlparse, parse_qs
import hashlib
import hmac
import datetime
import requests
import logging
from typing import Any, Dict, List, Tuple
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

GATEWAY_URL = 'https://kic.lgthinq.com:46030/api/common/gatewayUriList'
APP_KEY = 'wideq'
SECURITY_KEY = 'nuts_securitykey'
DATA_ROOT = 'lgedmRoot'
SVC_CODE = 'SVC202'
CLIENT_ID = 'LGAO221A02'
OAUTH_SECRET_KEY = 'c053c2a6ddeb7ad97cb0eed0dcb31cf8'
OAUTH_CLIENT_KEY = 'LGAO221A02'
DATE_FORMAT = '%a, %d %b %Y %H:%M:%S +0000'
DEFAULT_COUNTRY = 'US'
DEFAULT_LANGUAGE = 'en-US'

RETRY_COUNT = 5  # Anecdotally this seems sufficient.
RETRY_FACTOR = 0.5
RETRY_STATUSES = (502, 503, 504)


def get_wideq_logger() -> logging.Logger:
    level = logging.INFO
    fmt = "%(asctime)s %(levelname)s [%(name)s] %(message)s"
    datefmt = "%Y-%m-%d %H:%M:%S"
    logger = logging.getLogger("wideq")
    logger.setLevel(level)

    try:
        import colorlog  # type: ignore
        colorfmt = f"%(log_color)s{fmt}%(reset)s"
        handler = colorlog.StreamHandler()
        handler.setFormatter(
            colorlog.ColoredFormatter(
                colorfmt,
                datefmt=datefmt,
                reset=True,
                log_colors={
                    "DEBUG": "cyan",
                    "INFO": "green",
                    "WARNING": "yellow",
                    "ERROR": "red",
                    "CRITICAL": "red",
                },
            )
        )
    except ImportError:
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))

    logger.addHandler(handler)
    return logger


LOGGER = get_wideq_logger()


def retry_session():
    """Get a Requests session that retries HTTP and HTTPS requests.
    """
    # Adapted from:
    # https://www.peterbe.com/plog/best-practice-with-retries-with-requests
    session = requests.Session()
    retry = Retry(
        total=RETRY_COUNT,
        read=RETRY_COUNT,
        connect=RETRY_COUNT,
        backoff_factor=RETRY_FACTOR,
        status_forcelist=RETRY_STATUSES,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session


def set_log_level(level: int):
    logger = get_wideq_logger()
    logger.setLevel(level)


def gen_uuid() -> str:
    return str(uuid.uuid4())


def oauth2_signature(message: str, secret: str) -> bytes:
    """Get the base64-encoded SHA-1 HMAC digest of a string, as used in
    OAauth2 request signatures.

    Both the `secret` and `message` are given as text strings. We use
    their UTF-8 equivalents.
    """

    secret_bytes = secret.encode('utf8')
    hashed = hmac.new(secret_bytes, message.encode('utf8'), hashlib.sha1)
    digest = hashed.digest()
    return base64.b64encode(digest)


def get_list(obj, key: str) -> List[Dict[str, Any]]:
    """Look up a list using a key from an object.

    If `obj[key]` is a list, return it unchanged. If is something else,
    return a single-element list containing it. If the key does not
    exist, return an empty list.
    """

    try:
        val = obj[key]
    except KeyError:
        return []

    if isinstance(val, list):
        return val
    else:
        return [val]


class APIError(Exception):
    """An error reported by the API."""

    def __init__(self, code, message):
        self.code = code
        self.message = message


class NotLoggedInError(APIError):
    """The session is not valid or expired."""


class NotConnectedError(APIError):
    """The service can't contact the specified device."""


class TokenError(APIError):
    """An authentication token was rejected."""

    def __init__(self):
        pass


class FailedRequestError(APIError):
    """A failed request typically indicates an unsupported control on a
    device.
    """


class InvalidRequestError(APIError):
    """The server rejected a request as invalid."""


class MonitorError(APIError):
    """Monitoring a device failed, possibly because the monitoring
    session failed and needs to be restarted.
    """

    def __init__(self, device_id, code):
        self.device_id = device_id
        self.code = code


class MalformedResponseError(APIError):
    """The server produced malformed data, such as invalid JSON."""

    def __init__(self, data):
        self.data = data


API_ERRORS = {
    "0102": NotLoggedInError,
    "0106": NotConnectedError,
    "0100": FailedRequestError,
    9000: InvalidRequestError,  # Surprisingly, an integer (not a string).
    9003: NotLoggedInError,  # Session Creation FailureError
}


def lgedm_post(url, data=None, access_token=None, session_id=None):
    """Make an HTTP request in the format used by the API servers.

    In this format, the request POST data sent as JSON under a special
    key; authentication sent in headers. Return the JSON data extracted
    from the response.

    The `access_token` and `session_id` are required for most normal,
    authenticated requests. They are not required, for example, to load
    the gateway server data or to start a session.
    """
    headers = {
        'x-thinq-application-key': APP_KEY,
        'x-thinq-security-key': SECURITY_KEY,
        'Accept': 'application/json',
    }
    if access_token:
        headers['x-thinq-token'] = access_token
    if session_id:
        headers['x-thinq-jsessionId'] = session_id

    with retry_session() as session:
        res = session.post(url, json={DATA_ROOT: data}, headers=headers)
    out = res.json()[DATA_ROOT]

    # Check for API errors.
    if 'returnCd' in out:
        code = out['returnCd']
        if code != '0000':
            message = out['returnMsg']
            if code in API_ERRORS:
                raise API_ERRORS[code](code, message)
            else:
                raise APIError(code, message)

    return out


def oauth_url(auth_base, country, language):
    """Construct the URL for users to log in (in a browser) to start an
    authenticated session.
    """

    url = urljoin(auth_base, 'login/sign_in')
    query = urlencode({
        'country': country,
        'language': language,
        'svcCode': SVC_CODE,
        'authSvr': 'oauth2',
        'client_id': CLIENT_ID,
        'division': 'ha',
        'grant_type': 'password',
    })
    return '{}?{}'.format(url, query)


def parse_oauth_callback(url):
    """Parse the URL to which an OAuth login redirected to obtain two
    tokens: an access token for API credentials, and a refresh token for
    getting updated access tokens.
    """

    params = parse_qs(urlparse(url).query)
    return params['access_token'][0], params['refresh_token'][0]


def login(api_root, access_token, country, language):
    """Use an access token to log into the API and obtain a session and
    return information about the session.
    """

    url = urljoin(api_root + '/', 'member/login')
    data = {
        'countryCode': country,
        'langCode': language,
        'loginType': 'EMP',
        'token': access_token,
    }
    return lgedm_post(url, data)


def refresh_auth(oauth_root, refresh_token):
    """Get a new access_token using a refresh_token.

    May raise a `TokenError`.
    """

    token_url = urljoin(oauth_root, '/oauth2/token')
    data = {
        'grant_type': 'refresh_token',
        'refresh_token': refresh_token,
    }

    # The timestamp for labeling OAuth requests can be obtained
    # through a request to the date/time endpoint:
    # https://us.lgeapi.com/datetime
    # But we can also just generate a timestamp.
    timestamp = datetime.datetime.utcnow().strftime(DATE_FORMAT)

    # The signature for the requests is on a string consisting of two
    # parts: (1) a fake request URL containing the refresh token, and (2)
    # the timestamp.
    req_url = ('/oauth2/token?grant_type=refresh_token&refresh_token=' +
               refresh_token)
    sig = oauth2_signature('{}\n{}'.format(req_url, timestamp),
                           OAUTH_SECRET_KEY)

    headers = {
        'lgemp-x-app-key': OAUTH_CLIENT_KEY,
        'lgemp-x-signature': sig,
        'lgemp-x-date': timestamp,
        'Accept': 'application/json',
    }

    with retry_session() as session:
        res = session.post(token_url, data=data, headers=headers)
    res_data = res.json()

    if res_data['status'] != 1:
        raise TokenError()
    return res_data['access_token']


class Gateway(object):
    def __init__(self, auth_base, api_root, oauth_root, country, language):
        self.auth_base = auth_base
        self.api_root = api_root
        self.oauth_root = oauth_root
        self.country = country
        self.language = language

    @classmethod
    def discover(cls, country, language) -> 'Gateway':
        """Load information about the hosts to use for API interaction.

        `country` and `language` are codes, like "US" and "en-US,"
        respectively.
        """
        gw = lgedm_post(GATEWAY_URL,
                        {'countryCode': country, 'langCode': language})
        return cls(gw['empUri'], gw['thinqUri'], gw['oauthUri'],
                   country, language)

    def oauth_url(self):
        return oauth_url(self.auth_base, self.country, self.language)

    def serialize(self) -> Dict[str, str]:
        return {
            'auth_base': self.auth_base,
            'api_root': self.api_root,
            'oauth_root': self.oauth_root,
            'country': self.country,
            'language': self.language,
        }

    @classmethod
    def deserialize(cls, data: Dict[str, Any]) -> 'Gateway':
        return cls(data['auth_base'], data['api_root'], data['oauth_root'],
                   data.get('country', DEFAULT_COUNTRY),
                   data.get('language', DEFAULT_LANGUAGE))


class Auth(object):
    def __init__(self, gateway, access_token, refresh_token):
        self.gateway = gateway
        self.access_token = access_token
        self.refresh_token = refresh_token

    @classmethod
    def from_url(cls, gateway, url):
        """Create an authentication using an OAuth callback URL.
        """

        access_token, refresh_token = parse_oauth_callback(url)
        return cls(gateway, access_token, refresh_token)

    def start_session(self) -> Tuple['Session', List[Dict[str, Any]]]:
        """Start an API session for the logged-in user. Return the
        Session object and a list of the user's devices.
        """

        session_info = login(self.gateway.api_root, self.access_token,
                             self.gateway.country, self.gateway.language)
        session_id = session_info['jsessionId']
        return Session(self, session_id), get_list(session_info, 'item')

    def refresh(self):
        """Refresh the authentication, returning a new Auth object.
        """

        new_access_token = refresh_auth(self.gateway.oauth_root,
                                        self.refresh_token)
        return Auth(self.gateway, new_access_token, self.refresh_token)

    def serialize(self) -> Dict[str, str]:
        return {
            'access_token': self.access_token,
            'refresh_token': self.refresh_token,
        }


class Session(object):
    def __init__(self, auth, session_id) -> None:
        self.auth = auth
        self.session_id = session_id

    def post(self, path, data=None):
        """Make a POST request to the API server.

        This is like `lgedm_post`, but it pulls the context for the
        request from an active Session.
        """

        url = urljoin(self.auth.gateway.api_root + '/', path)
        return lgedm_post(url, data, self.auth.access_token, self.session_id)

    def get_devices(self) -> List[Dict[str, Any]]:
        """Get a list of devices associated with the user's account.

        Return a list of dicts with information about the devices.
        """

        return get_list(self.post('device/deviceList'), 'item')

    def monitor_start(self, device_id):
        """Begin monitoring a device's status.

        Return a "work ID" that can be used to retrieve the result of
        monitoring.
        """

        res = self.post('rti/rtiMon', {
            'cmd': 'Mon',
            'cmdOpt': 'Start',
            'deviceId': device_id,
            'workId': gen_uuid(),
        })
        return res['workId']

    def monitor_poll(self, device_id, work_id):
        """Get the result of a monitoring task.

        `work_id` is a string ID retrieved from `monitor_start`. Return
        a status result, which is a bytestring, or None if the
        monitoring is not yet ready.

        May raise a `MonitorError`, in which case the right course of
        action is probably to restart the monitoring task.
        """

        work_list = [{'deviceId': device_id, 'workId': work_id}]
        res = self.post('rti/rtiResult', {'workList': work_list})['workList']

        # When monitoring first starts, it usually takes a few
        # iterations before data becomes available. In the initial
        # "warmup" phase, `returnCode` is missing from the response.
        if 'returnCode' not in res:
            return None

        # Check for errors.
        code = res.get('returnCode')  # returnCode can be missing.
        if code != '0000':
            raise MonitorError(device_id, code)

        # The return data may or may not be present, depending on the
        # monitoring task status.
        if 'returnData' in res:
            # The main response payload is base64-encoded binary data in
            # the `returnData` field. This sometimes contains JSON data
            # and sometimes other binary data.
            return base64.b64decode(res['returnData'])
        else:
            return None

    def monitor_stop(self, device_id, work_id):
        """Stop monitoring a device."""

        self.post('rti/rtiMon', {
            'cmd': 'Mon',
            'cmdOpt': 'Stop',
            'deviceId': device_id,
            'workId': work_id,
        })

    def set_device_controls(self, device_id, values):
        """Control a device's settings.

        `values` is a key/value map containing the settings to update.
        """

        return self.post('rti/rtiControl', {
            'cmd': 'Control',
            'cmdOpt': 'Set',
            'value': values,
            'deviceId': device_id,
            'workId': gen_uuid(),
            'data': '',
        })

    def get_device_config(self, device_id, key, category='Config'):
        """Get a device configuration option.

        The `category` string should probably either be "Config" or
        "Control"; the right choice appears to depend on the key.
        """

        res = self.post('rti/rtiControl', {
            'cmd': category,
            'cmdOpt': 'Get',
            'value': key,
            'deviceId': device_id,
            'workId': gen_uuid(),
            'data': '',
        })
        return res['returnData']

Functions

def gen_uuid() ‑> str
Expand source code
def gen_uuid() -> str:
    return str(uuid.uuid4())
def get_list(obj, key: str) ‑> List[Dict[str, Any]]

Look up a list using a key from an object.

If obj[key] is a list, return it unchanged. If is something else, return a single-element list containing it. If the key does not exist, return an empty list.

Expand source code
def get_list(obj, key: str) -> List[Dict[str, Any]]:
    """Look up a list using a key from an object.

    If `obj[key]` is a list, return it unchanged. If is something else,
    return a single-element list containing it. If the key does not
    exist, return an empty list.
    """

    try:
        val = obj[key]
    except KeyError:
        return []

    if isinstance(val, list):
        return val
    else:
        return [val]
def get_wideq_logger() ‑> logging.Logger
Expand source code
def get_wideq_logger() -> logging.Logger:
    level = logging.INFO
    fmt = "%(asctime)s %(levelname)s [%(name)s] %(message)s"
    datefmt = "%Y-%m-%d %H:%M:%S"
    logger = logging.getLogger("wideq")
    logger.setLevel(level)

    try:
        import colorlog  # type: ignore
        colorfmt = f"%(log_color)s{fmt}%(reset)s"
        handler = colorlog.StreamHandler()
        handler.setFormatter(
            colorlog.ColoredFormatter(
                colorfmt,
                datefmt=datefmt,
                reset=True,
                log_colors={
                    "DEBUG": "cyan",
                    "INFO": "green",
                    "WARNING": "yellow",
                    "ERROR": "red",
                    "CRITICAL": "red",
                },
            )
        )
    except ImportError:
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))

    logger.addHandler(handler)
    return logger
def lgedm_post(url, data=None, access_token=None, session_id=None)

Make an HTTP request in the format used by the API servers.

In this format, the request POST data sent as JSON under a special key; authentication sent in headers. Return the JSON data extracted from the response.

The access_token and session_id are required for most normal, authenticated requests. They are not required, for example, to load the gateway server data or to start a session.

Expand source code
def lgedm_post(url, data=None, access_token=None, session_id=None):
    """Make an HTTP request in the format used by the API servers.

    In this format, the request POST data sent as JSON under a special
    key; authentication sent in headers. Return the JSON data extracted
    from the response.

    The `access_token` and `session_id` are required for most normal,
    authenticated requests. They are not required, for example, to load
    the gateway server data or to start a session.
    """
    headers = {
        'x-thinq-application-key': APP_KEY,
        'x-thinq-security-key': SECURITY_KEY,
        'Accept': 'application/json',
    }
    if access_token:
        headers['x-thinq-token'] = access_token
    if session_id:
        headers['x-thinq-jsessionId'] = session_id

    with retry_session() as session:
        res = session.post(url, json={DATA_ROOT: data}, headers=headers)
    out = res.json()[DATA_ROOT]

    # Check for API errors.
    if 'returnCd' in out:
        code = out['returnCd']
        if code != '0000':
            message = out['returnMsg']
            if code in API_ERRORS:
                raise API_ERRORS[code](code, message)
            else:
                raise APIError(code, message)

    return out
def login(api_root, access_token, country, language)

Use an access token to log into the API and obtain a session and return information about the session.

Expand source code
def login(api_root, access_token, country, language):
    """Use an access token to log into the API and obtain a session and
    return information about the session.
    """

    url = urljoin(api_root + '/', 'member/login')
    data = {
        'countryCode': country,
        'langCode': language,
        'loginType': 'EMP',
        'token': access_token,
    }
    return lgedm_post(url, data)
def oauth2_signature(message: str, secret: str) ‑> bytes

Get the base64-encoded SHA-1 HMAC digest of a string, as used in OAauth2 request signatures.

Both the secret and message are given as text strings. We use their UTF-8 equivalents.

Expand source code
def oauth2_signature(message: str, secret: str) -> bytes:
    """Get the base64-encoded SHA-1 HMAC digest of a string, as used in
    OAauth2 request signatures.

    Both the `secret` and `message` are given as text strings. We use
    their UTF-8 equivalents.
    """

    secret_bytes = secret.encode('utf8')
    hashed = hmac.new(secret_bytes, message.encode('utf8'), hashlib.sha1)
    digest = hashed.digest()
    return base64.b64encode(digest)
def oauth_url(auth_base, country, language)

Construct the URL for users to log in (in a browser) to start an authenticated session.

Expand source code
def oauth_url(auth_base, country, language):
    """Construct the URL for users to log in (in a browser) to start an
    authenticated session.
    """

    url = urljoin(auth_base, 'login/sign_in')
    query = urlencode({
        'country': country,
        'language': language,
        'svcCode': SVC_CODE,
        'authSvr': 'oauth2',
        'client_id': CLIENT_ID,
        'division': 'ha',
        'grant_type': 'password',
    })
    return '{}?{}'.format(url, query)
def parse_oauth_callback(url)

Parse the URL to which an OAuth login redirected to obtain two tokens: an access token for API credentials, and a refresh token for getting updated access tokens.

Expand source code
def parse_oauth_callback(url):
    """Parse the URL to which an OAuth login redirected to obtain two
    tokens: an access token for API credentials, and a refresh token for
    getting updated access tokens.
    """

    params = parse_qs(urlparse(url).query)
    return params['access_token'][0], params['refresh_token'][0]
def refresh_auth(oauth_root, refresh_token)

Get a new access_token using a refresh_token.

May raise a TokenError.

Expand source code
def refresh_auth(oauth_root, refresh_token):
    """Get a new access_token using a refresh_token.

    May raise a `TokenError`.
    """

    token_url = urljoin(oauth_root, '/oauth2/token')
    data = {
        'grant_type': 'refresh_token',
        'refresh_token': refresh_token,
    }

    # The timestamp for labeling OAuth requests can be obtained
    # through a request to the date/time endpoint:
    # https://us.lgeapi.com/datetime
    # But we can also just generate a timestamp.
    timestamp = datetime.datetime.utcnow().strftime(DATE_FORMAT)

    # The signature for the requests is on a string consisting of two
    # parts: (1) a fake request URL containing the refresh token, and (2)
    # the timestamp.
    req_url = ('/oauth2/token?grant_type=refresh_token&refresh_token=' +
               refresh_token)
    sig = oauth2_signature('{}\n{}'.format(req_url, timestamp),
                           OAUTH_SECRET_KEY)

    headers = {
        'lgemp-x-app-key': OAUTH_CLIENT_KEY,
        'lgemp-x-signature': sig,
        'lgemp-x-date': timestamp,
        'Accept': 'application/json',
    }

    with retry_session() as session:
        res = session.post(token_url, data=data, headers=headers)
    res_data = res.json()

    if res_data['status'] != 1:
        raise TokenError()
    return res_data['access_token']
def retry_session()

Get a Requests session that retries HTTP and HTTPS requests.

Expand source code
def retry_session():
    """Get a Requests session that retries HTTP and HTTPS requests.
    """
    # Adapted from:
    # https://www.peterbe.com/plog/best-practice-with-retries-with-requests
    session = requests.Session()
    retry = Retry(
        total=RETRY_COUNT,
        read=RETRY_COUNT,
        connect=RETRY_COUNT,
        backoff_factor=RETRY_FACTOR,
        status_forcelist=RETRY_STATUSES,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
def set_log_level(level: int)
Expand source code
def set_log_level(level: int):
    logger = get_wideq_logger()
    logger.setLevel(level)

Classes

class APIError (code, message)

An error reported by the API.

Expand source code
class APIError(Exception):
    """An error reported by the API."""

    def __init__(self, code, message):
        self.code = code
        self.message = message

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class Auth (gateway, access_token, refresh_token)
Expand source code
class Auth(object):
    def __init__(self, gateway, access_token, refresh_token):
        self.gateway = gateway
        self.access_token = access_token
        self.refresh_token = refresh_token

    @classmethod
    def from_url(cls, gateway, url):
        """Create an authentication using an OAuth callback URL.
        """

        access_token, refresh_token = parse_oauth_callback(url)
        return cls(gateway, access_token, refresh_token)

    def start_session(self) -> Tuple['Session', List[Dict[str, Any]]]:
        """Start an API session for the logged-in user. Return the
        Session object and a list of the user's devices.
        """

        session_info = login(self.gateway.api_root, self.access_token,
                             self.gateway.country, self.gateway.language)
        session_id = session_info['jsessionId']
        return Session(self, session_id), get_list(session_info, 'item')

    def refresh(self):
        """Refresh the authentication, returning a new Auth object.
        """

        new_access_token = refresh_auth(self.gateway.oauth_root,
                                        self.refresh_token)
        return Auth(self.gateway, new_access_token, self.refresh_token)

    def serialize(self) -> Dict[str, str]:
        return {
            'access_token': self.access_token,
            'refresh_token': self.refresh_token,
        }

Static methods

def from_url(gateway, url)

Create an authentication using an OAuth callback URL.

Expand source code
@classmethod
def from_url(cls, gateway, url):
    """Create an authentication using an OAuth callback URL.
    """

    access_token, refresh_token = parse_oauth_callback(url)
    return cls(gateway, access_token, refresh_token)

Methods

def refresh(self)

Refresh the authentication, returning a new Auth object.

Expand source code
def refresh(self):
    """Refresh the authentication, returning a new Auth object.
    """

    new_access_token = refresh_auth(self.gateway.oauth_root,
                                    self.refresh_token)
    return Auth(self.gateway, new_access_token, self.refresh_token)
def serialize(self) ‑> Dict[str, str]
Expand source code
def serialize(self) -> Dict[str, str]:
    return {
        'access_token': self.access_token,
        'refresh_token': self.refresh_token,
    }
def start_session(self) ‑> Tuple[Session, List[Dict[str, Any]]]

Start an API session for the logged-in user. Return the Session object and a list of the user's devices.

Expand source code
def start_session(self) -> Tuple['Session', List[Dict[str, Any]]]:
    """Start an API session for the logged-in user. Return the
    Session object and a list of the user's devices.
    """

    session_info = login(self.gateway.api_root, self.access_token,
                         self.gateway.country, self.gateway.language)
    session_id = session_info['jsessionId']
    return Session(self, session_id), get_list(session_info, 'item')
class FailedRequestError (code, message)

A failed request typically indicates an unsupported control on a device.

Expand source code
class FailedRequestError(APIError):
    """A failed request typically indicates an unsupported control on a
    device.
    """

Ancestors

  • APIError
  • builtins.Exception
  • builtins.BaseException
class Gateway (auth_base, api_root, oauth_root, country, language)
Expand source code
class Gateway(object):
    def __init__(self, auth_base, api_root, oauth_root, country, language):
        self.auth_base = auth_base
        self.api_root = api_root
        self.oauth_root = oauth_root
        self.country = country
        self.language = language

    @classmethod
    def discover(cls, country, language) -> 'Gateway':
        """Load information about the hosts to use for API interaction.

        `country` and `language` are codes, like "US" and "en-US,"
        respectively.
        """
        gw = lgedm_post(GATEWAY_URL,
                        {'countryCode': country, 'langCode': language})
        return cls(gw['empUri'], gw['thinqUri'], gw['oauthUri'],
                   country, language)

    def oauth_url(self):
        return oauth_url(self.auth_base, self.country, self.language)

    def serialize(self) -> Dict[str, str]:
        return {
            'auth_base': self.auth_base,
            'api_root': self.api_root,
            'oauth_root': self.oauth_root,
            'country': self.country,
            'language': self.language,
        }

    @classmethod
    def deserialize(cls, data: Dict[str, Any]) -> 'Gateway':
        return cls(data['auth_base'], data['api_root'], data['oauth_root'],
                   data.get('country', DEFAULT_COUNTRY),
                   data.get('language', DEFAULT_LANGUAGE))

Static methods

def deserialize(data: Dict[str, Any]) ‑> Gateway
Expand source code
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> 'Gateway':
    return cls(data['auth_base'], data['api_root'], data['oauth_root'],
               data.get('country', DEFAULT_COUNTRY),
               data.get('language', DEFAULT_LANGUAGE))
def discover(country, language) ‑> Gateway

Load information about the hosts to use for API interaction.

country and language are codes, like "US" and "en-US," respectively.

Expand source code
@classmethod
def discover(cls, country, language) -> 'Gateway':
    """Load information about the hosts to use for API interaction.

    `country` and `language` are codes, like "US" and "en-US,"
    respectively.
    """
    gw = lgedm_post(GATEWAY_URL,
                    {'countryCode': country, 'langCode': language})
    return cls(gw['empUri'], gw['thinqUri'], gw['oauthUri'],
               country, language)

Methods

def oauth_url(self)
Expand source code
def oauth_url(self):
    return oauth_url(self.auth_base, self.country, self.language)
def serialize(self) ‑> Dict[str, str]
Expand source code
def serialize(self) -> Dict[str, str]:
    return {
        'auth_base': self.auth_base,
        'api_root': self.api_root,
        'oauth_root': self.oauth_root,
        'country': self.country,
        'language': self.language,
    }
class InvalidRequestError (code, message)

The server rejected a request as invalid.

Expand source code
class InvalidRequestError(APIError):
    """The server rejected a request as invalid."""

Ancestors

  • APIError
  • builtins.Exception
  • builtins.BaseException
class MalformedResponseError (data)

The server produced malformed data, such as invalid JSON.

Expand source code
class MalformedResponseError(APIError):
    """The server produced malformed data, such as invalid JSON."""

    def __init__(self, data):
        self.data = data

Ancestors

  • APIError
  • builtins.Exception
  • builtins.BaseException
class MonitorError (device_id, code)

Monitoring a device failed, possibly because the monitoring session failed and needs to be restarted.

Expand source code
class MonitorError(APIError):
    """Monitoring a device failed, possibly because the monitoring
    session failed and needs to be restarted.
    """

    def __init__(self, device_id, code):
        self.device_id = device_id
        self.code = code

Ancestors

  • APIError
  • builtins.Exception
  • builtins.BaseException
class NotConnectedError (code, message)

The service can't contact the specified device.

Expand source code
class NotConnectedError(APIError):
    """The service can't contact the specified device."""

Ancestors

  • APIError
  • builtins.Exception
  • builtins.BaseException
class NotLoggedInError (code, message)

The session is not valid or expired.

Expand source code
class NotLoggedInError(APIError):
    """The session is not valid or expired."""

Ancestors

  • APIError
  • builtins.Exception
  • builtins.BaseException
class Session (auth, session_id)
Expand source code
class Session(object):
    def __init__(self, auth, session_id) -> None:
        self.auth = auth
        self.session_id = session_id

    def post(self, path, data=None):
        """Make a POST request to the API server.

        This is like `lgedm_post`, but it pulls the context for the
        request from an active Session.
        """

        url = urljoin(self.auth.gateway.api_root + '/', path)
        return lgedm_post(url, data, self.auth.access_token, self.session_id)

    def get_devices(self) -> List[Dict[str, Any]]:
        """Get a list of devices associated with the user's account.

        Return a list of dicts with information about the devices.
        """

        return get_list(self.post('device/deviceList'), 'item')

    def monitor_start(self, device_id):
        """Begin monitoring a device's status.

        Return a "work ID" that can be used to retrieve the result of
        monitoring.
        """

        res = self.post('rti/rtiMon', {
            'cmd': 'Mon',
            'cmdOpt': 'Start',
            'deviceId': device_id,
            'workId': gen_uuid(),
        })
        return res['workId']

    def monitor_poll(self, device_id, work_id):
        """Get the result of a monitoring task.

        `work_id` is a string ID retrieved from `monitor_start`. Return
        a status result, which is a bytestring, or None if the
        monitoring is not yet ready.

        May raise a `MonitorError`, in which case the right course of
        action is probably to restart the monitoring task.
        """

        work_list = [{'deviceId': device_id, 'workId': work_id}]
        res = self.post('rti/rtiResult', {'workList': work_list})['workList']

        # When monitoring first starts, it usually takes a few
        # iterations before data becomes available. In the initial
        # "warmup" phase, `returnCode` is missing from the response.
        if 'returnCode' not in res:
            return None

        # Check for errors.
        code = res.get('returnCode')  # returnCode can be missing.
        if code != '0000':
            raise MonitorError(device_id, code)

        # The return data may or may not be present, depending on the
        # monitoring task status.
        if 'returnData' in res:
            # The main response payload is base64-encoded binary data in
            # the `returnData` field. This sometimes contains JSON data
            # and sometimes other binary data.
            return base64.b64decode(res['returnData'])
        else:
            return None

    def monitor_stop(self, device_id, work_id):
        """Stop monitoring a device."""

        self.post('rti/rtiMon', {
            'cmd': 'Mon',
            'cmdOpt': 'Stop',
            'deviceId': device_id,
            'workId': work_id,
        })

    def set_device_controls(self, device_id, values):
        """Control a device's settings.

        `values` is a key/value map containing the settings to update.
        """

        return self.post('rti/rtiControl', {
            'cmd': 'Control',
            'cmdOpt': 'Set',
            'value': values,
            'deviceId': device_id,
            'workId': gen_uuid(),
            'data': '',
        })

    def get_device_config(self, device_id, key, category='Config'):
        """Get a device configuration option.

        The `category` string should probably either be "Config" or
        "Control"; the right choice appears to depend on the key.
        """

        res = self.post('rti/rtiControl', {
            'cmd': category,
            'cmdOpt': 'Get',
            'value': key,
            'deviceId': device_id,
            'workId': gen_uuid(),
            'data': '',
        })
        return res['returnData']

Methods

def get_device_config(self, device_id, key, category='Config')

Get a device configuration option.

The category string should probably either be "Config" or "Control"; the right choice appears to depend on the key.

Expand source code
def get_device_config(self, device_id, key, category='Config'):
    """Get a device configuration option.

    The `category` string should probably either be "Config" or
    "Control"; the right choice appears to depend on the key.
    """

    res = self.post('rti/rtiControl', {
        'cmd': category,
        'cmdOpt': 'Get',
        'value': key,
        'deviceId': device_id,
        'workId': gen_uuid(),
        'data': '',
    })
    return res['returnData']
def get_devices(self) ‑> List[Dict[str, Any]]

Get a list of devices associated with the user's account.

Return a list of dicts with information about the devices.

Expand source code
def get_devices(self) -> List[Dict[str, Any]]:
    """Get a list of devices associated with the user's account.

    Return a list of dicts with information about the devices.
    """

    return get_list(self.post('device/deviceList'), 'item')
def monitor_poll(self, device_id, work_id)

Get the result of a monitoring task.

work_id is a string ID retrieved from monitor_start. Return a status result, which is a bytestring, or None if the monitoring is not yet ready.

May raise a MonitorError, in which case the right course of action is probably to restart the monitoring task.

Expand source code
def monitor_poll(self, device_id, work_id):
    """Get the result of a monitoring task.

    `work_id` is a string ID retrieved from `monitor_start`. Return
    a status result, which is a bytestring, or None if the
    monitoring is not yet ready.

    May raise a `MonitorError`, in which case the right course of
    action is probably to restart the monitoring task.
    """

    work_list = [{'deviceId': device_id, 'workId': work_id}]
    res = self.post('rti/rtiResult', {'workList': work_list})['workList']

    # When monitoring first starts, it usually takes a few
    # iterations before data becomes available. In the initial
    # "warmup" phase, `returnCode` is missing from the response.
    if 'returnCode' not in res:
        return None

    # Check for errors.
    code = res.get('returnCode')  # returnCode can be missing.
    if code != '0000':
        raise MonitorError(device_id, code)

    # The return data may or may not be present, depending on the
    # monitoring task status.
    if 'returnData' in res:
        # The main response payload is base64-encoded binary data in
        # the `returnData` field. This sometimes contains JSON data
        # and sometimes other binary data.
        return base64.b64decode(res['returnData'])
    else:
        return None
def monitor_start(self, device_id)

Begin monitoring a device's status.

Return a "work ID" that can be used to retrieve the result of monitoring.

Expand source code
def monitor_start(self, device_id):
    """Begin monitoring a device's status.

    Return a "work ID" that can be used to retrieve the result of
    monitoring.
    """

    res = self.post('rti/rtiMon', {
        'cmd': 'Mon',
        'cmdOpt': 'Start',
        'deviceId': device_id,
        'workId': gen_uuid(),
    })
    return res['workId']
def monitor_stop(self, device_id, work_id)

Stop monitoring a device.

Expand source code
def monitor_stop(self, device_id, work_id):
    """Stop monitoring a device."""

    self.post('rti/rtiMon', {
        'cmd': 'Mon',
        'cmdOpt': 'Stop',
        'deviceId': device_id,
        'workId': work_id,
    })
def post(self, path, data=None)

Make a POST request to the API server.

This is like lgedm_post(), but it pulls the context for the request from an active Session.

Expand source code
def post(self, path, data=None):
    """Make a POST request to the API server.

    This is like `lgedm_post`, but it pulls the context for the
    request from an active Session.
    """

    url = urljoin(self.auth.gateway.api_root + '/', path)
    return lgedm_post(url, data, self.auth.access_token, self.session_id)
def set_device_controls(self, device_id, values)

Control a device's settings.

values is a key/value map containing the settings to update.

Expand source code
def set_device_controls(self, device_id, values):
    """Control a device's settings.

    `values` is a key/value map containing the settings to update.
    """

    return self.post('rti/rtiControl', {
        'cmd': 'Control',
        'cmdOpt': 'Set',
        'value': values,
        'deviceId': device_id,
        'workId': gen_uuid(),
        'data': '',
    })
class TokenError

An authentication token was rejected.

Expand source code
class TokenError(APIError):
    """An authentication token was rejected."""

    def __init__(self):
        pass

Ancestors

  • APIError
  • builtins.Exception
  • builtins.BaseException