9 changed files with 52 additions and 1695 deletions
@ -1,16 +0,0 @@ |
|||
""" |
|||
livereload |
|||
~~~~~~~~~~ |
|||
|
|||
A python version of livereload. |
|||
|
|||
:copyright: (c) 2013 by Hsiaoming Yang |
|||
""" |
|||
|
|||
__version__ = '2.2.0' |
|||
__author__ = 'Hsiaoming Yang <me@lepture.com>' |
|||
__homepage__ = 'https://github.com/lepture/python-livereload' |
|||
|
|||
from .server import Server, shell |
|||
|
|||
__all__ = ('Server', 'shell') |
@ -1,43 +0,0 @@ |
|||
# coding: utf-8 |
|||
""" |
|||
livereload._compat |
|||
~~~~~~~~~~~~~~~~~~ |
|||
|
|||
Compatible module for python2 and python3. |
|||
|
|||
:copyright: (c) 2013 by Hsiaoming Yang |
|||
""" |
|||
|
|||
|
|||
import sys |
|||
PY3 = sys.version_info[0] == 3 |
|||
|
|||
if PY3: |
|||
unicode_type = str |
|||
bytes_type = bytes |
|||
text_types = (str,) |
|||
else: |
|||
unicode_type = unicode |
|||
bytes_type = str |
|||
text_types = (str, unicode) |
|||
|
|||
|
|||
def to_unicode(value, encoding='utf-8'): |
|||
"""Convert different types of objects to unicode.""" |
|||
if isinstance(value, unicode_type): |
|||
return value |
|||
|
|||
if isinstance(value, bytes_type): |
|||
return unicode_type(value, encoding=encoding) |
|||
|
|||
if isinstance(value, int): |
|||
return unicode_type(str(value)) |
|||
|
|||
return value |
|||
|
|||
|
|||
def to_bytes(value, encoding='utf-8'): |
|||
"""Convert different types of objects to bytes.""" |
|||
if isinstance(value, bytes_type): |
|||
return value |
|||
return value.encode(encoding) |
@ -1,209 +0,0 @@ |
|||
# -*- coding: utf-8 -*- |
|||
""" |
|||
livereload.handlers |
|||
~~~~~~~~~~~~~~~~~~~ |
|||
|
|||
HTTP and WebSocket handlers for livereload. |
|||
|
|||
:copyright: (c) 2013 by Hsiaoming Yang |
|||
""" |
|||
|
|||
import os |
|||
import time |
|||
import hashlib |
|||
import logging |
|||
import mimetypes |
|||
from tornado import ioloop |
|||
from tornado import escape |
|||
from tornado.websocket import WebSocketHandler |
|||
from tornado.web import RequestHandler |
|||
from tornado.util import ObjectDict |
|||
from ._compat import to_bytes |
|||
|
|||
|
|||
class LiveReloadHandler(WebSocketHandler): |
|||
waiters = set() |
|||
watcher = None |
|||
_last_reload_time = None |
|||
|
|||
def allow_draft76(self): |
|||
return True |
|||
|
|||
def on_close(self): |
|||
if self in LiveReloadHandler.waiters: |
|||
LiveReloadHandler.waiters.remove(self) |
|||
|
|||
def send_message(self, message): |
|||
if isinstance(message, dict): |
|||
message = escape.json_encode(message) |
|||
|
|||
try: |
|||
self.write_message(message) |
|||
except: |
|||
logging.error('Error sending message', exc_info=True) |
|||
|
|||
def poll_tasks(self): |
|||
filepath = self.watcher.examine() |
|||
if not filepath: |
|||
return |
|||
logging.info('File %s changed', filepath) |
|||
self.watch_tasks() |
|||
|
|||
def watch_tasks(self): |
|||
if time.time() - self._last_reload_time < 3: |
|||
# if you changed lot of files in one time |
|||
# it will refresh too many times |
|||
logging.info('ignore this reload action') |
|||
return |
|||
|
|||
logging.info('Reload %s waiters', len(self.waiters)) |
|||
|
|||
msg = { |
|||
'command': 'reload', |
|||
'path': self.watcher.filepath or '*', |
|||
'liveCSS': True |
|||
} |
|||
|
|||
self._last_reload_time = time.time() |
|||
for waiter in LiveReloadHandler.waiters: |
|||
try: |
|||
waiter.write_message(msg) |
|||
except: |
|||
logging.error('Error sending message', exc_info=True) |
|||
LiveReloadHandler.waiters.remove(waiter) |
|||
|
|||
def on_message(self, message): |
|||
"""Handshake with livereload.js |
|||
|
|||
1. client send 'hello' |
|||
2. server reply 'hello' |
|||
3. client send 'info' |
|||
|
|||
http://feedback.livereload.com/knowledgebase/articles/86174-livereload-protocol |
|||
""" |
|||
message = ObjectDict(escape.json_decode(message)) |
|||
if message.command == 'hello': |
|||
handshake = {} |
|||
handshake['command'] = 'hello' |
|||
handshake['protocols'] = [ |
|||
'http://livereload.com/protocols/official-7', |
|||
'http://livereload.com/protocols/official-8', |
|||
'http://livereload.com/protocols/official-9', |
|||
'http://livereload.com/protocols/2.x-origin-version-negotiation', |
|||
'http://livereload.com/protocols/2.x-remote-control' |
|||
] |
|||
handshake['serverName'] = 'livereload-tornado' |
|||
self.send_message(handshake) |
|||
|
|||
if message.command == 'info' and 'url' in message: |
|||
logging.info('Browser Connected: %s' % message.url) |
|||
LiveReloadHandler.waiters.add(self) |
|||
|
|||
if not LiveReloadHandler._last_reload_time: |
|||
if not self.watcher._tasks: |
|||
logging.info('Watch current working directory') |
|||
self.watcher.watch(os.getcwd()) |
|||
|
|||
LiveReloadHandler._last_reload_time = time.time() |
|||
logging.info('Start watching changes') |
|||
if not self.watcher.start(self.poll_tasks): |
|||
ioloop.PeriodicCallback(self.poll_tasks, 800).start() |
|||
|
|||
|
|||
class LiveReloadJSHandler(RequestHandler): |
|||
def initialize(self, port): |
|||
self._port = port |
|||
|
|||
def get(self): |
|||
js = os.path.join( |
|||
os.path.abspath(os.path.dirname(__file__)), 'livereload.js', |
|||
) |
|||
self.set_header('Content-Type', 'application/javascript') |
|||
with open(js, 'r') as f: |
|||
content = f.read() |
|||
content = content.replace('{{port}}', str(self._port)) |
|||
self.write(content) |
|||
|
|||
|
|||
class ForceReloadHandler(RequestHandler): |
|||
def get(self): |
|||
msg = { |
|||
'command': 'reload', |
|||
'path': self.get_argument('path', default=None) or '*', |
|||
'liveCSS': True, |
|||
'liveImg': True |
|||
} |
|||
for waiter in LiveReloadHandler.waiters: |
|||
try: |
|||
waiter.write_message(msg) |
|||
except: |
|||
logging.error('Error sending message', exc_info=True) |
|||
LiveReloadHandler.waiters.remove(waiter) |
|||
self.write('ok') |
|||
|
|||
|
|||
class StaticHandler(RequestHandler): |
|||
def initialize(self, root, fallback=None): |
|||
self._root = os.path.abspath(root) |
|||
self._fallback = fallback |
|||
|
|||
def filepath(self, url): |
|||
url = url.lstrip('/') |
|||
url = os.path.join(self._root, url) |
|||
|
|||
if url.endswith('/'): |
|||
url += 'index.html' |
|||
elif not os.path.exists(url) and not url.endswith('.html'): |
|||
url += '.html' |
|||
|
|||
if not os.path.isfile(url): |
|||
return None |
|||
return url |
|||
|
|||
def get(self, path='/'): |
|||
filepath = self.filepath(path) |
|||
if not filepath and path.endswith('/'): |
|||
rootdir = os.path.join(self._root, path.lstrip('/')) |
|||
return self.create_index(rootdir) |
|||
|
|||
if not filepath: |
|||
if self._fallback: |
|||
self._fallback(self.request) |
|||
self._finished = True |
|||
return |
|||
return self.send_error(404) |
|||
|
|||
mime_type, encoding = mimetypes.guess_type(filepath) |
|||
if not mime_type: |
|||
mime_type = 'text/html' |
|||
|
|||
self.mime_type = mime_type |
|||
self.set_header('Content-Type', mime_type) |
|||
|
|||
with open(filepath, 'r') as f: |
|||
data = f.read() |
|||
|
|||
hasher = hashlib.sha1() |
|||
hasher.update(to_bytes(data)) |
|||
self.set_header('Etag', '"%s"' % hasher.hexdigest()) |
|||
|
|||
ua = self.request.headers.get('User-Agent', 'bot').lower() |
|||
if mime_type == 'text/html' and 'msie' not in ua: |
|||
data = data.replace( |
|||
'</head>', |
|||
'<script src="/livereload.js"></script></head>' |
|||
) |
|||
self.write(data) |
|||
|
|||
def create_index(self, root): |
|||
files = os.listdir(root) |
|||
self.write('<ul>') |
|||
for f in files: |
|||
path = os.path.join(root, f) |
|||
self.write('<li>') |
|||
if os.path.isdir(path): |
|||
self.write('<a href="%s/">%s</a>' % (f, f)) |
|||
else: |
|||
self.write('<a href="%s">%s</a>' % (f, f)) |
|||
self.write('</li>') |
|||
self.write('</ul>') |
File diff suppressed because it is too large
@ -1,224 +0,0 @@ |
|||
# -*- coding: utf-8 -*- |
|||
""" |
|||
livereload.server |
|||
~~~~~~~~~~~~~~~~~ |
|||
|
|||
WSGI app server for livereload. |
|||
|
|||
:copyright: (c) 2013 by Hsiaoming Yang |
|||
""" |
|||
|
|||
import os |
|||
import logging |
|||
from subprocess import Popen, PIPE |
|||
import time |
|||
import threading |
|||
import webbrowser |
|||
|
|||
from tornado import escape |
|||
from tornado.wsgi import WSGIContainer |
|||
from tornado.ioloop import IOLoop |
|||
from tornado.web import Application, FallbackHandler |
|||
from .handlers import LiveReloadHandler, LiveReloadJSHandler |
|||
from .handlers import ForceReloadHandler, StaticHandler |
|||
from .watcher import Watcher |
|||
from ._compat import text_types |
|||
from tornado.log import enable_pretty_logging |
|||
enable_pretty_logging() |
|||
|
|||
|
|||
def shell(command, output=None, mode='w'): |
|||
"""Command shell command. |
|||
|
|||
You can add a shell command:: |
|||
|
|||
server.watch( |
|||
'style.less', shell('lessc style.less', output='style.css') |
|||
) |
|||
|
|||
:param command: a shell command, string or list |
|||
:param output: output stdout to the given file |
|||
:param mode: only works with output, mode ``w`` means write, |
|||
mode ``a`` means append |
|||
""" |
|||
if not output: |
|||
output = os.devnull |
|||
else: |
|||
folder = os.path.dirname(output) |
|||
if folder and not os.path.isdir(folder): |
|||
os.makedirs(folder) |
|||
|
|||
if isinstance(command, (list, tuple)): |
|||
cmd = command |
|||
else: |
|||
cmd = command.split() |
|||
|
|||
def run_shell(): |
|||
try: |
|||
p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) |
|||
except OSError as e: |
|||
logging.error(e) |
|||
if e.errno == os.errno.ENOENT: # file (command) not found |
|||
logging.error("maybe you haven't installed %s", cmd[0]) |
|||
return e |
|||
stdout, stderr = p.communicate() |
|||
if stderr: |
|||
logging.error(stderr) |
|||
return stderr |
|||
#: stdout is bytes, decode for python3 |
|||
code = stdout.decode() |
|||
with open(output, mode) as f: |
|||
f.write(code) |
|||
|
|||
return run_shell |
|||
|
|||
|
|||
class WSGIWrapper(WSGIContainer): |
|||
"""Insert livereload scripts into response body.""" |
|||
|
|||
def __call__(self, request): |
|||
data = {} |
|||
response = [] |
|||
|
|||
def start_response(status, response_headers, exc_info=None): |
|||
data["status"] = status |
|||
data["headers"] = response_headers |
|||
return response.append |
|||
app_response = self.wsgi_application( |
|||
WSGIContainer.environ(request), start_response) |
|||
try: |
|||
response.extend(app_response) |
|||
body = b"".join(response) |
|||
finally: |
|||
if hasattr(app_response, "close"): |
|||
app_response.close() |
|||
if not data: |
|||
raise Exception("WSGI app did not call start_response") |
|||
|
|||
status_code = int(data["status"].split()[0]) |
|||
headers = data["headers"] |
|||
header_set = set(k.lower() for (k, v) in headers) |
|||
body = escape.utf8(body) |
|||
body = body.replace( |
|||
b'</head>', |
|||
b'<script src="/livereload.js"></script></head>' |
|||
) |
|||
|
|||
if status_code != 304: |
|||
if "content-length" not in header_set: |
|||
headers.append(("Content-Length", str(len(body)))) |
|||
if "content-type" not in header_set: |
|||
headers.append(("Content-Type", "text/html; charset=UTF-8")) |
|||
if "server" not in header_set: |
|||
headers.append(("Server", "livereload-tornado")) |
|||
|
|||
parts = [escape.utf8("HTTP/1.1 " + data["status"] + "\r\n")] |
|||
for key, value in headers: |
|||
if key.lower() == 'content-length': |
|||
value = str(len(body)) |
|||
parts.append( |
|||
escape.utf8(key) + b": " + escape.utf8(value) + b"\r\n" |
|||
) |
|||
parts.append(b"\r\n") |
|||
parts.append(body) |
|||
request.write(b"".join(parts)) |
|||
request.finish() |
|||
self._log(status_code, request) |
|||
|
|||
|
|||
class Server(object): |
|||
"""Livereload server interface. |
|||
|
|||
Initialize a server and watch file changes:: |
|||
|
|||
server = Server(wsgi_app) |
|||
server.serve() |
|||
|
|||
:param app: a wsgi application instance |
|||
:param watcher: A Watcher instance, you don't have to initialize |
|||
it by yourself. Under Linux, you will want to install |
|||
pyinotify and use INotifyWatcher() to avoid wasted |
|||
CPU usage. |
|||
""" |
|||
def __init__(self, app=None, watcher=None): |
|||
self.app = app |
|||
self.port = 5500 |
|||
self.root = None |
|||
if not watcher: |
|||
watcher = Watcher() |
|||
self.watcher = watcher |
|||
|
|||
def watch(self, filepath, func=None): |
|||
"""Add the given filepath for watcher list. |
|||
|
|||
Once you have intialized a server, watch file changes before |
|||
serve the server:: |
|||
|
|||
server.watch('static/*.stylus', 'make static') |
|||
def alert(): |
|||
print('foo') |
|||
server.watch('foo.txt', alert) |
|||
server.serve() |
|||
|
|||
:param filepath: files to be watched, it can be a filepath, |
|||
a directory, or a glob pattern |
|||
:param func: the function to be called, it can be a string of |
|||
shell command, or any callable object without |
|||
parameters |
|||
""" |
|||
if isinstance(func, text_types): |
|||
func = shell(func) |
|||
|
|||
self.watcher.watch(filepath, func) |
|||
|
|||
def application(self, debug=True): |
|||
LiveReloadHandler.watcher = self.watcher |
|||
handlers = [ |
|||
(r'/livereload', LiveReloadHandler), |
|||
(r'/forcereload', ForceReloadHandler), |
|||
(r'/livereload.js', LiveReloadJSHandler, dict(port=self.port)), |
|||
] |
|||
|
|||
if self.app: |
|||
self.app = WSGIWrapper(self.app) |
|||
handlers.append( |
|||
(r'.*', FallbackHandler, dict(fallback=self.app)) |
|||
) |
|||
else: |
|||
handlers.append( |
|||
(r'(.*)', StaticHandler, dict(root=self.root or '.')), |
|||
) |
|||
return Application(handlers=handlers, debug=debug) |
|||
|
|||
def serve(self, port=None, host=None, root=None, debug=True, open_url=False): |
|||
"""Start serve the server with the given port. |
|||
|
|||
:param port: serve on this port, default is 5500 |
|||
:param host: serve on this hostname, default is 0.0.0.0 |
|||
:param root: serve static on this root directory |
|||
:param open_url: open system browser |
|||
""" |
|||
if root: |
|||
self.root = root |
|||
if port: |
|||
self.port = port |
|||
if host is None: |
|||
host = '' |
|||
|
|||
self.application(debug=debug).listen(self.port, address=host) |
|||
logging.getLogger().setLevel(logging.INFO) |
|||
|
|||
host = host or '127.0.0.1' |
|||
print('Serving on %s:%s' % (host, self.port)) |
|||
|
|||
# Async open web browser after 5 sec timeout |
|||
if open_url: |
|||
def opener(): |
|||
time.sleep(5) |
|||
webbrowser.open('http://%s:%s' % (host, self.port)) |
|||
threading.Thread(target=opener).start() |
|||
|
|||
try: |
|||
IOLoop.instance().start() |
|||
except KeyboardInterrupt: |
|||
print('Shutting down...') |
@ -1,132 +0,0 @@ |
|||
# -*- coding: utf-8 -*- |
|||
""" |
|||
livereload.watcher |
|||
~~~~~~~~~~~~~~~~~~ |
|||
|
|||
A file watch management for LiveReload Server. |
|||
|
|||
:copyright: (c) 2013 by Hsiaoming Yang |
|||
""" |
|||
|
|||
import os |
|||
import glob |
|||
import time |
|||
|
|||
|
|||
class Watcher(object): |
|||
"""A file watcher registery.""" |
|||
def __init__(self): |
|||
self._tasks = {} |
|||
self._mtimes = {} |
|||
|
|||
# filepath that is changed |
|||
self.filepath = None |
|||
self._start = time.time() |
|||
|
|||
def ignore(self, filename): |
|||
"""Ignore a given filename or not.""" |
|||
_, ext = os.path.splitext(filename) |
|||
return ext in ['.pyc', '.pyo', '.o', '.swp'] |
|||
|
|||
def watch(self, path, func=None): |
|||
"""Add a task to watcher.""" |
|||
self._tasks[path] = func |
|||
|
|||
def start(self, callback): |
|||
"""Start the watcher running, calling callback when changes are observed. If this returns False, |
|||
regular polling will be used.""" |
|||
return False |
|||
|
|||
def examine(self): |
|||
"""Check if there are changes, if true, run the given task.""" |
|||
# clean filepath |
|||
self.filepath = None |
|||
for path in self._tasks: |
|||
if self.is_changed(path): |
|||
func = self._tasks[path] |
|||
# run function |
|||
func and func() |
|||
return self.filepath |
|||
|
|||
def is_changed(self, path): |
|||
if os.path.isfile(path): |
|||
return self.is_file_changed(path) |
|||
elif os.path.isdir(path): |
|||
return self.is_folder_changed(path) |
|||
return self.is_glob_changed(path) |
|||
|
|||
def is_file_changed(self, path): |
|||
if not os.path.isfile(path): |
|||
return False |
|||
|
|||
if self.ignore(path): |
|||
return False |
|||
|
|||
mtime = os.path.getmtime(path) |
|||
|
|||
if path not in self._mtimes: |
|||
self._mtimes[path] = mtime |
|||
self.filepath = path |
|||
return mtime > self._start |
|||
|
|||
if self._mtimes[path] != mtime: |
|||
self._mtimes[path] = mtime |
|||
self.filepath = path |
|||
return True |
|||
|
|||
self._mtimes[path] = mtime |
|||
return False |
|||
|
|||
def is_folder_changed(self, path): |
|||
for root, dirs, files in os.walk(path, followlinks=True): |
|||
if '.git' in dirs: |
|||
dirs.remove('.git') |
|||
if '.hg' in dirs: |
|||
dirs.remove('.hg') |
|||
if '.svn' in dirs: |
|||
dirs.remove('.svn') |
|||
if '.cvs' in dirs: |
|||
dirs.remove('.cvs') |
|||
|
|||
for f in files: |
|||
if self.is_file_changed(os.path.join(root, f)): |
|||
return True |
|||
return False |
|||
|
|||
def is_glob_changed(self, path): |
|||
for f in glob.glob(path): |
|||
if self.is_file_changed(f): |
|||
return True |
|||
return False |
|||
|
|||
|
|||
class INotifyWatcher(Watcher): |
|||
def __init__(self): |
|||
Watcher.__init__(self) |
|||
|
|||
import pyinotify |
|||
self.wm = pyinotify.WatchManager() |
|||
self.notifier = None |
|||
self.callback = None |
|||
|
|||
def watch(self, path, func=None): |
|||
import pyinotify |
|||
flag = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY |
|||
self.wm.add_watch(path, flag, rec=True, do_glob=True, auto_add=True) |
|||
Watcher.watch(self, path, func) |
|||
|
|||
def inotify_event(self, event): |
|||
self.callback() |
|||
|
|||
def start(self, callback): |
|||
if not self.notifier: |
|||
self.callback = callback |
|||
|
|||
import pyinotify |
|||
from tornado import ioloop |
|||
self.notifier = pyinotify.TornadoAsyncNotifier( |
|||
self.wm, ioloop.IOLoop.instance(), |
|||
default_proc_fun=self.inotify_event |
|||
) |
|||
callback() |
|||
return True |
Loading…
Reference in new issue