From cce7b65398f355ebee43df699d88547f4d202548 Mon Sep 17 00:00:00 2001 From: Jon Shallow Date: Sat, 15 Mar 2025 11:17:44 +0000 Subject: [PATCH] Threads: Support multiple threads doing coap_io_process() Set up a coap_io_process() loop spread across multiple threads, each thread repeatably calling coap_io_process() and so able to handle any input packets. Note that the application handlers need to be thread-safe for application level activity. Add 2 new main functions coap_io_process_loop() coap_io_process_terminate_loop() Add in 2 new sub functions coap_io_process_configure_threads() coap_io_process_remove_threads() to set up the parallel threads. Update example servers to include using coap_io_process_loop(). Support optional thread number logging by configuration to maintain backwards logging format output if required. --- CMakeLists.txt | 11 + cmake_coap_defines.h.in | 3 + coap_config.h.contiki | 3 + coap_config.h.riot | 7 + coap_config.h.windows | 5 + configure.ac | 18 +- examples/coap-rd.c | 23 ++- examples/coap-server.c | 66 ++++-- include/coap3/coap_mutex_internal.h | 8 + include/coap3/coap_net.h | 67 ++++++ include/coap3/coap_net_internal.h | 30 +++ include/coap3/libcoap.h | 10 + libcoap-3.map | 4 + libcoap-3.sym | 4 + man/Makefile.am | 1 + man/coap_handler.txt.in | 4 + man/coap_io.txt.in | 3 + man/coap_io_loop.txt.in | 307 ++++++++++++++++++++++++++++ man/coap_observe.txt.in | 2 +- src/coap_debug.c | 34 ++- src/coap_io.c | 236 +++++++++++++++++++-- src/coap_net.c | 17 ++ src/coap_netif.c | 10 +- src/coap_resource.c | 2 +- src/coap_session.c | 2 +- src/coap_tcp.c | 10 +- src/coap_threadsafe.c | 106 ++++++++++ 27 files changed, 947 insertions(+), 46 deletions(-) create mode 100644 man/coap_io_loop.txt.in diff --git a/CMakeLists.txt b/CMakeLists.txt index ddf6473d..42f0b42e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -156,6 +156,10 @@ option( ENABLE_THREAD_RECURSIVE_LOCK_CHECK "enable building with thread recursive lock detection" OFF) +option( + ENABLE_THREAD_NUM_LOGGING + "enable building with thread number logging support" + ON) option( ENABLE_SMALL_STACK "enable if the system has small stack size" @@ -274,6 +278,7 @@ check_include_file(sys/epoll.h HAVE_EPOLL_H) check_include_file(sys/timerfd.h HAVE_TIMERFD_H) check_include_file(arpa/inet.h HAVE_ARPA_INET_H) check_include_file(stdbool.h HAVE_STDBOOL_H) +check_include_file(signal.h HAVE_SIGNAL_H) check_include_file(netdb.h HAVE_NETDB_H) check_include_file(pthread.h HAVE_PTHREAD_H) check_include_file(stdlib.h HAVE_STDINT_H) @@ -428,6 +433,11 @@ if(${ENABLE_THREAD_RECURSIVE_LOCK_CHECK}) message(STATUS "compiling with thread recursive lock detection support") endif() +if(${ENABLE_THREAD_NUM_LOGGING}) + set(COAP_THREAD_SAFE "${ENABLE_THREAD_NUM_LOGGING}") + message(STATUS "compiling with thread number logging support") +endif() + if(${ENABLE_SMALL_STACK}) set(COAP_CONSTRAINED_STACK "${ENABLE_SMALL_STACK}") message(STATUS "compiling with small stack support") @@ -694,6 +704,7 @@ message(STATUS "ENABLE_OSCORE:...................${ENABLE_OSCORE}") message(STATUS "ENABLE_ASYNC:....................${ENABLE_ASYNC}") message(STATUS "ENABLE_THREAD_SAFE:..............${ENABLE_THREAD_SAFE}") message(STATUS "ENABLE_THREAD_RECURSIVE_CHECK....${ENABLE_THREAD_RECURSIVE_LOCK_CHECK}") +message(STATUS "ENABLE_THREAD_NUM_LOGGING........${ENABLE_THREAD_NUM_LOGGING}") message(STATUS "ENABLE_DOCS:.....................${ENABLE_DOCS}") message(STATUS "ENABLE_EXAMPLES:.................${ENABLE_EXAMPLES}") message(STATUS "DTLS_BACKEND:....................${DTLS_BACKEND}") diff --git a/cmake_coap_defines.h.in b/cmake_coap_defines.h.in index 95da3e9a..a8f9d540 100644 --- a/cmake_coap_defines.h.in +++ b/cmake_coap_defines.h.in @@ -62,6 +62,9 @@ /* Define to 1 if the library has thread safe support. */ #cmakedefine COAP_THREAD_SAFE @COAP_THREAD_SAFE@ +/* Define to 1 if the library has thread number logging support. */ +#cmakedefine COAP_THREAD_NUM_LOGGING @COAP_THREAD_NUM_LOGGING@ + /* Define to 1 if the system has libgnutls28 */ #cmakedefine COAP_WITH_LIBGNUTLS @COAP_WITH_LIBGNUTLS@ diff --git a/coap_config.h.contiki b/coap_config.h.contiki index 9033672c..74342d9a 100644 --- a/coap_config.h.contiki +++ b/coap_config.h.contiki @@ -56,6 +56,9 @@ /* Define to 1 if libcoap has thread safe support. */ /* #undef COAP_THREAD_SAFE 1 */ +/* Define to 1 if libcoap has thread number logging support. */ +/* #undef COAP_THREAD_NUM_LOGGING 1 */ + /* Define to 1 if you have the header file. */ #define HAVE_ASSERT_H 1 diff --git a/coap_config.h.riot b/coap_config.h.riot index d38e824f..c5241ca9 100644 --- a/coap_config.h.riot +++ b/coap_config.h.riot @@ -144,6 +144,13 @@ #endif /* COAP_THREAD_SAFE */ #endif /* CONFIG_LIBCOAP_THREAD_SAFE */ +#ifdef CONFIG_LIBCOAP_THREAD_NUM_LOGGING +#ifndef COAP_THREAD_NUM_LOGGING +/* Define to 1 if libcoap has thread number logging support. */ +#define COAP_THREAD_NUM_LOGGING 1 +#endif /* COAP_THREAD_SAFE */ +#endif /* CONFIG_LIBCOAP_THREAD_NUM_LOGGING */ + #ifdef CONFIG_LIBCOAP_THREAD_RECURSIVE_CHECK #ifndef COAP_THREAD_RECURSIVE_CHECK /* Define to 1 to build with thread recursive lock detection support. */ diff --git a/coap_config.h.windows b/coap_config.h.windows index 350822a4..234152f7 100644 --- a/coap_config.h.windows +++ b/coap_config.h.windows @@ -155,6 +155,11 @@ #define COAP_THREAD_RECURSIVE_CHECK 0 #endif +#ifndef COAP_THREAD_NUM_LOGGING +/* Define to 1 if libcoap has thread number logging support. */ +#define COAP_THREAD_NUM_LOGGING 0 +#endif + #ifndef PACKAGE_BUGREPORT /* Define to the address where bug reports for this package should be sent. */ #define PACKAGE_BUGREPORT "libcoap-developers@lists.sourceforge.net" diff --git a/configure.ac b/configure.ac index 6fae7f21..6360442e 100644 --- a/configure.ac +++ b/configure.ac @@ -1102,7 +1102,7 @@ fi # Checks for header files. AC_CHECK_HEADERS([assert.h arpa/inet.h limits.h netdb.h netinet/in.h \ - pthread.h errno.h winsock2.h ws2tcpip.h \ + pthread.h errno.h winsock2.h ws2tcpip.h signal.h \ stdlib.h string.h strings.h sys/socket.h sys/time.h \ time.h unistd.h sys/unistd.h sys/ioctl.h net/if.h ifaddrs.h]) @@ -1148,6 +1148,16 @@ if test "x$enable_recursive_detection" = "xyes"; then AC_DEFINE(COAP_THREAD_RECURSIVE_CHECK, 1, [Define to 1 detect recursive locking detection support]) fi +AC_ARG_ENABLE([thread-num-logging], + [AS_HELP_STRING([--enable-thread-num-logging], + [Enable building with logging the thread number [default=yes]])], + [enable_thread_num_logging="$enableval"], + [enable_thread_num_logging="yes"]) + +if test "x$enable_thread_num_logging" = "xyes"; then + AC_DEFINE(COAP_THREAD_NUM_LOGGING, 1, [Define to 1 if libcoap has thread number logging support]) +fi + AC_ARG_ENABLE([small-stack], [AS_HELP_STRING([--enable-small-stack], [Use small-stack if the available stack space is restricted [default=no]])], @@ -1320,8 +1330,8 @@ AC_SUBST(PREDEFINED_CFLAGS) # And finally combining the CFLAGS together ... CFLAGS="$CFLAGS $ADDITIONAL_CFLAGS" -# Override the various template files, currently just makefiles and the -# pkgconfig *.pc file. +# Override the various template files, currently just makefiles, man pages +# and the pkgconfig *.pc file. # Later if the API version is changing don't forget to change the # libcoap-$LIBCOAP_API_VERSION.pc.in file too!! You will have to change # the 'Cflags' variable to something like @@ -1345,6 +1355,7 @@ man/coap_endpoint_server.txt man/coap_handler.txt man/coap_init.txt man/coap_io.txt +man/coap_io_loop.txt man/coap_keepalive.txt man/coap_locking.txt man/coap_logging.txt @@ -1505,6 +1516,7 @@ else fi AC_MSG_RESULT([ enable thread safe code : "$enable_thread_safe"]) AC_MSG_RESULT([ enable recursive lock check : "$enable_recursive_detection"]) +AC_MSG_RESULT([ enable thread number logging : "$enable_thread_num_logging"]) if test "x$build_doxygen" = "xyes"; then AC_MSG_RESULT([ build doxygen pages : "yes"]) AC_MSG_RESULT([ --> Doxygen around : "yes" ($DOXYGEN $doxygen_version)]) diff --git a/examples/coap-rd.c b/examples/coap-rd.c index 2989a1ce..0924d7b5 100644 --- a/examples/coap-rd.c +++ b/examples/coap-rd.c @@ -44,6 +44,14 @@ #endif #include +#include + +#if COAP_THREAD_SAFE +/* Define the number of coap_io_process() threads required */ +#ifndef NUM_SERVER_THREADS +#define NUM_SERVER_THREADS 3 +#endif /* NUM_SERVER_THREADS */ +#endif /* COAP_THREAD_SAFE */ #define COAP_RESOURCE_CHECK_TIME 2 @@ -126,13 +134,16 @@ resource_rd_delete(void *ptr) { rd_delete(ptr); } -static int quit = 0; +static volatile int quit = 0; /* SIGINT handler: set quit to 1 for graceful termination */ static void handle_sigint(int signum COAP_UNUSED) { quit = 1; coap_send_recv_terminate(); +#if NUM_SERVER_THREADS + coap_io_process_terminate_loop(); +#endif /* NUM_SERVER_THREADS */ } static void @@ -754,7 +765,6 @@ cmdline_read_extended_token_size(char *arg) { int main(int argc, char **argv) { coap_context_t *ctx; - int result; char addr_str[NI_MAXHOST] = "::"; char port_str[NI_MAXSERV] = "5683"; char *group = NULL; @@ -864,12 +874,21 @@ main(int argc, char **argv) { sigaction(SIGPIPE, &sa, NULL); #endif +#if NUM_SERVER_THREADS + if (!coap_io_process_loop(ctx, NULL, NULL, COAP_RESOURCE_CHECK_TIME * 1000, + NUM_SERVER_THREADS)) { + coap_log_err("coap_io_process_loop: Failed\n"); + } +#else while (!quit) { + int result; + result = coap_io_process(ctx, COAP_RESOURCE_CHECK_TIME * 1000); if (result >= 0) { /* coap_check_resource_list( ctx ); */ } } +#endif coap_free_context(ctx); coap_cleanup(); diff --git a/examples/coap-server.c b/examples/coap-server.c index 0b4edbf7..35db0324 100644 --- a/examples/coap-server.c +++ b/examples/coap-server.c @@ -60,6 +60,13 @@ strndup(const char *s1, size_t n) { #include #include +#if COAP_THREAD_SAFE +/* Define the number of coap_io_process() threads required */ +#ifndef NUM_SERVER_THREADS +#define NUM_SERVER_THREADS 3 +#endif /* NUM_SERVER_THREADS */ +#endif /* COAP_THREAD_SAFE */ + #ifndef min #define min(a,b) ((a) < (b) ? (a) : (b)) #endif @@ -71,7 +78,7 @@ static char *tls_engine_conf = NULL; static int ec_jpake = 0; /* set to 1 to request clean server shutdown */ -static int quit = 0; +static volatile int quit = 0; /* set to 1 if persist information is to be kept on server shutdown */ static int keep_persist = 0; @@ -182,6 +189,9 @@ static void handle_sigint(int signum COAP_UNUSED) { quit = 1; coap_send_recv_terminate(); +#if NUM_SERVER_THREADS + coap_io_process_terminate_loop(); +#endif /* NUM_SERVER_THREADS */ } #ifndef _WIN32 @@ -194,6 +204,9 @@ static void handle_sigusr2(int signum COAP_UNUSED) { quit = 1; keep_persist = 1; +#if NUM_SERVER_THREADS + coap_io_process_terminate_loop(); +#endif /* NUM_SERVER_THREADS */ } #endif /* ! _WIN32 */ @@ -2299,12 +2312,30 @@ syslog_handler(coap_log_t level, const char *message) { } #endif /* ! _WIN32 */ +/* + * This function only initiates an Observe unsolicited response when the time + * (in seconds) changes. + */ +static void +do_time_observe_code(void *arg) { + static coap_time_t t_last = 0; + coap_time_t t_now; + coap_tick_t now; + + (void)arg; + coap_ticks(&now); + t_now = coap_ticks_to_rt(now); + if (t_now != t_last) { + t_last = t_now; + coap_resource_notify_observers(time_resource, NULL); + } +} + int main(int argc, char **argv) { coap_context_t *ctx = NULL; char *group = NULL; char *group_if = NULL; - coap_tick_t now; char addr_str[NI_MAXHOST] = "::"; char *port_str = NULL; int opt; @@ -2312,10 +2343,6 @@ main(int argc, char **argv) { coap_log_t log_level = COAP_LOG_WARN; coap_log_t dtls_log_level = COAP_LOG_ERR; unsigned wait_ms; - coap_time_t t_last = 0; - int coap_fd; - fd_set m_readfds; - int nfds = 0; size_t i; int exit_code = 0; uint32_t max_block_size = 0; @@ -2610,6 +2637,18 @@ main(int argc, char **argv) { } } + wait_ms = COAP_RESOURCE_CHECK_TIME * 1000; + +#if NUM_SERVER_THREADS + if (!coap_io_process_loop(ctx, time_resource ? do_time_observe_code : NULL, + NULL, wait_ms, NUM_SERVER_THREADS)) { + coap_log_err("coap_io_process_loop: Failed\n"); + } +#else + int nfds = 0; + int coap_fd; + fd_set m_readfds; + coap_fd = coap_context_get_coap_fd(ctx); if (coap_fd != -1) { /* if coap_fd is -1, then epoll is not supported within libcoap */ @@ -2618,10 +2657,9 @@ main(int argc, char **argv) { nfds = coap_fd + 1; } - wait_ms = COAP_RESOURCE_CHECK_TIME * 1000; - while (!quit) { int result; + coap_tick_t now; if (coap_fd != -1) { /* @@ -2679,23 +2717,19 @@ main(int argc, char **argv) { wait_ms = COAP_RESOURCE_CHECK_TIME * 1000; } if (time_resource) { - coap_time_t t_now; unsigned int next_sec_ms; - coap_ticks(&now); - t_now = coap_ticks_to_rt(now); - if (t_last != t_now) { - /* Happens once per second */ - t_last = t_now; - coap_resource_notify_observers(time_resource, NULL); - } + do_time_observe_code(NULL); + /* need to wait until next second starts if wait_ms is too large */ + coap_ticks(&now); next_sec_ms = 1000 - (now % COAP_TICKS_PER_SECOND) * 1000 / COAP_TICKS_PER_SECOND; if (next_sec_ms && next_sec_ms < wait_ms) wait_ms = next_sec_ms; } } +#endif /* NUM_SERVER_THREADS */ exit_code = 0; finish: diff --git a/include/coap3/coap_mutex_internal.h b/include/coap3/coap_mutex_internal.h index bdbd0c93..c42508d0 100644 --- a/include/coap3/coap_mutex_internal.h +++ b/include/coap3/coap_mutex_internal.h @@ -150,4 +150,12 @@ typedef int coap_mutex_t; #endif /* !WITH_CONTIKI && !WITH_LWIP && !RIOT_VERSION && !HAVE_PTHREAD_H && !HAVE_PTHREAD_MUTEX_LOCK */ +#if COAP_THREAD_SAFE + +extern coap_mutex_t m_show_pdu; +extern coap_mutex_t m_log_impl; +extern coap_mutex_t m_io_threads; + +#endif /* COAP_THREAD_SAFE */ + #endif /* COAP_MUTEX_INTERNAL_H_ */ diff --git a/include/coap3/coap_net.h b/include/coap3/coap_net.h index 07b117d3..e979c279 100644 --- a/include/coap3/coap_net.h +++ b/include/coap3/coap_net.h @@ -855,6 +855,73 @@ struct epoll_event; COAP_API void coap_io_do_epoll(coap_context_t *ctx, struct epoll_event *events, size_t nevents); +/** + * Main thread coap_io_process_loop activity. + * + * This function should not do any blocking. + * + * @param arg The value of main_loop_code_arg passed into coap_io_process_loop(). + * + */ +typedef void (*coap_io_process_thread_t)(void *arg); + +/** + * Do the coap_io_process() across @p thread_count threads. + * The main thread will invoke @p main_loop_code (if defined) at least + * every @p timeout_ms. + * + * Note: If multi-threading protection is not in place, then @p thread_count + * is ignored and only a single thread runs (but still executes + * @p main_loop_code) + * + * Note: To stop the threads and continual looping, + * coap_io_process_terminate_loop() should be called. + * + * @param context The current CoAP context. + * @param main_loop_code The name of the function to execute in the main + * thread or NULL if not required. This function should + * not do any blocking. + * @param main_loop_code_arg The argument to pass to @p main_loop_code. + * @param timeout_ms The maximum amount of time the main thread should delay up + * to (i.e. timeout parameter for coap_io_process()) before + * the loop starts again. + * @param thread_count The number of threads to run. + * + */ +COAP_API int coap_io_process_loop(coap_context_t *context, + coap_io_process_thread_t main_loop_code, + void *main_loop_code_arg, uint32_t timeout_ms, + uint32_t thread_count); + +/** + * Terminate all the additional threads created by coap_io_process_loop() + * and break out of the main thread loop to return from coap_io_process_loop(). + * + * Typically this would be called from within a SIGQUIT handler. + * + */ +void coap_io_process_terminate_loop(void); + +/** + * Configure a defined number of threads to do the alternate coap_io_process() + * work with traffic load balanced across the threads based on inactive + * threads. + * + * @param context Context. + * @param thread_count The number of threads to configure. + * + * @return 1 success or 0 on failure. + */ +int coap_io_process_configure_threads(coap_context_t *context, + uint32_t thread_count); + +/** + * Release the coap_io_process() worker threads. + * + * @param context Context. + */ +void coap_io_process_remove_threads(coap_context_t *context); + /** * Get the libcoap internal file descriptor for a socket. This can be used to * integrate libcoap in an external event loop instead of using one of its diff --git a/include/coap3/coap_net_internal.h b/include/coap3/coap_net_internal.h index 3ad54893..fb2ccef7 100644 --- a/include/coap3/coap_net_internal.h +++ b/include/coap3/coap_net_internal.h @@ -769,6 +769,36 @@ unsigned int coap_io_prepare_io_lkd(coap_context_t *ctx, */ int coap_io_process_lkd(coap_context_t *ctx, uint32_t timeout_ms); +/** + * Do the coap_io_process() across @p thread_count threads. + * The main thread will invoke @p main_loop_code (if defined) at least + * every @p timeout_ms. + * + * Note: This function must be called in the locked state. + * + * Note: If multi-threading protection is not in place, then @p thread_count + * is ignored and only a single thread runs (but still executes + * @p main_loop_code) + * + * Note: To stop the threads and continual looping, + * coap_io_process_terminate_loop() should be called. + * + * @param context The current CoAP context. + * @param main_loop_code The name of the function to execute in the main + * thread or NULL if not required. This function should + * not do any blocking. + * @param main_loop_code_arg The argument to pass to @p main_loop_code. + * @param timeout_ms The maximum amount of time the main thread should delay up + * to (i.e. timeout parameter for coap_io_process()) before + * the loop starts again. + * @param thread_count The number of threads to run. + * + */ +int coap_io_process_loop_lkd(coap_context_t *context, + coap_io_process_thread_t main_loop_code, + void *main_loop_code_arg, uint32_t timeout_ms, + uint32_t thread_count); + #if !defined(RIOT_VERSION) && !defined(WITH_CONTIKI) /** * The main message processing loop with additional fds for internal select. diff --git a/include/coap3/libcoap.h b/include/coap3/libcoap.h index fd7fefa3..c434cfb3 100644 --- a/include/coap3/libcoap.h +++ b/include/coap3/libcoap.h @@ -71,6 +71,16 @@ typedef USHORT in_port_t; # endif /* __GNUC__ */ #endif /* COAP_UNUSED */ +#ifndef COAP_THREAD_LOCAL_VAR +# ifdef __GNUC__ +# define COAP_THREAD_LOCAL_VAR __thread +# elif defined(_MSC_VER) +# define COAP_THREAD_LOCAL_VAR __declspec(thread) +# else /* ! __GNUC__ && ! _MSC_VER */ +# define COAP_THREAD_LOCAL_VAR +# endif +#endif + #ifndef COAP_API #define COAP_API #endif diff --git a/libcoap-3.map b/libcoap-3.map index 495aa40c..ecb979fd 100644 --- a/libcoap-3.map +++ b/libcoap-3.map @@ -131,6 +131,10 @@ global: coap_io_prepare_epoll; coap_io_prepare_io; coap_io_process; + coap_io_process_configure_threads; + coap_io_process_loop; + coap_io_process_remove_threads; + coap_io_process_terminate_loop; coap_io_process_with_fds; coap_ipv4_is_supported; coap_ipv6_is_supported; diff --git a/libcoap-3.sym b/libcoap-3.sym index ccec4340..f7904a74 100644 --- a/libcoap-3.sym +++ b/libcoap-3.sym @@ -129,6 +129,10 @@ coap_io_pending coap_io_prepare_epoll coap_io_prepare_io coap_io_process +coap_io_process_configure_threads +coap_io_process_loop +coap_io_process_remove_threads +coap_io_process_terminate_loop coap_io_process_with_fds coap_ipv4_is_supported coap_ipv6_is_supported diff --git a/man/Makefile.am b/man/Makefile.am index 7e8dffea..9dca0727 100644 --- a/man/Makefile.am +++ b/man/Makefile.am @@ -31,6 +31,7 @@ TXT3 = coap_address.txt \ coap_handler.txt \ coap_init.txt \ coap_io.txt \ + coap_io_loop.txt \ coap_keepalive.txt \ coap_locking.txt \ coap_logging.txt \ diff --git a/man/coap_handler.txt.in b/man/coap_handler.txt.in index 59e95da0..d3c2d25f 100644 --- a/man/coap_handler.txt.in +++ b/man/coap_handler.txt.in @@ -54,6 +54,10 @@ DESCRIPTION This documents the different callback handlers that can optionally be invoked on receipt of a packet or when a timeout occurs. +*NOTE:* If multi-thread activity is supported, these callback handlers need to +be thread-safe at the application level as multiple threads could be executing +the same handler code. + FUNCTIONS --------- diff --git a/man/coap_io.txt.in b/man/coap_io.txt.in index 1623289d..9a332c9f 100644 --- a/man/coap_io.txt.in +++ b/man/coap_io.txt.in @@ -137,6 +137,9 @@ _timeout_ms_ set to COAP_IO_NO_WAIT. + will then be using *epoll* internally to process all the file descriptors of the different sessions. +:NOTE:* With multi-threading protection enabled, it is possible to +call *coap_io_process*() from multiple threads to do some load balancing. + See EXAMPLES below. *Function: coap_io_prepare_epoll()* diff --git a/man/coap_io_loop.txt.in b/man/coap_io_loop.txt.in new file mode 100644 index 00000000..7f5b9458 --- /dev/null +++ b/man/coap_io_loop.txt.in @@ -0,0 +1,307 @@ +// -*- mode:doc; -*- +// vim: set syntax=asciidoc tw=0 + +coap_io_loop(3) +=============== +:doctype: manpage +:man source: coap_io_loop +:man version: @PACKAGE_VERSION@ +:man manual: libcoap Manual + +NAME +---- +coap_io_loop, +coap_io_process_loop, +coap_io_process_terminate_loop, +coap_io_process_configure_threads, +coap_io_process_remove_threads +- Work with CoAP threads doing coap_io_process + +SYNOPSIS +-------- +*#include * + +*int coap_io_process_loop(coap_context_t *_context_, +coap_io_process_thread_t _main_loop_code_, void *_main_loop_code_arg_, +uint32_t _timeout_ms_, uint32_t _thread_count_);* + +*void coap_io_process_terminate_loop(void);* + +*int coap_io_process_configure_threads(coap_context_t *_context_, +uint32_t _thread_count_);* + +*void coap_io_process_remove_threads(coap_context_t *_context_);* + +For specific (D)TLS library support, link with +*-lcoap-@LIBCOAP_API_VERSION@-notls*, *-lcoap-@LIBCOAP_API_VERSION@-gnutls*, +*-lcoap-@LIBCOAP_API_VERSION@-openssl*, *-lcoap-@LIBCOAP_API_VERSION@-mbedtls*, +*-lcoap-@LIBCOAP_API_VERSION@-wolfssl* +or *-lcoap-@LIBCOAP_API_VERSION@-tinydtls*. Otherwise, link with +*-lcoap-@LIBCOAP_API_VERSION@* to get the default (D)TLS library support. + +DESCRIPTION +----------- +This man page focuses on setting up and supporting multiple threads, each one +invoking *coap_io_process*(3) in a loop. + +Each thread can receive a packet (a different packet for each thread), call the +appropriate application call-back handler and potentially be spending time of +consequence in that handler without blocking the reciept of other input traffic. + +These functions should be called from the main thread. It is assumed that +thread-safe code has been enabled. + +FUNCTIONS +--------- + +*Function: coap_io_process_loop()* + +The *coap_io_process_loop*() function is used to set up additional threads that +are in a loop just calling *coap_io_process*(3) with COAP_IO_WAIT. These +threads are terminated when *coap_io_process_terminate_loop*() is called. +The thread calling *coap_io_process_loop*() will also be in a separate loop +that is calling *coap_io_process*(3), optionally calling _main_loop_code_ +(if not NULL) with argument _main_loop_code_arg_. For the thread calling +*coap_io_process_loop*(), it will call _main_loop_code_ at least every +_timeout_ms_ milli-secs, the call to _main_loop_code_ start time aligned to +the nearest second. + +_context_ defines the context to associate the threads with. _thread_count_ +is the number of threads to be running *coap_io_process*(3) which includes the +*coap_io_process_loop*() calling thread in the count. + +*Function: coap_io_process_terminate_loop()* + +The *coap_io_process_terminate_loop*() function is used to terminate any added +threads running under the control of *coap_io_process_loop*() and causing the +thread that called *coap_io_process_loop*() to return. + +*Function: coap_io_process_configure_threads()* + +The *coap_io_process_configure_threads*() function is used to set up +an additional _thread_count_ threads for _context_. Usually +*coap_io_process_loop*() would be called, but can be used to wrap with +*coap_io_process_remove_threads*() a complex version of _main_loop_code_. + +*coap_io_process_loop*() uses *coap_io_process_configure_threads*() and +*coap_io_process_remove_threads*() to wrap the call to _main_loop_code_. + +*Function: coap_io_process_remove_threads()* + +The *coap_io_process_remove_threads*() function is used stop and remove +threads created by *coap_io_process_configure_threads*() for _context_. + +RETURN VALUES +------------- +*coap_io_process_loop*(), *coap_io_process_configure_threads*() return 1 on success +else 0 on failure. + +EXAMPLES +-------- +*coap_io_process_loop()* + +[source, c] +---- +#include +#include +#include +#include +#include + +#if COAP_THREAD_SAFE +/* Define the number of coap_io_process() threads required */ +#ifndef NUM_SERVER_THREADS +#define NUM_SERVER_THREADS 3 +#endif /* NUM_SERVER_THREADS */ +#endif /* COAP_THREAD_SAFE */ + +static volatile int quit = 0; +coap_resource_t *time_resource; +static time_t my_clock_base = 0; + +/* SIGINT handler: set quit to 1 for graceful termination */ +static void +handle_sigint(int signum COAP_UNUSED) { + quit = 1; +#if NUM_SERVER_THREADS + coap_io_process_terminate_loop(); +#endif /* NUM_SERVER_THREADS */ +} + +static void +hnd_get_fetch_time(coap_resource_t *resource, + coap_session_t *session, + const coap_pdu_t *request, + const coap_string_t *query, + coap_pdu_t *response) { + unsigned char buf[40]; + size_t len; + time_t now; + coap_tick_t t; + (void)request; + coap_pdu_code_t code = coap_pdu_get_code(request); + size_t size; + const uint8_t *data; + coap_str_const_t *ticks = coap_make_str_const("ticks"); + + if (my_clock_base) { + + /* calculate current time */ + coap_ticks(&t); + now = my_clock_base + (t / COAP_TICKS_PER_SECOND); + + /* coap_get_data() sets size to 0 on error */ + (void)coap_get_data(request, &size, &data); + + if (code == COAP_REQUEST_CODE_GET && query != NULL && + coap_string_equal(query, ticks)) { + /* parameter is in query, output ticks */ + len = snprintf((char *)buf, sizeof(buf), "%" PRIi64, (int64_t)now); + } else if (code == COAP_REQUEST_CODE_FETCH && size == ticks->length && + memcmp(data, ticks->s, ticks->length) == 0) { + /* parameter is in data, output ticks */ + len = snprintf((char *)buf, sizeof(buf), "%" PRIi64, (int64_t)now); + } else { /* output human-readable time */ + struct tm *tmp; + tmp = gmtime(&now); + if (!tmp) { + /* If 'now' is not valid */ + coap_pdu_set_code(response, COAP_RESPONSE_CODE_NOT_FOUND); + return; + } else { + len = strftime((char *)buf, sizeof(buf), "%b %d %H:%M:%S", tmp); + } + } + coap_pdu_set_code(response, COAP_RESPONSE_CODE_CONTENT); + coap_add_data_large_response(resource, session, request, response, + query, COAP_MEDIATYPE_TEXT_PLAIN, 1, 0, + len, + buf, NULL, NULL); + } else { + /* if my_clock_base was deleted, we pretend to have no such resource */ + coap_pdu_set_code(response, COAP_RESPONSE_CODE_NOT_FOUND); + } +} + +/* + * This function sends off an Observe unsolicited response when the time + * (based on seconds) changes. + */ +static void +do_time_observe_code(void *arg) { + static coap_time_t t_last = 0; + coap_time_t t_now; + coap_tick_t now; + + (void)arg; + coap_ticks(&now); + t_now = coap_ticks_to_rt(now); + if (t_now != t_last) { + t_last = t_now; + coap_resource_notify_observers(time_resource, NULL); + } +} + +static void +init_resources(coap_context_t *ctx) { + coap_resource_t *r; + + my_clock_base = time(NULL); + r = coap_resource_init(coap_make_str_const("time"), 0); + coap_register_request_handler(r, COAP_REQUEST_GET, hnd_get_fetch_time); + coap_register_request_handler(r, COAP_REQUEST_FETCH, hnd_get_fetch_time); + coap_resource_set_get_observable(r, 1); + + coap_add_attr(r, coap_make_str_const("ct"), coap_make_str_const("0"), 0); + coap_add_attr(r, coap_make_str_const("title"), coap_make_str_const("\"Internal Clock\""), 0); + coap_add_attr(r, coap_make_str_const("rt"), coap_make_str_const("\"ticks\""), 0); + coap_add_attr(r, coap_make_str_const("if"), coap_make_str_const("\"clock\""), 0); + + coap_add_resource(ctx, r); + time_resource = r; +} + +int +main(int argc, char **argv) { + unsigned wait_ms; + coap_context_t *ctx; + + (void)argc; + (void)argv; + + ctx = coap_new_context(NULL); + signal(SIGINT, handle_sigint); + + init_resources(ctx); + + /* Other general start up code */ + + wait_ms = 1000; +#if NUM_SERVER_THREADS + if (!coap_io_process_loop(ctx, do_time_observe_code, NULL, wait_ms, + NUM_SERVER_THREADS)) { + coap_log_err("coap_io_process_loop: Startup failed\n"); + } +#else /* ! NUM_SERVER_THREADS */ + while (!quit) { + unsigned int next_sec_ms; + int result; + coap_tick_t now; + + /* + * result is time spent in coap_io_process() + */ + result = coap_io_process(ctx, wait_ms); + if (result < 0) { + break; + } else if (result && (unsigned)result < wait_ms) { + /* decrement if there is a result wait time returned */ + wait_ms -= result; + } else { + /* + * result == 0, or result >= wait_ms + * (wait_ms could have decremented to a small value, below + * the granularity of the timer in coap_io_process() and hence + * result == 0) + */ + wait_ms = 1000; + } + + do_time_observe_code(NULL); + + /* need to wait until next second starts if wait_ms is too large */ + coap_ticks(&now); + next_sec_ms = 1000 - (now % COAP_TICKS_PER_SECOND) * + 1000 / COAP_TICKS_PER_SECOND; + if (next_sec_ms && next_sec_ms < wait_ms) + wait_ms = next_sec_ms; + } +#endif /* ! NUM_SERVER_THREADS */ + + /* General close down code */ +} +---- + +SEE ALSO +-------- +*coap_io_process*(3) + +FURTHER INFORMATION +------------------- +See + +"https://rfc-editor.org/rfc/rfc7252[RFC7252: The Constrained Application Protocol (CoAP)]" + +for further information. + +BUGS +---- +Please raise an issue on GitHub at +https://github.com/obgm/libcoap/issues to report any bugs. + +Please raise a Pull Request at https://github.com/obgm/libcoap/pulls +for any fixes. + +AUTHORS +------- +The libcoap project diff --git a/man/coap_observe.txt.in b/man/coap_observe.txt.in index 42711057..06228048 100644 --- a/man/coap_observe.txt.in +++ b/man/coap_observe.txt.in @@ -75,7 +75,7 @@ appropriate GET/FETCH handler within the server application is called to fill in the response packet with the appropriate information. This "fake GET/FETCH request" is triggered by a call to *coap_resource_notify_observers*(). -The call to *coap_io_process*() in the main server application i/o loop will do +Any call to *coap_io_process*() in the server application i/o loop will do all the necessary processing of sending any outstanding "fake GET/FETCH requests". diff --git a/src/coap_debug.c b/src/coap_debug.c index 795b9a13..e2cbec9a 100644 --- a/src/coap_debug.c +++ b/src/coap_debug.c @@ -784,7 +784,7 @@ void coap_show_pdu(coap_log_t level, const coap_pdu_t *pdu) { #if COAP_CONSTRAINED_STACK /* Proxy-Uri: can be 1034 bytes long */ - /* buf and outbuf can be protected by global_lock if needed */ + /* buf and outbuf can be protected by m_show_pdu if needed */ static unsigned char buf[min(COAP_DEBUG_BUF_SIZE, 1035)]; static char outbuf[COAP_DEBUG_BUF_SIZE]; #else /* ! COAP_CONSTRAINED_STACK */ @@ -809,6 +809,9 @@ coap_show_pdu(coap_log_t level, const coap_pdu_t *pdu) { if (level > coap_get_log_level()) return; +#if COAP_THREAD_SAFE + coap_mutex_lock(&m_show_pdu); +#endif /* COAP_THREAD_SAFE */ if (!pdu->session || COAP_PROTO_NOT_RELIABLE(pdu->session->proto)) { snprintf(outbuf, sizeof(outbuf), "v:%d t:%s c:%s i:%04x {", COAP_DEFAULT_VERSION, msg_type_string(pdu->type), @@ -1055,6 +1058,9 @@ no_more: snprintf(&outbuf[outbuflen], sizeof(outbuf)-outbuflen, "data length %zu (data suppressed)\n", data_len); COAP_DO_SHOW_OUTPUT_LINE; +#if COAP_THREAD_SAFE + coap_mutex_unlock(&m_show_pdu); +#endif /* COAP_THREAD_SAFE */ return; } @@ -1130,6 +1136,10 @@ no_more: outbuflen--; snprintf(&outbuf[outbuflen], sizeof(outbuf)-outbuflen, "\n"); COAP_DO_SHOW_OUTPUT_LINE; + +#if COAP_THREAD_SAFE + coap_mutex_unlock(&m_show_pdu); +#endif /* COAP_THREAD_SAFE */ } void @@ -1278,12 +1288,21 @@ coap_set_log_handler(coap_log_handler_t handler) { log_handler = handler; } +/* Visible to only this thread */ +extern COAP_THREAD_LOCAL_VAR uint32_t thread_no; +/* Visible across all threads */ +extern uint32_t max_thread_no; + void coap_log_impl(coap_log_t level, const char *format, ...) { +#if COAP_THREAD_SAFE + coap_mutex_lock(&m_log_impl); +#endif /* COAP_THREAD_SAFE */ + if (log_handler) { #if COAP_CONSTRAINED_STACK - /* message can be protected by global_lock if needed */ + /* message can be protected by m_log_impl if needed */ static char message[COAP_DEBUG_BUF_SIZE]; #else /* ! COAP_CONSTRAINED_STACK */ char message[COAP_DEBUG_BUF_SIZE]; @@ -1312,6 +1331,13 @@ coap_log_impl(coap_log_t level, const char *format, ...) { if (len) fprintf(log_fd, "%.*s ", (int)len, timebuf); +#if COAP_THREAD_NUM_LOGGING + if (thread_no == 0) { + thread_no = ++max_thread_no; + } + fprintf(log_fd, "%2d ", thread_no); +#endif /* COAP_THREAD_NUM_LOGGING */ + fprintf(log_fd, "%s ", coap_log_level_desc(level)); va_start(ap, format); @@ -1323,6 +1349,10 @@ coap_log_impl(coap_log_t level, const char *format, ...) { va_end(ap); fflush(log_fd); } + +#if COAP_THREAD_SAFE + coap_mutex_unlock(&m_log_impl); +#endif /* COAP_THREAD_SAFE */ } static struct packet_num_interval { diff --git a/src/coap_io.c b/src/coap_io.c index 1e158dce..80223aac 100644 --- a/src/coap_io.c +++ b/src/coap_io.c @@ -1777,6 +1777,81 @@ coap_io_process_with_fds(coap_context_t *ctx, uint32_t timeout_ms, return ret; } +#if !defined(COAP_EPOLL_SUPPORT) && COAP_THREAD_SAFE +static unsigned int +coap_io_prepare_fds(coap_context_t *ctx, + int enfds, fd_set *ereadfds, fd_set *ewritefds, + fd_set *eexceptfds) { + coap_session_t *s, *stmp; + unsigned int max_sockets = sizeof(ctx->sockets) / sizeof(ctx->sockets[0]); + coap_fd_t nfds = 0; + unsigned int i; + + ctx->num_sockets = 0; +#if COAP_SERVER_SUPPORT + coap_endpoint_t *ep; + + LL_FOREACH(ctx->endpoint, ep) { + if (ep->sock.flags & (COAP_SOCKET_WANT_READ | COAP_SOCKET_WANT_WRITE | COAP_SOCKET_WANT_ACCEPT)) { + if (ctx->num_sockets < max_sockets) + ctx->sockets[ctx->num_sockets++] = &ep->sock; + } + SESSIONS_ITER(ep->sessions, s, stmp) { + if (s->sock.flags & (COAP_SOCKET_WANT_READ|COAP_SOCKET_WANT_WRITE)) { + if (ctx->num_sockets < max_sockets) + ctx->sockets[ctx->num_sockets++] = &s->sock; + } + } + } +#endif /* COAP_SERVER_SUPPORT */ +#if COAP_CLIENT_SUPPORT + SESSIONS_ITER(ctx->sessions, s, stmp) { + if (s->sock.flags & (COAP_SOCKET_WANT_READ | + COAP_SOCKET_WANT_WRITE | + COAP_SOCKET_WANT_CONNECT)) { + if (ctx->num_sockets < max_sockets) + ctx->sockets[ctx->num_sockets++] = &s->sock; + } + } +#endif /* COAP_CLIENT_SUPPORT */ + if (ereadfds) { + ctx->readfds = *ereadfds; + nfds = enfds; + } else { + FD_ZERO(&ctx->readfds); + } + if (ewritefds) { + ctx->writefds = *ewritefds; + nfds = enfds; + } else { + FD_ZERO(&ctx->writefds); + } + if (eexceptfds) { + ctx->exceptfds = *eexceptfds; + nfds = enfds; + } else { + FD_ZERO(&ctx->exceptfds); + } + for (i = 0; i < ctx->num_sockets; i++) { + if (ctx->sockets[i]->fd + 1 > nfds) + nfds = ctx->sockets[i]->fd + 1; + if (ctx->sockets[i]->flags & COAP_SOCKET_WANT_READ) + FD_SET(ctx->sockets[i]->fd, &ctx->readfds); + if (ctx->sockets[i]->flags & COAP_SOCKET_WANT_WRITE) + FD_SET(ctx->sockets[i]->fd, &ctx->writefds); +#if !COAP_DISABLE_TCP + if (ctx->sockets[i]->flags & COAP_SOCKET_WANT_ACCEPT) + FD_SET(ctx->sockets[i]->fd, &ctx->readfds); + if (ctx->sockets[i]->flags & COAP_SOCKET_WANT_CONNECT) { + FD_SET(ctx->sockets[i]->fd, &ctx->writefds); + FD_SET(ctx->sockets[i]->fd, &ctx->exceptfds); + } +#endif /* !COAP_DISABLE_TCP */ + } + return nfds; +} +#endif /* ! COAP_EPOLL_SUPPORT && COAP_THREAD_SAFE */ + int coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms, int enfds, fd_set *ereadfds, fd_set *ewritefds, @@ -1858,7 +1933,7 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms, coap_lock_lock(ctx, return -1); } else { - result = 0; + goto all_over; } if (result < 0) { /* error */ @@ -1866,10 +1941,38 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms, coap_win_error_to_errno(); #endif if (errno != EINTR) { - coap_log_err("select: %s", coap_socket_strerror()); +#if COAP_THREAD_SAFE + if (errno == EBADF) { + coap_log_debug("select: %s\n", coap_socket_strerror()); + goto all_over; + } +#endif /* COAP_THREAD_SAFE */ + coap_log_err("select: %s\n", coap_socket_strerror()); return -1; } + goto all_over; } +#if COAP_THREAD_SAFE + /* Need to refresh what is available to read / write etc. */ + nfds = coap_io_prepare_fds(ctx, enfds, ereadfds, ewritefds, eexceptfds); + tv.tv_usec = 0; + tv.tv_sec = 0; + result = select((int)nfds, &ctx->readfds, &ctx->writefds, &ctx->exceptfds, &tv); + if (result < 0) { /* error */ +#ifdef _WIN32 + coap_win_error_to_errno(); +#endif + if (errno != EINTR) { + if (errno == EBADF) { + coap_log_debug("select: %s\n", coap_socket_strerror()); + goto all_over; + } + coap_log_err("select: %s\n", coap_socket_strerror()); + return -1; + } + goto all_over; + } +#endif /* COAP_THREAD_SAFE */ if (ereadfds) { *ereadfds = ctx->readfds; } @@ -1881,12 +1984,6 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms, } if (result > 0) { -#if COAP_THREAD_SAFE - /* Need to refresh what is available to read / write etc. */ - tv.tv_usec = 0; - tv.tv_sec = 0; - select((int)nfds, &ctx->readfds, &ctx->writefds, &ctx->exceptfds, &tv); -#endif /* COAP_THREAD_SAFE */ for (i = 0; i < ctx->num_sockets; i++) { if ((ctx->sockets[i]->flags & COAP_SOCKET_WANT_READ) && FD_ISSET(ctx->sockets[i]->fd, &ctx->readfds)) @@ -1962,6 +2059,7 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms, break; } + coap_lock_lock(ctx, return -1); #if COAP_THREAD_SAFE /* Need to refresh what is available to read / write etc. */ nfds = epoll_wait(ctx->epfd, events, COAP_MAX_EPOLL_EVENTS, 0); @@ -1970,11 +2068,9 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms, coap_log_err("epoll_wait: unexpected error: %s (%d)\n", coap_socket_strerror(), nfds); } - coap_lock_lock(ctx, return -1); break; } #endif /* COAP_THREAD_SAFE */ - coap_lock_lock(ctx, return -1); coap_io_do_epoll_lkd(ctx, events, nfds); @@ -1992,16 +2088,132 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms, #if COAP_SERVER_SUPPORT coap_expire_cache_entries(ctx); #endif /* COAP_SERVER_SUPPORT */ - coap_ticks(&now); #if COAP_ASYNC_SUPPORT /* Check to see if we need to send off any Async requests as delay might have been updated */ - coap_check_async(ctx, now); coap_ticks(&now); + coap_check_async(ctx, now); #endif /* COAP_ASYNC_SUPPORT */ +#ifndef COAP_EPOLL_SUPPORT +all_over: +#endif /* COAP_EPOLL_SUPPORT */ + coap_ticks(&now); return (int)(((now - before) * 1000) / COAP_TICKS_PER_SECOND); } + +volatile int coap_thread_quit = 0; + +void +coap_io_process_terminate_loop(void) { + coap_send_recv_terminate(); + coap_thread_quit = 1; +} + +COAP_API int +coap_io_process_loop(coap_context_t *context, + coap_io_process_thread_t main_loop_code, + void *main_loop_code_arg, uint32_t timeout_ms, + uint32_t thread_count) { + int ret; + + if (!context) + return 0; + coap_lock_lock(context, return 0); + ret = coap_io_process_loop_lkd(context, main_loop_code, + main_loop_code_arg, timeout_ms, + thread_count); + coap_lock_unlock(context); + return ret; +} + +int +coap_io_process_loop_lkd(coap_context_t *context, + coap_io_process_thread_t main_loop_code, + void *main_loop_code_arg, uint32_t timeout_ms, + uint32_t thread_count) { + int ret = 0;; + +#if COAP_THREAD_SAFE + if (thread_count > 1) { + if (!coap_io_process_configure_threads(context, thread_count - 1)) + return 0; + } +#else /* COAP_THREAD_SAFE */ + thread_count = 1; +#endif /* COAP_THREAD_SAFE */ + while (!coap_thread_quit) { + if (main_loop_code) { + coap_tick_t begin, end; + uint32_t used_ms; + + coap_ticks(&begin); + /* + * main_loop_codecode should not be blocking for any time, and not calling + * coap_io_process(). + */ + coap_lock_callback_release(context, main_loop_code(main_loop_code_arg), + /* On re-lock failure */ + ret = 0; break); + /* + * Need to delay for the remainder of timeout_ms. In case main_loop_code() + * is time sensitive (e.g Observe subscription to /time), delay to the + * start of the a second boundary + */ + coap_ticks(&end); + used_ms = (uint32_t)(end - begin) * 1000 / COAP_TICKS_PER_SECOND; + if (timeout_ms == COAP_IO_NO_WAIT || timeout_ms == COAP_IO_WAIT) { + ret = coap_io_process_lkd(context, timeout_ms); + } else if (timeout_ms > used_ms) { + /* Wait for remaining time rounded up to next second start */ + coap_tick_t next_time = end + (timeout_ms - used_ms) * COAP_TICKS_PER_SECOND / 1000; + unsigned int next_sec_us; + unsigned int next_sec_ms; + + next_sec_us = (timeout_ms - used_ms) * 1000000 / COAP_TICKS_PER_SECOND + 1000000 - + (coap_ticks_to_rt_us(next_time) % 1000000); + next_sec_ms = (next_sec_us + 999) / 1000; + if (next_sec_ms > timeout_ms && next_sec_ms > 1000) + next_sec_ms -= 1000; + ret = coap_io_process_lkd(context, next_sec_ms ? next_sec_ms : 1); + } else { + /* timeout_ms has expired */ + ret = coap_io_process_lkd(context, COAP_IO_NO_WAIT); + } + + if (thread_count == 1) { + /* + * Need to delay if only one thread until the remainder of + * timeout_ms is used up. Otherwise, another thread will be + * waiting on coap_io_process() to do any input / timeout work. + */ + coap_ticks(&end); + used_ms = (uint32_t)(end - begin) * 1000 / COAP_TICKS_PER_SECOND; + if (timeout_ms > 0 && timeout_ms < used_ms) { + ret = coap_io_process_lkd(context, used_ms - timeout_ms); + } else { + ret = coap_io_process_lkd(context, COAP_IO_NO_WAIT); + } + } + } else { + ret = coap_io_process_lkd(context, timeout_ms); + } + /* coap_io_process_lkd() can return 0 */ + if (ret >= 0) + ret = 1; + + if (ret < 0) { + ret = 0; + break; + } + } +#if COAP_THREAD_SAFE + coap_io_process_remove_threads(context); +#endif /* COAP_THREAD_SAFE */ + coap_thread_quit = 0; + return ret; +} + #endif /* ! WITH_LWIP && ! WITH_CONTIKI && ! RIOT_VERSION*/ COAP_API int diff --git a/src/coap_net.c b/src/coap_net.c index 9db7fab5..35de7e1c 100644 --- a/src/coap_net.c +++ b/src/coap_net.c @@ -2500,11 +2500,13 @@ coap_read_endpoint(coap_context_t *ctx, coap_endpoint_t *endpoint, coap_tick_t n } else if (bytes_read > 0) { coap_session_t *session = coap_endpoint_get_session(endpoint, packet, now); if (session) { + coap_session_reference_lkd(session); coap_log_debug("* %s: netif: recv %4zd bytes\n", coap_session_str(session), bytes_read); result = coap_handle_dgram_for_proto(ctx, session, packet); if (endpoint->proto == COAP_PROTO_DTLS && session->type == COAP_SESSION_TYPE_HELLO && result == 1) coap_session_new_dtls_session(session, now); + coap_session_release_lkd(session); } } return result; @@ -4793,6 +4795,12 @@ int coap_started = 0; * Global lock for multi-thread support */ coap_lock_t global_lock; +/* + * low level protection mutex + */ +coap_mutex_t m_show_pdu; +coap_mutex_t m_log_impl; +coap_mutex_t m_io_threads; #endif /* COAP_THREAD_SAFE */ void @@ -4808,6 +4816,9 @@ coap_startup(void) { #if COAP_THREAD_SAFE coap_lock_init(); + coap_mutex_init(&m_show_pdu); + coap_mutex_init(&m_log_impl); + coap_mutex_init(&m_io_threads); #endif /* COAP_THREAD_SAFE */ #if defined(HAVE_WINSOCK2_H) @@ -4856,6 +4867,12 @@ coap_cleanup(void) { #endif /* WITH_LWIP */ coap_dtls_shutdown(); +#if COAP_THREAD_SAFE + coap_mutex_destroy(&m_show_pdu); + coap_mutex_destroy(&m_log_impl); + coap_mutex_destroy(&m_io_threads); +#endif /* COAP_THREAD_SAFE */ + coap_debug_reset(); } diff --git a/src/coap_netif.c b/src/coap_netif.c index 3c53c4b6..1d24bbf6 100644 --- a/src/coap_netif.c +++ b/src/coap_netif.c @@ -105,10 +105,12 @@ coap_netif_dgrm_read_ep(coap_endpoint_t *endpoint, coap_packet_t *packet) { bytes_read = coap_socket_recv(&endpoint->sock, packet); keep_errno = errno; if (bytes_read == -1) { - coap_log_debug("* %s: netif: failed to read %zd bytes (%s)\n", - coap_endpoint_str(endpoint), packet->length, - coap_socket_strerror()); - errno = keep_errno; + if (errno != EAGAIN) { + coap_log_debug("* %s: netif: failed to read %zd bytes (%s)\n", + coap_endpoint_str(endpoint), packet->length, + coap_socket_strerror()); + errno = keep_errno; + } } else if (bytes_read > 0) { /* Let the caller do the logging as session available by then */ } diff --git a/src/coap_resource.c b/src/coap_resource.c index 3d2cbd00..2dd9b882 100644 --- a/src/coap_resource.c +++ b/src/coap_resource.c @@ -1343,7 +1343,7 @@ coap_resource_notify_observers_lkd(coap_resource_t *r, } r->context->observe_pending = 1; - coap_update_io_timer(r->context, 0); + coap_check_notify_lkd(r->context); return 1; } diff --git a/src/coap_session.c b/src/coap_session.c index 9a2d6a15..034c9d5e 100644 --- a/src/coap_session.c +++ b/src/coap_session.c @@ -1433,8 +1433,8 @@ coap_session_create_client(coap_context_t *ctx, #endif /* !COAP_DISABLE_TCP */ } -#ifdef COAP_EPOLL_SUPPORT session->sock.session = session; +#ifdef COAP_EPOLL_SUPPORT coap_epoll_ctl_add(&session->sock, EPOLLIN | ((session->sock.flags & COAP_SOCKET_WANT_CONNECT) ? diff --git a/src/coap_tcp.c b/src/coap_tcp.c index de222a68..91effae6 100644 --- a/src/coap_tcp.c +++ b/src/coap_tcp.c @@ -300,16 +300,18 @@ coap_socket_accept_tcp(coap_socket_t *server, #endif (void)extra; - server->flags &= ~COAP_SOCKET_CAN_ACCEPT; - new_client->fd = accept(server->fd, &remote_addr->addr.sa, &remote_addr->size); if (new_client->fd == COAP_INVALID_SOCKET) { - coap_log_warn("coap_socket_accept_tcp: accept: %s\n", - coap_socket_strerror()); + if (errno != EAGAIN) { + coap_log_warn("coap_socket_accept_tcp: accept: %s\n", + coap_socket_strerror()); + } return 0; } + server->flags &= ~COAP_SOCKET_CAN_ACCEPT; + if (getsockname(new_client->fd, &local_addr->addr.sa, &local_addr->size) < 0) coap_log_warn("coap_socket_accept_tcp: getsockname: %s\n", coap_socket_strerror()); diff --git a/src/coap_threadsafe.c b/src/coap_threadsafe.c index d9e1cdcc..6ae9d175 100644 --- a/src/coap_threadsafe.c +++ b/src/coap_threadsafe.c @@ -99,6 +99,112 @@ coap_lock_lock_func(void) { } #endif /* ! COAP_THREAD_RECURSIVE_CHECK */ +#if !WITH_LWIP +extern volatile int coap_thread_quit; +static pthread_t *thread_id = NULL; +static uint32_t thread_id_count = 0; + +/* Visible to only this thread */ +COAP_THREAD_LOCAL_VAR uint32_t thread_no = 0; +/* Visible across all threads */ +uint32_t max_thread_no = 0; + +typedef struct { + coap_context_t *context; + uint32_t thread_no; +} coap_thread_param_t; + +static void * +coap_io_process_worker_thread(void *arg) { + coap_thread_param_t *thread_param = (coap_thread_param_t *)arg; + coap_context_t *context = thread_param->context; + + thread_no = thread_param->thread_no; + coap_free_type(COAP_STRING, thread_param); + + coap_log_debug("Thread %lx start\n", pthread_self()); + + while (!coap_thread_quit) { + int result; + + coap_lock_lock(context, return 0); + result = coap_io_process_lkd(context, COAP_IO_WAIT); + coap_lock_unlock(context); + if (result < 0) + break; + } + coap_log_debug("Thread %lx exit\n", pthread_self()); + return 0; +} + +int +coap_io_process_configure_threads(coap_context_t *context, uint32_t thread_count) { + uint32_t i; + + coap_mutex_lock(&m_io_threads); + + thread_no = 1; + max_thread_no = 1 + thread_count; + coap_free_type(COAP_STRING, thread_id); + thread_id = coap_malloc_type(COAP_STRING, thread_count * sizeof(pthread_t)); + if (!thread_id) { + coap_log_err("thread start up memory allocate failure\n"); + coap_mutex_unlock(&m_io_threads); + return 0; + } + for (i = 0; i < thread_count ; i++) { + coap_thread_param_t *thread_param = coap_malloc_type(COAP_STRING, sizeof(coap_thread_param_t)); + int s; + + thread_param->context = context; + thread_param->thread_no = i + 2; + s = pthread_create(&thread_id[i], NULL, + &coap_io_process_worker_thread, thread_param); + if (s != 0) { + coap_log_err("thread start up failure (%s)\n", coap_socket_strerror()); + coap_mutex_unlock(&m_io_threads); + return 0; + } + thread_id_count++; + } + coap_mutex_unlock(&m_io_threads); + return 1; +} + +#ifdef HAVE_SIGNAL_H +#include +#endif /* HAVE_SIGNAL_H */ +void +coap_io_process_remove_threads(coap_context_t *context) { + uint32_t i; + + (void)context; + + coap_lock_unlock(context); + coap_mutex_lock(&m_io_threads); + + for (i = 0; i < thread_id_count ; i++) { + int s = pthread_kill(thread_id[i], SIGINT); + if (s != 0) { + coap_log_err("thread kill failure\n"); + } + } + for (i = 0; i < thread_id_count ; i++) { + void *retval; + int s = pthread_join(thread_id[i], &retval); + if (s != 0) { + coap_log_err("thread join failure\n"); + } + } + coap_free_type(COAP_STRING, thread_id); + thread_id = NULL; + thread_id_count = 0; + + coap_mutex_unlock(&m_io_threads); + coap_lock_lock(context, return); +} +#endif /* !WITH_LWIP */ + #else /* ! COAP_THREAD_SAFE */ #ifdef __clang__