Skip to content

Commit

Permalink
Try discarding namespaces when in non-strict mode to improving handli…
Browse files Browse the repository at this point in the history
…ng broken devices
  • Loading branch information
StevenLooman committed Mar 25, 2024
1 parent bebb5c9 commit f6273bf
Showing 1 changed file with 124 additions and 46 deletions.
170 changes: 124 additions & 46 deletions async_upnp_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

# pylint: disable=too-many-lines

import io
import logging
import urllib.parse
from abc import ABC
from datetime import datetime, timezone
from types import TracebackType
from typing import (
Any,
Callable,
Expand All @@ -17,9 +19,11 @@
Sequence,
Set,
Tuple,
Type,
TypeVar,
)
from xml.etree import ElementTree as ET
from xml.parsers import expat
from xml.sax.saxutils import escape

import defusedxml.ElementTree as DET
Expand Down Expand Up @@ -50,6 +54,34 @@
EventCallbackType = Callable[["UpnpService", Sequence["UpnpStateVariable"]], None]


class DisableXmlNamespaces:
"""Context manager to disable XML namespace handling."""

def __enter__(self) -> None:
"""Enter context manager."""
# pylint: disable=attribute-defined-outside-init
self._old_parser_create = expat.ParserCreate

def expat_parser_create(
encoding: Optional[str] = None,
namespace_separator: Optional[str] = None,
intern: Optional[dict[str, Any]] = None,
) -> expat.XMLParserType:
# pylint: disable=unused-argument
return self._old_parser_create(encoding, None, intern)

expat.ParserCreate = expat_parser_create

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Exit context manager."""
expat.ParserCreate = self._old_parser_create


class UpnpRequester(ABC):
"""
Abstract base class used for performing async HTTP requests.
Expand Down Expand Up @@ -646,7 +678,7 @@ async def async_call(self, **kwargs: Any) -> Mapping[str, Any]:
except ET.ParseError:
pass
else:
_parse_fault(self, xml, status_code, response_headers)
self._parse_fault(xml, status_code, response_headers)

# Couldn't parse body for fault details, raise generic response error
_LOGGER.debug(
Expand Down Expand Up @@ -719,15 +751,34 @@ def parse_response(
) -> Mapping[str, Any]:
"""Parse response from called Action."""
# pylint: disable=unused-argument
stripped_response_body = response_body.rstrip(" \t\r\n\0")
try:
xml = DET.fromstring(response_body.rstrip(" \t\r\n\0"))
xml = DET.fromstring(stripped_response_body)
except ET.ParseError as err:
_LOGGER.debug("Unable to parse XML: %s\nXML:\n%s", err, response_body)
raise UpnpXmlParseError(err) from err
if self._non_strict:
# Try again ignoring namespaces.
try:
with DisableXmlNamespaces():
parser = DET.XMLParser()

source = io.StringIO(stripped_response_body)
it = DET.iterparse(source, parser=parser)
for _, el in it:
_, _, el.tag = el.tag.rpartition(":") # Strip namespace.
it_root = it.root # type: ET.Element
xml = it_root
except ET.ParseError as err2:
_LOGGER.debug(
"Unable to parse XML: %s\nXML:\n%s", err2, response_body
)
raise UpnpXmlParseError(err2) from err2
else:
_LOGGER.debug("Unable to parse XML: %s\nXML:\n%s", err, response_body)
raise UpnpXmlParseError(err) from err

# Check if a SOAP fault occurred. It should have been caught earlier, by
# the device sending an HTTP 500 status, but not all devices do.
_parse_fault(self, xml)
self._parse_fault(xml)

try:
return self._parse_response_args(service_type, xml)
Expand All @@ -744,9 +795,15 @@ def _parse_response_args(
response = xml.find(query, NS)

# If no response was found, do a search ignoring namespaces when in non-strict mode.
if response is None and self._non_strict:
query = f".//{{*}}{self.name}Response"
response = xml.find(query, NS)
if self._non_strict:
if response is None:
query = f".//{{*}}{self.name}Response"
response = xml.find(query, NS)

# Perhaps namespaces were removed/ignored, try searching again.
if response is None:
query = ".//*Response"
response = xml.find(query)

if response is None:
xml_str = ET.tostring(xml, encoding="unicode")
Expand All @@ -770,51 +827,72 @@ def _parse_response_args(

return args

def _parse_fault(
self,
xml: ET.Element,
status_code: Optional[int] = None,
response_headers: Optional[Mapping] = None,
) -> None:
"""Parse SOAP fault and raise appropriate exception."""
# pylint: disable=too-many-branches
fault = xml.find(".//soap_envelope:Body/soap_envelope:Fault", NS)
if self._non_strict:
if fault is None:
fault = xml.find(".//{{*}}Body/{{*}}Fault", NS)

if fault is None:
fault = xml.find(".//{{*}}Body/{{*}}Fault")

if fault is None:
return

error_code_str = fault.findtext(".//control:errorCode", None, NS)
if self._non_strict:
if not error_code_str:
error_code_str = fault.findtext(".//{{*}}:errorCode", None, NS)

if not error_code_str:
error_code_str = fault.findtext(".//errorCode")

if error_code_str:
error_code: Optional[int] = int(error_code_str)
else:
error_code = None

error_desc = fault.findtext(".//control:errorDescription", None, NS)
if self._non_strict:
if not error_desc:
error_desc = fault.findtext(".//{{*}}:errorDescription", None, NS)

if not error_desc:
error_desc = fault.findtext(".//errorDescription")
_LOGGER.debug(
"Error calling action: %s, error code: %s, error desc: %s",
self.name,
error_code,
error_desc,
)

def _parse_fault(
action: UpnpAction,
xml: ET.Element,
status_code: Optional[int] = None,
response_headers: Optional[Mapping] = None,
) -> None:
"""Parse SOAP fault and raise appropriate exception."""
fault = xml.find(".//soap_envelope:Body/soap_envelope:Fault", NS)
if not fault:
return

error_code_str = fault.findtext(".//control:errorCode", None, NS)
if error_code_str:
error_code: Optional[int] = int(error_code_str)
else:
error_code = None
error_desc = fault.findtext(".//control:errorDescription", None, NS)
_LOGGER.debug(
"Error calling action: %s, error code: %s, error desc: %s",
action.name,
error_code,
error_desc,
)

if status_code is not None:
raise UpnpActionResponseError(
if status_code is not None:
raise UpnpActionResponseError(
error_code=error_code,
error_desc=error_desc,
status=status_code,
headers=response_headers,
message=f"Error during async_call(), "
f"action: {self.name}, "
f"status: {status_code}, "
f"upnp error: {error_code} ({error_desc})",
)

raise UpnpActionError(
error_code=error_code,
error_desc=error_desc,
status=status_code,
headers=response_headers,
message=f"Error during async_call(), "
f"action: {action.name}, "
f"status: {status_code}, "
f"action: {self.name}, "
f"upnp error: {error_code} ({error_desc})",
)

raise UpnpActionError(
error_code=error_code,
error_desc=error_desc,
message=f"Error during async_call(), "
f"action: {action.name}, "
f"upnp error: {error_code} ({error_desc})",
)


T = TypeVar("T") # pylint: disable=invalid-name

Expand Down

0 comments on commit f6273bf

Please sign in to comment.