record: Improve overflow handling

Signal the accumulated item overflow count with the time of the first
new item.
This commit is contained in:
Sebastian Huber 2019-08-25 12:14:11 +02:00
parent 5fcaf843bd
commit e8a709ac8e
2 changed files with 63 additions and 25 deletions

View File

@ -98,14 +98,13 @@ static rtems_record_client_status call_handler(
}
static void signal_overflow(
const rtems_record_client_context *ctx,
rtems_record_client_per_cpu *per_cpu,
uint32_t data
rtems_record_client_per_cpu *per_cpu,
uint32_t data
)
{
per_cpu->hold_back = true;
per_cpu->item_index = 0;
call_handler( ctx, 0, RTEMS_RECORD_PER_CPU_OVERFLOW, data );
per_cpu->overflow += data;
}
static void resolve_hold_back(
@ -114,35 +113,44 @@ static void resolve_hold_back(
)
{
if ( per_cpu->hold_back ) {
uint32_t last_tail;
uint32_t last_head;
uint32_t last_capacity;
uint32_t new_head;
uint32_t overwritten;
uint32_t new_content;
uint32_t begin_index;
uint32_t index;
uint32_t first;
uint32_t last;
uint32_t delta;
uint64_t uptime;
per_cpu->hold_back = false;
last_head = per_cpu->head[ per_cpu->tail_head_index ];
last_tail = per_cpu->tail[ per_cpu->tail_head_index ];
new_head = per_cpu->head[ per_cpu->tail_head_index ^ 1 ];
overwritten = new_head - last_head;
last_capacity = ( last_tail - last_head - 1 ) & ( ctx->count - 1 );
new_content = new_head - last_head;
if ( overwritten >= per_cpu->item_index ) {
if ( new_content > last_capacity ) {
begin_index = new_content - last_capacity;
per_cpu->overflow += begin_index;
} else {
begin_index = 0;
}
if ( begin_index >= per_cpu->item_index ) {
per_cpu->item_index = 0;
return;
}
if ( overwritten > 0 ) {
call_handler( ctx, 0, RTEMS_RECORD_PER_CPU_OVERFLOW, overwritten );
}
per_cpu->hold_back = false;
first = RTEMS_RECORD_GET_TIME( per_cpu->items[ overwritten ].event );
first = RTEMS_RECORD_GET_TIME( per_cpu->items[ begin_index ].event );
last = first;
delta = 0;
uptime = 0;
for ( index = overwritten; index < per_cpu->item_index; ++index ) {
for ( index = begin_index; index < per_cpu->item_index; ++index ) {
const rtems_record_item_64 *item;
rtems_record_event event;
uint32_t time;
@ -170,7 +178,17 @@ static void resolve_hold_back(
per_cpu->uptime.time_last = first;
per_cpu->uptime.time_accumulated = 0;
for ( index = overwritten; index < per_cpu->item_index; ++index ) {
if ( per_cpu->overflow > 0 ) {
call_handler(
ctx,
per_cpu->uptime.bt,
RTEMS_RECORD_PER_CPU_OVERFLOW,
per_cpu->overflow
);
per_cpu->overflow = 0;
}
for ( index = begin_index; index < per_cpu->item_index; ++index ) {
const rtems_record_item_64 *item;
item = &per_cpu->items[ index ];
@ -185,20 +203,31 @@ static void process_per_cpu_head(
uint64_t data
)
{
uint32_t new_head;
uint32_t last_tail;
uint32_t last_head;
uint32_t capacity;
uint32_t last_capacity;
uint32_t new_tail;
uint32_t new_head;
uint32_t new_content;
uint32_t produced;
uint32_t content;
new_tail = per_cpu->tail[ per_cpu->tail_head_index ];
new_head = (uint32_t) data;
produced = new_head - per_cpu->tail[ per_cpu->tail_head_index ];
per_cpu->head[ per_cpu->tail_head_index ]= new_head;
content = new_head - new_tail;
per_cpu->head[ per_cpu->tail_head_index ] = new_head;
per_cpu->tail_head_index ^= 1;
if ( produced >= ctx->count ) {
signal_overflow( ctx, per_cpu, produced - ctx->count + 1 );
if ( content >= ctx->count ) {
/*
* This is a complete ring buffer overflow, the server will detect this
* also. It sets the tail to the head plus one and sends us all the
* content. This reduces the ring buffer capacity to zero. So, during
* transfer, new events will overwrite items in transfer. This is handled
* by resolve_hold_back().
*/
per_cpu->tail[ per_cpu->tail_head_index ^ 1 ] = new_head + 1;
signal_overflow( per_cpu, content - ctx->count + 1 );
return;
}
@ -207,6 +236,10 @@ static void process_per_cpu_head(
if ( last_tail == last_head ) {
if ( per_cpu->uptime.bt == 0 ) {
/*
* This is a special case during initial ramp up. We hold back the items
* to deduce the uptime of the first item via resolve_hold_back().
*/
per_cpu->hold_back = true;
} else {
resolve_hold_back( ctx, per_cpu );
@ -215,15 +248,15 @@ static void process_per_cpu_head(
return;
}
capacity = ( last_tail - last_head - 1 ) & ( ctx->count - 1 );
last_capacity = ( last_tail - last_head - 1 ) & ( ctx->count - 1 );
new_content = new_head - last_head;
if ( new_content <= capacity ) {
if ( new_content <= last_capacity || per_cpu->hold_back ) {
resolve_hold_back( ctx, per_cpu );
return;
}
signal_overflow( ctx, per_cpu, new_content - capacity );
signal_overflow( per_cpu, new_content - last_capacity );
}
static rtems_record_client_status process_per_cpu_count(

View File

@ -105,6 +105,11 @@ typedef struct {
*/
size_t tail_head_index;
/**
* @brief Count of lost items due to ring buffer overflows.
*/
uint32_t overflow;
/**
* @brief If true, then hold back items for overflow or initial ramp up
* processing.