4 changed files with 726 additions and 70 deletions
@ -0,0 +1 @@ |
|||
from .torrent_parser import * |
@ -0,0 +1,645 @@ |
|||
#!/usr/bin/env python |
|||
# coding: utf-8 |
|||
|
|||
""" |
|||
A .torrent file parser for both Python 2 and 3 |
|||
|
|||
Usage: |
|||
|
|||
data = parse_torrent_file(filename) |
|||
|
|||
# or |
|||
|
|||
with open(filename, 'rb') as f: # the binary mode 'b' is necessary |
|||
data = TorrentFileParser(f).parse() |
|||
|
|||
# then you can edit the data |
|||
|
|||
data['announce-list'].append(['http://127.0.0.1:8080']) |
|||
|
|||
# and create a new torrent file from data |
|||
|
|||
create_torrent_file('new.torrent', data) |
|||
|
|||
# or |
|||
|
|||
with open('new.torrent', 'wb') as f: |
|||
f.write(TorrentFileCreator(data).encode()) |
|||
|
|||
# or you don't deal with file, just object in memory |
|||
|
|||
data = decode(b'i12345e') # data = 12345 |
|||
content = encode(data) # content = b'i12345e' |
|||
|
|||
""" |
|||
|
|||
from __future__ import print_function, unicode_literals |
|||
|
|||
import argparse |
|||
import binascii |
|||
import collections |
|||
import io |
|||
import json |
|||
import sys |
|||
import warnings |
|||
|
|||
try: |
|||
FileNotFoundError |
|||
except NameError: |
|||
# Python 2 do not have FileNotFoundError, use IOError instead |
|||
# noinspection PyShadowingBuiltins |
|||
FileNotFoundError = IOError |
|||
|
|||
try: |
|||
# noinspection PyPackageRequirements |
|||
from chardet import detect as _detect |
|||
except ImportError: |
|||
def _detect(_): |
|||
warnings.warn("No chardet module installed, encoding will be utf-8") |
|||
return {'encoding': 'utf-8', 'confidence': 1} |
|||
|
|||
try: |
|||
# noinspection PyUnresolvedReferences |
|||
# For Python 2 |
|||
str_type = unicode |
|||
except NameError: |
|||
# For Python 3 |
|||
str_type = str |
|||
|
|||
__all__ = [ |
|||
'InvalidTorrentDataException', |
|||
'BEncoder', |
|||
'BDecoder', |
|||
'encode', |
|||
'decode', |
|||
'TorrentFileParser', |
|||
'create_torrent_file', |
|||
'parse_torrent_file', |
|||
] |
|||
|
|||
__version__ = '0.3.0' |
|||
|
|||
|
|||
def detect(content): |
|||
return _detect(content)['encoding'] |
|||
|
|||
|
|||
class InvalidTorrentDataException(Exception): |
|||
def __init__(self, pos, msg=None): |
|||
msg = msg or "Invalid torrent format when read at pos {pos}" |
|||
msg = msg.format(pos=pos) |
|||
super(InvalidTorrentDataException, self).__init__(msg) |
|||
|
|||
|
|||
class __EndCls(object): |
|||
pass |
|||
|
|||
|
|||
_END = __EndCls() |
|||
|
|||
|
|||
def _check_hash_field_params(name, value): |
|||
return isinstance(name, str_type) \ |
|||
and isinstance(value, tuple) and len(value) == 2 \ |
|||
and isinstance(value[0], int) and isinstance(value[1], bool) |
|||
|
|||
|
|||
class TorrentFileParser(object): |
|||
|
|||
TYPE_LIST = 'list' |
|||
TYPE_DICT = 'dict' |
|||
TYPE_INT = 'int' |
|||
TYPE_STRING = 'string' |
|||
TYPE_END = 'end' |
|||
|
|||
LIST_INDICATOR = b'l' |
|||
DICT_INDICATOR = b'd' |
|||
INT_INDICATOR = b'i' |
|||
END_INDICATOR = b'e' |
|||
STRING_INDICATOR = b'' |
|||
STRING_DELIMITER = b':' |
|||
|
|||
HASH_FIELD_PARAMS = { |
|||
# field length need_list |
|||
'pieces': (20, True), |
|||
'ed2k': (16, False), |
|||
'filehash': (20, False), |
|||
} |
|||
|
|||
TYPES = [ |
|||
(TYPE_LIST, LIST_INDICATOR), |
|||
(TYPE_DICT, DICT_INDICATOR), |
|||
(TYPE_INT, INT_INDICATOR), |
|||
(TYPE_END, END_INDICATOR), |
|||
(TYPE_STRING, STRING_INDICATOR), |
|||
] |
|||
|
|||
def __init__( |
|||
self, fp, use_ordered_dict=False, encoding='utf-8', errors='strict', |
|||
hash_fields=None, hash_raw=False, |
|||
): |
|||
""" |
|||
:param fp: a **binary** file-like object to parse, |
|||
which means need 'b' mode when use built-in open function |
|||
:param bool use_ordered_dict: Use collections.OrderedDict as dict |
|||
container default False, which mean use built-in dict |
|||
:param str encoding: file content encoding, default utf-8, use 'auto' |
|||
to enable charset auto detection (need 'chardet' package installed) |
|||
:param str errors: how to deal with encoding error when try to parse |
|||
string from content with ``encoding`` |
|||
:param Dict[str, Tuple[int, bool]] hash_fields: extra fields should |
|||
be treated as hash value. dict key is the field name, value is a |
|||
two-element tuple of (hash_block_length, as_a_list). |
|||
See :any:`hash_field` for detail |
|||
""" |
|||
if getattr(fp, 'read', ) is None \ |
|||
or getattr(fp, 'seek') is None: |
|||
raise ValueError('Parameter fp needs a file like object') |
|||
|
|||
self._pos = 0 |
|||
self._encoding = encoding |
|||
self._content = fp |
|||
self._use_ordered_dict = use_ordered_dict |
|||
self._error_handler = errors |
|||
self._hash_fields = dict(TorrentFileParser.HASH_FIELD_PARAMS) |
|||
if hash_fields is not None: |
|||
for k, v in hash_fields.items(): |
|||
if _check_hash_field_params(k, v): |
|||
self._hash_fields[k] = v |
|||
else: |
|||
raise ValueError( |
|||
"Invalid hash field parameter, it should be type of " |
|||
"Dict[str, Tuple[int, bool]]" |
|||
) |
|||
self._hash_raw = bool(hash_raw) |
|||
|
|||
def hash_field(self, name, block_length=20, need_list=False): |
|||
""" |
|||
Let field with the `name` to be treated as hash value, don't decode it |
|||
as a string. |
|||
|
|||
:param str name: field name |
|||
:param int block_length: hash block length for split |
|||
:param bool need_list: if True, when the field only has one block( |
|||
or even empty) its parse result will be a one-element list( |
|||
or empty list); If False, will be a string in 0 or 1 block condition |
|||
:return: return self, so you can chained call |
|||
""" |
|||
v = (block_length, need_list) |
|||
if _check_hash_field_params(name, v): |
|||
self._hash_fields[name] = v |
|||
else: |
|||
raise ValueError("Invalid hash field parameter") |
|||
return self |
|||
|
|||
def parse(self): |
|||
""" |
|||
:rtype: dict|list|int|str|bytes |
|||
:raise: :any:`InvalidTorrentDataException` when parse failed or error |
|||
happened when decode string using specified encoding |
|||
""" |
|||
self._restart() |
|||
data = self._next_element() |
|||
|
|||
try: |
|||
c = self._read_byte(1, True) |
|||
raise InvalidTorrentDataException( |
|||
0, 'Expect EOF, but get [{}] at pos {}'.format(c, self._pos) |
|||
) |
|||
except EOFError: # expect EOF |
|||
pass |
|||
|
|||
return data |
|||
|
|||
def _read_byte(self, count=1, raise_eof=False): |
|||
assert count >= 0 |
|||
gotten = self._content.read(count) |
|||
if count != 0 and len(gotten) == 0: |
|||
if raise_eof: |
|||
raise EOFError() |
|||
raise InvalidTorrentDataException( |
|||
self._pos, |
|||
'Unexpected EOF when reading torrent file' |
|||
) |
|||
self._pos += count |
|||
return gotten |
|||
|
|||
def _seek_back(self, count): |
|||
self._content.seek(-count, 1) |
|||
self._pos = self._pos - count |
|||
|
|||
def _restart(self): |
|||
self._content.seek(0, 0) |
|||
self._pos = 0 |
|||
|
|||
def _dict_items_generator(self): |
|||
while True: |
|||
k = self._next_element() |
|||
if k is _END: |
|||
return |
|||
if not isinstance(k, str_type): |
|||
raise InvalidTorrentDataException( |
|||
self._pos, "Type of dict key can't be " + type(k).__name__ |
|||
) |
|||
if k in self._hash_fields: |
|||
v = self._next_hash(*self._hash_fields[k]) |
|||
else: |
|||
v = self._next_element(k) |
|||
if k == 'encoding': |
|||
self._encoding = v |
|||
yield k, v |
|||
|
|||
def _next_dict(self): |
|||
data = collections.OrderedDict() if self._use_ordered_dict else dict() |
|||
for key, element in self._dict_items_generator(): |
|||
data[key] = element |
|||
return data |
|||
|
|||
def _list_items_generator(self): |
|||
while True: |
|||
element = self._next_element() |
|||
if element is _END: |
|||
return |
|||
yield element |
|||
|
|||
def _next_list(self): |
|||
return [element for element in self._list_items_generator()] |
|||
|
|||
def _next_int(self, end=END_INDICATOR): |
|||
value = 0 |
|||
char = self._read_byte(1) |
|||
neg = False |
|||
while char != end: |
|||
if not neg and char == b'-': |
|||
neg = True |
|||
elif not b'0' <= char <= b'9': |
|||
raise InvalidTorrentDataException(self._pos - 1) |
|||
else: |
|||
value = value * 10 + int(char) - int(b'0') |
|||
char = self._read_byte(1) |
|||
return -value if neg else value |
|||
|
|||
def _next_string(self, need_decode=True, field=None): |
|||
length = self._next_int(self.STRING_DELIMITER) |
|||
raw = self._read_byte(length) |
|||
if need_decode: |
|||
encoding = self._encoding |
|||
if encoding == 'auto': |
|||
self.encoding = encoding = detect(raw) |
|||
try: |
|||
string = raw.decode(encoding, self._error_handler) |
|||
except UnicodeDecodeError as e: |
|||
msg = [ |
|||
"Fail to decode string at pos {pos} using encoding ", |
|||
e.encoding |
|||
] |
|||
if field: |
|||
msg.extend([ |
|||
' when parser field "', field, '"' |
|||
', maybe it is an hash field. ', |
|||
'You can use self.hash_field("', field, '") ', |
|||
'to let it be treated as hash value, ', |
|||
'so this error may disappear' |
|||
]) |
|||
raise InvalidTorrentDataException( |
|||
self._pos - length + e.start, |
|||
''.join(msg) |
|||
) |
|||
return string |
|||
return raw |
|||
|
|||
def _next_hash(self, p_len, need_list): |
|||
raw = self._next_string(need_decode=False) |
|||
if len(raw) % p_len != 0: |
|||
raise InvalidTorrentDataException( |
|||
self._pos - len(raw), "Hash bit length not match at pos {pos}" |
|||
) |
|||
if self._hash_raw: |
|||
return raw |
|||
res = [ |
|||
binascii.hexlify(chunk).decode('ascii') |
|||
for chunk in (raw[x:x+p_len] for x in range(0, len(raw), p_len)) |
|||
] |
|||
if len(res) == 0 and not need_list: |
|||
return '' |
|||
if len(res) == 1 and not need_list: |
|||
return res[0] |
|||
return res |
|||
|
|||
@staticmethod |
|||
def _next_end(): |
|||
return _END |
|||
|
|||
def _next_type(self): |
|||
for (element_type, indicator) in self.TYPES: |
|||
indicator_length = len(indicator) |
|||
char = self._read_byte(indicator_length) |
|||
if indicator == char: |
|||
return element_type |
|||
self._seek_back(indicator_length) |
|||
raise InvalidTorrentDataException(self._pos) |
|||
|
|||
def _type_to_func(self, t): |
|||
return getattr(self, '_next_' + t) |
|||
|
|||
def _next_element(self, field=None): |
|||
element_type = self._next_type() |
|||
if element_type is TorrentFileParser.TYPE_STRING and field is not None: |
|||
element = self._type_to_func(element_type)(field=field) |
|||
else: |
|||
element = self._type_to_func(element_type)() |
|||
return element |
|||
|
|||
|
|||
class BEncoder(object): |
|||
|
|||
TYPES = { |
|||
(dict,): TorrentFileParser.TYPE_DICT, |
|||
(list,): TorrentFileParser.TYPE_LIST, |
|||
(int,): TorrentFileParser.TYPE_INT, |
|||
(str_type, bytes): TorrentFileParser.TYPE_STRING, |
|||
} |
|||
|
|||
def __init__(self, data, encoding='utf-8', hash_fields=None): |
|||
""" |
|||
:param dict|list|int|str data: data will be encoded |
|||
:param str encoding: string field output encoding |
|||
:param List[str] hash_fields: see |
|||
:any:`TorrentFileParser.__init__` |
|||
""" |
|||
self._data = data |
|||
self._encoding = encoding |
|||
self._hash_fields = list(TorrentFileParser.HASH_FIELD_PARAMS.keys()) |
|||
if hash_fields is not None: |
|||
self._hash_fields.extend(str_type(hash_fields)) |
|||
|
|||
def hash_field(self, name): |
|||
""" |
|||
see :any:`TorrentFileParser.hash_field` |
|||
|
|||
:param str name: |
|||
:return: return self, so you can chained call |
|||
""" |
|||
return self._hash_fields.append(str_type(name)) |
|||
|
|||
def encode(self): |
|||
""" |
|||
Encode to bytes |
|||
|
|||
:rtype: bytes |
|||
""" |
|||
return b''.join(self._output_element(self._data)) |
|||
|
|||
def encode_to_filelike(self): |
|||
""" |
|||
Encode to a file-like(BytesIO) object |
|||
|
|||
:rtype: BytesIO |
|||
""" |
|||
return io.BytesIO(self.encode()) |
|||
|
|||
def _output_string(self, data): |
|||
if isinstance(data, str_type): |
|||
data = data.encode(self._encoding) |
|||
yield str(len(data)).encode('ascii') |
|||
yield TorrentFileParser.STRING_DELIMITER |
|||
yield data |
|||
|
|||
@staticmethod |
|||
def _output_int(data): |
|||
yield TorrentFileParser.INT_INDICATOR |
|||
yield str(data).encode('ascii') |
|||
yield TorrentFileParser.END_INDICATOR |
|||
|
|||
def _output_decode_hash(self, data): |
|||
if isinstance(data, str_type): |
|||
data = [data] |
|||
result = [] |
|||
for hash_line in data: |
|||
if not isinstance(hash_line, str_type): |
|||
raise InvalidTorrentDataException( |
|||
None, |
|||
"Hash must be " + str_type.__name__ + " not " + |
|||
type(hash_line).__name__, |
|||
) |
|||
if len(hash_line) % 2 != 0: |
|||
raise InvalidTorrentDataException( |
|||
None, |
|||
"Hash(" + hash_line + ") length(" + str(len(hash_line)) + |
|||
") is a not even number", |
|||
) |
|||
try: |
|||
raw = binascii.unhexlify(hash_line) |
|||
except binascii.Error as e: |
|||
raise InvalidTorrentDataException( |
|||
None, str(e), |
|||
) |
|||
result.append(raw) |
|||
for x in self._output_string(b''.join(result)): |
|||
yield x |
|||
|
|||
def _output_dict(self, data): |
|||
yield TorrentFileParser.DICT_INDICATOR |
|||
for k, v in data.items(): |
|||
if not isinstance(k, str_type): |
|||
raise InvalidTorrentDataException( |
|||
None, "Dict key must be " + str_type.__name__, |
|||
) |
|||
for x in self._output_element(k): |
|||
yield x |
|||
if k in self._hash_fields: |
|||
for x in self._output_decode_hash(v): |
|||
yield x |
|||
else: |
|||
for x in self._output_element(v): |
|||
yield x |
|||
yield TorrentFileParser.END_INDICATOR |
|||
|
|||
def _output_list(self, data): |
|||
yield TorrentFileParser.LIST_INDICATOR |
|||
for v in data: |
|||
for x in self._output_element(v): |
|||
yield x |
|||
yield TorrentFileParser.END_INDICATOR |
|||
|
|||
def _type_to_func(self, t): |
|||
return getattr(self, '_output_' + t) |
|||
|
|||
def _output_element(self, data): |
|||
for types, t in self.TYPES.items(): |
|||
if isinstance(data, types): |
|||
# noinspection PyCallingNonCallable |
|||
return self._type_to_func(t)(data) |
|||
raise InvalidTorrentDataException( |
|||
None, |
|||
"Invalid type for torrent file: " + type(data).__name__, |
|||
) |
|||
|
|||
|
|||
class BDecoder(object): |
|||
def __init__( |
|||
self, data, use_ordered_dict=False, encoding='utf-8', errors='strict', |
|||
hash_fields=None, hash_raw=False, |
|||
): |
|||
""" |
|||
See :any:`TorrentFileParser.__init__` for parameter description. |
|||
|
|||
:param bytes data: raw data to be decoded |
|||
:param bool use_ordered_dict: |
|||
:param str encoding: |
|||
:param str errors: |
|||
:param Dict[str, Tuple[int, bool]] hash_fields: |
|||
:param bool hash_raw: |
|||
""" |
|||
self._parser = TorrentFileParser( |
|||
io.BytesIO(bytes(data)), |
|||
use_ordered_dict, |
|||
encoding, |
|||
errors, |
|||
hash_fields, |
|||
hash_raw, |
|||
) |
|||
|
|||
def hash_field(self, name, block_length=20, need_dict=False): |
|||
""" |
|||
See :any:`TorrentFileParser.hash_field` for parameter description |
|||
|
|||
:param name: |
|||
:param block_length: |
|||
:param need_dict: |
|||
:return: return self, so you can chained call |
|||
""" |
|||
self._parser.hash_field(name, block_length, need_dict) |
|||
return self |
|||
|
|||
def decode(self): |
|||
return self._parser.parse() |
|||
|
|||
|
|||
def encode(data, encoding='utf-8', hash_fields=None): |
|||
""" |
|||
Shortcut function for encode python object to torrent file format(bencode) |
|||
|
|||
See :any:`BEncoder.__init__` for parameter description |
|||
|
|||
:param dict|list|int|str|bytes data: data to be encoded |
|||
:param str encoding: |
|||
:param List[str] hash_fields: |
|||
:rtype: bytes |
|||
""" |
|||
return BEncoder(data, encoding, hash_fields).encode() |
|||
|
|||
|
|||
def decode( |
|||
data, use_ordered_dict=False, encoding='utf-8', errors='strict', |
|||
hash_fields=None, hash_raw=False, |
|||
): |
|||
""" |
|||
Shortcut function for decode bytes as torrent file format(bencode) to python |
|||
object |
|||
|
|||
See :any:`BDecoder.__init__` for parameter description |
|||
|
|||
:param bytes data: raw data to be decoded |
|||
:param bool use_ordered_dict: |
|||
:param str encoding: |
|||
:param str errors: |
|||
:param Dict[str, Tuple[int, bool]] hash_fields: |
|||
:param bool hash_raw: |
|||
:rtype: dict|list|int|str|bytes|bytes |
|||
""" |
|||
return BDecoder( |
|||
data, use_ordered_dict, encoding, errors, hash_fields, hash_raw, |
|||
).decode() |
|||
|
|||
|
|||
def parse_torrent_file( |
|||
filename, use_ordered_dict=False, encoding='utf-8', errors='strict', |
|||
hash_fields=None, hash_raw=False, |
|||
): |
|||
""" |
|||
Shortcut function for parse torrent object using TorrentFileParser |
|||
|
|||
See :any:`TorrentFileParser.__init__` for parameter description |
|||
|
|||
:param str filename: torrent filename |
|||
:param bool use_ordered_dict: |
|||
:param str encoding: |
|||
:param str errors: |
|||
:param Dict[str, Tuple[int, bool]] hash_fields: |
|||
:param bool hash_raw: |
|||
:rtype: dict|list|int|str|bytes |
|||
""" |
|||
with open(filename, 'rb') as f: |
|||
return TorrentFileParser( |
|||
f, use_ordered_dict, encoding, errors, hash_fields, hash_raw, |
|||
).parse() |
|||
|
|||
|
|||
def create_torrent_file(filename, data, encoding='utf-8', hash_fields=None): |
|||
""" |
|||
Shortcut function for create a torrent file using BEncoder |
|||
|
|||
see :any:`BDecoder.__init__` for parameter description |
|||
|
|||
:param str filename: output torrent filename |
|||
:param dict|list|int|str|bytes data: |
|||
:param str encoding: |
|||
:param List[str] hash_fields: |
|||
""" |
|||
with open(filename, 'wb') as f: |
|||
f.write(BEncoder(data, encoding, hash_fields).encode()) |
|||
|
|||
|
|||
def __main(): |
|||
parser = argparse.ArgumentParser() |
|||
parser.add_argument('file', nargs='?', default='', |
|||
help='input file, will read form stdin if empty') |
|||
parser.add_argument('--dict', '-d', action='store_true', default=False, |
|||
help='use built-in dict, default will be OrderedDict') |
|||
parser.add_argument('--sort', '-s', action='store_true', default=False, |
|||
help='sort output json item by key') |
|||
parser.add_argument('--indent', '-i', type=int, default=None, |
|||
help='json output indent for every inner level') |
|||
parser.add_argument('--ascii', '-a', action='store_true', default=False, |
|||
help='ensure output json use ascii char, ' |
|||
'escape other char use \\u') |
|||
parser.add_argument('--coding', '-c', default='utf-8', |
|||
help='string encoding, default "utf-8"') |
|||
parser.add_argument('--errors', '-e', default='strict', |
|||
help='decoding error handler, default "strict", you can' |
|||
' use "ignore" or "replace" to avoid exception') |
|||
parser.add_argument('--version', '-v', action='store_true', default=False, |
|||
help='print version and exit') |
|||
args = parser.parse_args() |
|||
|
|||
if args.version: |
|||
print(__version__) |
|||
exit(0) |
|||
|
|||
try: |
|||
if args.file == '': |
|||
target_file = io.BytesIO( |
|||
getattr(sys.stdin, 'buffer', sys.stdin).read() |
|||
) |
|||
else: |
|||
target_file = open(args.file, 'rb') |
|||
except FileNotFoundError: |
|||
sys.stderr.write('File "{}" not exist\n'.format(args.file)) |
|||
exit(1) |
|||
|
|||
# noinspection PyUnboundLocalVariable |
|||
data = TorrentFileParser( |
|||
target_file, not args.dict, args.coding, args.errors |
|||
).parse() |
|||
|
|||
data = json.dumps( |
|||
data, ensure_ascii=args.ascii, |
|||
sort_keys=args.sort, indent=args.indent |
|||
) |
|||
|
|||
print(data) |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
__main() |
Loading…
Reference in new issue