Skip to content

Commit

Permalink
Improve parsing of broken XML on responses by ignoring namespaces (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
StevenLooman committed Mar 29, 2024
1 parent 55121ed commit e6d464f
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 50 deletions.
171 changes: 125 additions & 46 deletions async_upnp_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@

# pylint: disable=too-many-lines

import io
import logging
import urllib.parse
from abc import ABC
from datetime import datetime, timezone
from types import TracebackType
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
)
from xml.etree import ElementTree as ET
from xml.parsers import expat
from xml.sax.saxutils import escape

import defusedxml.ElementTree as DET
Expand Down Expand Up @@ -50,6 +55,34 @@
EventCallbackType = Callable[["UpnpService", Sequence["UpnpStateVariable"]], None]


class DisableXmlNamespaces:
"""Context manager to disable XML namespace handling."""

def __enter__(self) -> None:
"""Enter context manager."""
# pylint: disable=attribute-defined-outside-init
self._old_parser_create = expat.ParserCreate

def expat_parser_create(
encoding: Optional[str] = None,
namespace_separator: Optional[str] = None,
intern: Optional[Dict[str, Any]] = None,
) -> expat.XMLParserType:
# pylint: disable=unused-argument
return self._old_parser_create(encoding, None, intern)

expat.ParserCreate = expat_parser_create

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Exit context manager."""
expat.ParserCreate = self._old_parser_create


class UpnpRequester(ABC):
"""
Abstract base class used for performing async HTTP requests.
Expand Down Expand Up @@ -646,7 +679,7 @@ async def async_call(self, **kwargs: Any) -> Mapping[str, Any]:
except ET.ParseError:
pass
else:
_parse_fault(self, xml, status_code, response_headers)
self._parse_fault(xml, status_code, response_headers)

