This commit is contained in:
Vitor Santos Costa
2018-01-05 16:57:38 +00:00
parent 814aa2bd4c
commit 9c862c21bc
271 changed files with 43711 additions and 6129 deletions

View File

@@ -0,0 +1,38 @@
"""Testing support (tools to test yap_ipython itself).
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2009-2011 The yap_ipython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Functions
#-----------------------------------------------------------------------------
# User-level entry point for testing
def test(**kwargs):
"""Run the entire yap_ipython test suite.
Any of the options for run_iptestall() may be passed as keyword arguments.
For example::
yap_ipython.test(testgroups=['lib', 'config', 'utils'], fast=2)
will run those three sections of the test suite, using two processes.
"""
# Do the import internally, so that this function doesn't increase total
# import time
from .iptestcontroller import run_iptestall, default_options
options = default_options()
for name, val in kwargs.items():
setattr(options, name, val)
run_iptestall(options)
# So nose doesn't try to run this as a test itself and we end up with an
# infinite test loop
test.__test__ = False

View File

@@ -0,0 +1,3 @@
if __name__ == '__main__':
from yap_ipython.testing import iptestcontroller
iptestcontroller.main()

View File

@@ -0,0 +1,376 @@
# -*- coding: utf-8 -*-
"""Decorators for labeling test objects.
Decorators that merely return a modified version of the original function
object are straightforward. Decorators that return a new function object need
to use nose.tools.make_decorator(original_function)(decorator) in returning the
decorator, in order to preserve metadata such as function name, setup and
teardown functions and so on - see nose.tools for more information.
This module provides a set of useful decorators meant to be ready to use in
your own tests. See the bottom of the file for the ready-made ones, and if you
find yourself writing a new one that may be of generic use, add it here.
Included decorators:
Lightweight testing that remains unittest-compatible.
- An @as_unittest decorator can be used to tag any normal parameter-less
function as a unittest TestCase. Then, both nose and normal unittest will
recognize it as such. This will make it easier to migrate away from Nose if
we ever need/want to while maintaining very lightweight tests.
NOTE: This file contains yap_ipython-specific decorators. Using the machinery in
yap_ipython.external.decorators, we import either numpy.testing.decorators if numpy is
available, OR use equivalent code in yap_ipython.external._decorators, which
we've copied verbatim from numpy.
"""
# Copyright (c) yap_ipython Development Team.
# Distributed under the terms of the Modified BSD License.
import os
import shutil
import sys
import tempfile
import unittest
import warnings
from importlib import import_module
from decorator import decorator
# Expose the unittest-driven decorators
from .ipunittest import ipdoctest, ipdocstring
# Grab the numpy-specific decorators which we keep in a file that we
# occasionally update from upstream: decorators.py is a copy of
# numpy.testing.decorators, we expose all of it here.
from yap_ipython.external.decorators import *
#-----------------------------------------------------------------------------
# Classes and functions
#-----------------------------------------------------------------------------
# Simple example of the basic idea
def as_unittest(func):
"""Decorator to make a simple function into a normal test via unittest."""
class Tester(unittest.TestCase):
def test(self):
func()
Tester.__name__ = func.__name__
return Tester
# Utility functions
def apply_wrapper(wrapper, func):
"""Apply a wrapper to a function for decoration.
This mixes Michele Simionato's decorator tool with nose's make_decorator,
to apply a wrapper in a decorator so that all nose attributes, as well as
function signature and other properties, survive the decoration cleanly.
This will ensure that wrapped functions can still be well introspected via
yap_ipython, for example.
"""
warnings.warn("The function `apply_wrapper` is deprecated since yap_ipython 4.0",
DeprecationWarning, stacklevel=2)
import nose.tools
return decorator(wrapper,nose.tools.make_decorator(func)(wrapper))
def make_label_dec(label, ds=None):
"""Factory function to create a decorator that applies one or more labels.
Parameters
----------
label : string or sequence
One or more labels that will be applied by the decorator to the functions
it decorates. Labels are attributes of the decorated function with their
value set to True.
ds : string
An optional docstring for the resulting decorator. If not given, a
default docstring is auto-generated.
Returns
-------
A decorator.
Examples
--------
A simple labeling decorator:
>>> slow = make_label_dec('slow')
>>> slow.__doc__
"Labels a test as 'slow'."
And one that uses multiple labels and a custom docstring:
>>> rare = make_label_dec(['slow','hard'],
... "Mix labels 'slow' and 'hard' for rare tests.")
>>> rare.__doc__
"Mix labels 'slow' and 'hard' for rare tests."
Now, let's test using this one:
>>> @rare
... def f(): pass
...
>>>
>>> f.slow
True
>>> f.hard
True
"""
warnings.warn("The function `make_label_dec` is deprecated since yap_ipython 4.0",
DeprecationWarning, stacklevel=2)
if isinstance(label, str):
labels = [label]
else:
labels = label
# Validate that the given label(s) are OK for use in setattr() by doing a
# dry run on a dummy function.
tmp = lambda : None
for label in labels:
setattr(tmp,label,True)
# This is the actual decorator we'll return
def decor(f):
for label in labels:
setattr(f,label,True)
return f
# Apply the user's docstring, or autogenerate a basic one
if ds is None:
ds = "Labels a test as %r." % label
decor.__doc__ = ds
return decor
# Inspired by numpy's skipif, but uses the full apply_wrapper utility to
# preserve function metadata better and allows the skip condition to be a
# callable.
def skipif(skip_condition, msg=None):
''' Make function raise SkipTest exception if skip_condition is true
Parameters
----------
skip_condition : bool or callable
Flag to determine whether to skip test. If the condition is a
callable, it is used at runtime to dynamically make the decision. This
is useful for tests that may require costly imports, to delay the cost
until the test suite is actually executed.
msg : string
Message to give on raising a SkipTest exception.
Returns
-------
decorator : function
Decorator, which, when applied to a function, causes SkipTest
to be raised when the skip_condition was True, and the function
to be called normally otherwise.
Notes
-----
You will see from the code that we had to further decorate the
decorator with the nose.tools.make_decorator function in order to
transmit function name, and various other metadata.
'''
def skip_decorator(f):
# Local import to avoid a hard nose dependency and only incur the
# import time overhead at actual test-time.
import nose
# Allow for both boolean or callable skip conditions.
if callable(skip_condition):
skip_val = skip_condition
else:
skip_val = lambda : skip_condition
def get_msg(func,msg=None):
"""Skip message with information about function being skipped."""
if msg is None: out = 'Test skipped due to test condition.'
else: out = msg
return "Skipping test: %s. %s" % (func.__name__,out)
# We need to define *two* skippers because Python doesn't allow both
# return with value and yield inside the same function.
def skipper_func(*args, **kwargs):
"""Skipper for normal test functions."""
if skip_val():
raise nose.SkipTest(get_msg(f,msg))
else:
return f(*args, **kwargs)
def skipper_gen(*args, **kwargs):
"""Skipper for test generators."""
if skip_val():
raise nose.SkipTest(get_msg(f,msg))
else:
for x in f(*args, **kwargs):
yield x
# Choose the right skipper to use when building the actual generator.
if nose.util.isgenerator(f):
skipper = skipper_gen
else:
skipper = skipper_func
return nose.tools.make_decorator(f)(skipper)
return skip_decorator
# A version with the condition set to true, common case just to attach a message
# to a skip decorator
def skip(msg=None):
"""Decorator factory - mark a test function for skipping from test suite.
Parameters
----------
msg : string
Optional message to be added.
Returns
-------
decorator : function
Decorator, which, when applied to a function, causes SkipTest
to be raised, with the optional message added.
"""
return skipif(True,msg)
def onlyif(condition, msg):
"""The reverse from skipif, see skipif for details."""
if callable(condition):
skip_condition = lambda : not condition()
else:
skip_condition = lambda : not condition
return skipif(skip_condition, msg)
#-----------------------------------------------------------------------------
# Utility functions for decorators
def module_not_available(module):
"""Can module be imported? Returns true if module does NOT import.
This is used to make a decorator to skip tests that require module to be
available, but delay the 'import numpy' to test execution time.
"""
try:
mod = import_module(module)
mod_not_avail = False
except ImportError:
mod_not_avail = True
return mod_not_avail
def decorated_dummy(dec, name):
"""Return a dummy function decorated with dec, with the given name.
Examples
--------
import yap_ipython.testing.decorators as dec
setup = dec.decorated_dummy(dec.skip_if_no_x11, __name__)
"""
warnings.warn("The function `decorated_dummy` is deprecated since yap_ipython 4.0",
DeprecationWarning, stacklevel=2)
dummy = lambda: None
dummy.__name__ = name
return dec(dummy)
#-----------------------------------------------------------------------------
# Decorators for public use
# Decorators to skip certain tests on specific platforms.
skip_win32 = skipif(sys.platform == 'win32',
"This test does not run under Windows")
skip_linux = skipif(sys.platform.startswith('linux'),
"This test does not run under Linux")
skip_osx = skipif(sys.platform == 'darwin',"This test does not run under OS X")
# Decorators to skip tests if not on specific platforms.
skip_if_not_win32 = skipif(sys.platform != 'win32',
"This test only runs under Windows")
skip_if_not_linux = skipif(not sys.platform.startswith('linux'),
"This test only runs under Linux")
skip_if_not_osx = skipif(sys.platform != 'darwin',
"This test only runs under OSX")
_x11_skip_cond = (sys.platform not in ('darwin', 'win32') and
os.environ.get('DISPLAY', '') == '')
_x11_skip_msg = "Skipped under *nix when X11/XOrg not available"
skip_if_no_x11 = skipif(_x11_skip_cond, _x11_skip_msg)
# not a decorator itself, returns a dummy function to be used as setup
def skip_file_no_x11(name):
warnings.warn("The function `skip_file_no_x11` is deprecated since yap_ipython 4.0",
DeprecationWarning, stacklevel=2)
return decorated_dummy(skip_if_no_x11, name) if _x11_skip_cond else None
# Other skip decorators
# generic skip without module
skip_without = lambda mod: skipif(module_not_available(mod), "This test requires %s" % mod)
skipif_not_numpy = skip_without('numpy')
skipif_not_matplotlib = skip_without('matplotlib')
skipif_not_sympy = skip_without('sympy')
skip_known_failure = knownfailureif(True,'This test is known to fail')
# A null 'decorator', useful to make more readable code that needs to pick
# between different decorators based on OS or other conditions
null_deco = lambda f: f
# Some tests only run where we can use unicode paths. Note that we can't just
# check os.path.supports_unicode_filenames, which is always False on Linux.
try:
f = tempfile.NamedTemporaryFile(prefix=u"tmp€")
except UnicodeEncodeError:
unicode_paths = False
else:
unicode_paths = True
f.close()
onlyif_unicode_paths = onlyif(unicode_paths, ("This test is only applicable "
"where we can use unicode in filenames."))
def onlyif_cmds_exist(*commands):
"""
Decorator to skip test when at least one of `commands` is not found.
"""
for cmd in commands:
if not shutil.which(cmd):
return skip("This test runs only if command '{0}' "
"is installed".format(cmd))
return null_deco
def onlyif_any_cmd_exists(*commands):
"""
Decorator to skip test unless at least one of `commands` is found.
"""
warnings.warn("The function `onlyif_any_cmd_exists` is deprecated since yap_ipython 4.0",
DeprecationWarning, stacklevel=2)
for cmd in commands:
if shutil.which(cmd):
return null_deco
return skip("This test runs only if one of the commands {0} "
"is installed".format(commands))

View File

@@ -0,0 +1,136 @@
"""Global yap_ipython app to support test running.
We must start our own ipython object and heavily muck with it so that all the
modifications yap_ipython makes to system behavior don't send the doctest machinery
into a fit. This code should be considered a gross hack, but it gets the job
done.
"""
# Copyright (c) yap_ipython Development Team.
# Distributed under the terms of the Modified BSD License.
import builtins as builtin_mod
import sys
import types
import warnings
from . import tools
from yap_ipython.core import page
from yap_ipython.utils import io
from yap_ipython.terminal.interactiveshell import TerminalInteractiveShell
class StreamProxy(io.IOStream):
"""Proxy for sys.stdout/err. This will request the stream *at call time*
allowing for nose's Capture plugin's redirection of sys.stdout/err.
Parameters
----------
name : str
The name of the stream. This will be requested anew at every call
"""
def __init__(self, name):
warnings.warn("StreamProxy is deprecated and unused as of yap_ipython 5", DeprecationWarning,
stacklevel=2,
)
self.name=name
@property
def stream(self):
return getattr(sys, self.name)
def flush(self):
self.stream.flush()
def get_ipython():
# This will get replaced by the real thing once we start yap_ipython below
return start_ipython()
# A couple of methods to override those in the running yap_ipython to interact
# better with doctest (doctest captures on raw stdout, so we need to direct
# various types of output there otherwise it will miss them).
def xsys(self, cmd):
"""Replace the default system call with a capturing one for doctest.
"""
# We use getoutput, but we need to strip it because pexpect captures
# the trailing newline differently from commands.getoutput
print(self.getoutput(cmd, split=False, depth=1).rstrip(), end='', file=sys.stdout)
sys.stdout.flush()
def _showtraceback(self, etype, evalue, stb):
"""Print the traceback purely on stdout for doctest to capture it.
"""
print(self.InteractiveTB.stb2text(stb), file=sys.stdout)
def start_ipython():
"""Start a global yap_ipython shell, which we need for yap_ipython-specific syntax.
"""
global get_ipython
# This function should only ever run once!
if hasattr(start_ipython, 'already_called'):
return
start_ipython.already_called = True
# Store certain global objects that yap_ipython modifies
_displayhook = sys.displayhook
_excepthook = sys.excepthook
_main = sys.modules.get('__main__')
# Create custom argv and namespaces for our yap_ipython to be test-friendly
config = tools.default_config()
config.TerminalInteractiveShell.simple_prompt = True
# Create and initialize our test-friendly yap_ipython instance.
shell = TerminalInteractiveShell.instance(config=config,
)
# A few more tweaks needed for playing nicely with doctests...
# remove history file
shell.tempfiles.append(config.HistoryManager.hist_file)
# These traps are normally only active for interactive use, set them
# permanently since we'll be mocking interactive sessions.
shell.builtin_trap.activate()
# Modify the yap_ipython system call with one that uses getoutput, so that we
# can capture subcommands and print them to Python's stdout, otherwise the
# doctest machinery would miss them.
shell.system = types.MethodType(xsys, shell)
shell._showtraceback = types.MethodType(_showtraceback, shell)
# yap_ipython is ready, now clean up some global state...
# Deactivate the various python system hooks added by ipython for
# interactive convenience so we don't confuse the doctest system
sys.modules['__main__'] = _main
sys.displayhook = _displayhook
sys.excepthook = _excepthook
# So that ipython magics and aliases can be doctested (they work by making
# a call into a global _ip object). Also make the top-level get_ipython
# now return this without recursively calling here again.
_ip = shell
get_ipython = _ip.get_ipython
builtin_mod._ip = _ip
builtin_mod.get_ipython = get_ipython
# Override paging, so we don't require user interaction during the tests.
def nopage(strng, start=0, screen_lines=0, pager_cmd=None):
if isinstance(strng, dict):
strng = strng.get('text/plain', '')
print(strng)
page.orig_page = page.pager_page
page.pager_page = nopage
return _ip

View File

@@ -0,0 +1,454 @@
# -*- coding: utf-8 -*-
"""yap_ipython Test Suite Runner.
This module provides a main entry point to a user script to test yap_ipython
itself from the command line. There are two ways of running this script:
1. With the syntax `iptest all`. This runs our entire test suite by
calling this script (with different arguments) recursively. This
causes modules and package to be tested in different processes, using nose
or trial where appropriate.
2. With the regular nose syntax, like `iptest -vvs yap_ipython`. In this form
the script simply calls nose, but with special command line flags and
plugins loaded.
"""
# Copyright (c) yap_ipython Development Team.
# Distributed under the terms of the Modified BSD License.
import glob
from io import BytesIO
import os
import os.path as path
import sys
from threading import Thread, Lock, Event
import warnings
import nose.plugins.builtin
from nose.plugins.xunit import Xunit
from nose import SkipTest
from nose.core import TestProgram
from nose.plugins import Plugin
from nose.util import safe_str
from yap_ipython import version_info
from yap_ipython.utils.py3compat import decode
from yap_ipython.utils.importstring import import_item
from yap_ipython.testing.plugin.ipdoctest import IPythonDoctest
from yap_ipython.external.decorators import KnownFailure, knownfailureif
pjoin = path.join
# Enable printing all warnings raise by yap_ipython's modules
warnings.filterwarnings('ignore', message='.*Matplotlib is building the font cache.*', category=UserWarning, module='.*')
warnings.filterwarnings('error', message='.*', category=ResourceWarning, module='.*')
warnings.filterwarnings('error', message=".*{'config': True}.*", category=DeprecationWarning, module='IPy.*')
warnings.filterwarnings('default', message='.*', category=Warning, module='IPy.*')
warnings.filterwarnings('error', message='.*apply_wrapper.*', category=DeprecationWarning, module='.*')
warnings.filterwarnings('error', message='.*make_label_dec', category=DeprecationWarning, module='.*')
warnings.filterwarnings('error', message='.*decorated_dummy.*', category=DeprecationWarning, module='.*')
warnings.filterwarnings('error', message='.*skip_file_no_x11.*', category=DeprecationWarning, module='.*')
warnings.filterwarnings('error', message='.*onlyif_any_cmd_exists.*', category=DeprecationWarning, module='.*')
warnings.filterwarnings('error', message='.*disable_gui.*', category=DeprecationWarning, module='.*')
warnings.filterwarnings('error', message='.*ExceptionColors global is deprecated.*', category=DeprecationWarning, module='.*')
# Jedi older versions
warnings.filterwarnings(
'error', message='.*elementwise != comparison failed and.*', category=FutureWarning, module='.*')
if version_info < (6,):
# nose.tools renames all things from `camelCase` to `snake_case` which raise an
# warning with the runner they also import from standard import library. (as of Dec 2015)
# Ignore, let's revisit that in a couple of years for yap_ipython 6.
warnings.filterwarnings(
'ignore', message='.*Please use assertEqual instead', category=Warning, module='yap_ipython.*')
if version_info < (7,):
warnings.filterwarnings('ignore', message='.*Completer.complete.*',
category=PendingDeprecationWarning, module='.*')
else:
warnings.warn(
'Completer.complete was pending deprecation and should be changed to Deprecated', FutureWarning)
# ------------------------------------------------------------------------------
# Monkeypatch Xunit to count known failures as skipped.
# ------------------------------------------------------------------------------
def monkeypatch_xunit():
try:
knownfailureif(True)(lambda: None)()
except Exception as e:
KnownFailureTest = type(e)
def addError(self, test, err, capt=None):
if issubclass(err[0], KnownFailureTest):
err = (SkipTest,) + err[1:]
return self.orig_addError(test, err, capt)
Xunit.orig_addError = Xunit.addError
Xunit.addError = addError
#-----------------------------------------------------------------------------
# Check which dependencies are installed and greater than minimum version.
#-----------------------------------------------------------------------------
def extract_version(mod):
return mod.__version__
def test_for(item, min_version=None, callback=extract_version):
"""Test to see if item is importable, and optionally check against a minimum
version.
If min_version is given, the default behavior is to check against the
`__version__` attribute of the item, but specifying `callback` allows you to
extract the value you are interested in. e.g::
In [1]: import sys
In [2]: from yap_ipython.testing.iptest import test_for
In [3]: test_for('sys', (2,6), callback=lambda sys: sys.version_info)
Out[3]: True
"""
try:
check = import_item(item)
except (ImportError, RuntimeError):
# GTK reports Runtime error if it can't be initialized even if it's
# importable.
return False
else:
if min_version:
if callback:
# extra processing step to get version to compare
check = callback(check)
return check >= min_version
else:
return True
# Global dict where we can store information on what we have and what we don't
# have available at test run time
have = {'matplotlib': test_for('matplotlib'),
'pygments': test_for('pygments'),
'sqlite3': test_for('sqlite3')}
#-----------------------------------------------------------------------------
# Test suite definitions
#-----------------------------------------------------------------------------
test_group_names = ['core',
'extensions', 'lib', 'terminal', 'testing', 'utils',
]
class TestSection(object):
def __init__(self, name, includes):
self.name = name
self.includes = includes
self.excludes = []
self.dependencies = []
self.enabled = True
def exclude(self, module):
if not module.startswith('yap_ipython'):
module = self.includes[0] + "." + module
self.excludes.append(module.replace('.', os.sep))
def requires(self, *packages):
self.dependencies.extend(packages)
@property
def will_run(self):
return self.enabled and all(have[p] for p in self.dependencies)
# Name -> (include, exclude, dependencies_met)
test_sections = {n:TestSection(n, ['yap_ipython.%s' % n]) for n in test_group_names}
# Exclusions and dependencies
# ---------------------------
# core:
sec = test_sections['core']
if not have['sqlite3']:
sec.exclude('tests.test_history')
sec.exclude('history')
if not have['matplotlib']:
sec.exclude('pylabtools'),
sec.exclude('tests.test_pylabtools')
# lib:
sec = test_sections['lib']
sec.exclude('kernel')
if not have['pygments']:
sec.exclude('tests.test_lexers')
# We do this unconditionally, so that the test suite doesn't import
# gtk, changing the default encoding and masking some unicode bugs.
sec.exclude('inputhookgtk')
# We also do this unconditionally, because wx can interfere with Unix signals.
# There are currently no tests for it anyway.
sec.exclude('inputhookwx')
# Testing inputhook will need a lot of thought, to figure out
# how to have tests that don't lock up with the gui event
# loops in the picture
sec.exclude('inputhook')
# testing:
sec = test_sections['testing']
# These have to be skipped on win32 because they use echo, rm, cd, etc.
# See ticket https://github.com/ipython/ipython/issues/87
if sys.platform == 'win32':
sec.exclude('plugin.test_exampleip')
sec.exclude('plugin.dtexample')
# don't run jupyter_console tests found via shim
test_sections['terminal'].exclude('console')
# extensions:
sec = test_sections['extensions']
# This is deprecated in favour of rpy2
sec.exclude('rmagic')
# autoreload does some strange stuff, so move it to its own test section
sec.exclude('autoreload')
sec.exclude('tests.test_autoreload')
test_sections['autoreload'] = TestSection('autoreload',
['yap_ipython.extensions.autoreload', 'yap_ipython.extensions.tests.test_autoreload'])
test_group_names.append('autoreload')
#-----------------------------------------------------------------------------
# Functions and classes
#-----------------------------------------------------------------------------
def check_exclusions_exist():
from yap_ipython.paths import get_ipython_package_dir
from warnings import warn
parent = os.path.dirname(get_ipython_package_dir())
for sec in test_sections:
for pattern in sec.exclusions:
fullpath = pjoin(parent, pattern)
if not os.path.exists(fullpath) and not glob.glob(fullpath + '.*'):
warn("Excluding nonexistent file: %r" % pattern)
class ExclusionPlugin(Plugin):
"""A nose plugin to effect our exclusions of files and directories.
"""
name = 'exclusions'
score = 3000 # Should come before any other plugins
def __init__(self, exclude_patterns=None):
"""
Parameters
----------
exclude_patterns : sequence of strings, optional
Filenames containing these patterns (as raw strings, not as regular
expressions) are excluded from the tests.
"""
self.exclude_patterns = exclude_patterns or []
super(ExclusionPlugin, self).__init__()
def options(self, parser, env=os.environ):
Plugin.options(self, parser, env)
def configure(self, options, config):
Plugin.configure(self, options, config)
# Override nose trying to disable plugin.
self.enabled = True
def wantFile(self, filename):
"""Return whether the given filename should be scanned for tests.
"""
if any(pat in filename for pat in self.exclude_patterns):
return False
return None
def wantDirectory(self, directory):
"""Return whether the given directory should be scanned for tests.
"""
if any(pat in directory for pat in self.exclude_patterns):
return False
return None
class StreamCapturer(Thread):
daemon = True # Don't hang if main thread crashes
started = False
def __init__(self, echo=False):
super(StreamCapturer, self).__init__()
self.echo = echo
self.streams = []
self.buffer = BytesIO()
self.readfd, self.writefd = os.pipe()
self.buffer_lock = Lock()
self.stop = Event()
def run(self):
self.started = True
while not self.stop.is_set():
chunk = os.read(self.readfd, 1024)
with self.buffer_lock:
self.buffer.write(chunk)
if self.echo:
sys.stdout.write(decode(chunk))
os.close(self.readfd)
os.close(self.writefd)
def reset_buffer(self):
with self.buffer_lock:
self.buffer.truncate(0)
self.buffer.seek(0)
def get_buffer(self):
with self.buffer_lock:
return self.buffer.getvalue()
def ensure_started(self):
if not self.started:
self.start()
def halt(self):
"""Safely stop the thread."""
if not self.started:
return
self.stop.set()
os.write(self.writefd, b'\0') # Ensure we're not locked in a read()
self.join()
class SubprocessStreamCapturePlugin(Plugin):
name='subprocstreams'
def __init__(self):
Plugin.__init__(self)
self.stream_capturer = StreamCapturer()
self.destination = os.environ.get('IPTEST_SUBPROC_STREAMS', 'capture')
# This is ugly, but distant parts of the test machinery need to be able
# to redirect streams, so we make the object globally accessible.
nose.iptest_stdstreams_fileno = self.get_write_fileno
def get_write_fileno(self):
if self.destination == 'capture':
self.stream_capturer.ensure_started()
return self.stream_capturer.writefd
elif self.destination == 'discard':
return os.open(os.devnull, os.O_WRONLY)
else:
return sys.__stdout__.fileno()
def configure(self, options, config):
Plugin.configure(self, options, config)
# Override nose trying to disable plugin.
if self.destination == 'capture':
self.enabled = True
def startTest(self, test):
# Reset log capture
self.stream_capturer.reset_buffer()
def formatFailure(self, test, err):
# Show output
ec, ev, tb = err
captured = self.stream_capturer.get_buffer().decode('utf-8', 'replace')
if captured.strip():
ev = safe_str(ev)
out = [ev, '>> begin captured subprocess output <<',
captured,
'>> end captured subprocess output <<']
return ec, '\n'.join(out), tb
return err
formatError = formatFailure
def finalize(self, result):
self.stream_capturer.halt()
def run_iptest():
"""Run the yap_ipython test suite using nose.
This function is called when this script is **not** called with the form
`iptest all`. It simply calls nose with appropriate command line flags
and accepts all of the standard nose arguments.
"""
# Apply our monkeypatch to Xunit
if '--with-xunit' in sys.argv and not hasattr(Xunit, 'orig_addError'):
monkeypatch_xunit()
arg1 = sys.argv[1]
if arg1 in test_sections:
section = test_sections[arg1]
sys.argv[1:2] = section.includes
elif arg1.startswith('yap_ipython.') and arg1[8:] in test_sections:
section = test_sections[arg1[8:]]
sys.argv[1:2] = section.includes
else:
section = TestSection(arg1, includes=[arg1])
argv = sys.argv + [ '--detailed-errors', # extra info in tracebacks
# We add --exe because of setuptools' imbecility (it
# blindly does chmod +x on ALL files). Nose does the
# right thing and it tries to avoid executables,
# setuptools unfortunately forces our hand here. This
# has been discussed on the distutils list and the
# setuptools devs refuse to fix this problem!
'--exe',
]
if '-a' not in argv and '-A' not in argv:
argv = argv + ['-a', '!crash']
if nose.__version__ >= '0.11':
# I don't fully understand why we need this one, but depending on what
# directory the test suite is run from, if we don't give it, 0 tests
# get run. Specifically, if the test suite is run from the source dir
# with an argument (like 'iptest.py yap_ipython.core', 0 tests are run,
# even if the same call done in this directory works fine). It appears
# that if the requested package is in the current dir, nose bails early
# by default. Since it's otherwise harmless, leave it in by default
# for nose >= 0.11, though unfortunately nose 0.10 doesn't support it.
argv.append('--traverse-namespace')
plugins = [ ExclusionPlugin(section.excludes), KnownFailure(),
SubprocessStreamCapturePlugin() ]
# we still have some vestigial doctests in core
if (section.name.startswith(('core', 'yap_ipython.core', 'yap_ipython.utils'))):
plugins.append(IPythonDoctest())
argv.extend([
'--with-ipdoctest',
'--ipdoctest-tests',
'--ipdoctest-extension=txt',
])
# Use working directory set by parent process (see iptestcontroller)
if 'IPTEST_WORKING_DIR' in os.environ:
os.chdir(os.environ['IPTEST_WORKING_DIR'])
# We need a global ipython running in this process, but the special
# in-process group spawns its own yap_ipython kernels, so for *that* group we
# must avoid also opening the global one (otherwise there's a conflict of
# singletons). Ultimately the solution to this problem is to refactor our
# assumptions about what needs to be a singleton and what doesn't (app
# objects should, individual shells shouldn't). But for now, this
# workaround allows the test suite for the inprocess module to complete.
if 'kernel.inprocess' not in section.name:
from yap_ipython.testing import globalipapp
globalipapp.start_ipython()
# Now nose can run
TestProgram(argv=argv, addplugins=plugins)
if __name__ == '__main__':
run_iptest()

View File

@@ -0,0 +1,510 @@
# -*- coding: utf-8 -*-
"""yap_ipython Test Process Controller
This module runs one or more subprocesses which will actually run the yap_ipython
test suite.
"""
# Copyright (c) yap_ipython Development Team.
# Distributed under the terms of the Modified BSD License.
import argparse
import multiprocessing.pool
import os
import stat
import shutil
import signal
import sys
import subprocess
import time
from .iptest import (
have, test_group_names as py_test_group_names, test_sections, StreamCapturer,
)
from yap_ipython.utils.path import compress_user
from yap_ipython.utils.py3compat import decode
from yap_ipython.utils.sysinfo import get_sys_info
from yap_ipython.utils.tempdir import TemporaryDirectory
def popen_wait(p, timeout):
return p.wait(timeout)
class TestController(object):
"""Run tests in a subprocess
"""
#: str, yap_ipython test suite to be executed.
section = None
#: list, command line arguments to be executed
cmd = None
#: dict, extra environment variables to set for the subprocess
env = None
#: list, TemporaryDirectory instances to clear up when the process finishes
dirs = None
#: subprocess.Popen instance
process = None
#: str, process stdout+stderr
stdout = None
def __init__(self):
self.cmd = []
self.env = {}
self.dirs = []
def setup(self):
"""Create temporary directories etc.
This is only called when we know the test group will be run. Things
created here may be cleaned up by self.cleanup().
"""
pass
def launch(self, buffer_output=False, capture_output=False):
# print('*** ENV:', self.env) # dbg
# print('*** CMD:', self.cmd) # dbg
env = os.environ.copy()
env.update(self.env)
if buffer_output:
capture_output = True
self.stdout_capturer = c = StreamCapturer(echo=not buffer_output)
c.start()
stdout = c.writefd if capture_output else None
stderr = subprocess.STDOUT if capture_output else None
self.process = subprocess.Popen(self.cmd, stdout=stdout,
stderr=stderr, env=env)
def wait(self):
self.process.wait()
self.stdout_capturer.halt()
self.stdout = self.stdout_capturer.get_buffer()
return self.process.returncode
def print_extra_info(self):
"""Print extra information about this test run.
If we're running in parallel and showing the concise view, this is only
called if the test group fails. Otherwise, it's called before the test
group is started.
The base implementation does nothing, but it can be overridden by
subclasses.
"""
return
def cleanup_process(self):
"""Cleanup on exit by killing any leftover processes."""
subp = self.process
if subp is None or (subp.poll() is not None):
return # Process doesn't exist, or is already dead.
try:
print('Cleaning up stale PID: %d' % subp.pid)
subp.kill()
except: # (OSError, WindowsError) ?
# This is just a best effort, if we fail or the process was
# really gone, ignore it.
pass
else:
for i in range(10):
if subp.poll() is None:
time.sleep(0.1)
else:
break
if subp.poll() is None:
# The process did not die...
print('... failed. Manual cleanup may be required.')
def cleanup(self):
"Kill process if it's still alive, and clean up temporary directories"
self.cleanup_process()
for td in self.dirs:
td.cleanup()
__del__ = cleanup
class PyTestController(TestController):
"""Run Python tests using yap_ipython.testing.iptest"""
#: str, Python command to execute in subprocess
pycmd = None
def __init__(self, section, options):
"""Create new test runner."""
TestController.__init__(self)
self.section = section
# pycmd is put into cmd[2] in PyTestController.launch()
self.cmd = [sys.executable, '-c', None, section]
self.pycmd = "from yap_ipython.testing.iptest import run_iptest; run_iptest()"
self.options = options
def setup(self):
ipydir = TemporaryDirectory()
self.dirs.append(ipydir)
self.env['IPYTHONDIR'] = ipydir.name
self.workingdir = workingdir = TemporaryDirectory()
self.dirs.append(workingdir)
self.env['IPTEST_WORKING_DIR'] = workingdir.name
# This means we won't get odd effects from our own matplotlib config
self.env['MPLCONFIGDIR'] = workingdir.name
# For security reasons (http://bugs.python.org/issue16202), use
# a temporary directory to which other users have no access.
self.env['TMPDIR'] = workingdir.name
# Add a non-accessible directory to PATH (see gh-7053)
noaccess = os.path.join(self.workingdir.name, "_no_access_")
self.noaccess = noaccess
os.mkdir(noaccess, 0)
PATH = os.environ.get('PATH', '')
if PATH:
PATH = noaccess + os.pathsep + PATH
else:
PATH = noaccess
self.env['PATH'] = PATH
# From options:
if self.options.xunit:
self.add_xunit()
if self.options.coverage:
self.add_coverage()
self.env['IPTEST_SUBPROC_STREAMS'] = self.options.subproc_streams
self.cmd.extend(self.options.extra_args)
def cleanup(self):
"""
Make the non-accessible directory created in setup() accessible
again, otherwise deleting the workingdir will fail.
"""
os.chmod(self.noaccess, stat.S_IRWXU)
TestController.cleanup(self)
@property
def will_run(self):
try:
return test_sections[self.section].will_run
except KeyError:
return True
def add_xunit(self):
xunit_file = os.path.abspath(self.section + '.xunit.xml')
self.cmd.extend(['--with-xunit', '--xunit-file', xunit_file])
def add_coverage(self):
try:
sources = test_sections[self.section].includes
except KeyError:
sources = ['yap_ipython']
coverage_rc = ("[run]\n"
"data_file = {data_file}\n"
"source =\n"
" {source}\n"
).format(data_file=os.path.abspath('.coverage.'+self.section),
source="\n ".join(sources))
config_file = os.path.join(self.workingdir.name, '.coveragerc')
with open(config_file, 'w') as f:
f.write(coverage_rc)
self.env['COVERAGE_PROCESS_START'] = config_file
self.pycmd = "import coverage; coverage.process_startup(); " + self.pycmd
def launch(self, buffer_output=False):
self.cmd[2] = self.pycmd
super(PyTestController, self).launch(buffer_output=buffer_output)
def prepare_controllers(options):
"""Returns two lists of TestController instances, those to run, and those
not to run."""
testgroups = options.testgroups
if not testgroups:
testgroups = py_test_group_names
controllers = [PyTestController(name, options) for name in testgroups]
to_run = [c for c in controllers if c.will_run]
not_run = [c for c in controllers if not c.will_run]
return to_run, not_run
def do_run(controller, buffer_output=True):
"""Setup and run a test controller.
If buffer_output is True, no output is displayed, to avoid it appearing
interleaved. In this case, the caller is responsible for displaying test
output on failure.
Returns
-------
controller : TestController
The same controller as passed in, as a convenience for using map() type
APIs.
exitcode : int
The exit code of the test subprocess. Non-zero indicates failure.
"""
try:
try:
controller.setup()
if not buffer_output:
controller.print_extra_info()
controller.launch(buffer_output=buffer_output)
except Exception:
import traceback
traceback.print_exc()
return controller, 1 # signal failure
exitcode = controller.wait()
return controller, exitcode
except KeyboardInterrupt:
return controller, -signal.SIGINT
finally:
controller.cleanup()
def report():
"""Return a string with a summary report of test-related variables."""
inf = get_sys_info()
out = []
def _add(name, value):
out.append((name, value))
_add('yap_ipython version', inf['ipython_version'])
_add('yap_ipython commit', "{} ({})".format(inf['commit_hash'], inf['commit_source']))
_add('yap_ipython package', compress_user(inf['ipython_path']))
_add('Python version', inf['sys_version'].replace('\n',''))
_add('sys.executable', compress_user(inf['sys_executable']))
_add('Platform', inf['platform'])
width = max(len(n) for (n,v) in out)
out = ["{:<{width}}: {}\n".format(n, v, width=width) for (n,v) in out]
avail = []
not_avail = []
for k, is_avail in have.items():
if is_avail:
avail.append(k)
else:
not_avail.append(k)
if avail:
out.append('\nTools and libraries available at test time:\n')
avail.sort()
out.append(' ' + ' '.join(avail)+'\n')
if not_avail:
out.append('\nTools and libraries NOT available at test time:\n')
not_avail.sort()
out.append(' ' + ' '.join(not_avail)+'\n')
return ''.join(out)
def run_iptestall(options):
"""Run the entire yap_ipython test suite by calling nose and trial.
This function constructs :class:`IPTester` instances for all yap_ipython
modules and package and then runs each of them. This causes the modules
and packages of yap_ipython to be tested each in their own subprocess using
nose.
Parameters
----------
All parameters are passed as attributes of the options object.
testgroups : list of str
Run only these sections of the test suite. If empty, run all the available
sections.
fast : int or None
Run the test suite in parallel, using n simultaneous processes. If None
is passed, one process is used per CPU core. Default 1 (i.e. sequential)
inc_slow : bool
Include slow tests. By default, these tests aren't run.
url : unicode
Address:port to use when running the JS tests.
xunit : bool
Produce Xunit XML output. This is written to multiple foo.xunit.xml files.
coverage : bool or str
Measure code coverage from tests. True will store the raw coverage data,
or pass 'html' or 'xml' to get reports.
extra_args : list
Extra arguments to pass to the test subprocesses, e.g. '-v'
"""
to_run, not_run = prepare_controllers(options)
def justify(ltext, rtext, width=70, fill='-'):
ltext += ' '
rtext = (' ' + rtext).rjust(width - len(ltext), fill)
return ltext + rtext
# Run all test runners, tracking execution time
failed = []
t_start = time.time()
print()
if options.fast == 1:
# This actually means sequential, i.e. with 1 job
for controller in to_run:
print('Test group:', controller.section)
sys.stdout.flush() # Show in correct order when output is piped
controller, res = do_run(controller, buffer_output=False)
if res:
failed.append(controller)
if res == -signal.SIGINT:
print("Interrupted")
break
print()
else:
# Run tests concurrently
try:
pool = multiprocessing.pool.ThreadPool(options.fast)
for (controller, res) in pool.imap_unordered(do_run, to_run):
res_string = 'OK' if res == 0 else 'FAILED'
print(justify('Test group: ' + controller.section, res_string))
if res:
controller.print_extra_info()
print(decode(controller.stdout))
failed.append(controller)
if res == -signal.SIGINT:
print("Interrupted")
break
except KeyboardInterrupt:
return
for controller in not_run:
print(justify('Test group: ' + controller.section, 'NOT RUN'))
t_end = time.time()
t_tests = t_end - t_start
nrunners = len(to_run)
nfail = len(failed)
# summarize results
print('_'*70)
print('Test suite completed for system with the following information:')
print(report())
took = "Took %.3fs." % t_tests
print('Status: ', end='')
if not failed:
print('OK (%d test groups).' % nrunners, took)
else:
# If anything went wrong, point out what command to rerun manually to
# see the actual errors and individual summary
failed_sections = [c.section for c in failed]
print('ERROR - {} out of {} test groups failed ({}).'.format(nfail,
nrunners, ', '.join(failed_sections)), took)
print()
print('You may wish to rerun these, with:')
print(' iptest', *failed_sections)
print()
if options.coverage:
from coverage import coverage, CoverageException
cov = coverage(data_file='.coverage')
cov.combine()
cov.save()
# Coverage HTML report
if options.coverage == 'html':
html_dir = 'ipy_htmlcov'
shutil.rmtree(html_dir, ignore_errors=True)
print("Writing HTML coverage report to %s/ ... " % html_dir, end="")
sys.stdout.flush()
# Custom HTML reporter to clean up module names.
from coverage.html import HtmlReporter
class CustomHtmlReporter(HtmlReporter):
def find_code_units(self, morfs):
super(CustomHtmlReporter, self).find_code_units(morfs)
for cu in self.code_units:
nameparts = cu.name.split(os.sep)
if 'yap_ipython' not in nameparts:
continue
ix = nameparts.index('yap_ipython')
cu.name = '.'.join(nameparts[ix:])
# Reimplement the html_report method with our custom reporter
cov.get_data()
cov.config.from_args(omit='*{0}tests{0}*'.format(os.sep), html_dir=html_dir,
html_title='yap_ipython test coverage',
)
reporter = CustomHtmlReporter(cov, cov.config)
reporter.report(None)
print('done.')
# Coverage XML report
elif options.coverage == 'xml':
try:
cov.xml_report(outfile='ipy_coverage.xml')
except CoverageException as e:
print('Generating coverage report failed. Are you running javascript tests only?')
import traceback
traceback.print_exc()
if failed:
# Ensure that our exit code indicates failure
sys.exit(1)
argparser = argparse.ArgumentParser(description='Run yap_ipython test suite')
argparser.add_argument('testgroups', nargs='*',
help='Run specified groups of tests. If omitted, run '
'all tests.')
argparser.add_argument('--all', action='store_true',
help='Include slow tests not run by default.')
argparser.add_argument('--url', help="URL to use for the JS tests.")
argparser.add_argument('-j', '--fast', nargs='?', const=None, default=1, type=int,
help='Run test sections in parallel. This starts as many '
'processes as you have cores, or you can specify a number.')
argparser.add_argument('--xunit', action='store_true',
help='Produce Xunit XML results')
argparser.add_argument('--coverage', nargs='?', const=True, default=False,
help="Measure test coverage. Specify 'html' or "
"'xml' to get reports.")
argparser.add_argument('--subproc-streams', default='capture',
help="What to do with stdout/stderr from subprocesses. "
"'capture' (default), 'show' and 'discard' are the options.")
def default_options():
"""Get an argparse Namespace object with the default arguments, to pass to
:func:`run_iptestall`.
"""
options = argparser.parse_args([])
options.extra_args = []
return options
def main():
# iptest doesn't work correctly if the working directory is the
# root of the yap_ipython source tree. Tell the user to avoid
# frustration.
if os.path.exists(os.path.join(os.getcwd(),
'yap_ipython', 'testing', '__main__.py')):
print("Don't run iptest from the yap_ipython source directory",
file=sys.stderr)
sys.exit(1)
# Arguments after -- should be passed through to nose. Argparse treats
# everything after -- as regular positional arguments, so we separate them
# first.
try:
ix = sys.argv.index('--')
except ValueError:
to_parse = sys.argv[1:]
extra_args = []
else:
to_parse = sys.argv[1:ix]
extra_args = sys.argv[ix+1:]
options = argparser.parse_args(to_parse)
options.extra_args = extra_args
run_iptestall(options)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,176 @@
"""Experimental code for cleaner support of yap_ipython syntax with unittest.
In yap_ipython up until 0.10, we've used very hacked up nose machinery for running
tests with yap_ipython special syntax, and this has proved to be extremely slow.
This module provides decorators to try a different approach, stemming from a
conversation Brian and I (FP) had about this problem Sept/09.
The goal is to be able to easily write simple functions that can be seen by
unittest as tests, and ultimately for these to support doctests with full
yap_ipython syntax. Nose already offers this based on naming conventions and our
hackish plugins, but we are seeking to move away from nose dependencies if
possible.
This module follows a different approach, based on decorators.
- A decorator called @ipdoctest can mark any function as having a docstring
that should be viewed as a doctest, but after syntax conversion.
Authors
-------
- Fernando Perez <Fernando.Perez@berkeley.edu>
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2009-2011 The yap_ipython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Stdlib
import re
import unittest
from doctest import DocTestFinder, DocTestRunner, TestResults
#-----------------------------------------------------------------------------
# Classes and functions
#-----------------------------------------------------------------------------
def count_failures(runner):
"""Count number of failures in a doctest runner.
Code modeled after the summarize() method in doctest.
"""
return [TestResults(f, t) for f, t in runner._name2ft.values() if f > 0 ]
class IPython2PythonConverter(object):
"""Convert yap_ipython 'syntax' to valid Python.
Eventually this code may grow to be the full yap_ipython syntax conversion
implementation, but for now it only does prompt conversion."""
def __init__(self):
self.rps1 = re.compile(r'In\ \[\d+\]: ')
self.rps2 = re.compile(r'\ \ \ \.\.\.+: ')
self.rout = re.compile(r'Out\[\d+\]: \s*?\n?')
self.pyps1 = '>>> '
self.pyps2 = '... '
self.rpyps1 = re.compile ('(\s*%s)(.*)$' % self.pyps1)
self.rpyps2 = re.compile ('(\s*%s)(.*)$' % self.pyps2)
def __call__(self, ds):
"""Convert yap_ipython prompts to python ones in a string."""
from . import globalipapp
pyps1 = '>>> '
pyps2 = '... '
pyout = ''
dnew = ds
dnew = self.rps1.sub(pyps1, dnew)
dnew = self.rps2.sub(pyps2, dnew)
dnew = self.rout.sub(pyout, dnew)
ip = globalipapp.get_ipython()
# Convert input yap_ipython source into valid Python.
out = []
newline = out.append
for line in dnew.splitlines():
mps1 = self.rpyps1.match(line)
if mps1 is not None:
prompt, text = mps1.groups()
newline(prompt+ip.prefilter(text, False))
continue
mps2 = self.rpyps2.match(line)
if mps2 is not None:
prompt, text = mps2.groups()
newline(prompt+ip.prefilter(text, True))
continue
newline(line)
newline('') # ensure a closing newline, needed by doctest
#print "PYSRC:", '\n'.join(out) # dbg
return '\n'.join(out)
#return dnew
class Doc2UnitTester(object):
"""Class whose instances act as a decorator for docstring testing.
In practice we're only likely to need one instance ever, made below (though
no attempt is made at turning it into a singleton, there is no need for
that).
"""
def __init__(self, verbose=False):
"""New decorator.
Parameters
----------
verbose : boolean, optional (False)
Passed to the doctest finder and runner to control verbosity.
"""
self.verbose = verbose
# We can reuse the same finder for all instances
self.finder = DocTestFinder(verbose=verbose, recurse=False)
def __call__(self, func):
"""Use as a decorator: doctest a function's docstring as a unittest.
This version runs normal doctests, but the idea is to make it later run
ipython syntax instead."""
# Capture the enclosing instance with a different name, so the new
# class below can see it without confusion regarding its own 'self'
# that will point to the test instance at runtime
d2u = self
# Rewrite the function's docstring to have python syntax
if func.__doc__ is not None:
func.__doc__ = ip2py(func.__doc__)
# Now, create a tester object that is a real unittest instance, so
# normal unittest machinery (or Nose, or Trial) can find it.
class Tester(unittest.TestCase):
def test(self):
# Make a new runner per function to be tested
runner = DocTestRunner(verbose=d2u.verbose)
map(runner.run, d2u.finder.find(func, func.__name__))
failed = count_failures(runner)
if failed:
# Since we only looked at a single function's docstring,
# failed should contain at most one item. More than that
# is a case we can't handle and should error out on
if len(failed) > 1:
err = "Invalid number of test results:" % failed
raise ValueError(err)
# Report a normal failure.
self.fail('failed doctests: %s' % str(failed[0]))
# Rename it so test reports have the original signature.
Tester.__name__ = func.__name__
return Tester
def ipdocstring(func):
"""Change the function docstring via ip2py.
"""
if func.__doc__ is not None:
func.__doc__ = ip2py(func.__doc__)
return func
# Make an instance of the classes for public use
ipdoctest = Doc2UnitTester()
ip2py = IPython2PythonConverter()

