mirror of
https://github.com/littlefs-project/littlefs.git
synced 2025-10-17 07:41:29 +08:00

Instead of trying to align to block-boundaries tracebd.py now just aliases to whatever dimensions are provided. Also reworked how scripts handle default sizing. Now using reasonable defaults with 0 being a placeholder for automatic sizing. The addition of -z/--cat makes it possible to pipe directly to stdout. Also added support for dots/braille output which can capture more detail, though care needs to be taken to not rely on accurate coloring.
952 lines
28 KiB
Python
Executable File
952 lines
28 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#
|
|
# Display operations on block devices based on trace output
|
|
#
|
|
# Example:
|
|
# ./scripts/tracebd.py trace
|
|
#
|
|
# Copyright (c) 2022, The littlefs authors.
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
#
|
|
|
|
import collections as co
|
|
import functools as ft
|
|
import io
|
|
import itertools as it
|
|
import math as m
|
|
import os
|
|
import re
|
|
import shutil
|
|
import time
|
|
|
|
|
|
|
|
CHARS = 'rpe.'
|
|
COLORS = ['42', '45', '44', '']
|
|
|
|
WEAR_CHARS = '0123456789'
|
|
WEAR_CHARS_SUBSCRIPTS = '.₁₂₃₄₅₆789'
|
|
WEAR_COLORS = ['', '', '', '', '', '', '', '35', '35', '1;31']
|
|
|
|
CHARS_DOTS = " .':"
|
|
COLORS_DOTS = ['32', '35', '34', '']
|
|
CHARS_BRAILLE = (
|
|
'⠀⢀⡀⣀⠠⢠⡠⣠⠄⢄⡄⣄⠤⢤⡤⣤' '⠐⢐⡐⣐⠰⢰⡰⣰⠔⢔⡔⣔⠴⢴⡴⣴'
|
|
'⠂⢂⡂⣂⠢⢢⡢⣢⠆⢆⡆⣆⠦⢦⡦⣦' '⠒⢒⡒⣒⠲⢲⡲⣲⠖⢖⡖⣖⠶⢶⡶⣶'
|
|
'⠈⢈⡈⣈⠨⢨⡨⣨⠌⢌⡌⣌⠬⢬⡬⣬' '⠘⢘⡘⣘⠸⢸⡸⣸⠜⢜⡜⣜⠼⢼⡼⣼'
|
|
'⠊⢊⡊⣊⠪⢪⡪⣪⠎⢎⡎⣎⠮⢮⡮⣮' '⠚⢚⡚⣚⠺⢺⡺⣺⠞⢞⡞⣞⠾⢾⡾⣾'
|
|
'⠁⢁⡁⣁⠡⢡⡡⣡⠅⢅⡅⣅⠥⢥⡥⣥' '⠑⢑⡑⣑⠱⢱⡱⣱⠕⢕⡕⣕⠵⢵⡵⣵'
|
|
'⠃⢃⡃⣃⠣⢣⡣⣣⠇⢇⡇⣇⠧⢧⡧⣧' '⠓⢓⡓⣓⠳⢳⡳⣳⠗⢗⡗⣗⠷⢷⡷⣷'
|
|
'⠉⢉⡉⣉⠩⢩⡩⣩⠍⢍⡍⣍⠭⢭⡭⣭' '⠙⢙⡙⣙⠹⢹⡹⣹⠝⢝⡝⣝⠽⢽⡽⣽'
|
|
'⠋⢋⡋⣋⠫⢫⡫⣫⠏⢏⡏⣏⠯⢯⡯⣯' '⠛⢛⡛⣛⠻⢻⡻⣻⠟⢟⡟⣟⠿⢿⡿⣿')
|
|
|
|
|
|
def openio(path, mode='r'):
|
|
if path == '-':
|
|
if mode == 'r':
|
|
return os.fdopen(os.dup(sys.stdin.fileno()), 'r')
|
|
else:
|
|
return os.fdopen(os.dup(sys.stdout.fileno()), 'w')
|
|
else:
|
|
return open(path, mode)
|
|
|
|
class LinesIO:
|
|
def __init__(self, maxlen=None):
|
|
self.maxlen = maxlen
|
|
self.lines = co.deque(maxlen=maxlen)
|
|
self.tail = io.StringIO()
|
|
|
|
# trigger automatic sizing
|
|
if maxlen == 0:
|
|
self.resize(0)
|
|
|
|
def write(self, s):
|
|
# note using split here ensures the trailing string has no newline
|
|
lines = s.split('\n')
|
|
|
|
if len(lines) > 1 and self.tail.getvalue():
|
|
self.tail.write(lines[0])
|
|
lines[0] = self.tail.getvalue()
|
|
self.tail = io.StringIO()
|
|
|
|
self.lines.extend(lines[:-1])
|
|
|
|
if lines[-1]:
|
|
self.tail.write(lines[-1])
|
|
|
|
def resize(self, maxlen):
|
|
self.maxlen = maxlen
|
|
if maxlen == 0:
|
|
maxlen = shutil.get_terminal_size((80, 5))[1]
|
|
if maxlen != self.lines.maxlen:
|
|
self.lines = co.deque(self.lines, maxlen=maxlen)
|
|
|
|
last_lines = 1
|
|
def draw(self):
|
|
# did terminal size change?
|
|
if self.maxlen == 0:
|
|
self.resize(0)
|
|
|
|
# first thing first, give ourself a canvas
|
|
while LinesIO.last_lines < len(self.lines):
|
|
sys.stdout.write('\n')
|
|
LinesIO.last_lines += 1
|
|
|
|
for j, line in enumerate(self.lines):
|
|
# move cursor, clear line, disable/reenable line wrapping
|
|
sys.stdout.write('\r')
|
|
if len(self.lines)-1-j > 0:
|
|
sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-j))
|
|
sys.stdout.write('\x1b[K')
|
|
sys.stdout.write('\x1b[?7l')
|
|
sys.stdout.write(line)
|
|
sys.stdout.write('\x1b[?7h')
|
|
if len(self.lines)-1-j > 0:
|
|
sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-j))
|
|
sys.stdout.flush()
|
|
|
|
|
|
|
|
# space filling Hilbert-curve
|
|
#
|
|
# note we memoize the last curve since this is a bit expensive
|
|
#
|
|
@ft.lru_cache(1)
|
|
def hilbert_curve(width, height):
|
|
# based on generalized Hilbert curves:
|
|
# https://github.com/jakubcerveny/gilbert
|
|
#
|
|
def hilbert_(x, y, a_x, a_y, b_x, b_y):
|
|
w = abs(a_x+a_y)
|
|
h = abs(b_x+b_y)
|
|
a_dx = -1 if a_x < 0 else +1 if a_x > 0 else 0
|
|
a_dy = -1 if a_y < 0 else +1 if a_y > 0 else 0
|
|
b_dx = -1 if b_x < 0 else +1 if b_x > 0 else 0
|
|
b_dy = -1 if b_y < 0 else +1 if b_y > 0 else 0
|
|
|
|
# trivial row
|
|
if h == 1:
|
|
for _ in range(w):
|
|
yield (x,y)
|
|
x, y = x+a_dx, y+a_dy
|
|
return
|
|
|
|
# trivial column
|
|
if w == 1:
|
|
for _ in range(h):
|
|
yield (x,y)
|
|
x, y = x+b_dx, y+b_dy
|
|
return
|
|
|
|
a_x_, a_y_ = a_x//2, a_y//2
|
|
b_x_, b_y_ = b_x//2, b_y//2
|
|
w_ = abs(a_x_+a_y_)
|
|
h_ = abs(b_x_+b_y_)
|
|
|
|
if 2*w > 3*h:
|
|
# prefer even steps
|
|
if w_ % 2 != 0 and w > 2:
|
|
a_x_, a_y_ = a_x_+a_dx, a_y_+a_dy
|
|
|
|
# split in two
|
|
yield from hilbert_(x, y, a_x_, a_y_, b_x, b_y)
|
|
yield from hilbert_(x+a_x_, y+a_y_, a_x-a_x_, a_y-a_y_, b_x, b_y)
|
|
else:
|
|
# prefer even steps
|
|
if h_ % 2 != 0 and h > 2:
|
|
b_x_, b_y_ = b_x_+b_dx, b_y_+b_dy
|
|
|
|
# split in three
|
|
yield from hilbert_(x, y, b_x_, b_y_, a_x_, a_y_)
|
|
yield from hilbert_(x+b_x_, y+b_y_, a_x, a_y, b_x-b_x_, b_y-b_y_)
|
|
yield from hilbert_(
|
|
x+(a_x-a_dx)+(b_x_-b_dx), y+(a_y-a_dy)+(b_y_-b_dy),
|
|
-b_x_, -b_y_, -(a_x-a_x_), -(a_y-a_y_))
|
|
|
|
if width >= height:
|
|
curve = hilbert_(0, 0, +width, 0, 0, +height)
|
|
else:
|
|
curve = hilbert_(0, 0, 0, +height, +width, 0)
|
|
|
|
return list(curve)
|
|
|
|
# space filling Z-curve/Lebesgue-curve
|
|
#
|
|
# note we memoize the last curve since this is a bit expensive
|
|
#
|
|
@ft.lru_cache(1)
|
|
def lebesgue_curve(width, height):
|
|
# we create a truncated Z-curve by simply filtering out the points
|
|
# that are outside our region
|
|
curve = []
|
|
for i in range(2**(2*m.ceil(m.log2(max(width, height))))):
|
|
# we just operate on binary strings here because it's easier
|
|
b = '{:0{}b}'.format(i, 2*m.ceil(m.log2(i+1)/2))
|
|
x = int(b[1::2], 2) if b[1::2] else 0
|
|
y = int(b[0::2], 2) if b[0::2] else 0
|
|
if x < width and y < height:
|
|
curve.append((x, y))
|
|
|
|
return curve
|
|
|
|
|
|
class Block(int):
|
|
__slots__ = ()
|
|
def __new__(cls, state=0, *,
|
|
wear=0,
|
|
readed=False,
|
|
proged=False,
|
|
erased=False):
|
|
return super().__new__(cls,
|
|
state
|
|
| (wear << 3)
|
|
| (1 if readed else 0)
|
|
| (2 if proged else 0)
|
|
| (4 if erased else 0))
|
|
|
|
@property
|
|
def wear(self):
|
|
return self >> 3
|
|
|
|
@property
|
|
def readed(self):
|
|
return (self & 1) != 0
|
|
|
|
@property
|
|
def proged(self):
|
|
return (self & 2) != 0
|
|
|
|
@property
|
|
def erased(self):
|
|
return (self & 4) != 0
|
|
|
|
def read(self):
|
|
return Block(int(self) | 1)
|
|
|
|
def prog(self):
|
|
return Block(int(self) | 2)
|
|
|
|
def erase(self):
|
|
return Block((int(self) | 4) + 8)
|
|
|
|
def clear(self):
|
|
return Block(int(self) & ~7)
|
|
|
|
def __or__(self, other):
|
|
return Block(
|
|
(int(self) | int(other)) & 7,
|
|
wear=max(self.wear, other.wear))
|
|
|
|
def worn(self, max_wear, *,
|
|
block_cycles=None,
|
|
wear_chars=None,
|
|
**_):
|
|
if wear_chars is None:
|
|
wear_chars = WEAR_CHARS
|
|
|
|
if block_cycles:
|
|
return self.wear / block_cycles
|
|
else:
|
|
return self.wear / max(max_wear, len(wear_chars))
|
|
|
|
def draw(self, max_wear, char=None, *,
|
|
read=True,
|
|
prog=True,
|
|
erase=True,
|
|
wear=False,
|
|
block_cycles=None,
|
|
color=True,
|
|
subscripts=False,
|
|
dots=False,
|
|
braille=False,
|
|
chars=None,
|
|
wear_chars=None,
|
|
colors=None,
|
|
wear_colors=None,
|
|
**_):
|
|
# fallback to default chars/colors
|
|
if chars is None:
|
|
chars = CHARS
|
|
if len(chars) < len(CHARS):
|
|
chars = chars + CHARS[len(chars):]
|
|
|
|
if colors is None:
|
|
if braille or dots:
|
|
colors = COLORS_DOTS
|
|
else:
|
|
colors = COLORS
|
|
if len(colors) < len(COLORS):
|
|
colors = colors + COLORS[len(colors):]
|
|
|
|
if wear_chars is None:
|
|
if subscripts:
|
|
wear_chars = WEAR_CHARS_SUBSCRIPTS
|
|
else:
|
|
wear_chars = WEAR_CHARS
|
|
|
|
if wear_colors is None:
|
|
wear_colors = WEAR_COLORS
|
|
|
|
# compute char/color
|
|
c = chars[3]
|
|
f = [colors[3]]
|
|
|
|
if wear:
|
|
w = min(
|
|
self.worn(
|
|
max_wear,
|
|
block_cycles=block_cycles,
|
|
wear_chars=wear_chars),
|
|
1)
|
|
|
|
c = wear_chars[int(w * (len(wear_chars)-1))]
|
|
f.append(wear_colors[int(w * (len(wear_colors)-1))])
|
|
|
|
if erase and self.erased:
|
|
c = chars[2]
|
|
f.append(colors[2])
|
|
elif prog and self.proged:
|
|
c = chars[1]
|
|
f.append(colors[1])
|
|
elif read and self.readed:
|
|
c = chars[0]
|
|
f.append(colors[0])
|
|
|
|
# override char?
|
|
if char:
|
|
c = char
|
|
|
|
# apply colors
|
|
if f and color:
|
|
c = '%s%s\x1b[m' % (
|
|
''.join('\x1b[%sm' % f_ for f_ in f),
|
|
c)
|
|
|
|
return c
|
|
|
|
|
|
class Bd:
|
|
def __init__(self, *,
|
|
size=1,
|
|
count=1,
|
|
width=None,
|
|
height=1,
|
|
blocks=None):
|
|
if width is None:
|
|
width = count
|
|
|
|
if blocks is None:
|
|
self.blocks = [Block() for _ in range(width*height)]
|
|
else:
|
|
self.blocks = blocks
|
|
self.size = size
|
|
self.count = count
|
|
self.width = width
|
|
self.height = height
|
|
|
|
def _op(self, f, block=None, off=None, size=None):
|
|
if block is None:
|
|
range_ = range(len(self.blocks))
|
|
else:
|
|
if off is None:
|
|
off, size = 0, self.size
|
|
elif size is None:
|
|
off, size = 0, off
|
|
|
|
# update our geometry? this will do nothing if we haven't changed
|
|
self.resize(
|
|
size=max(self.size, off+size),
|
|
count=max(self.count, block+1))
|
|
|
|
# map to our block space
|
|
start = (block*self.size + off) / (self.size*self.count)
|
|
stop = (block*self.size + off+size) / (self.size*self.count)
|
|
|
|
range_ = range(
|
|
m.floor(start*len(self.blocks)),
|
|
m.ceil(stop*len(self.blocks)))
|
|
|
|
# apply the op
|
|
for i in range_:
|
|
self.blocks[i] = f(self.blocks[i])
|
|
|
|
def read(self, block=None, off=None, size=None):
|
|
self._op(Block.read, block, off, size)
|
|
|
|
def prog(self, block=None, off=None, size=None):
|
|
self._op(Block.prog, block, off, size)
|
|
|
|
def erase(self, block=None, off=None, size=None):
|
|
self._op(Block.erase, block, off, size)
|
|
|
|
def clear(self, block=None, off=None, size=None):
|
|
self._op(Block.clear, block, off, size)
|
|
|
|
def copy(self):
|
|
return Bd(
|
|
blocks=self.blocks.copy(),
|
|
size=self.size,
|
|
count=self.count,
|
|
width=self.width,
|
|
height=self.height)
|
|
|
|
def resize(self, *,
|
|
size=None,
|
|
count=None,
|
|
width=None,
|
|
height=None):
|
|
size = size if size is not None else self.size
|
|
count = count if count is not None else self.count
|
|
width = width if width is not None else self.width
|
|
height = height if height is not None else self.height
|
|
|
|
if (size == self.size
|
|
and count == self.count
|
|
and width == self.width
|
|
and height == self.height):
|
|
return
|
|
|
|
# transform our blocks
|
|
blocks = []
|
|
for x in range(width*height):
|
|
# map from new bd space
|
|
start = m.floor(x * (size*count)/(width*height))
|
|
stop = m.ceil((x+1) * (size*count)/(width*height))
|
|
start_block = start // size
|
|
start_off = start % size
|
|
stop_block = stop // size
|
|
stop_off = stop % size
|
|
# map to old bd space
|
|
start = start_block*self.size + start_off
|
|
stop = stop_block*self.size + stop_off
|
|
start = m.floor(start * len(self.blocks)/(self.size*self.count))
|
|
stop = m.ceil(stop * len(self.blocks)/(self.size*self.count))
|
|
|
|
# aggregate state
|
|
blocks.append(ft.reduce(
|
|
Block.__or__,
|
|
self.blocks[start:stop],
|
|
Block()))
|
|
|
|
self.size = size
|
|
self.count = count
|
|
self.width = width
|
|
self.height = height
|
|
self.blocks = blocks
|
|
|
|
def draw(self, row, *,
|
|
read=False,
|
|
prog=False,
|
|
erase=False,
|
|
wear=False,
|
|
hilbert=False,
|
|
lebesgue=False,
|
|
dots=False,
|
|
braille=False,
|
|
**args):
|
|
# find max wear?
|
|
max_wear = None
|
|
if wear:
|
|
max_wear = max(b.wear for b in self.blocks)
|
|
|
|
# fold via a curve?
|
|
if hilbert:
|
|
grid = [None]*(self.width*self.height)
|
|
for (x,y), b in zip(
|
|
hilbert_curve(self.width, self.height),
|
|
self.blocks):
|
|
grid[x + y*self.width] = b
|
|
elif lebesgue:
|
|
grid = [None]*(self.width*self.height)
|
|
for (x,y), b in zip(
|
|
lebesgue_curve(self.width, self.height),
|
|
self.blocks):
|
|
grid[x + y*self.width] = b
|
|
else:
|
|
grid = self.blocks
|
|
|
|
# need to wait for more trace output before rendering
|
|
#
|
|
# this is sort of a hack that knows the output is going to a terminal
|
|
if (braille and self.height < 4) or (dots and self.height < 2):
|
|
needed_height = 4 if braille else 2
|
|
|
|
self.history = getattr(self, 'history', [])
|
|
self.history.append(grid)
|
|
|
|
if len(self.history)*self.height < needed_height:
|
|
# skip for now
|
|
return None
|
|
|
|
grid = list(it.chain.from_iterable(
|
|
# did we resize?
|
|
it.islice(it.chain(h, it.repeat(Block())),
|
|
self.width*self.height)
|
|
for h in self.history))
|
|
self.history = []
|
|
|
|
line = []
|
|
if braille:
|
|
# encode into a byte
|
|
for x in range(0, self.width, 2):
|
|
byte_b = 0
|
|
best_b = Block()
|
|
for i in range(2*4):
|
|
b = grid[x+(2-1-(i%2)) + ((row*4)+(4-1-(i//2)))*self.width]
|
|
best_b |= b
|
|
if ((read and b.readed)
|
|
or (prog and b.proged)
|
|
or (erase and b.erased)
|
|
or (not read and not prog and not erase
|
|
and wear and b.worn(max_wear, **args) >= 0.7)):
|
|
byte_b |= 1 << i
|
|
|
|
line.append(best_b.draw(
|
|
max_wear,
|
|
CHARS_BRAILLE[byte_b],
|
|
braille=True,
|
|
read=read,
|
|
prog=prog,
|
|
erase=erase,
|
|
wear=wear,
|
|
**args))
|
|
elif dots:
|
|
# encode into a byte
|
|
for x in range(self.width):
|
|
byte_b = 0
|
|
best_b = Block()
|
|
for i in range(2):
|
|
b = grid[x + ((row*2)+(2-1-i))*self.width]
|
|
best_b |= b
|
|
if ((read and b.readed)
|
|
or (prog and b.proged)
|
|
or (erase and b.erased)
|
|
or (not read and not prog and not erase
|
|
and wear and b.worn(max_wear, **args) >= 0.7)):
|
|
byte_b |= 1 << i
|
|
|
|
line.append(best_b.draw(
|
|
max_wear,
|
|
CHARS_DOTS[byte_b],
|
|
dots=True,
|
|
read=read,
|
|
prog=prog,
|
|
erase=erase,
|
|
wear=wear,
|
|
**args))
|
|
else:
|
|
for x in range(self.width):
|
|
line.append(grid[x + row*self.width].draw(
|
|
max_wear,
|
|
read=read,
|
|
prog=prog,
|
|
erase=erase,
|
|
wear=wear,
|
|
**args))
|
|
|
|
return ''.join(line)
|
|
|
|
|
|
|
|
def main(path='-', *,
|
|
read=False,
|
|
prog=False,
|
|
erase=False,
|
|
wear=False,
|
|
block=(None,None),
|
|
off=(None,None),
|
|
block_size=None,
|
|
block_count=None,
|
|
block_cycles=None,
|
|
reset=False,
|
|
color='auto',
|
|
dots=False,
|
|
braille=False,
|
|
width=None,
|
|
height=None,
|
|
lines=None,
|
|
cat=False,
|
|
hilbert=False,
|
|
lebesgue=False,
|
|
coalesce=None,
|
|
sleep=None,
|
|
keep_open=False,
|
|
**args):
|
|
# figure out what color should be
|
|
if color == 'auto':
|
|
color = sys.stdout.isatty()
|
|
elif color == 'always':
|
|
color = True
|
|
else:
|
|
color = False
|
|
|
|
# exclusive wear or read/prog/erase by default
|
|
if not read and not prog and not erase and not wear:
|
|
read = True
|
|
prog = True
|
|
erase = True
|
|
|
|
# assume a reasonable lines/height if not specified
|
|
#
|
|
# note that we let height = None if neither hilbert or lebesgue
|
|
# are specified, this is a bit special as the default may be less
|
|
# than one character in height.
|
|
if height is None and (hilbert or lebesgue):
|
|
if lines is not None:
|
|
height = lines
|
|
else:
|
|
height = 5
|
|
|
|
if lines is None:
|
|
if height is not None:
|
|
lines = height
|
|
else:
|
|
lines = 5
|
|
|
|
# allow ranges for blocks/offs
|
|
block_start = block[0]
|
|
block_stop = block[1] if len(block) > 1 else block[0]+1
|
|
off_start = off[0]
|
|
off_stop = off[1] if len(off) > 1 else off[0]+1
|
|
|
|
if block_start is None:
|
|
block_start = 0
|
|
if block_stop is None and block_count is not None:
|
|
block_stop = block_count
|
|
if off_start is None:
|
|
off_start = 0
|
|
if off_stop is None and block_size is not None:
|
|
off_stop = block_size
|
|
|
|
# create a block device representation
|
|
bd = Bd()
|
|
|
|
def resize(*, size=None, count=None):
|
|
nonlocal bd
|
|
|
|
# size may be overriden by cli args
|
|
if block_size is not None:
|
|
size = block_size
|
|
elif off_stop is not None:
|
|
size = off_stop-off_start
|
|
|
|
if block_count is not None:
|
|
count = block_count
|
|
elif block_stop is not None:
|
|
count = block_stop-block_start
|
|
|
|
# figure out best width/height
|
|
if width is None:
|
|
width_ = min(80, shutil.get_terminal_size((80, 5))[0])
|
|
elif width:
|
|
width_ = width
|
|
else:
|
|
width_ = shutil.get_terminal_size((80, 5))[0]
|
|
|
|
if height is None:
|
|
height_ = 0
|
|
elif height:
|
|
height_ = height
|
|
else:
|
|
height_ = shutil.get_terminal_size((80, 5))[1]
|
|
|
|
bd.resize(
|
|
size=size,
|
|
count=count,
|
|
# scale if we're printing with dots or braille
|
|
width=2*width_ if braille else width_,
|
|
height=max(1,
|
|
4*height_ if braille
|
|
else 2*height_ if dots
|
|
else height_))
|
|
resize()
|
|
|
|
# parse a line of trace output
|
|
pattern = re.compile(
|
|
'trace.*?bd_(?:'
|
|
'(?P<create>create\w*)\('
|
|
'(?:'
|
|
'block_size=(?P<block_size>\w+)'
|
|
'|' 'block_count=(?P<block_count>\w+)'
|
|
'|' '.*?' ')*' '\)'
|
|
'|' '(?P<read>read)\('
|
|
'\s*(?P<read_ctx>\w+)\s*' ','
|
|
'\s*(?P<read_block>\w+)\s*' ','
|
|
'\s*(?P<read_off>\w+)\s*' ','
|
|
'\s*(?P<read_buffer>\w+)\s*' ','
|
|
'\s*(?P<read_size>\w+)\s*' '\)'
|
|
'|' '(?P<prog>prog)\('
|
|
'\s*(?P<prog_ctx>\w+)\s*' ','
|
|
'\s*(?P<prog_block>\w+)\s*' ','
|
|
'\s*(?P<prog_off>\w+)\s*' ','
|
|
'\s*(?P<prog_buffer>\w+)\s*' ','
|
|
'\s*(?P<prog_size>\w+)\s*' '\)'
|
|
'|' '(?P<erase>erase)\('
|
|
'\s*(?P<erase_ctx>\w+)\s*' ','
|
|
'\s*(?P<erase_block>\w+)\s*' '\)'
|
|
'|' '(?P<sync>sync)\('
|
|
'\s*(?P<sync_ctx>\w+)\s*' '\)' ')')
|
|
def parse(line):
|
|
nonlocal bd
|
|
|
|
# string searching is much faster than the regex here, and this
|
|
# actually has a big impact given how much trace output comes
|
|
# through here
|
|
if 'trace' not in line or 'bd' not in line:
|
|
return False
|
|
m = pattern.search(line)
|
|
if not m:
|
|
return False
|
|
|
|
if m.group('create'):
|
|
# update our block size/count
|
|
size = int(m.group('block_size'), 0)
|
|
count = int(m.group('block_count'), 0)
|
|
|
|
resize(size=size, count=count)
|
|
if reset:
|
|
bd = Bd(
|
|
size=bd.size,
|
|
count=bd.count,
|
|
width=bd.width,
|
|
height=bd.height)
|
|
return True
|
|
|
|
elif m.group('read') and read:
|
|
block = int(m.group('read_block'), 0)
|
|
off = int(m.group('read_off'), 0)
|
|
size = int(m.group('read_size'), 0)
|
|
|
|
if block_stop is not None and block >= block_stop:
|
|
return False
|
|
block -= block_start
|
|
if off_stop is not None:
|
|
if off >= off_stop:
|
|
return False
|
|
size = min(size, off_stop-off)
|
|
off -= off_start
|
|
|
|
bd.read(block, off, size)
|
|
return True
|
|
|
|
elif m.group('prog') and prog:
|
|
block = int(m.group('prog_block'), 0)
|
|
off = int(m.group('prog_off'), 0)
|
|
size = int(m.group('prog_size'), 0)
|
|
|
|
if block_stop is not None and block >= block_stop:
|
|
return False
|
|
block -= block_start
|
|
if off_stop is not None:
|
|
if off >= off_stop:
|
|
return False
|
|
size = min(size, off_stop-off)
|
|
off -= off_start
|
|
|
|
bd.prog(block, off, size)
|
|
return True
|
|
|
|
elif m.group('erase') and (erase or wear):
|
|
block = int(m.group('erase_block'), 0)
|
|
|
|
if block_stop is not None and block >= block_stop:
|
|
return False
|
|
block -= block_start
|
|
|
|
bd.erase(block)
|
|
return True
|
|
|
|
else:
|
|
return False
|
|
|
|
# print trace output
|
|
def draw(f):
|
|
def writeln(s=''):
|
|
f.write(s)
|
|
f.write('\n')
|
|
f.writeln = writeln
|
|
|
|
# don't forget we've scaled this for braille/dots!
|
|
for row in range(
|
|
m.ceil(bd.height/4) if braille
|
|
else m.ceil(bd.height/2) if dots
|
|
else bd.height):
|
|
line = bd.draw(row,
|
|
read=read,
|
|
prog=prog,
|
|
erase=erase,
|
|
wear=wear,
|
|
block_cycles=block_cycles,
|
|
color=color,
|
|
dots=dots,
|
|
braille=braille,
|
|
hilbert=hilbert,
|
|
lebesgue=lebesgue,
|
|
**args)
|
|
if line:
|
|
f.writeln(line)
|
|
|
|
bd.clear()
|
|
resize()
|
|
|
|
|
|
# read/parse/coalesce operations
|
|
if cat:
|
|
ring = sys.stdout
|
|
else:
|
|
ring = LinesIO(lines)
|
|
|
|
ptime = time.time()
|
|
try:
|
|
while True:
|
|
with openio(path) as f:
|
|
changed = 0
|
|
for line in f:
|
|
changed += parse(line)
|
|
|
|
# need to redraw?
|
|
if (changed
|
|
and (not coalesce or changed >= coalesce)
|
|
and (not sleep or time.time()-ptime >= sleep)):
|
|
draw(ring)
|
|
if not cat:
|
|
ring.draw()
|
|
changed = 0
|
|
ptime = time.time()
|
|
|
|
if not keep_open:
|
|
break
|
|
# don't just flood open calls
|
|
time.sleep(sleep or 0.1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
if not cat:
|
|
sys.stdout.write('\n')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
import argparse
|
|
parser = argparse.ArgumentParser(
|
|
description="Display operations on block devices based on "
|
|
"trace output.")
|
|
parser.add_argument(
|
|
'path',
|
|
nargs='?',
|
|
help="Path to read from.")
|
|
parser.add_argument(
|
|
'-r', '--read',
|
|
action='store_true',
|
|
help="Render reads.")
|
|
parser.add_argument(
|
|
'-p', '--prog',
|
|
action='store_true',
|
|
help="Render progs.")
|
|
parser.add_argument(
|
|
'-e', '--erase',
|
|
action='store_true',
|
|
help="Render erases.")
|
|
parser.add_argument(
|
|
'-w', '--wear',
|
|
action='store_true',
|
|
help="Render wear.")
|
|
parser.add_argument(
|
|
'-b', '--block',
|
|
type=lambda x: tuple(int(x,0) if x else None for x in x.split(',',1)),
|
|
help="Show a specific block or range of blocks.")
|
|
parser.add_argument(
|
|
'-i', '--off',
|
|
type=lambda x: tuple(int(x,0) if x else None for x in x.split(',',1)),
|
|
help="Show a specific offset or range of offsets.")
|
|
parser.add_argument(
|
|
'-B', '--block-size',
|
|
type=lambda x: int(x, 0),
|
|
help="Assume a specific block size.")
|
|
parser.add_argument(
|
|
'--block-count',
|
|
type=lambda x: int(x, 0),
|
|
help="Assume a specific block count.")
|
|
parser.add_argument(
|
|
'-C', '--block-cycles',
|
|
type=lambda x: int(x, 0),
|
|
help="Assumed maximum number of erase cycles when measuring wear.")
|
|
parser.add_argument(
|
|
'-R', '--reset',
|
|
action='store_true',
|
|
help="Reset wear on block device initialization.")
|
|
parser.add_argument(
|
|
'--color',
|
|
choices=['never', 'always', 'auto'],
|
|
default='auto',
|
|
help="When to use terminal colors. Defaults to 'auto'.")
|
|
parser.add_argument(
|
|
'--subscripts',
|
|
action='store_true',
|
|
help="Use unicode subscripts for showing wear.")
|
|
parser.add_argument(
|
|
'-:', '--dots',
|
|
action='store_true',
|
|
help="Use 1x2 ascii dot characters.")
|
|
parser.add_argument(
|
|
'-⣿', '--braille',
|
|
action='store_true',
|
|
help="Use 2x4 unicode braille characters. Note that braille characters "
|
|
"sometimes suffer from inconsistent widths.")
|
|
parser.add_argument(
|
|
'--chars',
|
|
help="Characters to use for read, prog, erase, noop operations.")
|
|
parser.add_argument(
|
|
'--wear-chars',
|
|
help="Characters to use for showing wear.")
|
|
parser.add_argument(
|
|
'--colors',
|
|
type=lambda x: x.split(','),
|
|
help="Colors to use for read, prog, erase, noop operations.")
|
|
parser.add_argument(
|
|
'--wear-colors',
|
|
type=lambda x: x.split(','),
|
|
help="Colors to use for showing wear.")
|
|
parser.add_argument(
|
|
'-W', '--width',
|
|
type=lambda x: int(x, 0),
|
|
help="Width in columns. 0 uses the terminal width. Defaults to "
|
|
"min(terminal, 80).")
|
|
parser.add_argument(
|
|
'-H', '--height',
|
|
type=lambda x: int(x, 0),
|
|
help="Height in rows. 0 uses the terminal height. Defaults to 1.")
|
|
parser.add_argument(
|
|
'-n', '--lines',
|
|
type=lambda x: int(x, 0),
|
|
help="Show this many lines of history. 0 uses the terminal height. "
|
|
"Defaults to 5.")
|
|
parser.add_argument(
|
|
'-z', '--cat',
|
|
action='store_true',
|
|
help="Pipe directly to stdout.")
|
|
parser.add_argument(
|
|
'-U', '--hilbert',
|
|
action='store_true',
|
|
help="Render as a space-filling Hilbert curve.")
|
|
parser.add_argument(
|
|
'-Z', '--lebesgue',
|
|
action='store_true',
|
|
help="Render as a space-filling Z-curve.")
|
|
parser.add_argument(
|
|
'-c', '--coalesce',
|
|
type=lambda x: int(x, 0),
|
|
help="Number of operations to coalesce together.")
|
|
parser.add_argument(
|
|
'-s', '--sleep',
|
|
type=float,
|
|
help="Time in seconds to sleep between reads, coalescing operations.")
|
|
parser.add_argument(
|
|
'-k', '--keep-open',
|
|
action='store_true',
|
|
help="Reopen the pipe on EOF, useful when multiple "
|
|
"processes are writing.")
|
|
sys.exit(main(**{k: v
|
|
for k, v in vars(parser.parse_intermixed_args()).items()
|
|
if v is not None}))
|