|
|
|
#!/usr/bin/env python
|
|
|
|
#
|
|
|
|
# Copyright 2011 Facebook
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
|
|
# not use this file except in compliance with the License. You may obtain
|
|
|
|
# a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
# License for the specific language governing permissions and limitations
|
|
|
|
# under the License.
|
|
|
|
|
|
|
|
"""Utilities for working with multiple processes."""
|
|
|
|
|
|
|
|
from __future__ import absolute_import, division, with_statement
|
|
|
|
|
|
|
|
import errno
|
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
import time
|
|
|
|
|
|
|
|
from binascii import hexlify
|
|
|
|
|
|
|
|
from tornado import ioloop
|
|
|
|
|
|
|
|
try:
|
|
|
|
import multiprocessing # Python 2.6+
|
|
|
|
except ImportError:
|
|
|
|
multiprocessing = None
|
|
|
|
|
|
|
|
|
|
|
|
def cpu_count():
|
|
|
|
"""Returns the number of processors on this machine."""
|
|
|
|
if multiprocessing is not None:
|
|
|
|
try:
|
|
|
|
return multiprocessing.cpu_count()
|
|
|
|
except NotImplementedError:
|
|
|
|
pass
|
|
|
|
try:
|
|
|
|
return os.sysconf("SC_NPROCESSORS_CONF")
|
|
|
|
except ValueError:
|
|
|
|
pass
|
|
|
|
logging.error("Could not detect number of processors; assuming 1")
|
|
|
|
return 1
|
|
|
|
|
|
|
|
|
|
|
|
def _reseed_random():
|
|
|
|
if 'random' not in sys.modules:
|
|
|
|
return
|
|
|
|
import random
|
|
|
|
# If os.urandom is available, this method does the same thing as
|
|
|
|
# random.seed (at least as of python 2.6). If os.urandom is not
|
|
|
|
# available, we mix in the pid in addition to a timestamp.
|
|
|
|
try:
|
|
|
|
seed = long(hexlify(os.urandom(16)), 16)
|
|
|
|
except NotImplementedError:
|
|
|
|
seed = int(time.time() * 1000) ^ os.getpid()
|
|
|
|
random.seed(seed)
|
|
|
|
|
|
|
|
|
|
|
|
_task_id = None
|
|
|
|
|
|
|
|
|
|
|
|
def fork_processes(num_processes, max_restarts=100):
|
|
|
|
"""Starts multiple worker processes.
|
|
|
|
|
|
|
|
If ``num_processes`` is None or <= 0, we detect the number of cores
|
|
|
|
available on this machine and fork that number of child
|
|
|
|
processes. If ``num_processes`` is given and > 0, we fork that
|
|
|
|
specific number of sub-processes.
|
|
|
|
|
|
|
|
Since we use processes and not threads, there is no shared memory
|
|
|
|
between any server code.
|
|
|
|
|
|
|
|
Note that multiple processes are not compatible with the autoreload
|
|
|
|
module (or the debug=True option to `tornado.web.Application`).
|
|
|
|
When using multiple processes, no IOLoops can be created or
|
|
|
|
referenced until after the call to ``fork_processes``.
|
|
|
|
|
|
|
|
In each child process, ``fork_processes`` returns its *task id*, a
|
|
|
|
number between 0 and ``num_processes``. Processes that exit
|
|
|
|
abnormally (due to a signal or non-zero exit status) are restarted
|
|
|
|
with the same id (up to ``max_restarts`` times). In the parent
|
|
|
|
process, ``fork_processes`` returns None if all child processes
|
|
|
|
have exited normally, but will otherwise only exit by throwing an
|
|
|
|
exception.
|
|
|
|
"""
|
|
|
|
global _task_id
|
|
|
|
assert _task_id is None
|
|
|
|
if num_processes is None or num_processes <= 0:
|
|
|
|
num_processes = cpu_count()
|
|
|
|
if ioloop.IOLoop.initialized():
|
|
|
|
raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
|
|
|
|
"has already been initialized. You cannot call "
|
|
|
|
"IOLoop.instance() before calling start_processes()")
|
|
|
|
logging.info("Starting %d processes", num_processes)
|
|
|
|
children = {}
|
|
|
|
|
|
|
|
def start_child(i):
|
|
|
|
pid = os.fork()
|
|
|
|
if pid == 0:
|
|
|
|
# child process
|
|
|
|
_reseed_random()
|
|
|
|
global _task_id
|
|
|
|
_task_id = i
|
|
|
|
return i
|
|
|
|
else:
|
|
|
|
children[pid] = i
|
|
|
|
return None
|
|
|
|
for i in range(num_processes):
|
|
|
|
id = start_child(i)
|
|
|
|
if id is not None:
|
|
|
|
return id
|
|
|
|
num_restarts = 0
|
|
|
|
while children:
|
|
|
|
try:
|
|
|
|
pid, status = os.wait()
|
|
|
|
except OSError, e:
|
|
|
|
if e.errno == errno.EINTR:
|
|
|
|
continue
|
|
|
|
raise
|
|
|
|
if pid not in children:
|
|
|
|
continue
|
|
|
|
id = children.pop(pid)
|
|
|
|
if os.WIFSIGNALED(status):
|
|
|
|
logging.warning("child %d (pid %d) killed by signal %d, restarting",
|
|
|
|
id, pid, os.WTERMSIG(status))
|
|
|
|
elif os.WEXITSTATUS(status) != 0:
|
|
|
|
logging.warning("child %d (pid %d) exited with status %d, restarting",
|
|
|
|
id, pid, os.WEXITSTATUS(status))
|
|
|
|
else:
|
|
|
|
logging.info("child %d (pid %d) exited normally", id, pid)
|
|
|
|
continue
|
|
|
|
num_restarts += 1
|
|
|
|
if num_restarts > max_restarts:
|
|
|
|
raise RuntimeError("Too many child restarts, giving up")
|
|
|
|
new_id = start_child(id)
|
|
|
|
if new_id is not None:
|
|
|
|
return new_id
|
|
|
|
# All child processes exited cleanly, so exit the master process
|
|
|
|
# instead of just returning to right after the call to
|
|
|
|
# fork_processes (which will probably just start up another IOLoop
|
|
|
|
# unless the caller checks the return value).
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
|
|
|
|
def task_id():
|
|
|
|
"""Returns the current task id, if any.
|
|
|
|
|
|
|
|
Returns None if this process was not created by `fork_processes`.
|
|
|
|
"""
|
|
|
|
global _task_id
|
|
|
|
return _task_id
|