@ -1,18 +1,13 @@
from requests . adapters import HTTPAdapter
from requests . exceptions import RequestException
from requests . models import Response
from requests . sessions import Session
from urllib3 . util . ssl_ import create_urllib3_context
import logging
import random
import re
import ssl
import time
import js2py
from _23 import b64decodestring , b64encodestring , urlparse , urlunparse
from _23 import b64encodestring , urlparse
DEFAULT_USER_AGENTS = [
@ -46,25 +41,27 @@ class CloudflareScraper(Session):
self . delay = kwargs . pop ( ' delay ' , self . default_delay )
self . start_time = None
self . cipher_suite = self . load_cipher_suite ( )
self . mount ( ' https:// ' , CloudflareAdapter ( self . cipher_suite ) )
self . trust_env = False
def request ( self , method , url , * args , * * kwargs ) :
resp = super ( CloudflareScraper , self ) . request ( method , url , * args , * * kwargs )
url_solver = kwargs . pop ( ' url_solver ' , None )
if not kwargs . pop ( ' proxy_browser ' , None ) :
resp = super ( CloudflareScraper , self ) . request ( method , url , * args , * * kwargs )
else :
resp = self . get_content ( method , url , url_solver ,
user_agent = self . headers . get ( ' User-Agent ' ) , proxy_browser = True , * * kwargs )
# Check if anti-bot is on
if ( isinstance ( resp , type ( Response ( ) ) )
and resp . status_code in ( 503 , 429 , 403 ) ) :
self . start_time = time . time ( )
if ( re . search ( ' (?i)cloudflare ' , resp . headers . get ( ' Server ' , ' ' ) )
and b ' jschl_vc ' in resp . content
and b ' jschl_answer ' in resp . content ) :
resp = self . solve_cf_challenge ( resp , * * kwargs )
resp = self . solve_cf_challenge ( resp , url_solver , * * kwargs )
elif b ' ddgu ' in resp . content :
resp = self . solve_ddg_challenge ( resp , * * kwargs )
# Otherwise, no anti-bot detected
return resp
def wait ( self ) :
@ -87,142 +84,74 @@ class CloudflareScraper(Session):
pass
return resp
def solve_cf_challenge ( self , resp , * * original_kwargs ) :
def test_flaresolverr ( self , url_solver ) :
# test if FlareSolverr software is running
response_test = super ( CloudflareScraper , self ) . request ( ' GET ' , url_solver )
fs_ver = None
if 200 == response_test . status_code and response_test . ok :
json_data = response_test . json ( )
if ' ok ' == json_data . get ( ' status ' ) :
fs_ver = json_data . get ( ' version ' )
if None is fs_ver :
raise ValueError ( ' FlareSolverr software not found (is it running?) ' )
return fs_ver
def get_content ( self , method , url , url_solver , user_agent , proxy_browser = False , * * kwargs ) :
url_solver = url_solver and re . sub ( r ' (/|v1)*$ ' , ' ' , url_solver ) or ' http://localhost:8191 '
if not self . test_flaresolverr ( url_solver ) :
raise ValueError ( ' No FlareSolverr software running %s at %s ' % ( ( ' to solve Cloudflare challenge ' ,
' ' ) [ proxy_browser ] , url_solver ) )
try :
response = super ( CloudflareScraper , self ) . request ( ' POST ' , ' %s /v1 ' % url_solver , json = dict (
cmd = ' request. %s ' % method . lower ( ) , userAgent = user_agent , url = url ,
cookies = [ { ' name ' : cur_ckee . name , ' value ' : cur_ckee . value ,
' domain ' : cur_ckee . domain , ' path ' : cur_ckee . path } for cur_ckee in self . cookies ] ) )
except ( BaseException , Exception ) as e :
raise ValueError ( ' FlareSolverr software unable to %s : %r ' % ( ( ' solve Cloudflare anti-bot IUAM challenge ' ,
' fetch content ' ) [ proxy_browser ] , e ) )
if None is not response :
data_json = response . json ( )
result = ( { } , data_json ) [ isinstance ( data_json , ( dict , list ) ) ]
if response . ok :
if ' ok ' == result . get ( ' status ' ) :
self . cookies . clear ( )
for cur_ckee in result . get ( ' solution ' , { } ) . get ( ' cookies ' , [ ] ) :
if cur_ckee . get ( ' value ' ) and cur_ckee . get ( ' name ' ) not in ( ' ' , None , ' _gid ' , ' _ga ' , ' _gat ' ) :
self . cookies . set (
cur_ckee [ ' name ' ] , cur_ckee [ ' value ' ] ,
rest = { ' httpOnly ' : cur_ckee . get ( ' httpOnly ' ) , ' session ' : cur_ckee . get ( ' session ' ) } ,
* * dict ( [ ( k , cur_ckee . get ( k ) ) for k in ( ' expires ' , ' domain ' , ' path ' , ' secure ' ) ] ) )
else :
response = None
elif ' error ' == result . get ( ' status ' ) :
raise ValueError ( ' Failure with FlareSolverr: %s ' % result . get ( ' message ' , ' See the FlareSolver output ' ) )
return response
def solve_cf_challenge ( self , resp , url_solver , * * original_kwargs ) :
body = resp . text
parsed_url = urlparse ( resp . url )
domain = parsed_url . netloc
if ' /cdn-cgi/l/chk_captcha ' in body or ' cf_chl_captcha ' in body :
raise CloudflareError (
' Cloudflare captcha presented for %s , please notify SickGear for an update, ua: %s ' %
' Cloudflare captcha presented for %s , safe to ignore as this shouldn \' t happen every tim e, ua: %s ' %
( domain , self . cf_ua ) , response = resp )
try :
action , method = re . findall ( r ' (?sim)<form.*?id= " challenge.*?action= " /?([^? " ]+).*?method= " ([^ " ]+) ' , body ) [ 0 ]
except ( Exception , BaseException ) :
action , method = ' cdn-cgi/l/chk_jschl ' , resp . request . method
submit_url = ' %s :// %s / %s ' % ( parsed_url . scheme , domain , action )
cloudflare_kwargs = { k : v for k , v in original_kwargs . items ( ) if k not in [ ' hooks ' ] }
params = cloudflare_kwargs . setdefault ( ( ' data ' , ' params ' ) [ ' GET ' == method . upper ( ) ] , { } )
headers = cloudflare_kwargs . setdefault ( ' headers ' , { } )
headers [ ' Referer ' ] = resp . url
try :
token = re . findall ( r ' (?sim)__cf_chl_jschl_tk__=([^ " ]+) ' , body ) [ 0 ]
cloudflare_kwargs [ ' params ' ] = dict ( __cf_chl_jschl_tk__ = token )
except ( Exception , BaseException ) :
pass
if None is self . get_content (
' GET ' , ( resp . request . url , ' %s :// %s / ' % ( parsed_url . scheme , domain ) ) [ ' POST ' == resp . request . method ] ,
url_solver , user_agent = resp . request . headers . get ( ' User-Agent ' ) ) :
raise ValueError ( ' Failed to validate Cloudflare anti-bot IUAM challenge ' )
if self . delay == self . default_delay :
try :
# no instantiated delay, therefore check js for hard coded CF delay
self . delay = float ( re . search ( r ' submit \ ( \ );[^0-9]*?([0-9]+) ' , body ) . group ( 1 ) ) / float ( 1000 )
except ( BaseException , Exception ) :
pass
for i in re . findall ( r ' (<input[^>]+?hidden[^>]+?>) ' , re . sub ( r ' (?sim)<!-- \ s+<input.*?(?=<) ' , ' ' , body ) ) :
value = re . findall ( r ' value= " ([^ " \' ]+?)[ " \' ] ' , i )
name = re . findall ( r ' name= " ([^ " \' ]+?)[ " \' ] ' , i )
if all ( [ name , value ] ) :
params [ name [ 0 ] ] = value [ 0 ]
js = self . extract_js ( body , domain )
atob = ( lambda s : b64decodestring ( ' %s ' % s ) )
try :
# Eval the challenge algorithm
params [ ' jschl_answer ' ] = str ( js2py . EvalJs ( { ' atob ' : atob } ) . eval ( js ) )
except ( BaseException , Exception ) :
try :
params [ ' jschl_answer ' ] = str ( js2py . EvalJs ( { ' atob ' : atob } ) . eval ( js ) )
except ( BaseException , Exception ) as e :
# Something is wrong with the page. This may indicate Cloudflare has changed their anti-bot technique.
raise ValueError ( ' Unable to parse Cloudflare anti-bot IUAM page: %r ' % e )
# Requests transforms any request into a GET after a redirect,
# so the redirect has to be handled manually here to allow for
# performing other types of requests even as the first request.
cloudflare_kwargs [ ' allow_redirects ' ] = False
self . wait ( )
response = self . request ( method , submit_url , * * cloudflare_kwargs )
if response :
if 200 == getattr ( response , ' status_code ' ) :
return response
# legacy redirection handler (pre 2019.11.xx)
location = response . headers . get ( ' Location ' )
try :
r = urlparse ( location )
except ( Exception , BaseException ) :
# Something is wrong with the page, perhaps CF changed their anti-bot technique
raise ValueError ( ' Unable to find a new location from Cloudflare anti-bot IUAM page ' )
if not r . netloc or location . startswith ( ' / ' ) :
location = urlunparse ( ( parsed_url . scheme , domain , r . path , r . params , r . query , r . fragment ) )
return self . request ( resp . request . method , location , * * original_kwargs )
@staticmethod
def extract_js ( body , domain ) :
try :
js = re . search (
r ''' (?x)
setTimeout \( function \( \) { \s * ( [ \w \W ] + ) [ \r \n ] * [ \w \W ] \. action
''' , body).group(1)
except ( BaseException , Exception ) :
raise RuntimeError ( ' Error #1 Cloudflare anti-bots changed, please notify SickGear for an update ' )
if not re . search ( r ' (?i)(toFixed|t \ .length) ' , js ) :
raise RuntimeError ( ' Error #2 Cloudflare anti-bots changed, please notify SickGear for an update ' )
js = re . sub ( r ' (; \ s+); ' , r ' \ 1 ' , js . strip ( ) )
js = re . sub ( r ' ([) \ ]];)( \ w) ' , r ' \ 1 \ n \ n \ 2 ' , js )
js = re . sub ( r ' \ s* \' ; \ s* \ d+ \' \ s*$ ' , ' ' , js )
innerHTML = re . search ( r ' (?sim)<div(?: [^<>]*)? id= " ([^<>]*?) " >([^<>]*?)</div> ' , body )
innerHTML = ' ' if not innerHTML else innerHTML . group ( 2 ) . strip ( )
# Prefix the challenge with a fake document object.
# Interpolate the domain, div contents, and JS challenge.
# The `a.value` to be returned is tacked onto the end.
return r '''
var document = {
createElement : function ( ) {
return { firstChild : { href : ' https:// %s / ' } }
} ,
getElementById : function ( ) {
return { innerHTML : ' %s ' } ;
}
} ;
String . prototype . italics = function ( ) { return ' <i> ' + this + ' </i> ' ; } ;
setInterval = function ( ) { } ;
% s ; a . value
''' % (domain, innerHTML, js)
@staticmethod
def load_cipher_suite ( ) :
suite = [ ]
if hasattr ( ssl , ' PROTOCOL_TLS ' ) :
ctx = ssl . SSLContext ( getattr ( ssl , ' PROTOCOL_TLSv1_3 ' , ssl . PROTOCOL_TLSv1_2 ) )
for cipher in ( [
' ECDHE-ECDSA-AES128-GCM-SHA256 ' ,
' ECDHE-RSA-AES128-GCM-SHA256 ' ,
' ECDHE-ECDSA-AES128-SHA256 ' ,
' ECDHE-ECDSA-AES256-GCM-SHA384 ' ,
' ECDHE-ECDSA-AES256-SHA384 ' ,
' ECDHE-ECDSA-AES256-SHA ' ,
' ECDHE-ECDSA-CHACHA20-POLY1305 ' ,
' ECDHE-RSA-AES256-GCM-SHA384 ' ,
' ECDHE-RSA-AES256-SHA384 ' ,
] ) :
try :
ctx . set_ciphers ( cipher )
suite + = [ cipher ]
except ssl . SSLError :
pass
return ' : ' . join ( suite )
time . sleep ( 1.2 )
final_response = super ( CloudflareScraper , self ) . request ( resp . request . method , resp . request . url , headers = {
' User-Agent ' : resp . request . headers . get ( ' User-Agent ' ) } , * * original_kwargs )
# if final_response and 200 == getattr(final_response, 'status_code'):
# return final_response
return final_response
@classmethod
def create_scraper ( cls , sess = None , * * kwargs ) :
@ -279,29 +208,6 @@ class CloudflareScraper(Session):
return ' ; ' . join ( [ ' = ' . join ( pair ) for pair in tokens . items ( ) ] ) , user_agent
class CloudflareAdapter ( HTTPAdapter ) :
"""
HTTPS adapter that creates a SSL context with custom ciphers
"""
def __init__ ( self , cipher_suite = None , * * kwargs ) :
self . cipher_suite = cipher_suite
params = dict ( ssl_version = ssl . PROTOCOL_TLSv1 )
if hasattr ( ssl , ' PROTOCOL_TLS ' ) :
params = dict ( ssl_version = getattr ( ssl , ' PROTOCOL_TLSv1_3 ' , ssl . PROTOCOL_TLSv1_2 ) , ciphers = cipher_suite )
self . ssl_context = create_urllib3_context ( * * params )
super ( CloudflareAdapter , self ) . __init__ ( * * kwargs )
def init_poolmanager ( self , * args , * * kwargs ) :
kwargs [ ' ssl_context ' ] = self . ssl_context
return super ( CloudflareAdapter , self ) . init_poolmanager ( * args , * * kwargs )
def proxy_manager_for ( self , * args , * * kwargs ) :
kwargs [ ' ssl_context ' ] = self . ssl_context
return super ( CloudflareAdapter , self ) . proxy_manager_for ( * args , * * kwargs )
create_scraper = CloudflareScraper . create_scraper
get_tokens = CloudflareScraper . get_tokens
get_cookie_string = CloudflareScraper . get_cookie_string