Source code for distarray.apps.dacluster
#!/usr/bin/env python
# encoding: utf-8
# ---------------------------------------------------------------------------
# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc.
# Distributed under the terms of the BSD License. See COPYING.rst.
# ---------------------------------------------------------------------------
"""
Start, stop and manage a IPython.parallel cluster. `dacluster` can take
all the commands IPython's `ipcluster` can, and a few extras that are
distarray specific.
"""
from __future__ import print_function
import argparse
import sys
from time import sleep
from subprocess import Popen, PIPE
from distarray.externals import six
from distarray.globalapi.ipython_cleanup import clear_all
is_anaconda = "Anaconda" in sys.version or "Continuum" in sys.version
if six.PY2 or is_anaconda:
ipcluster_cmd = 'ipcluster'
elif six.PY3:
ipcluster_cmd = 'ipcluster3'
else:
raise NotImplementedError("Not run with Python 2 *or* 3?")
[docs]def start(n=4, engines=None, **kwargs):
"""Convenient way to start an ipcluster for testing.
Doesn't exit until the ipcluster prints a success message.
"""
if engines is None:
engines = "--engines=MPIEngineSetLauncher"
cluster = Popen([ipcluster_cmd, 'start', '-n', str(n), engines],
stdout=PIPE, stderr=PIPE)
started = "Engines appear to have started successfully"
running = "CRITICAL | Cluster is already running with"
while True:
line = cluster.stderr.readline().decode()
if not line:
break
print(line, end='')
if (started in line):
break
elif (running in line):
raise RuntimeError("ipcluster is already running.")
[docs]def stop(**kwargs):
"""Convenient way to stop an ipcluster."""
stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE)
stopped = "Stopping cluster"
not_running = ("CRITICAL | Could not read pid file, cluster "
"is probably not running.")
while True:
line = stopping.stderr.readline().decode()
if not line:
break
print(line, end='')
if (stopped in line) or (not_running in line):
break
[docs]def restart(n=4, engines=None, **kwargs):
"""Convenient way to restart an ipcluster."""
stop()
started = False
while not started:
sleep(2)
try:
start(n=n, engines=engines)
except RuntimeError:
pass
else:
started = True
[docs]def clear(**kwargs):
""" Removes all distarray-related modules from engines' sys.modules."""
mods = clear_all()
msg = "*** Removing %d distarray modules from engines' namespace. ***"
print(msg % len(list(mods.values())[0]))
[docs]def main():
""" Main function for dacluster utility.
Either start, stop, restart, or clear is called depending on the
command line arguments.
"""
main_description = """
Start, stop and manage a IPython.parallel cluster. `dacluster` can take
all the commands IPython's `ipcluster` can, and a few extras that are
distarray specific. For details on a subcommand, try `dacluster
<subcommand> --help`.
"""
parser = argparse.ArgumentParser(description=main_description)
# Print help if no command line args are supplied
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
subparsers = parser.add_subparsers()
start_description = """
Start a new IPython.parallel cluster.
"""
stop_description = """
Stop a IPython.parallel cluster.
"""
restart_description = """
Restart a IPython.parallel cluster.
"""
clear_description = """
Clear the namespace and imports on the cluster. This should be the
same as restarting the engines, but faster.
"""
# subparses for all our commands
parser_start = subparsers.add_parser('start',
description=start_description)
parser_stop = subparsers.add_parser('stop', description=stop_description)
parser_restart = subparsers.add_parser('restart',
description=restart_description)
parser_clear = subparsers.add_parser('clear',
description=clear_description)
engine_help = """
Number of engines to start.
"""
# Add some optional arguments for `start` and `restart`
parser_start.add_argument('-n', '--n', type=int, nargs='?', default=4,
help=engine_help)
parser_restart.add_argument('-n', '--n', type=int, nargs='?', default=4,
help=engine_help)
# set the functions each command should use
parser_start.set_defaults(func=start)
parser_stop.set_defaults(func=stop)
parser_restart.set_defaults(func=restart)
parser_clear.set_defaults(func=clear)
# run it
args = parser.parse_args()
args.func(**vars(args))
if __name__ == '__main__':
main()