View File

@@ -0,0 +1,19 @@
"""Decorators marks that a doctest should be skipped.
The yap_ipython.testing.decorators module triggers various extra imports, including
numpy and sympy if they're present. Since this decorator is used in core parts
of yap_ipython, it's in a separate module so that running yap_ipython doesn't trigger
those imports."""
# Copyright (C) yap_ipython Development Team
# Distributed under the terms of the Modified BSD License.
def skip_doctest(f):
"""Decorator - mark a function or method for skipping its doctest.
This decorator allows you to mark a function whose docstring you wish to
omit from testing, while preserving the docstring for introspection, help,
etc."""
f.skip_doctest = True
return f

View File

@@ -0,0 +1,467 @@
"""Generic testing tools.
Authors
-------
- Fernando Perez <Fernando.Perez@berkeley.edu>
"""
# Copyright (c) yap_ipython Development Team.
# Distributed under the terms of the Modified BSD License.
import os
import re
import sys
import tempfile
from contextlib import contextmanager
from io import StringIO
from subprocess import Popen, PIPE
from unittest.mock import patch
try:
# These tools are used by parts of the runtime, so we make the nose
# dependency optional at this point. Nose is a hard dependency to run the
# test suite, but NOT to use ipython itself.
import nose.tools as nt
has_nose = True
except ImportError:
has_nose = False
from traitlets.config.loader import Config
from yap_ipython.utils.process import get_output_error_code
from yap_ipython.utils.text import list_strings
from yap_ipython.utils.io import temp_pyfile, Tee
from yap_ipython.utils import py3compat
from . import decorators as dec
from . import skipdoctest
# The docstring for full_path doctests differently on win32 (different path
# separator) so just skip the doctest there. The example remains informative.
doctest_deco = skipdoctest.skip_doctest if sys.platform == 'win32' else dec.null_deco
@doctest_deco
def full_path(startPath,files):
"""Make full paths for all the listed files, based on startPath.
Only the base part of startPath is kept, since this routine is typically
used with a script's ``__file__`` variable as startPath. The base of startPath
is then prepended to all the listed files, forming the output list.
Parameters
----------
startPath : string
Initial path to use as the base for the results. This path is split
using os.path.split() and only its first component is kept.
files : string or list
One or more files.
Examples
--------
>>> full_path('/foo/bar.py',['a.txt','b.txt'])
['/foo/a.txt', '/foo/b.txt']
>>> full_path('/foo',['a.txt','b.txt'])
['/a.txt', '/b.txt']
If a single file is given, the output is still a list::
>>> full_path('/foo','a.txt')
['/a.txt']
"""
files = list_strings(files)
base = os.path.split(startPath)[0]
return [ os.path.join(base,f) for f in files ]
def parse_test_output(txt):
"""Parse the output of a test run and return errors, failures.
Parameters
----------
txt : str
Text output of a test run, assumed to contain a line of one of the
following forms::
'FAILED (errors=1)'
'FAILED (failures=1)'
'FAILED (errors=1, failures=1)'
Returns
-------
nerr, nfail
number of errors and failures.
"""
err_m = re.search(r'^FAILED \(errors=(\d+)\)', txt, re.MULTILINE)
if err_m:
nerr = int(err_m.group(1))
nfail = 0
return nerr, nfail
fail_m = re.search(r'^FAILED \(failures=(\d+)\)', txt, re.MULTILINE)
if fail_m:
nerr = 0
nfail = int(fail_m.group(1))
return nerr, nfail
both_m = re.search(r'^FAILED \(errors=(\d+), failures=(\d+)\)', txt,
re.MULTILINE)
if both_m:
nerr = int(both_m.group(1))
nfail = int(both_m.group(2))
return nerr, nfail
# If the input didn't match any of these forms, assume no error/failures
return 0, 0
# So nose doesn't think this is a test
parse_test_output.__test__ = False
def default_argv():
"""Return a valid default argv for creating testing instances of ipython"""
return ['--quick', # so no config file is loaded
# Other defaults to minimize side effects on stdout
'--colors=NoColor', '--no-term-title','--no-banner',
'--autocall=0']
def default_config():
"""Return a config object with good defaults for testing."""
config = Config()
config.TerminalInteractiveShell.colors = 'NoColor'
config.TerminalTerminalInteractiveShell.term_title = False,
config.TerminalInteractiveShell.autocall = 0
f = tempfile.NamedTemporaryFile(suffix=u'test_hist.sqlite', delete=False)
config.HistoryManager.hist_file = f.name
f.close()
config.HistoryManager.db_cache_size = 10000
return config
def get_ipython_cmd(as_string=False):
"""
Return appropriate yap_ipython command line name. By default, this will return
a list that can be used with subprocess.Popen, for example, but passing
`as_string=True` allows for returning the yap_ipython command as a string.
Parameters
----------
as_string: bool
Flag to allow to return the command as a string.
"""
ipython_cmd = [sys.executable, "-m", "yap_ipython"]
if as_string:
ipython_cmd = " ".join(ipython_cmd)
return ipython_cmd
def ipexec(fname, options=None, commands=()):
"""Utility to call 'ipython filename'.
Starts yap_ipython with a minimal and safe configuration to make startup as fast
as possible.
Note that this starts yap_ipython in a subprocess!
Parameters
----------
fname : str
Name of file to be executed (should have .py or .ipy extension).
options : optional, list
Extra command-line flags to be passed to yap_ipython.
commands : optional, list
Commands to send in on stdin
Returns
-------
``(stdout, stderr)`` of ipython subprocess.
"""
if options is None: options = []
cmdargs = default_argv() + options
test_dir = os.path.dirname(__file__)
ipython_cmd = get_ipython_cmd()
# Absolute path for filename
full_fname = os.path.join(test_dir, fname)
full_cmd = ipython_cmd + cmdargs + [full_fname]
env = os.environ.copy()
# FIXME: ignore all warnings in ipexec while we have shims
# should we keep suppressing warnings here, even after removing shims?
env['PYTHONWARNINGS'] = 'ignore'
# env.pop('PYTHONWARNINGS', None) # Avoid extraneous warnings appearing on stderr
for k, v in env.items():
# Debug a bizarre failure we've seen on Windows:
# TypeError: environment can only contain strings
if not isinstance(v, str):
print(k, v)
p = Popen(full_cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, env=env)
out, err = p.communicate(input=py3compat.encode('\n'.join(commands)) or None)
out, err = py3compat.decode(out), py3compat.decode(err)
# `import readline` causes 'ESC[?1034h' to be output sometimes,
# so strip that out before doing comparisons
if out:
out = re.sub(r'\x1b\[[^h]+h', '', out)
return out, err
def ipexec_validate(fname, expected_out, expected_err='',
options=None, commands=()):
"""Utility to call 'ipython filename' and validate output/error.
This function raises an AssertionError if the validation fails.
Note that this starts yap_ipython in a subprocess!
Parameters
----------
fname : str
Name of the file to be executed (should have .py or .ipy extension).
expected_out : str
Expected stdout of the process.
expected_err : optional, str
Expected stderr of the process.
options : optional, list
Extra command-line flags to be passed to yap_ipython.
Returns
-------
None
"""
import nose.tools as nt
out, err = ipexec(fname, options, commands)
#print 'OUT', out # dbg
#print 'ERR', err # dbg
# If there are any errors, we must check those befor stdout, as they may be
# more informative than simply having an empty stdout.
if err:
if expected_err:
nt.assert_equal("\n".join(err.strip().splitlines()), "\n".join(expected_err.strip().splitlines()))
else:
raise ValueError('Running file %r produced error: %r' %
(fname, err))
# If no errors or output on stderr was expected, match stdout
nt.assert_equal("\n".join(out.strip().splitlines()), "\n".join(expected_out.strip().splitlines()))
class TempFileMixin(object):
"""Utility class to create temporary Python/yap_ipython files.
Meant as a mixin class for test cases."""
def mktmp(self, src, ext='.py'):
"""Make a valid python temp file."""
fname, f = temp_pyfile(src, ext)
self.tmpfile = f
self.fname = fname
def tearDown(self):
if hasattr(self, 'tmpfile'):
# If the tmpfile wasn't made because of skipped tests, like in
# win32, there's nothing to cleanup.
self.tmpfile.close()
try:
os.unlink(self.fname)
except:
# On Windows, even though we close the file, we still can't
# delete it. I have no clue why
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.tearDown()
pair_fail_msg = ("Testing {0}\n\n"
"In:\n"
" {1!r}\n"
"Expected:\n"
" {2!r}\n"
"Got:\n"
" {3!r}\n")
def check_pairs(func, pairs):
"""Utility function for the common case of checking a function with a
sequence of input/output pairs.
Parameters
----------
func : callable
The function to be tested. Should accept a single argument.
pairs : iterable
A list of (input, expected_output) tuples.
Returns
-------
None. Raises an AssertionError if any output does not match the expected
value.
"""
name = getattr(func, "func_name", getattr(func, "__name__", "<unknown>"))
for inp, expected in pairs:
out = func(inp)
assert out == expected, pair_fail_msg.format(name, inp, expected, out)
MyStringIO = StringIO
_re_type = type(re.compile(r''))
notprinted_msg = """Did not find {0!r} in printed output (on {1}):
-------
{2!s}
-------
"""
class AssertPrints(object):
"""Context manager for testing that code prints certain text.
Examples
--------
>>> with AssertPrints("abc", suppress=False):
... print("abcd")
... print("def")
...
abcd
def
"""
def __init__(self, s, channel='stdout', suppress=True):
self.s = s
if isinstance(self.s, (str, _re_type)):
self.s = [self.s]
self.channel = channel
self.suppress = suppress
def __enter__(self):
self.orig_stream = getattr(sys, self.channel)
self.buffer = MyStringIO()
self.tee = Tee(self.buffer, channel=self.channel)
setattr(sys, self.channel, self.buffer if self.suppress else self.tee)
def __exit__(self, etype, value, traceback):
try:
if value is not None:
# If an error was raised, don't check anything else
return False
self.tee.flush()
setattr(sys, self.channel, self.orig_stream)
printed = self.buffer.getvalue()
for s in self.s:
if isinstance(s, _re_type):
assert s.search(printed), notprinted_msg.format(s.pattern, self.channel, printed)
else:
assert s in printed, notprinted_msg.format(s, self.channel, printed)
return False
finally:
self.tee.close()
printed_msg = """Found {0!r} in printed output (on {1}):
-------
{2!s}
-------
"""
class AssertNotPrints(AssertPrints):
"""Context manager for checking that certain output *isn't* produced.
Counterpart of AssertPrints"""
def __exit__(self, etype, value, traceback):
try:
if value is not None:
# If an error was raised, don't check anything else
self.tee.close()
return False
self.tee.flush()
setattr(sys, self.channel, self.orig_stream)
printed = self.buffer.getvalue()
for s in self.s:
if isinstance(s, _re_type):
assert not s.search(printed),printed_msg.format(
s.pattern, self.channel, printed)
else:
assert s not in printed, printed_msg.format(
s, self.channel, printed)
return False
finally:
self.tee.close()
@contextmanager
def mute_warn():
from yap_ipython.utils import warn
save_warn = warn.warn
warn.warn = lambda *a, **kw: None
try:
yield
finally:
warn.warn = save_warn
@contextmanager
def make_tempfile(name):
""" Create an empty, named, temporary file for the duration of the context.
"""
f = open(name, 'w')
f.close()
try:
yield
finally:
os.unlink(name)
def fake_input(inputs):
"""Temporarily replace the input() function to return the given values
Use as a context manager:
with fake_input(['result1', 'result2']):
...
Values are returned in order. If input() is called again after the last value
was used, EOFError is raised.
"""
it = iter(inputs)
def mock_input(prompt=''):
try:
return next(it)
except StopIteration:
raise EOFError('No more inputs given')
return patch('builtins.input', mock_input)
def help_output_test(subcommand=''):
"""test that `ipython [subcommand] -h` works"""
cmd = get_ipython_cmd() + [subcommand, '-h']
out, err, rc = get_output_error_code(cmd)
nt.assert_equal(rc, 0, err)
nt.assert_not_in("Traceback", err)
nt.assert_in("Options", out)
nt.assert_in("--help-all", out)
return out, err
def help_all_output_test(subcommand=''):
"""test that `ipython [subcommand] --help-all` works"""
cmd = get_ipython_cmd() + [subcommand, '--help-all']
out, err, rc = get_output_error_code(cmd)
nt.assert_equal(rc, 0, err)
nt.assert_not_in("Traceback", err)
nt.assert_in("Options", out)
nt.assert_in("Class", out)
return out, err