From 7096a785db6be6be7dbc0f91c2f830d6445224b1 Mon Sep 17 00:00:00 2001 From: jcfp Date: Fri, 5 Feb 2021 18:48:14 +0100 Subject: [PATCH] fix bonjour with localhost, retire LOCALHOSTS constant (#1782) * fix bonjour with localhost, retire LOCALHOSTS constant * rename probablyipv[46] functions to is_ipv[46]_addr * refuse to send ssdp description_xml to outsiders --- SABnzbd.py | 65 +++++++++++--------- sabnzbd/cfg.py | 2 +- sabnzbd/constants.py | 2 - sabnzbd/interface.py | 16 +++-- sabnzbd/misc.py | 50 ++++++++++----- sabnzbd/newswrapper.py | 6 +- sabnzbd/zconfig.py | 8 +-- tests/test_getipaddress.py | 8 +-- tests/test_misc.py | 148 +++++++++++++++++++++++++++++++++++++++++++++ tests/test_probablyip.py | 46 -------------- 10 files changed, 240 insertions(+), 111 deletions(-) delete mode 100644 tests/test_probablyip.py diff --git a/SABnzbd.py b/SABnzbd.py index 7e53453..bccf5b5 100755 --- a/SABnzbd.py +++ b/SABnzbd.py @@ -68,7 +68,9 @@ from sabnzbd.misc import ( get_serv_parms, get_from_url, upload_file_to_sabnzbd, - probablyipv4, + is_ipv4_addr, + is_localhost, + is_lan_addr, ) from sabnzbd.filesystem import get_ext, real_path, long_path, globber_full, remove_file from sabnzbd.panic import panic_tmpl, panic_port, panic_host, panic, launch_a_browser @@ -533,7 +535,7 @@ def get_webhost(cherryhost, cherryport, https_port): # Valid user defined name? info = socket.getaddrinfo(cherryhost, None) except socket.error: - if cherryhost not in LOCALHOSTS: + if not is_localhost(cherryhost): cherryhost = "0.0.0.0" try: info = socket.getaddrinfo(localhost, None) @@ -600,7 +602,7 @@ def get_webhost(cherryhost, cherryport, https_port): except socket.error: cherryhost = cherryhost.strip("[]") - if ipv6 and ipv4 and browserhost not in LOCALHOSTS: + if ipv6 and ipv4 and not is_localhost(browserhost): sabnzbd.AMBI_LOCALHOST = True logging.info("IPV6 has priority on this system, potential Firefox issue") @@ -1489,34 +1491,37 @@ def main(): check_latest_version() autorestarted = False - # bonjour/zeroconf needs an ip. Lets try to find it. - external_host = localipv4() # IPv4 address of the LAN interface. This is the normal use case - if not external_host: - # None, so no network / default route, so let's set to ... - external_host = "127.0.0.1" - elif probablyipv4(cherryhost) and cherryhost not in LOCALHOSTS + ("0.0.0.0", "::"): - # a hard-configured cherryhost other than the usual, so let's take that (good or wrong) - external_host = cherryhost - logging.debug("bonjour/zeroconf/SSDP using host: %s", external_host) - sabnzbd.zconfig.set_bonjour(external_host, cherryport) - - # Start SSDP if SABnzbd is running exposed - if cherryhost not in LOCALHOSTS: - # Set URL for browser for external hosts - if enable_https: - ssdp_url = "https://%s:%s%s" % (external_host, cherryport, sabnzbd.cfg.url_base()) + # Start SSDP and Bonjour if SABnzbd isn't listening on localhost only + if sabnzbd.cfg.enable_broadcast() and not is_localhost(cherryhost): + # Try to find a LAN IP address for SSDP/Bonjour + if is_lan_addr(cherryhost): + # A specific listening address was configured, use that + external_host = cherryhost else: - ssdp_url = "http://%s:%s%s" % (external_host, cherryport, sabnzbd.cfg.url_base()) - ssdp.start_ssdp( - external_host, - "SABnzbd", - ssdp_url, - "SABnzbd %s" % sabnzbd.__version__, - "SABnzbd Team", - "https://sabnzbd.org/", - "SABnzbd %s" % sabnzbd.__version__, - ssdp_broadcast_interval=sabnzbd.cfg.ssdp_broadcast_interval(), - ) + # Fall back to the IPv4 address of the LAN interface + external_host = localipv4() + logging.debug("Using %s as host address for Bonjour and SSDP", external_host) + + if is_lan_addr(external_host): + sabnzbd.zconfig.set_bonjour(external_host, cherryport) + + # Set URL for browser for external hosts + ssdp_url = "%s://%s:%s%s" % ( + ("https" if enable_https else "http"), + external_host, + cherryport, + sabnzbd.cfg.url_base(), + ) + ssdp.start_ssdp( + external_host, + "SABnzbd", + ssdp_url, + "SABnzbd %s" % sabnzbd.__version__, + "SABnzbd Team", + "https://sabnzbd.org/", + "SABnzbd %s" % sabnzbd.__version__, + ssdp_broadcast_interval=sabnzbd.cfg.ssdp_broadcast_interval(), + ) # Have to keep this running, otherwise logging will terminate timer = 0 diff --git a/sabnzbd/cfg.py b/sabnzbd/cfg.py index 50ff62c..ea33ff2 100644 --- a/sabnzbd/cfg.py +++ b/sabnzbd/cfg.py @@ -282,7 +282,7 @@ helpfull_warnings = OptionBool("misc", "helpfull_warnings", True) keep_awake = OptionBool("misc", "keep_awake", True) win_menu = OptionBool("misc", "win_menu", True) allow_incomplete_nzb = OptionBool("misc", "allow_incomplete_nzb", False) -enable_bonjour = OptionBool("misc", "enable_bonjour", True) +enable_broadcast = OptionBool("misc", "enable_broadcast", True) max_art_opt = OptionBool("misc", "max_art_opt", False) ipv6_hosting = OptionBool("misc", "ipv6_hosting", False) fixed_ports = OptionBool("misc", "fixed_ports", False) diff --git a/sabnzbd/constants.py b/sabnzbd/constants.py index 44f0317..eab736a 100644 --- a/sabnzbd/constants.py +++ b/sabnzbd/constants.py @@ -124,8 +124,6 @@ CHEETAH_DIRECTIVES = {"directiveStartToken": " IGNORED_FOLDERS = ("@eaDir", ".appleDouble") -LOCALHOSTS = ("localhost", "127.0.0.1", "[::1]", "::1") - # (MATCHER, [EXTRA, MATCHERS]) series_match = [ (compile(r"( [sS]|[\d]+)x(\d+)"), [compile(r"^[-\.]+([sS]|[\d])+x(\d+)"), compile(r"^[-\.](\d+)")]), # 1x01 diff --git a/sabnzbd/interface.py b/sabnzbd/interface.py index a446dd7..f60ca70 100644 --- a/sabnzbd/interface.py +++ b/sabnzbd/interface.py @@ -44,10 +44,11 @@ from sabnzbd.misc import ( calc_age, int_conv, get_base_url, - probablyipv4, - probablyipv6, + is_ipv4_addr, + is_ipv6_addr, opts_to_pp, get_server_addrinfo, + is_lan_addr, ) from sabnzbd.filesystem import real_path, long_path, globber, globber_full, remove_all, clip_path, same_file from sabnzbd.encoding import xml_name, utob @@ -165,7 +166,7 @@ def check_hostname(): host = re.sub(":[0123456789]+$", "", host).lower() # Fine if localhost or IP - if host == "localhost" or probablyipv4(host) or probablyipv6(host): + if host == "localhost" or is_ipv4_addr(host) or is_ipv6_addr(host): return True # Check on the whitelist @@ -477,8 +478,11 @@ class MainPage: cherrypy.request.remote.ip, cherrypy.request.headers.get("User-Agent", "??"), ) - cherrypy.response.headers["Content-Type"] = "application/xml" - return utob(sabnzbd.utils.ssdp.server_ssdp_xml()) + if is_lan_addr(cherrypy.request.remote.ip): + cherrypy.response.headers["Content-Type"] = "application/xml" + return utob(sabnzbd.utils.ssdp.server_ssdp_xml()) + else: + return None ############################################################################## @@ -1327,7 +1331,7 @@ SPECIAL_BOOL_LIST = ( "html_login", "wait_for_dfolder", "max_art_opt", - "enable_bonjour", + "enable_broadcast", "warn_dupl_jobs", "replace_illegal", "backup_for_duplicates", diff --git a/sabnzbd/misc.py b/sabnzbd/misc.py index 942860e..52f31cc 100644 --- a/sabnzbd/misc.py +++ b/sabnzbd/misc.py @@ -837,27 +837,47 @@ def find_on_path(targets): return None -def probablyipv4(ip): +def is_ipv4_addr(ip: str) -> bool: + """ Determine if the ip is an IPv4 address """ try: return ipaddress.ip_address(ip).version == 4 - except: + except ValueError: return False -def probablyipv6(ip): - # Returns True if the given input is probably an IPv6 address - # Square Brackets like '[2001::1]' are OK +def is_ipv6_addr(ip: str) -> bool: + """ Determine if the ip is an IPv6 address; square brackets ([2001::1]) are OK """ try: - # Check for plain IPv6 address - return ipaddress.ip_address(ip).version == 6 - except: - try: - # Remove '[' and ']' and test again: - ip = re.search(r"^\[(.*)\]$", ip).group(1) - return ipaddress.ip_address(ip).version == 6 - except: - # No, not an IPv6 address - return False + return ipaddress.ip_address(ip.strip("[]")).version == 6 + except (ValueError, AttributeError): + return False + + +def is_loopback_addr(ip: str) -> bool: + """ Determine if the ip is an IPv4 or IPv6 local loopback address """ + try: + if ip.find(".") < 0: + ip = ip.strip("[]") + return ipaddress.ip_address(ip).is_loopback + except (ValueError, AttributeError): + return False + + +def is_localhost(value: str) -> bool: + """ Determine if the input is some variety of 'localhost' """ + return (value == "localhost") or is_loopback_addr(value) + + +def is_lan_addr(ip: str) -> bool: + """ Determine if the ip is a local area network address """ + try: + return ( + ip not in ("0.0.0.0", "255.255.255.255", "::") + and ipaddress.ip_address(ip).is_private + and not is_loopback_addr(ip) + ) + except ValueError: + return False def ip_extract() -> List[str]: diff --git a/sabnzbd/newswrapper.py b/sabnzbd/newswrapper.py index 048450d..d563db1 100644 --- a/sabnzbd/newswrapper.py +++ b/sabnzbd/newswrapper.py @@ -32,7 +32,7 @@ import sabnzbd import sabnzbd.cfg from sabnzbd.constants import DEF_TIMEOUT from sabnzbd.encoding import utob -from sabnzbd.misc import nntp_to_msg, probablyipv4, probablyipv6, get_server_addrinfo +from sabnzbd.misc import nntp_to_msg, is_ipv4_addr, is_ipv6_addr, get_server_addrinfo # Set pre-defined socket timeout socket.setdefaulttimeout(DEF_TIMEOUT) @@ -269,9 +269,9 @@ class NNTP: af, socktype, proto, canonname, sa = self.nw.server.info[0] # there will be a connect to host (or self.host, so let's force set 'af' to the correct value - if probablyipv4(self.host): + if is_ipv4_addr(self.host): af = socket.AF_INET - if probablyipv6(self.host): + if is_ipv6_addr(self.host): af = socket.AF_INET6 # Secured or unsecured? diff --git a/sabnzbd/zconfig.py b/sabnzbd/zconfig.py index db88dd7..2580ced 100644 --- a/sabnzbd/zconfig.py +++ b/sabnzbd/zconfig.py @@ -35,7 +35,7 @@ except: import sabnzbd import sabnzbd.cfg as cfg -from sabnzbd.constants import LOCALHOSTS +from sabnzbd.misc import is_localhost _BONJOUR_OBJECT = None @@ -58,7 +58,7 @@ def set_bonjour(host=None, port=None): """ Publish host/port combo through Bonjour """ global _HOST_PORT, _BONJOUR_OBJECT - if not _HAVE_BONJOUR or not cfg.enable_bonjour(): + if not _HAVE_BONJOUR or not cfg.enable_broadcast(): logging.info("No bonjour/zeroconf support installed") return @@ -71,8 +71,8 @@ def set_bonjour(host=None, port=None): zhost = None domain = None - if host in LOCALHOSTS: - logging.info("bonjour/zeroconf cannot be one of %s", LOCALHOSTS) + if is_localhost(host): + logging.info("Cannot setup bonjour/zeroconf for localhost (%s)", host) # All implementations fail to implement "localhost" properly # A false address is published even when scope==kDNSServiceInterfaceIndexLocalOnly return diff --git a/tests/test_getipaddress.py b/tests/test_getipaddress.py index c2e5ed1..1ddf6b3 100644 --- a/tests/test_getipaddress.py +++ b/tests/test_getipaddress.py @@ -21,7 +21,7 @@ tests.test_utils.test_check_dir - Testing SABnzbd checkdir util from sabnzbd.cfg import selftest_host from sabnzbd.getipaddress import * -from sabnzbd.misc import probablyipv4, probablyipv6 +from sabnzbd.misc import is_ipv4_addr, is_ipv6_addr class TestGetIpAddress: @@ -33,14 +33,14 @@ class TestGetIpAddress: def test_publicipv4(self): public_ipv4 = publicipv4() - assert probablyipv4(public_ipv4) + assert is_ipv4_addr(public_ipv4) def test_localipv4(self): local_ipv4 = localipv4() - assert probablyipv4(local_ipv4) + assert is_ipv4_addr(local_ipv4) def test_ipv6(self): test_ipv6 = ipv6() # Not all systems have IPv6 if test_ipv6: - assert probablyipv6(test_ipv6) + assert is_ipv6_addr(test_ipv6) diff --git a/tests/test_misc.py b/tests/test_misc.py index 87a5e4a..05452cc 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -230,6 +230,154 @@ class TestMisc: # Make sure the output is cmd.exe-compatible assert res == expected_output + @pytest.mark.parametrize( + "value, result", + [ + ("1.2.3.4", True), + ("255.255.255.255", True), + ("0.0.0.0", True), + ("10.11.12.13", True), + ("127.0.0.1", True), + ("400.500.600.700", False), + ("blabla", False), + ("2001::1", False), + ("::1", False), + ("::", False), + ("example.org", False), + (None, False), + ("", False), + ("3.2.0", False), + (-42, False), + ], + ) + def test_is_ipv4_addr(self, value, result): + assert misc.is_ipv4_addr(value) is result + + @pytest.mark.parametrize( + "value, result", + [ + ("2001::1", True), + ("::1", True), + ("[2001::1]", True), + ("fdd6:5a2d:3f20:0:14b0:d8f4:ccb9:fab6", True), + ("::", True), + ("a::b", True), + ("1.2.3.4", False), + ("255.255.255.255", False), + ("0.0.0.0", False), + ("10.11.12.13", False), + ("127.0.0.1", False), + ("400.500.600.700", False), + ("blabla", False), + (666, False), + ("example.org", False), + (None, False), + ("", False), + ("[1.2.3.4]", False), + ("2001:1", False), + ("2001::[2001::1]", False), + ], + ) + def test_is_ipv6_addr(self, value, result): + assert misc.is_ipv6_addr(value) is result + + @pytest.mark.parametrize( + "value, result", + [ + ("::1", True), + ("[::1]", True), + ("127.0.0.1", True), + ("127.255.0.0", True), + ("127.1.2.7", True), + ("fdd6:5a2d:3f20:0:14b0:d8f4:ccb9:fab6", False), + ("::", False), + ("a::b", False), + ("1.2.3.4", False), + ("255.255.255.255", False), + ("0.0.0.0", False), + ("10.11.12.13", False), + ("400.500.600.700", False), + ("localhost", False), + (-666, False), + ("example.org", False), + (None, False), + ("", False), + ("[127.6.6.6]", False), + ("2001:1", False), + ("2001::[2001::1]", False), + ], + ) + def test_is_loopback_addr(self, value, result): + assert misc.is_loopback_addr(value) is result + + @pytest.mark.parametrize( + "value, result", + [ + ("localhost", True), + ("::1", True), + ("[::1]", True), + ("localhost", True), + ("127.0.0.1", True), + ("127.255.0.0", True), + ("127.1.2.7", True), + (".local", False), + ("test.local", False), + ("fdd6:5a2d:3f20:0:14b0:d8f4:ccb9:fab6", False), + ("::", False), + ("a::b", False), + ("1.2.3.4", False), + ("255.255.255.255", False), + ("0.0.0.0", False), + ("10.11.12.13", False), + ("400.500.600.700", False), + (-1984, False), + ("example.org", False), + (None, False), + ("", False), + ("[127.6.6.6]", False), + ("2001:1", False), + ("2001::[2001::1]", False), + ], + ) + def test_is_localhost(self, value, result): + assert misc.is_localhost(value) is result + + @pytest.mark.parametrize( + "value, result", + [ + ("10.11.12.13", True), + ("172.16.2.81", True), + ("192.168.255.255", True), + ("169.254.42.42", True), # Link-local + ("fd00::ffff", True), # Part of fc00::/7, IPv6 "Unique Local Addresses" + ("fe80::a1", True), # IPv6 Link-local + ("::1", False), + ("localhost", False), + ("127.0.0.1", False), + ("2001:1337:babe::", False), + ("172.32.32.32", False), # Near but not part of 172.16.0.0/12 + ("100.64.0.1", False), # Test net + ("[2001::1]", False), + ("::", False), + ("::a:b:c", False), + ("1.2.3.4", False), + ("255.255.255.255", False), + ("0.0.0.0", False), + ("127.0.0.1", False), + ("400.500.600.700", False), + ("blabla", False), + (-666, False), + ("example.org", False), + (None, False), + ("", False), + ("[1.2.3.4]", False), + ("2001:1", False), + ("2001::[2001::1]", False), + ], + ) + def test_is_lan_addr(self, value, result): + assert misc.is_lan_addr(value) is result + class TestBuildAndRunCommand: # Path should exist diff --git a/tests/test_probablyip.py b/tests/test_probablyip.py deleted file mode 100644 index da6b3e9..0000000 --- a/tests/test_probablyip.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/python3 -OO -# Copyright 2007-2021 The SABnzbd-Team -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - -""" -tests.test_utils.test_probablyip - Testing SABnzbd's probablyipX() functions -""" - -from sabnzbd.misc import * - - -class TestProbablyIP: - def test_probablyipv4(self): - # Positive testing - assert probablyipv4("1.2.3.4") - assert probablyipv4("255.255.255.255") - assert probablyipv4("0.0.0.0") - # Negative testing - assert not probablyipv4("400.500.600.700") - assert not probablyipv4("blabla") - assert not probablyipv4("2001::1") - - def test_probablyipv6(self): - # Positive testing - assert probablyipv6("2001::1") - assert probablyipv6("[2001::1]") - assert probablyipv6("fdd6:5a2d:3f20:0:14b0:d8f4:ccb9:fab6") - # Negative testing - assert not probablyipv6("blabla") - assert not probablyipv6("1.2.3.4") - assert not probablyipv6("[1.2.3.4]") - assert not probablyipv6("2001:1") - assert not probablyipv6("2001::[2001::1]")