blob: ea0a6c15f14bd264fbf13499286b991d7d6ca14f [file] [log] [blame]
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
import re
import sys
from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style
from .winterm import WinTerm, WinColor, WinStyle
from .win32 import windll
if windll is not None:
winterm = WinTerm()
def is_a_tty(stream):
return hasattr(stream, 'isatty') and stream.isatty()
class StreamWrapper(object):
'''
Wraps a stream (such as stdout), acting as a transparent proxy for all
attribute access apart from method 'write()', which is delegated to our
Converter instance.
'''
def __init__(self, wrapped, converter):
# double-underscore everything to prevent clashes with names of
# attributes on the wrapped stream object.
self.__wrapped = wrapped
self.__convertor = converter
def __getattr__(self, name):
return getattr(self.__wrapped, name)
def write(self, text):
self.__convertor.write(text)
class AnsiToWin32(object):
'''
Implements a 'write()' method which, on Windows, will strip ANSI character
sequences from the text, and if outputting to a tty, will convert them into
win32 function calls.
'''
ANSI_RE = re.compile('\033\[((?:\d|;)*)([a-zA-Z])')
def __init__(self, wrapped, convert=None, strip=None, autoreset=False):
# The wrapped stream (normally sys.stdout or sys.stderr)
self.wrapped = wrapped
# should we reset colors to defaults after every .write()
self.autoreset = autoreset
# create the proxy wrapping our output stream
self.stream = StreamWrapper(wrapped, self)
on_windows = sys.platform.startswith('win')
# should we strip ANSI sequences from our output?
if strip is None:
strip = on_windows
self.strip = strip
# should we should convert ANSI sequences into win32 calls?
if convert is None:
convert = on_windows and is_a_tty(wrapped)
self.convert = convert
# dict of ansi codes to win32 functions and parameters
self.win32_calls = self.get_win32_calls()
# are we wrapping stderr?
self.on_stderr = self.wrapped is sys.stderr
def should_wrap(self):
'''
True if this class is actually needed. If false, then the output
stream will not be affected, nor will win32 calls be issued, so
wrapping stdout is not actually required. This will generally be
False on non-Windows platforms, unless optional functionality like
autoreset has been requested using kwargs to init()
'''
return self.convert or self.strip or self.autoreset
def get_win32_calls(self):
if self.convert and winterm:
return {
AnsiStyle.RESET_ALL: (winterm.reset_all, ),
AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT),
AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL),
AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL),
AnsiFore.BLACK: (winterm.fore, WinColor.BLACK),
AnsiFore.RED: (winterm.fore, WinColor.RED),
AnsiFore.GREEN: (winterm.fore, WinColor.GREEN),
AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW),
AnsiFore.BLUE: (winterm.fore, WinColor.BLUE),
AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA),
AnsiFore.CYAN: (winterm.fore, WinColor.CYAN),
AnsiFore.WHITE: (winterm.fore, WinColor.GREY),
AnsiFore.RESET: (winterm.fore, ),
AnsiBack.BLACK: (winterm.back, WinColor.BLACK),
AnsiBack.RED: (winterm.back, WinColor.RED),
AnsiBack.GREEN: (winterm.back, WinColor.GREEN),
AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW),
AnsiBack.BLUE: (winterm.back, WinColor.BLUE),
AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA),
AnsiBack.CYAN: (winterm.back, WinColor.CYAN),
AnsiBack.WHITE: (winterm.back, WinColor.GREY),
AnsiBack.RESET: (winterm.back, ),
}
def write(self, text):
if self.strip or self.convert:
self.write_and_convert(text)
else:
self.wrapped.write(text)
self.wrapped.flush()
if self.autoreset:
self.reset_all()
def reset_all(self):
if self.convert:
self.call_win32('m', (0,))
elif is_a_tty(self.wrapped):
self.wrapped.write(Style.RESET_ALL)
def write_and_convert(self, text):
'''
Write the given text to our wrapped stream, stripping any ANSI
sequences from the text, and optionally converting them into win32
calls.
'''
cursor = 0
for match in self.ANSI_RE.finditer(text):
start, end = match.span()
self.write_plain_text(text, cursor, start)
self.convert_ansi(*match.groups())
cursor = end
self.write_plain_text(text, cursor, len(text))
def write_plain_text(self, text, start, end):
if start < end:
self.wrapped.write(text[start:end])
self.wrapped.flush()
def convert_ansi(self, paramstring, command):
if self.convert:
params = self.extract_params(paramstring)
self.call_win32(command, params)
def extract_params(self, paramstring):
def split(paramstring):
for p in paramstring.split(';'):
if p != '':
yield int(p)
return tuple(split(paramstring))
def call_win32(self, command, params):
if params == []:
params = [0]
if command == 'm':
for param in params:
if param in self.win32_calls:
func_args = self.win32_calls[param]
func = func_args[0]
args = func_args[1:]
kwargs = dict(on_stderr=self.on_stderr)
func(*args, **kwargs)
elif command in ('H', 'f'): # set cursor position
func = winterm.set_cursor_position
func(params, on_stderr=self.on_stderr)
elif command in ('J'):
func = winterm.erase_data
func(params, on_stderr=self.on_stderr)
elif command == 'A':
if params == () or params == None:
num_rows = 1
else:
num_rows = params[0]
func = winterm.cursor_up
func(num_rows, on_stderr=self.on_stderr)