Chris Johns 59880c0933 tester/wait: Add a wait directive to the tester
- Lets you test with stand alone TFTP or other services
2021-09-21 17:41:15 +10:00

155 lines
4.8 KiB
Python

# SPDX-License-Identifier: BSD-2-Clause
'''Wait test target.'''
# Copyright (C) 2013-2021 Chris Johns (chrisj@rtems.org)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import print_function
import datetime
import os
import threading
import time
class wait(object):
'''RTEMS Testing WAIT base.'''
# pylint: disable=useless-object-inheritance
# pylint: disable=too-many-instance-attributes
def __init__(self, bsp_arch, bsp, trace=False):
self.trace = trace
self.lock_trace = False
self.lock_locked = None
self.lock = threading.RLock()
self.bsp = bsp
self.bsp_arch = bsp_arch
self._init()
def _init(self):
self.output_length = None
self.console = None
self.server = None
self.timeout = None
self.test_too_long = None
self.timer = None
self.waiting = False
self.seconds = 0
def _lock(self, msg):
if self.lock_trace:
print('|[ LOCK:%s ]|' % (msg))
self.lock_locked = datetime.datetime.now()
self.lock.acquire()
def _unlock(self, msg):
if self.lock_trace:
period = datetime.datetime.now() - self.lock_locked
print('|] UNLOCK:%s [| : %s' % (msg, period))
self.lock.release()
def _trace(self, text):
if log.tracing:
log.trace('] wait: ' + text)
def _console(self, text):
if self.console:
self.console(text)
def _kill(self):
self._lock('_kill')
self._console('wait: kill')
self.waiting = False
self._unlock('_kill')
def _timeout(self):
self._kill()
if self.timeout is not None:
self.timeout()
def _test_too_long(self):
self._kill()
if self.test_too_long is not None:
self.test_too_long()
def _monitor(self, timeout):
output_length = self.output_length
period = timeout[0]
seconds = timeout[1]
self._console('wait: start: period=%d seconds=%d' % (period, seconds))
step = 0.5
period *= step
seconds *= step
self.waiting = True
while self.waiting and period > 0 and seconds > 0:
self._unlock('_monitor')
current_length = self.output_length()
if output_length != current_length:
period = timeout[0] * step
output_length = current_length
if seconds < step:
seconds = 0
else:
seconds -= step
if period < step:
step = period
period = 0
else:
period -= step
time.sleep(step)
self._lock('_monitor')
if self.waiting:
if period == 0:
self._timeout()
elif seconds == 0:
self._test_too_long()
self._console('wait: finished: period=%d seconds=%d' % (period, seconds))
def open(self, output_length, console, timeout):
'''Start to wait'''
# pylint: disable=too-many-arguments
self._lock('_open')
self._init()
self.output_length = output_length
self.console = console
self.timeout = timeout[2]
self.test_too_long = timeout[3]
self._monitor(timeout)
def kill(self):
'''Kill the test run.'''
self._kill()
def target_restart(self, started):
pass
def target_reset(self, started):
pass
def target_start(self):
pass
def target_end(self):
pass