Browse Source

Add unittests for secured_expose

tags/3.3.0Beta1
Safihre 4 years ago
parent
commit
a482bb7acc
  1. 33
      sabnzbd/interface.py
  2. 2
      sabnzbd/lang.py
  3. 66
      tests/test_api.py
  4. 237
      tests/test_api_and_interface.py
  5. 15
      tests/test_functional_api.py

33
sabnzbd/interface.py

@ -51,6 +51,7 @@ from sabnzbd.misc import (
opts_to_pp,
get_server_addrinfo,
is_lan_addr,
is_loopback_addr,
)
from sabnzbd.filesystem import real_path, long_path, globber, globber_full, remove_all, clip_path, same_file
from sabnzbd.encoding import xml_name, utob
@ -83,6 +84,12 @@ from sabnzbd.api import (
##############################################################################
# Security functions
##############################################################################
_MSG_ACCESS_DENIED = "Access denied"
_MSG_ACCESS_DENIED_CONFIG_LOCK = "Access denied - Configuration locked"
_MSG_ACCESS_DENIED_HOSTNAME = "Access denied - Hostname verification failed: https://sabnzbd.org/hostname-check"
_MSG_MISSING_AUTH = "Missing authentication"
_MSG_APIKEY_REQUIRED = "API Key Required"
_MSG_APIKEY_INCORRECT = "API Key Incorrect"
def secured_expose(
@ -138,12 +145,12 @@ def secured_expose(
# Check if config is locked
if check_configlock and cfg.configlock():
cherrypy.response.status = 403
return "Access denied - Configuration locked"
return _MSG_ACCESS_DENIED_CONFIG_LOCK
# Check if external access and if it's allowed
if not check_access(access_type=access_type, warn_user=True):
cherrypy.response.status = 403
return "Access denied"
return _MSG_ACCESS_DENIED
# Verify login status, only for non-key pages
if check_for_login and not check_api_key and not check_login():
@ -152,7 +159,7 @@ def secured_expose(
# Verify host used for the visit
if not check_hostname():
cherrypy.response.status = 403
return "Access denied - Hostname verification failed: https://sabnzbd.org/hostname-check"
return _MSG_ACCESS_DENIED_HOSTNAME
# Some pages need correct API key
if check_api_key:
@ -181,23 +188,23 @@ def check_access(access_type: int = 4, warn_user: bool = False) -> bool:
# CherryPy will report ::ffff:192.168.0.10 on dual-stack situation
# It will always contain that ::ffff: prefix, the ipaddress module can handle that
referrer = cherrypy.request.remote.ip
remote_ip = cherrypy.request.remote.ip
# Check for localhost
if referrer in ("127.0.0.1", "::ffff:127.0.0.1", "::1"):
if is_loopback_addr(remote_ip):
return True
# No special ranged defined
is_allowed = False
if not cfg.local_ranges():
try:
is_allowed = ipaddress.ip_address(referrer).is_private
is_allowed = ipaddress.ip_address(remote_ip).is_private
except ValueError:
# Something malformed, reject
pass
else:
is_allowed = bool(
[1 for r in cfg.local_ranges() if (referrer.startswith(r) or referrer.replace("::ffff:", "").startswith(r))]
is_allowed = any(
remote_ip.startswith(r) or remote_ip.replace("::ffff:", "").startswith(r) for r in cfg.local_ranges()
)
# Reject
@ -329,10 +336,10 @@ def check_apikey(kwargs):
mode = kwargs.get("mode", "")
name = kwargs.get("name", "")
# Lookup required access level for the specific api-call, returns 4 for config-things
# Lookup required access level for the specific api-call
req_access = sabnzbd.api.api_level(mode, name)
if not check_access(req_access, warn_user=True):
return "Access denied"
return _MSG_ACCESS_DENIED
# Skip for auth and version calls
if mode in ("version", "auth"):
@ -345,14 +352,14 @@ def check_apikey(kwargs):
log_warning_and_ip(
T("API Key missing, please enter the api key from Config->General into your 3rd party program:")
)
return "API Key Required"
return _MSG_APIKEY_REQUIRED
elif req_access == 1 and key == cfg.nzb_key():
return None
elif key == cfg.api_key():
return None
else:
log_warning_and_ip(T("API Key incorrect, Use the api key from Config->General in your 3rd party program:"))
return "API Key Incorrect"
return _MSG_APIKEY_INCORRECT
# No active API-key, check web credentials instead
if cfg.username() and cfg.password():
@ -366,7 +373,7 @@ def check_apikey(kwargs):
"Authentication missing, please enter username/password from Config->General into your 3rd party program:"
)
)
return "Missing authentication"
return _MSG_MISSING_AUTH
return None

2
sabnzbd/lang.py

@ -38,7 +38,7 @@ import glob
import os
import locale
__all__ = ["set_locale_info", "set_language", "list_languages"]
__all__ = ["set_locale_info", "set_language", "list_languages", "is_rtl"]
_DOMAIN = "" # Holds translation domain
_LOCALEDIR = "" # Holds path to the translation base folder

66
tests/test_api.py

@ -1,66 +0,0 @@
#!/usr/bin/python3 -OO
# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
tests.test_api - Tests for API functions
"""
from tests.testhelper import *
import sabnzbd.api as api
class TestApiInternals:
""" Test internal functions of the API """
def test_empty(self):
with pytest.raises(TypeError):
api.api_handler(None)
with pytest.raises(AttributeError):
api.api_handler("")
@set_config({"disable_key": False})
def test_mode_invalid(self):
expected_error = "error: not implemented"
assert api.api_handler({"mode": "invalid"}).strip() == expected_error
with pytest.raises(IndexError):
assert api.api_handler({"mode": []}).strip() == expected_error
assert api.api_handler({"mode": ""}).strip() == expected_error
assert api.api_handler({"mode": None}).strip() == expected_error
def test_version(self):
assert api.api_handler({"mode": "version"}).strip() == sabnzbd.__version__
@set_config({"disable_key": False})
def test_auth(self):
assert api.api_handler({"mode": "auth"}).strip() == "apikey"
@set_config({"disable_key": True, "username": "foo", "password": "bar"})
def test_auth_apikey_disabled(self):
assert api.api_handler({"mode": "auth"}).strip() == "login"
@set_config({"disable_key": True, "username": "", "password": ""})
def test_auth_unavailable(self):
assert api.api_handler({"mode": "auth"}).strip() == "None"
@set_config({"disable_key": True, "username": "foo", "password": ""})
def test_auth_unavailable_username_set(self):
assert api.api_handler({"mode": "auth"}).strip() == "None"
@set_config({"disable_key": True, "username": "", "password": "bar"})
def test_auth_unavailable_password_set(self):
assert api.api_handler({"mode": "auth"}).strip() == "None"

237
tests/test_api_and_interface.py

@ -0,0 +1,237 @@
#!/usr/bin/python3 -OO
# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
tests.test_api - Tests for API functions
"""
import cherrypy
import pytest
from tests.testhelper import *
import sabnzbd.api as api
import sabnzbd.interface as interface
class TestApiInternals:
""" Test internal functions of the API """
def test_empty(self):
with pytest.raises(TypeError):
api.api_handler(None)
with pytest.raises(AttributeError):
api.api_handler("")
def test_mode_invalid(self):
expected_error = "error: not implemented"
assert api.api_handler({"mode": "invalid"}).strip() == expected_error
with pytest.raises(IndexError):
assert api.api_handler({"mode": []}).strip() == expected_error
assert api.api_handler({"mode": ""}).strip() == expected_error
assert api.api_handler({"mode": None}).strip() == expected_error
def test_version(self):
assert api.api_handler({"mode": "version"}).strip() == sabnzbd.__version__
def test_auth(self):
assert api.api_handler({"mode": "auth"}).strip() == "apikey"
@set_config({"disable_key": True, "username": "foo", "password": "bar"})
def test_auth_apikey_disabled(self):
assert api.api_handler({"mode": "auth"}).strip() == "login"
@set_config({"disable_key": True})
def test_auth_unavailable(self):
assert api.api_handler({"mode": "auth"}).strip() == "None"
@set_config({"disable_key": True, "username": "foo", "password": ""})
def test_auth_unavailable_username_set(self):
assert api.api_handler({"mode": "auth"}).strip() == "None"
@set_config({"disable_key": True, "username": "", "password": "bar"})
def test_auth_unavailable_password_set(self):
assert api.api_handler({"mode": "auth"}).strip() == "None"
def set_remote_host_or_ip(hostname: str = "localhost", remote_ip: str = "127.0.0.1"):
""" Change CherryPy's "Host" and "remote.ip"-values """
cherrypy.request.headers["Host"] = hostname
cherrypy.request.remote.ip = remote_ip
class TestSecuredExpose:
""" Test the security handling """
main_page = sabnzbd.interface.MainPage()
def check_full_access(self, redirect_match: str = r".*wizard.*"):
"""Basic test if we have full access to API and interface"""
assert sabnzbd.__version__ in self.main_page.api(mode="version")
# Passed authentication
assert api._MSG_NOT_IMPLEMENTED in self.main_page.api(apikey=sabnzbd.cfg.api_key())
# Raises a redirect to the wizard
with pytest.raises(cherrypy._cperror.HTTPRedirect, match=redirect_match):
self.main_page.index()
def test_basic(self):
set_remote_host_or_ip()
self.check_full_access()
def test_api_no_or_wrong_api_key(self):
set_remote_host_or_ip()
# Get blocked
assert interface._MSG_APIKEY_REQUIRED in self.main_page.api()
assert interface._MSG_APIKEY_REQUIRED in self.main_page.api(mode="queue")
# Allowed to access "auth" and "version" without key
assert "apikey" in self.main_page.api(mode="auth")
assert sabnzbd.__version__ in self.main_page.api(mode="version")
# Blocked when you do something wrong
assert interface._MSG_APIKEY_INCORRECT in self.main_page.api(mode="queue", apikey="wrong")
@set_config({"disable_key": True})
def test_api_disabled_key(self):
set_remote_host_or_ip()
assert api._MSG_NOT_IMPLEMENTED in self.main_page.api()
@set_config({"disable_key": True, "username": "foo", "password": "bar"})
def test_api_disabled_key_with_auth(self):
set_remote_host_or_ip()
assert interface._MSG_MISSING_AUTH in self.main_page.api()
assert interface._MSG_MISSING_AUTH in self.main_page.api(ma_username="foo")
assert interface._MSG_MISSING_AUTH in self.main_page.api(ma_password="bar")
assert interface._MSG_MISSING_AUTH in self.main_page.api(ma_username="wrong")
assert interface._MSG_MISSING_AUTH in self.main_page.api(ma_password="wrong")
assert api._MSG_NOT_IMPLEMENTED in self.main_page.api(ma_username="foo", ma_password="bar")
def test_api_nzb_key(self):
set_remote_host_or_ip()
# It should only access the nzb-functions, nothing else
assert api._MSG_NO_VALUE in self.main_page.api(mode="addfile", apikey=sabnzbd.cfg.nzb_key())
assert interface._MSG_APIKEY_INCORRECT in self.main_page.api(mode="set_config", apikey=sabnzbd.cfg.nzb_key())
assert interface._MSG_APIKEY_INCORRECT in self.main_page.shutdown(apikey=sabnzbd.cfg.nzb_key())
def test_check_hostname_basic(self):
# Block bad host
set_remote_host_or_ip(hostname="not_me")
assert interface._MSG_ACCESS_DENIED_HOSTNAME in self.main_page.api()
assert interface._MSG_ACCESS_DENIED_HOSTNAME in self.main_page.index()
# Block empty value
set_remote_host_or_ip(hostname="")
assert interface._MSG_ACCESS_DENIED_HOSTNAME in self.main_page.api()
assert interface._MSG_ACCESS_DENIED_HOSTNAME in self.main_page.index()
# Fine if ip-address
for test_hostname in (
"100.100.100.100",
"100.100.100.100:8080",
"[2001:db8:3333:4444:5555:6666:7777:8888]",
"[2001:db8:3333:4444:5555:6666:7777:8888]:8080",
"test.local",
"test.local:8080",
"test.local.",
):
set_remote_host_or_ip(hostname=test_hostname)
self.check_full_access()
@set_config({"username": "foo", "password": "bar"})
def test_check_hostname_not_user_password(self):
set_remote_host_or_ip(hostname="not_me")
self.check_full_access(redirect_match=r".*login.*")
@set_config({"host_whitelist": "test.com, not_evil"})
def test_check_hostname_whitelist(self):
set_remote_host_or_ip(hostname="test.com")
self.check_full_access()
set_remote_host_or_ip(hostname="not_evil")
self.check_full_access()
def check_inet_allows(self, inet_exposure: int):
"""Each should allow all previous ones and the current one"""
# Level 1: nzb
if inet_exposure >= 1:
assert api._MSG_NO_VALUE in self.main_page.api(mode="addfile", apikey=sabnzbd.cfg.nzb_key())
assert api._MSG_NO_VALUE in self.main_page.api(mode="addfile", apikey=sabnzbd.cfg.api_key())
# Level 2: basic API
if inet_exposure >= 2:
assert api._MSG_NO_VALUE in self.main_page.api(mode="get_files", apikey=sabnzbd.cfg.api_key())
assert api._MSG_NO_VALUE in self.main_page.api(mode="change_script", apikey=sabnzbd.cfg.api_key())
# Sub-function
assert "status" in self.main_page.api(mode="queue", name="resume", apikey=sabnzbd.cfg.api_key())
# Level 3: full API
if inet_exposure >= 3:
assert "misc" in self.main_page.api(mode="get_config", apikey=sabnzbd.cfg.api_key())
# Sub-function
assert api._MSG_NO_VALUE in self.main_page.api(
mode="config", name="set_colorscheme", apikey=sabnzbd.cfg.api_key()
)
# Level 4: full interface
if inet_exposure >= 4:
self.check_full_access()
def check_inet_blocks(self, inet_exposure: int):
"""We count from the most exposure down"""
# Level 4: full interface, no blocking
# Level 3: full API
if inet_exposure <= 3:
assert interface._MSG_ACCESS_DENIED in self.main_page.index()
# Level 2: basic API
if inet_exposure <= 2:
assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="get_config", apikey=sabnzbd.cfg.api_key())
assert interface._MSG_ACCESS_DENIED in self.main_page.api(
mode="config", name="set_colorscheme", apikey=sabnzbd.cfg.api_key()
)
# Level 1: nzb
if inet_exposure <= 1:
assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="rescan", apikey=sabnzbd.cfg.api_key())
assert interface._MSG_ACCESS_DENIED in self.main_page.api(
mode="queue", name="resume", apikey=sabnzbd.cfg.api_key()
)
# Level 0: nothing, already checked above, but just to be sure
if inet_exposure <= 0:
assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="addfile", apikey=sabnzbd.cfg.api_key())
# Check with or without API-key
assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="auth", apikey=sabnzbd.cfg.api_key())
assert interface._MSG_ACCESS_DENIED in self.main_page.api(mode="auth")
def test_inet_exposure(self):
# Run all tests as external user
set_remote_host_or_ip(hostname="100.100.100.100", remote_ip="11.11.11.11")
# We don't use the wrapper, it would require creating many extra functions
# Option 5 is special, so it also gets it's own special test
for inet_exposure in range(6):
sabnzbd.cfg.inet_exposure.set(inet_exposure)
self.check_inet_allows(inet_exposure=inet_exposure)
self.check_inet_blocks(inet_exposure=inet_exposure)
# Reset it
sabnzbd.cfg.inet_exposure.set(sabnzbd.cfg.inet_exposure.default())
@set_config({"inet_exposure": 5, "username": "foo", "password": "bar"})
def test_inet_exposure_login_for_external(self):
# Local user: full access
set_remote_host_or_ip()
self.check_full_access()
# Remote user: redirect to login
set_remote_host_or_ip(hostname="100.100.100.100", remote_ip="11.11.11.11")
self.check_full_access(redirect_match=r".*login.*")

