add action_queue/waitable_action_queue

This commit is contained in:
xianjimli 2020-02-07 11:19:46 +08:00
parent 4193539598
commit 267d09f775
14 changed files with 2070 additions and 1187 deletions

View File

@ -476,7 +476,7 @@ static ret_t on_snapshot(void* ctx, event_t* e) {
bitmap_t* bitmap = widget_take_snapshot(window_manager());
bitmap_save_png(bitmap, "test.png");
bitmap_destroy(bitmap);
#endif/*AWTK_WEB*/
#endif /*AWTK_WEB*/
return RET_OK;
}

View File

@ -1,5 +1,11 @@
# 最新动态
* 2020/02/07
* 增加waitable\_action\_queue
* 2020/02/06
* 增加action\_queue
* 2020/02/05
* 增加拼音输入数据更新工具。
* 完善联想词库工具生成文档。

View File

@ -305,7 +305,7 @@ input_method_t* input_method_default_create(void) {
#ifndef WITHOUT_SUGGEST_WORDS
im->suggest_words = suggest_words_create((const asset_info_t*)(data_words_bin));
#endif/*WITHOUT_SUGGEST_WORDS*/
#endif /*WITHOUT_SUGGEST_WORDS*/
return im;
}

File diff suppressed because it is too large Load Diff

View File