# Couldn't parse body for fault details, raise generic response error
_LOGGER.debug(
Expand Down Expand Up @@ -719,15 +752,34 @@ def parse_response(
) -> Mapping[str, Any]:
"""Parse response from called Action."""
# pylint: disable=unused-argument
stripped_response_body = response_body.rstrip(" \t\r\n\0")
try:
xml = DET.fromstring(response_body.rstrip(" \t\r\n\0"))
xml = DET.fromstring(stripped_response_body)
except ET.ParseError as err:
_LOGGER.debug("Unable to parse XML: %s\nXML:\n%s", err, response_body)
raise UpnpXmlParseError(err) from err
if self._non_strict:
# Try again ignoring namespaces.
try:
with DisableXmlNamespaces():
parser = DET.XMLParser()

source = io.StringIO(stripped_response_body)
it = DET.iterparse(source, parser=parser)
for _, el in it:
_, _, el.tag = el.tag.rpartition(":") # Strip namespace.
it_root = it.root # type: ET.Element
xml = it_root
except ET.ParseError as err2:
_LOGGER.debug(
"Unable to parse XML: %s\nXML:\n%s", err2, response_body
)
raise UpnpXmlParseError(err2) from err2
else:
_LOGGER.debug("Unable to parse XML: %s\nXML:\n%s", err, response_body)
raise UpnpXmlParseError(err) from err

# Check if a SOAP fault occurred. It should have been caught earlier, by
# the device sending an HTTP 500 status, but not all devices do.
_parse_fault(self, xml)
self._parse_fault(xml)

try:
return self._parse_response_args(service_type, xml)
Expand All @@ -744,9 +796,15 @@ def _parse_response_args(
response = xml.find(query, NS)

# If no response was found, do a search ignoring namespaces when in non-strict mode.
if response is None and self._non_strict:
query = f".//{{*}}{self.name}Response"
response = xml.find(query, NS)
if self._non_strict:
if response is None:
query = f".//{{*}}{self.name}Response"
response = xml.find(query, NS)

# Perhaps namespaces were removed/ignored, try searching again.
if response is None:
query = ".//*Response"
response = xml.find(query)

if response is None:
xml_str = ET.tostring(xml, encoding="unicode")
Expand All @@ -770,51 +828,72 @@ def _parse_response_args(

return args

def _parse_fault(
self,
xml: ET.Element,
status_code: Optional[int] = None,
response_headers: Optional[Mapping] = None,
) -> None:
"""Parse SOAP fault and raise appropriate exception."""
# pylint: disable=too-many-branches
fault = xml.find(".//soap_envelope:Body/soap_envelope:Fault", NS)
if self._non_strict:
if fault is None:
fault = xml.find(".//{{*}}Body/{{*}}Fault", NS)

if fault is None:
fault = xml.find(".//{{*}}Body/{{*}}Fault")

if fault is None:
return

error_code_str = fault.findtext(".//control:errorCode", None, NS)
if self._non_strict:
if not error_code_str:
error_code_str = fault.findtext(".//{{*}}:errorCode", None, NS)

if not error_code_str:
error_code_str = fault.findtext(".//errorCode")

if error_code_str:
error_code: Optional[int] = int(error_code_str)
else:
error_code = None

error_desc = fault.findtext(".//control:errorDescription", None, NS)
if self._non_strict:
if not error_desc:
error_desc = fault.findtext(".//{{*}}:errorDescription", None, NS)

if not error_desc:
error_desc = fault.findtext(".//errorDescription")
_LOGGER.debug(
"Error calling action: %s, error code: %s, error desc: %s",
self.name,
error_code,
error_desc,
)

def _parse_fault(
action: UpnpAction,
xml: ET.Element,
status_code: Optional[int] = None,
response_headers: Optional[Mapping] = None,
) -> None:
"""Parse SOAP fault and raise appropriate exception."""
fault = xml.find(".//soap_envelope:Body/soap_envelope:Fault", NS)
if not fault:
return

error_code_str = fault.findtext(".//control:errorCode", None, NS)
if error_code_str:
error_code: Optional[int] = int(error_code_str)
else:
error_code = None
error_desc = fault.findtext(".//control:errorDescription", None, NS)
_LOGGER.debug(
"Error calling action: %s, error code: %s, error desc: %s",
action.name,
error_code,
error_desc,
)

if status_code is not None:
raise UpnpActionResponseError(
if status_code is not None:
raise UpnpActionResponseError(
error_code=error_code,
error_desc=error_desc,
status=status_code,
headers=response_headers,
message=f"Error during async_call(), "
f"action: {self.name}, "
f"status: {status_code}, "
f"upnp error: {error_code} ({error_desc})",
)

raise UpnpActionError(
error_code=error_code,
error_desc=error_desc,
status=status_code,
headers=response_headers,
message=f"Error during async_call(), "
f"action: {action.name}, "
f"status: {status_code}, "
f"action: {self.name}, "
f"upnp error: {error_code} ({error_desc})",
)

raise UpnpActionError(
error_code=error_code,
error_desc=error_desc,
message=f"Error during async_call(), "
f"action: {action.name}, "
f"upnp error: {error_code} ({error_desc})",
)


T = TypeVar("T") # pylint: disable=invalid-name

Expand Down
1 change: 1 addition & 0 deletions changes/224.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Try discarding namespaces when in non-strict mode to improve handling of broken devices
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<s:Envelope s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"
xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">
<s:Body>
<u:DeletePortMappingResponse xmlns:u0="urn:schemas-upnp-org:service:WANIPConnection:1">
</u:DeletePortMappingResponse>
</s:Body>
</s:Envelope>
42 changes: 38 additions & 4 deletions tests/test_upnp_client.py → tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,14 +531,14 @@ async def test_parse_response_no_service_type_version_2(self) -> None:

@pytest.mark.asyncio
async def test_unknown_out_argument(self) -> None:
"""Test calling an actino and handling an unknown out-argument."""
"""Test calling an action and handling an unknown out-argument."""
requester = UpnpTestRequester(RESPONSE_MAP)
link_service = "http://dlna_dmr:1234/device.xml"
device_url = "http://dlna_dmr:1234/device.xml"
service_type = "urn:schemas-upnp-org:service:RenderingControl:1"
test_action = "GetVolume"

factory = UpnpFactory(requester)
device = await factory.async_create_device(link_service)
device = await factory.async_create_device(device_url)
service = device.service(service_type)
action = service.action(test_action)

Expand All @@ -550,7 +550,7 @@ async def test_unknown_out_argument(self) -> None:
pass

factory = UpnpFactory(requester, non_strict=True)
device = await factory.async_create_device(link_service)
device = await factory.async_create_device(device_url)
service = device.service(service_type)
action = service.action(test_action)

Expand All @@ -559,6 +559,40 @@ async def test_unknown_out_argument(self) -> None:
except UpnpError:
assert False

@pytest.mark.asyncio
async def test_response_invalid_xml_namespaces(self) -> None:
"""Test parsing response with invalid XML namespaces."""
requester = UpnpTestRequester(RESPONSE_MAP)
device_url = "http://igd:1234/device.xml"
service_type = "urn:schemas-upnp-org:service:WANIPConnection:1"
test_action = "DeletePortMapping"

# Test strict mode.
factory = UpnpFactory(requester)
device = await factory.async_create_device(device_url)
service = device.find_service(service_type)
assert service is not None
action = service.action(test_action)

response = read_file("igd/action_WANPIPConnection_DeletePortMapping.xml")
try:
action.parse_response(service_type, {}, response)
assert False
except UpnpError:
pass

# Test non-strict mode.
factory = UpnpFactory(requester, non_strict=True)
device = await factory.async_create_device(device_url)
service = device.find_service(service_type)
assert service is not None
action = service.action(test_action)

try:
action.parse_response(service_type, {}, response)
except UpnpError:
assert False


class TestUpnpService:
"""Tests for UpnpService."""
Expand Down

0 comments on commit e6d464f

Please sign in to comment.