15
tests/test_functional_api.py

@ -19,20 +19,16 @@
tests.test_functional_api - Functional tests for the API
"""
import json
import os
import shutil
import stat
import subprocess
import sys
import time
from math import ceil
from random import choice, randint, sample
from random import sample
from tavern.core import run
from warnings import warn
import sabnzbd.api as api
import sabnzbd.interface as interface
from sabnzbd.misc import from_units
from tests.testhelper import *
@ -314,10 +310,11 @@ class TestOtherApi(ApiTestFunctions):
assert self._get_api_json("set_config_default", extra_args={"keyword": "language"})["status"] is True
def test_api_get_clear_warnings(self):
apikey_error = "API Key Incorrect"
# Trigger warnings by sending requests with a truncated apikey
for _ in range(0, 2):
assert apikey_error in self._get_api_text("shutdown", extra_args={"apikey": SAB_APIKEY[:-1]})
assert interface._MSG_APIKEY_INCORRECT in self._get_api_text(
"shutdown", extra_args={"apikey": SAB_APIKEY[:-1]}
)
# Take delivery of our freshly baked warnings
json = self._get_api_json("warnings")
@ -326,7 +323,7 @@ class TestOtherApi(ApiTestFunctions):
for warning in json["warnings"]:
for key in ("type", "text", "time"):
assert key in warning.keys()
assert apikey_error.lower() in json["warnings"][-1]["text"].lower()
assert interface._MSG_APIKEY_INCORRECT.lower() in json["warnings"][-1]["text"].lower()
# Clear all warnings
assert self._get_api_json("warnings", extra_args={"name": "clear"})["status"] is True

Loading…
Cancel
Save