@ -42,14 +42,22 @@ struct _tk_semaphore_t {
#endif
#include "tkc/mem.h"
#include "tkc/utils.h"
#include "tkc/time_now.h"
#include "tkc/platform.h"
#include "tkc/semaphore.h"
static uint32_t s_ano_index = 0;
tk_semaphore_t* tk_semaphore_create(uint32_t value, const char* name) {
tk_semaphore_t* semaphore = TKMEM_ZALLOC(tk_semaphore_t);
return_value_if_fail(semaphore != NULL, NULL);
if (name == NULL) {
char tname[32];
tk_snprintf(tname, sizeof(tname), "sema%u", s_ano_index++);
name = tname;
}
#ifdef HAS_PTHREAD
semaphore->sem = sem_open(name, O_CREAT, S_IRUSR | S_IWUSR, value);
if (semaphore->sem == NULL) {

101
src/tkc/action_queue.c Normal file
View File

@ -0,0 +1,101 @@
/**
* File: action_queue.c
* Author: AWTK Develop Team
* Brief: action_queue
*
* Copyright (c) 2020 - 2020 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* License file for more details.
*
*/
/**
* History:
* ================================================================
* 2020-02-06 Li XianJing <xianjimli@hotmail.com> created
*
*/
#include "tkc/mem.h"
#include "tkc/action_queue.h"
ret_t qaction_exec(qaction_t* action) {
return_value_if_fail(action != NULL && action->vt != NULL, RET_BAD_PARAMS);
if (action->vt->exec != NULL) {
return action->vt->exec(action);
}
return RET_NOT_IMPL;
}
ret_t qaction_destroy(qaction_t* action) {
return_value_if_fail(action != NULL && action->vt != NULL, RET_BAD_PARAMS);
if (action->vt->destroy != NULL) {
return action->vt->destroy(action);
}
return RET_NOT_IMPL;
}
action_queue_t* action_queue_create(uint16_t capacity) {
uint16_t size = 0;
action_queue_t* q = NULL;
return_value_if_fail(capacity > 1, NULL);
size = sizeof(action_queue_t) + (capacity - 1) * sizeof(qaction_t);
q = (action_queue_t*)TKMEM_ALLOC(size);
return_value_if_fail(q != NULL, NULL);
memset(q, 0x00, size);
q->capacity = capacity;
return q;
}
ret_t action_queue_recv(action_queue_t* q, qaction_t* action) {
return_value_if_fail(q != NULL && action != NULL, RET_BAD_PARAMS);
if (q->r != q->w || q->full) {
memcpy(action, q->actions + q->r, sizeof(*action));
if ((q->r + 1) < q->capacity) {
q->r++;
} else {
q->r = 0;
}
q->full = FALSE;
return RET_OK;
}
return RET_FAIL;
}
ret_t action_queue_send(action_queue_t* q, const qaction_t* action) {
return_value_if_fail(q != NULL && action != NULL, RET_BAD_PARAMS);
if (q->r != q->w || !q->full) {
memcpy(q->actions + q->w, action, sizeof(*action));
if ((q->w + 1) < q->capacity) {
q->w++;
} else {
q->w = 0;
}
if (q->r == q->w) {
q->full = TRUE;
}
return RET_OK;
}
return RET_FAIL;
}
ret_t action_queue_destroy(action_queue_t* q) {
return_value_if_fail(q != NULL, RET_BAD_PARAMS);
TKMEM_FREE(q);
return RET_OK;
}

129
src/tkc/action_queue.h Normal file
View File

@ -0,0 +1,129 @@
/**
* File: action_queue.h
* Author: AWTK Develop Team
* Brief: action_queue
*
* Copyright (c) 2020 - 2020 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* License file for more details.
*
*/
/**
* History:
* ================================================================
* 2020-02-06 Li XianJing <xianjimli@hotmail.com> created
*
*/
#ifndef TK_ACTION_QUEUE_H
#define TK_ACTION_QUEUE_H
#include "tkc/types_def.h"
BEGIN_C_DECLS
struct _qaction_t;
typedef struct _qaction_t qaction_t;
typedef ret_t (*qaction_exec_t)(qaction_t* action);
typedef ret_t (*qaction_destroy_t)(qaction_t* action);
typedef struct _qaction_vtable_t {
qaction_exec_t exec;
qaction_destroy_t destroy;
} qaction_vtable_t;
/**
* @class qaction_t
* actionaction_queue中
*/
struct _qaction_t {
/**
* @property {uint32_t*} extra
* @annotation ["readable"]
* (action而不同)
*/
uint32_t extra[8];
/*private*/
qaction_vtable_t* vt;
};
/**
* @method qaction_exec
*
*
* @param {qaction_t*} action action对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t qaction_exec(qaction_t* action);
/**
* @method qaction_destroy
*
*
* @param {qaction_t*} action action对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t qaction_destroy(qaction_t* action);
typedef struct _action_queue_t {
uint16_t r;
uint16_t w;
uint16_t full;
uint16_t capacity;
qaction_t actions[1];
} action_queue_t;
/**
* @method action_queue_create
* @annotation ["constructor"]
* action_queue对象
*
* @param {uint32_t} capacity action的容量
*
* @return {action_queue_t*} action_queue对象
*/
action_queue_t* action_queue_create(uint16_t capacity);
/**
* @method action_queue_recv
*
*
* @param {action_queue_t*} q action_queue对象
* @param {qaction_t*} action action对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_queue_recv(action_queue_t* q, qaction_t* action);
/**
* @method action_queue_send
*
*
* @param {action_queue_t*} q action_queue对象
* @param {const qaction_t*} action action对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_queue_send(action_queue_t* q, const qaction_t* action);
/**
* @method action_queue_destroy
*
*
* @param {action_queue_t*} q action_queue对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_queue_destroy(action_queue_t* q);
END_C_DECLS
#endif /*TK_ACTION_QUEUE_H*/

View File

@ -387,4 +387,3 @@ ret_t emitter_dispatch_simple_event(emitter_t* emitter, uint32_t type) {
ret_t emitter_forward(void* ctx, event_t* e) {
return emitter_dispatch(EMITTER(ctx), e);
}

View File

@ -0,0 +1,109 @@
/**
* File: waitable_action_queue.c
* Author: AWTK Develop Team
* Brief: waitable_action_queue
*
* Copyright (c) 2020 - 2020 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* License file for more details.
*
*/
/**
* History:
* ================================================================
* 2020-02-07 Li XianJing <xianjimli@hotmail.com> created
*
*/
#include "tkc/mem.h"
#include "tkc/waitable_action_queue.h"
waitable_action_queue_t* waitable_action_queue_create(uint16_t capacity) {
waitable_action_queue_t* q = TKMEM_ZALLOC(waitable_action_queue_t);
return_value_if_fail(q != NULL, NULL);
q->queue = action_queue_create(capacity);
goto_error_if_fail(q->queue != NULL);
q->mutex = tk_mutex_create();
goto_error_if_fail(q->mutex != NULL);
q->sema_recv = tk_semaphore_create(0, NULL);
goto_error_if_fail(q->sema_recv != NULL);
q->sema_send = tk_semaphore_create(capacity, NULL);
goto_error_if_fail(q->sema_send != NULL);
return q;
error:
if (q != NULL) {
if (q->queue != NULL) {
action_queue_destroy(q->queue);
}
if (q->mutex != NULL) {
tk_mutex_destroy(q->mutex);
}
if (q->sema_recv != NULL) {
tk_semaphore_destroy(q->sema_recv);
}
if (q->sema_send != NULL) {
tk_semaphore_destroy(q->sema_send);
}
TKMEM_FREE(q);
}
return NULL;
}
ret_t waitable_action_queue_recv(waitable_action_queue_t* q, qaction_t* action,
uint32_t timeout_ms) {
ret_t ret = RET_FAIL;
return_value_if_fail(q != NULL && action != NULL, RET_BAD_PARAMS);
if (tk_semaphore_wait(q->sema_recv, timeout_ms) == RET_OK) {
if (tk_mutex_lock(q->mutex) == RET_OK) {
ret = action_queue_recv(q->queue, action);
assert(ret == RET_OK);
ENSURE(tk_semaphore_post(q->sema_send) == RET_OK);
ENSURE(tk_mutex_unlock(q->mutex) == RET_OK);
}
}
return ret;
}
ret_t waitable_action_queue_send(waitable_action_queue_t* q, const qaction_t* action,
uint32_t timeout_ms) {
ret_t ret = RET_FAIL;
return_value_if_fail(q != NULL && action != NULL, RET_BAD_PARAMS);
if (tk_semaphore_wait(q->sema_send, timeout_ms) == RET_OK) {
if (tk_mutex_lock(q->mutex) == RET_OK) {
ret = action_queue_send(q->queue, action);
assert(ret == RET_OK);
ENSURE(tk_semaphore_post(q->sema_recv) == RET_OK);
ENSURE(tk_mutex_unlock(q->mutex) == RET_OK);
}
}
return ret;
}
ret_t waitable_action_queue_destroy(waitable_action_queue_t* q) {
return_value_if_fail(q != NULL, RET_BAD_PARAMS);
ENSURE(action_queue_destroy(q->queue) == RET_OK);
ENSURE(tk_mutex_destroy(q->mutex) == RET_OK);
ENSURE(tk_semaphore_destroy(q->sema_recv) == RET_OK);
ENSURE(tk_semaphore_destroy(q->sema_send) == RET_OK);
TKMEM_FREE(q);
return RET_OK;
}

View File

@ -0,0 +1,87 @@
/**
* File: waitable_action_queue.h
* Author: AWTK Develop Team
* Brief: waitable_action_queue
*
* Copyright (c) 2020 - 2020 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* License file for more details.
*
*/
/**
* History:
* ================================================================
* 2020-02-07 Li XianJing <xianjimli@hotmail.com> created
*
*/
#ifndef TK_WAITABLE_ACTION_QUEUE_H
#define TK_WAITABLE_ACTION_QUEUE_H
#include "tkc/mutex.h"
#include "tkc/semaphore.h"
#include "tkc/action_queue.h"
BEGIN_C_DECLS
typedef struct _waitable_action_queue_t {
action_queue_t* queue;
tk_mutex_t* mutex;
tk_semaphore_t* sema_recv;
tk_semaphore_t* sema_send;
} waitable_action_queue_t;
/**
* @method waitable_action_queue_create
* @annotation ["constructor"]
* waitable_action_queue对象
*
* @param {uint32_t} capacity action的容量
*
* @return {waitable_action_queue_t*} waitable_action_queue对象
*/
waitable_action_queue_t* waitable_action_queue_create(uint16_t capacity);
/**
* @method waitable_action_queue_recv
*
*
* @param {waitable_action_queue_t*} q waitable_action_queue对象
* @param {qaction_t*} action action对象
* @param {uint32_t} timeout_ms (ms)
*
* @return {ret_t} RET_OK表示成功
*/
ret_t waitable_action_queue_recv(waitable_action_queue_t* q, qaction_t* action,
uint32_t timeout_ms);
/**
* @method waitable_action_queue_send
*
*
* @param {waitable_action_queue_t*} q waitable_action_queue对象
* @param {const qaction_t*} action action对象
* @param {uint32_t} timeout_ms (ms)
*
* @return {ret_t} RET_OK表示成功
*/
ret_t waitable_action_queue_send(waitable_action_queue_t* q, const qaction_t* action,
uint32_t timeout_ms);
/**
* @method waitable_action_queue_destroy
*
*
* @param {waitable_action_queue_t*} q waitable_action_queue对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t waitable_action_queue_destroy(waitable_action_queue_t* q);
END_C_DECLS
#endif /*TK_WAITABLE_ACTION_QUEUE_H*/

View File

@ -30,5 +30,6 @@ SOURCES = [
env.Program(os.path.join(BIN_DIR, 'runTest'), SOURCES);
env.Program(os.path.join(BIN_DIR, 'mem_test'), ["mem_test.cpp"])
env.Program(os.path.join(BIN_DIR, 'recycle_test'), ["recycle_test.cpp"])
env.Program(os.path.join(BIN_DIR, 'waitable_action_queue_test'), ["waitable_action_queue_test.cpp"])

View File

@ -0,0 +1,48 @@
#include "gtest/gtest.h"
#include "tkc/action_queue.h"
#define NR 10
TEST(ActionQueue, basic) {
uint16_t i = 0;
qaction_t r;
qaction_t w;
action_queue_t* q = action_queue_create(NR);
memset(&r, 0x00, sizeof(r));
memset(&w, 0x00, sizeof(w));
ASSERT_EQ(q != NULL, true);
ASSERT_EQ(q->r, 0);
ASSERT_EQ(q->w, 0);
ASSERT_EQ(q->full, FALSE);
ASSERT_EQ(q->capacity, 10);
w.extra[0] = 1234;
ASSERT_EQ(action_queue_recv(q, &r), RET_FAIL);
ASSERT_EQ(action_queue_send(q, &w), RET_OK);
ASSERT_EQ(action_queue_recv(q, &r), RET_OK);
ASSERT_EQ(memcmp(&r, &w, sizeof(r)), 0);
ASSERT_EQ(action_queue_recv(q, &r), RET_FAIL);
for (i = 0; i < NR; i++) {
w.extra[0] = i;
ASSERT_EQ(action_queue_send(q, &w), RET_OK);
}
ASSERT_EQ(q->full, TRUE);
ASSERT_EQ(action_queue_send(q, &w), RET_FAIL);
for (i = 0; i < NR; i++) {
ASSERT_EQ(action_queue_recv(q, &r), RET_OK);
ASSERT_EQ(r.extra[0], i);
}
ASSERT_EQ(action_queue_recv(q, &r), RET_FAIL);
ASSERT_EQ(q->full, FALSE);
ASSERT_EQ(action_queue_send(q, &w), RET_OK);
ASSERT_EQ(action_queue_send(q, &w), RET_OK);
ASSERT_EQ(action_queue_recv(q, &r), RET_OK);
action_queue_destroy(q);
}

View File

@ -17,9 +17,6 @@ TEST(EventQueue, basic) {
ASSERT_EQ(q->full, FALSE);
ASSERT_EQ(q->capacity, 10);
memset(&r, 0x00, sizeof(r));
memset(&r, 0x00, sizeof(w));
w.pointer_event.e.type = EVT_POINTER_DOWN;
w.pointer_event.x = 100;
w.pointer_event.y = 200;

View File

@ -0,0 +1,102 @@
#include "tkc/utils.h"
#include "tkc/thread.h"
#include "tkc/waitable_action_queue.h"
#define NR 100000
static uint32_t exec_times = 0;
static uint32_t destroy_times = 0;
static waitable_action_queue_t* q;
static ret_t qaction_exec_dummy(qaction_t* req) {
exec_times++;
return RET_OK;
}
static ret_t qaction_destroy_dummy(qaction_t* req) {
destroy_times++;
return RET_OK;
}
static qaction_vtable_t qvt = {qaction_exec_dummy, qaction_destroy_dummy};
static qaction_t* qaction_init(qaction_t* a) {
memset(a, 0x00, sizeof(qaction_t));
a->vt = &qvt;
return a;
}
static void* consumer(void* args) {
uint32_t n = 0;
qaction_t action;
log_debug("consumer start\n");
while (waitable_action_queue_recv(q, &action, 3000) == RET_OK) {
n++;
qaction_exec(&action);
qaction_destroy(&action);
}
log_debug("consumer done\n");
return NULL;
}
static void* producer(void* args) {
uint32_t i = 0;
qaction_t action;
qaction_t* a = qaction_init(&action);
uint32_t id = tk_pointer_to_int(args);
log_debug("p=%u start\n", id);
for (i = 0; i < NR; i++) {
if (waitable_action_queue_send(q, a, 3000) != RET_OK) {
log_debug("send timeout\n");
break;
}
}
log_debug("p=%u done\n", id);
return NULL;
}
void test() {
tk_thread_t* c = tk_thread_create(consumer, NULL);
tk_thread_t* p1 = tk_thread_create(producer, tk_pointer_from_int(1));
tk_thread_t* p2 = tk_thread_create(producer, tk_pointer_from_int(2));
tk_thread_t* p3 = tk_thread_create(producer, tk_pointer_from_int(3));
tk_thread_t* p4 = tk_thread_create(producer, tk_pointer_from_int(4));
q = waitable_action_queue_create(9);
tk_thread_start(c);
tk_thread_start(p1);
tk_thread_start(p2);
tk_thread_start(p3);
tk_thread_start(p4);
tk_thread_join(c);
tk_thread_join(p1);
tk_thread_join(p2);
tk_thread_join(p3);
tk_thread_join(p4);
tk_thread_destroy(c);
tk_thread_destroy(p1);
tk_thread_destroy(p2);
tk_thread_destroy(p3);
tk_thread_destroy(p4);
waitable_action_queue_destroy(q);
log_debug("exec_times=%u destroy_times=%u\n", exec_times, destroy_times);
}
#include "tkc/platform.h"
int main(int argc, char* argv[]) {
platform_prepare();
test();
return 0;
}