From a482bb7acc7feeef54044b17ed33166f0dce5b4d Mon Sep 17 00:00:00 2001 From: Safihre Date: Sun, 18 Apr 2021 22:59:54 +0200 Subject: [PATCH] Add unittests for secured_expose --- sabnzbd/interface.py | 33 +++--- sabnzbd/lang.py | 2 +- tests/test_api.py | 66 ----------- tests/test_api_and_interface.py | 237 ++++++++++++++++++++++++++++++++++++++++ tests/test_functional_api.py | 15 +-- 5 files changed, 264 insertions(+), 89 deletions(-) delete mode 100644 tests/test_api.py create mode 100644 tests/test_api_and_interface.py diff --git a/sabnzbd/interface.py b/sabnzbd/interface.py index 44b016f..c5de28b 100644 --- a/sabnzbd/interface.py +++ b/sabnzbd/interface.py @@ -51,6 +51,7 @@ from sabnzbd.misc import ( opts_to_pp, get_server_addrinfo, is_lan_addr, + is_loopback_addr, ) from sabnzbd.filesystem import real_path, long_path, globber, globber_full, remove_all, clip_path, same_file from sabnzbd.encoding import xml_name, utob @@ -83,6 +84,12 @@ from sabnzbd.api import ( ############################################################################## # Security functions ############################################################################## +_MSG_ACCESS_DENIED = "Access denied" +_MSG_ACCESS_DENIED_CONFIG_LOCK = "Access denied - Configuration locked" +_MSG_ACCESS_DENIED_HOSTNAME = "Access denied - Hostname verification failed: https://sabnzbd.org/hostname-check" +_MSG_MISSING_AUTH = "Missing authentication" +_MSG_APIKEY_REQUIRED = "API Key Required" +_MSG_APIKEY_INCORRECT = "API Key Incorrect" def secured_expose( @@ -138,12 +145,12 @@ def secured_expose( # Check if config is locked if check_configlock and cfg.configlock(): cherrypy.response.status = 403 - return "Access denied - Configuration locked" + return _MSG_ACCESS_DENIED_CONFIG_LOCK # Check if external access and if it's allowed if not check_access(access_type=access_type, warn_user=True): cherrypy.response.status = 403 - return "Access denied" + return _MSG_ACCESS_DENIED # Verify login status, only for non-key pages if check_for_login and not check_api_key and not check_login(): @@ -152,7 +159,7 @@ def secured_expose( # Verify host used for the visit if not check_hostname(): cherrypy.response.status = 403 - return "Access denied - Hostname verification failed: https://sabnzbd.org/hostname-check" + return _MSG_ACCESS_DENIED_HOSTNAME # Some pages need correct API key if check_api_key: @@ -181,23 +188,23 @@ def check_access(access_type: int = 4, warn_user: bool = False) -> bool: # CherryPy will report ::ffff:192.168.0.10 on dual-stack situation # It will always contain that ::ffff: prefix, the ipaddress module can handle that - referrer = cherrypy.request.remote.ip + remote_ip = cherrypy.request.remote.ip # Check for localhost - if referrer in ("127.0.0.1", "::ffff:127.0.0.1", "::1"): + if is_loopback_addr(remote_ip): return True # No special ranged defined is_allowed = False if not cfg.local_ranges(): try: - is_allowed = ipaddress.ip_address(referrer).is_private + is_allowed = ipaddress.ip_address(remote_ip).is_private except ValueError: # Something malformed, reject pass else: - is_allowed = bool( - [1 for r in cfg.local_ranges() if (referrer.startswith(r) or referrer.replace("::ffff:", "").startswith(r))] + is_allowed = any( + remote_ip.startswith(r) or remote_ip.replace("::ffff:", "").startswith(r) for r in cfg.local_ranges() ) # Reject @@ -329,10 +336,10 @@ def check_apikey(kwargs): mode = kwargs.get("mode", "") name = kwargs.get("name", "") - # Lookup required access level for the specific api-call, returns 4 for config-things + # Lookup required access level for the specific api-call req_access = sabnzbd.api.api_level(mode, name) if not check_access(req_access, warn_user=True): - return "Access denied" + return _MSG_ACCESS_DENIED # Skip for auth and version calls if mode in ("version", "auth"): @@ -345,14 +352,14 @@ def check_apikey(kwargs): log_warning_and_ip( T("API Key missing, please enter the api key from Config->General into your 3rd party program:") ) - return "API Key Required" + return _MSG_APIKEY_REQUIRED elif req_access == 1 and key == cfg.nzb_key(): return None elif key == cfg.api_key(): return None else: log_warning_and_ip(T("API Key incorrect, Use the api key from Config->General in your 3rd party program:")) - return "API Key Incorrect" + return _MSG_APIKEY_INCORRECT # No active API-key, check web credentials instead if cfg.username() and cfg.password(): @@ -366,7 +373,7 @@ def check_apikey(kwargs): "Authentication missing, please enter username/password from Config->General into your 3rd party program:" ) ) - return "Missing authentication" + return _MSG_MISSING_AUTH return None diff --git a/sabnzbd/lang.py b/sabnzbd/lang.py index 4352180..7400285 100644 --- a/sabnzbd/lang.py +++ b/sabnzbd/lang.py @@ -38,7 +38,7 @@ import glob import os import locale -__all__ = ["set_locale_info", "set_language", "list_languages"] +__all__ = ["set_locale_info", "set_language", "list_languages", "is_rtl"] _DOMAIN = "" # Holds translation domain _LOCALEDIR = "" # Holds path to the translation base folder diff --git a/tests/test_api.py b/tests/test_api.py deleted file mode 100644 index b1e8d43..0000000 --- a/tests/test_api.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/python3 -OO -# Copyright 2007-2021 The SABnzbd-Team -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - -""" -tests.test_api - Tests for API functions -""" - -from tests.testhelper import * - -import sabnzbd.api as api - - -class TestApiInternals: - """ Test internal functions of the API """ - - def test_empty(self): - with pytest.raises(TypeError): - api.api_handler(None) - with pytest.raises(AttributeError): - api.api_handler("") - - @set_config({"disable_key": False}) - def test_mode_invalid(self): - expected_error = "error: not implemented" - assert api.api_handler({"mode": "invalid"}).strip() == expected_error - with pytest.raises(IndexError): - assert api.api_handler({"mode": []}).strip() == expected_error - assert api.api_handler({"mode": ""}).strip() == expected_error - assert api.api_handler({"mode": None}).strip() == expected_error - - def test_version(self): - assert api.api_handler({"mode": "version"}).strip() == sabnzbd.__version__ - - @set_config({"disable_key": False}) - def test_auth(self): - assert api.api_handler({"mode": "auth"}).strip() == "apikey" - - @set_config({"disable_key": True, "username": "foo", "password": "bar"}) - def test_auth_apikey_disabled(self): - assert api.api_handler({"mode": "auth"}).strip() == "login" - - @set_config({"disable_key": True, "username": "", "password": ""}) - def test_auth_unavailable(self): - assert api.api_handler({"mode": "auth"}).strip() == "None" - - @set_config({"disable_key": True, "username": "foo", "password": ""}) - def test_auth_unavailable_username_set(self): - assert api.api_handler({"mode": "auth"}).strip() == "None" - - @set_config({"disable_key": True, "username": "", "password": "bar"}) - def test_auth_unavailable_password_set(self): - assert api.api_handler({"mode": "auth"}).strip() == "None" diff --git a/tests/test_api_and_interface.py b/tests/test_api_and_interface.py new file mode 100644 index 0000000..2dfefe9 --- /dev/null +++ b/tests/test_api_and_interface.py @@ -0,0 +1,237 @@ +#!/usr/bin/python3 -OO +# Copyright 2007-2021 The SABnzbd-Team +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +""" +tests.test_api - Tests for API functions +""" +import cherrypy +import pytest + +from tests.testhelper import * + +import sabnzbd.api as api +import sabnzbd.interface as interface + + +class TestApiInternals: + """ Test internal functions of the API """ + + def test_empty(self): + with pytest.raises(TypeError): + api.api_handler(None) + with pytest.raises(AttributeError): + api.api_handler("") + + def test_mode_invalid(self): + expected_error = "error: not implemented" + assert api.api_handler({"mode": "invalid"}).strip() == expected_error + with pytest.raises(IndexError): + assert api.api_handler({"mode": []}).strip() == expected_error + assert api.api_handler({"mode": ""}).strip() == expected_error + assert api.api_handler({"mode": None}).strip() == expected_error + + def test_version(self): + assert api.api_handler({"mode": "version"}).strip() == sabnzbd.__version__ + + def test_auth(self): + assert api.api_handler({"mode": "auth"}).strip() == "apikey" + + @set_config({"disable_key": True, "username": "foo", "password": "bar"}) + def test_auth_apikey_disabled(self): + assert api.api_handler({"mode": "auth"}).strip() == "login" + + @set_config({"disable_key": True}) + def test_auth_unavailable(self): + assert api.api_handler({"mode": "auth"}).strip() == "None" + + @set_config({"disable_key": True, "username": "foo", "password": ""}) + def test_auth_unavailable_username_set(self): + assert api.api_handler({"mode": "auth"}).strip() == "None" + + @set_config({"disable_key": True, "username": "", "password": "bar"}) + def test_auth_unavailable_password_set(self): + assert api.api_handler({"mode": "auth"}).strip() == "None" + + +def set_remote_host_or_ip(hostname: str = "localhost", remote_ip: str = "127.0.0.1"): + """ Change CherryPy's "Host" and "remote.ip"-values """ + cherrypy.request.headers["Host"] = hostname + cherrypy.request.remote.ip = remote_ip + + +class TestSecuredExpose: + """ Test the security handling """ + + main_page = sabnzbd.interface.MainPage() + + def check_full_access(self, redirect_match: str = r".*wizard.*"): + """Basic test if we have full access to API and interface""" + assert sabnzbd.__version__ in self.main_page.api(mode="version") + # Passed authentication + assert api._MSG_NOT_IMPLEMENTED in self.main_page.api(apikey=sabnzbd.cfg.api_key()) + # Raises a redirect to the wizard + with pytest.raises(cherrypy._cperror.HTTPRedirect, match=redirect_match): + self.main_page.index() + + def test_basic(self): + set_remote_host_or_ip() + self.check_full_access() + + def test_api_no_or_wrong_api_key(self): + set_remote_host_or_ip() + # Get blocked + assert interface._MSG_APIKEY_REQUIRED in self.main_page.api() + assert interface._MSG_APIKEY_REQUIRED in self.main_page.api(mode="queue") + # Allowed to access "auth" and "version" without key + assert "apikey" in self.main_page.api(mode="auth") + assert sabnzbd.__version__ in self.main_page.api(mode="version") + # Blocked when you do something wrong + assert interface._MSG_APIKEY_INCORRECT in self.main_page.api(mode="queue", apikey="wrong") + + @set_config({"disable_key": True}) + def test_api_disabled_key(self): + set_remote_host_or_ip() + assert api._MSG_NOT_IMPLEMENTED in self.main_page.api() + + @set_config({"disable_key": True, "username": "foo", "password": "bar"}) + def test_api_disabled_key_with_auth(self): + set_remote_host_or_ip() + assert interface._MSG_MISSING_AUTH in self.main_page.api() + assert interface._MSG_MISSING_AUTH in self.main_page.api(ma_username="foo") + assert interface._MSG_MISSING_AUTH in self.main_page.api(ma_password="bar") + assert interface._MSG_MISSING_AUTH in self.main_page.api(ma_username="wrong") + assert interface._MSG_MISSING_AUTH in self.main_page.api(ma_password="wrong") + assert api._MSG_NOT_IMPLEMENTED in self.main_page.api(ma_username="foo", ma_password="bar") + + def test_api_nzb_key(self): + set_remote_host_or_ip() + # It should only access the nzb-functions, nothing else + assert api._MSG_NO_VALUE in self.main_page.api(mode="addfile", apikey=sabnzbd.cfg.nzb_key()) + assert interface._MSG_APIKEY_INCORRECT in self.main_page.api(mode="set_config", apikey=sabnzbd.cfg.nzb_key()) + assert interface._MSG_APIKEY_INCORRECT in self.main_page.shutdown(apikey=sabnzbd.cfg.nzb_key()) + + def test_check_hostname_basic(self): + # Block bad host + set_remote_host_or_ip(hostname="not_me") + assert interface._MSG_ACCESS_DENIED_HOSTNAME in self.main_page.api() + assert interface._MSG_ACCESS_DENIED_HOSTNAME in self.main_page.index() + # Block empty value + set_remote_host_or_ip(hostname="") + assert interface._MSG_ACCESS_DENIED_HOSTNAME in self.main_page.api() + assert interface._MSG_ACCESS_DENIED_HOSTNAME in self.main_page.index() + + # Fine if ip-address + for test_hostname in ( + "100.100.100.100", + "100.100.100.100:8080", + "[2001:db8:3333:4444:5555:6666:7777:8888]", + "[2001:db8:3333:4444:5555:6666:7777:8888]:8080", + "test.local", + "test.local:8080", + "test.local.", + ): + set_remote_host_or_ip(hostname=test_hostname) + self.check_full_access() + + @set_config({"username": "foo", "password": "bar"}) + def test_check_hostname_not_user_password(self): + set_remote_host_or_ip(hostname="not_me") + self.check_full_access(redirect_match=r".*login.*") + + @set_config({"host_whitelist": "test.com, not_evil"}) + def test_check_hostname_whitelist(self): + set_remote_host_or_ip(hostname="test.com") + self.check_full_access() + set_remote_host_or_ip(hostname="not_evil") + self.check_full_access() + + def check_inet_allows(self, inet_exposure: int): + """Each should allow all previous ones and the current one""" + # Level 1: nzb + if inet_exposure >= 1: + assert api._MSG_NO_VALUE in self.main_page.api(mode="addfile", apikey=sabnzbd.cfg.nzb_key()) + assert api._MSG_NO_VALUE in self.main_page.api(mode="addfile", apikey=sabnzbd.cfg.api_key()) + + # Level 2: basic API + if inet_exposure >= 2: + assert api._MSG_NO_VALUE in self.main_page.api(mode="get_files", apikey=sabnzbd.cfg.api_key()) + assert api._MSG_NO_VALUE in self.main_page.api(mode="change_script", apikey=sabnzbd.cfg.api_key()) + # Sub-function + assert "status" in self.main_page.api(mode="queue", name="resume", apikey=sabnzbd.cfg.api_key()) + + # Level 3: full API + if inet_exposure >= 3: + assert "misc" in self.main_page.api(mode="get_config", apikey=sabnzbd.cfg.api_key()) + # Sub-function + assert api._MSG_NO_VALUE in self.main_page.api( + mode="config", name="set_colorscheme", apikey=sabnzbd.cfg.api_key() + ) + + # Level 4: full interface + if inet_exposure >= 4: + self.check_full_access() + + def check_inet_blocks(self, inet_exposure: int): + """We count from the most exposure down""" + # Level 4: full interface, no blocking + # Level 3: full API + if inet_exposure <= 3: + assert interface._MSG_ACCESS_DENIED in self.main_page.index() + + # Level 2: basic API + if inet_exposure <= 2: + assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="get_config", apikey=sabnzbd.cfg.api_key()) + assert interface._MSG_ACCESS_DENIED in self.main_page.api( + mode="config", name="set_colorscheme", apikey=sabnzbd.cfg.api_key() + ) + # Level 1: nzb + if inet_exposure <= 1: + assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="rescan", apikey=sabnzbd.cfg.api_key()) + assert interface._MSG_ACCESS_DENIED in self.main_page.api( + mode="queue", name="resume", apikey=sabnzbd.cfg.api_key() + ) + + # Level 0: nothing, already checked above, but just to be sure + if inet_exposure <= 0: + assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="addfile", apikey=sabnzbd.cfg.api_key()) + # Check with or without API-key + assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="auth", apikey=sabnzbd.cfg.api_key()) + assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="auth") + + def test_inet_exposure(self): + # Run all tests as external user + set_remote_host_or_ip(hostname="100.100.100.100", remote_ip="11.11.11.11") + + # We don't use the wrapper, it would require creating many extra functions + # Option 5 is special, so it also gets it's own special test + for inet_exposure in range(6): + sabnzbd.cfg.inet_exposure.set(inet_exposure) + self.check_inet_allows(inet_exposure=inet_exposure) + self.check_inet_blocks(inet_exposure=inet_exposure) + + # Reset it + sabnzbd.cfg.inet_exposure.set(sabnzbd.cfg.inet_exposure.default()) + + @set_config({"inet_exposure": 5, "username": "foo", "password": "bar"}) + def test_inet_exposure_login_for_external(self): + # Local user: full access + set_remote_host_or_ip() + self.check_full_access() + + # Remote user: redirect to login + set_remote_host_or_ip(hostname="100.100.100.100", remote_ip="11.11.11.11") + self.check_full_access(redirect_match=r".*login.*") diff --git a/tests/test_functional_api.py b/tests/test_functional_api.py index 9cc169b..bed2056 100644 --- a/tests/test_functional_api.py +++ b/tests/test_functional_api.py @@ -19,20 +19,16 @@ tests.test_functional_api - Functional tests for the API """ -import json -import os import shutil import stat -import subprocess import sys -import time from math import ceil -from random import choice, randint, sample +from random import sample from tavern.core import run from warnings import warn -import sabnzbd.api as api +import sabnzbd.interface as interface from sabnzbd.misc import from_units from tests.testhelper import * @@ -314,10 +310,11 @@ class TestOtherApi(ApiTestFunctions): assert self._get_api_json("set_config_default", extra_args={"keyword": "language"})["status"] is True def test_api_get_clear_warnings(self): - apikey_error = "API Key Incorrect" # Trigger warnings by sending requests with a truncated apikey for _ in range(0, 2): - assert apikey_error in self._get_api_text("shutdown", extra_args={"apikey": SAB_APIKEY[:-1]}) + assert interface._MSG_APIKEY_INCORRECT in self._get_api_text( + "shutdown", extra_args={"apikey": SAB_APIKEY[:-1]} + ) # Take delivery of our freshly baked warnings json = self._get_api_json("warnings") @@ -326,7 +323,7 @@ class TestOtherApi(ApiTestFunctions): for warning in json["warnings"]: for key in ("type", "text", "time"): assert key in warning.keys() - assert apikey_error.lower() in json["warnings"][-1]["text"].lower() + assert interface._MSG_APIKEY_INCORRECT.lower() in json["warnings"][-1]["text"].lower() # Clear all warnings assert self._get_api_json("warnings", extra_args={"name": "clear"})["status"] is True