2019-06-11 11:00:30 +10:00

251 lines
7.4 KiB
Python
Executable File

#
# RTEMS Tools Project (http://www.rtems.org/)
# Copyright 2010-2016 Chris Johns (chrisj@rtems.org)
# All rights reserved.
#
# This file is part of the RTEMS Tools package in 'rtems-testing'.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
# Log output to stdout and/or a file.
#
from __future__ import print_function
import os
import sys
import threading
from rtemstoolkit import error
#
# A global log.
#
default = None
#
# A global capture handler.
#
capture = None
#
# Global parameters.
#
tracing = False
quiet = False
#
# Global output lock to keep output together when working with threads
#
lock = threading.Lock()
def info(args):
s = [' Command Line: %s' % (' '.join(args))]
if hasattr(os, 'uname'):
s += [' Host: %s' % (' '.join(os.uname()))]
else:
h = ' Host: win32-mingw'
if 'HOSTTYPE' in os.environ:
h += ' ' + os.environ['HOSTTYPE']
else:
h += ' i686'
s += [h]
s += [' Python: %s' % (sys.version.replace('\n', ''))]
return s
def set_default_once(log):
if default is None:
default = log
def _output(text = os.linesep, log = None):
"""Output the text to a log if provided else send it to stdout."""
if text is None:
text = os.linesep
if type(text) is list:
text = os.linesep.join(text) + os.linesep
if isinstance(text, bytes):
text = text.decode('utf-8', 'ignore')
if log:
log.output(text)
elif default is not None:
default.output(text)
else:
lock.acquire()
for l in text.replace(chr(13), '').splitlines():
print(l)
lock.release()
if capture is not None:
lock.acquire()
capture(text)
lock.release()
def stderr(text = os.linesep, log = None):
lock.acquire()
for l in text.replace(chr(13), '').splitlines():
print(l, file = sys.stderr)
lock.release()
def output(text = os.linesep, log = None):
if not quiet:
_output(text, log)
def notice(text = os.linesep, log = None, stdout_only = False):
if not quiet and \
(default is not None and not default.has_stdout() or stdout_only):
lock.acquire()
for l in text.replace(chr(13), '').splitlines():
print(l)
lock.release()
if not stdout_only:
_output(text, log)
def trace(text = os.linesep, log = None):
if tracing:
_output(text, log)
def warning(text = os.linesep, log = None):
for l in text.replace(chr(13), '').splitlines():
_output('warning: %s' % (l), log)
def flush(log = None):
if log:
log.flush()
elif default is not None:
default.flush()
class log:
"""Log output to stdout or a file."""
def __init__(self, streams = None, tail_size = 100):
self.lock = threading.Lock()
self.tail = []
self.tail_size = tail_size
self.fhs = [None, None]
if streams:
for s in streams:
if s == 'stdout':
self.fhs[0] = sys.stdout
elif s == 'stderr':
self.fhs[1] = sys.stderr
else:
try:
self.fhs.append(open(s, 'w'))
except IOError as ioe:
raise error.general("creating log file '" + s + \
"': " + str(ioe))
def __del__(self):
for f in range(2, len(self.fhs)):
self.fhs[f].close()
def __str__(self):
t = ''
for tl in self.tail:
t += tl + os.linesep
return t[:-len(os.linesep)]
def _tail(self, text):
if type(text) is not list:
text = text.splitlines()
self.tail += text
if len(self.tail) > self.tail_size:
self.tail = self.tail[-self.tail_size:]
def has_stdout(self):
return self.fhs[0] is not None
def has_stderr(self):
return self.fhs[1] is not None
def output(self, text):
"""Output the text message to all the logs."""
# Reformat the text to have local line types.
text = text.replace(chr(13), '').splitlines()
self._tail(text)
out = os.linesep.join(text) + os.linesep
if isinstance(out, bytes):
out = out.decode('utf-8', 'ignore')
self.lock.acquire()
try:
for f in range(0, len(self.fhs)):
if self.fhs[f] is not None:
self.fhs[f].write(out)
self.flush()
except:
raise
finally:
self.lock.release()
def flush(self):
"""Flush the output."""
for f in range(0, len(self.fhs)):
if self.fhs[f] is not None:
self.fhs[f].flush()
if __name__ == "__main__":
l = log(['stdout', 'log.txt'], tail_size = 20)
for i in range(0, 10):
l.output('log: hello world: %d\n' % (i))
l.output('log: hello world CRLF\r\n')
l.output('log: hello world NONE')
l.flush()
print('=-' * 40)
print('tail: %d' % (len(l.tail)))
print(l)
print('=-' * 40)
for i in range(0, 10):
l.output('log: hello world 2: %d\n' % (i))
l.flush()
print('=-' * 40)
print('tail: %d' % (len(l.tail)))
print(l)
print('=-' * 40)
for i in [0, 1]:
quiet = False
tracing = False
print('- quiet:%s - trace:%s %s' % (str(quiet), str(tracing), '-' * 30))
trace('trace with quiet and trace off')
notice('notice with quiet and trace off')
quiet = True
tracing = False
print('- quiet:%s - trace:%s %s' % (str(quiet), str(tracing), '-' * 30))
trace('trace with quiet on and trace off')
notice('notice with quiet on and trace off')
quiet = False
tracing = True
print('- quiet:%s - trace:%s %s' % (str(quiet), str(tracing), '-' * 30))
trace('trace with quiet off and trace on')
notice('notice with quiet off and trace on')
quiet = True
tracing = True
print('- quiet:%s - trace:%s %s' % (str(quiet), str(tracing), '-' * 30))
trace('trace with quiet on and trace on')
notice('notice with quiet on and trace on')
default = l
print('=-' * 40)
print('tail: %d' % (len(l.tail)))
print(l)
print('=-' * 40)
del l