1
0
mirror of https://github.com/obgm/libcoap.git synced 2025-10-14 02:19:34 +08:00

Threads: Support multiple threads doing coap_io_process()

Set up a coap_io_process() loop spread across multiple threads, each thread
repeatably calling coap_io_process() and so able to handle any input packets.

Note that the application handlers need to be thread-safe for application
level activity.

Add 2 new main functions
  coap_io_process_loop()
  coap_io_process_terminate_loop()

Add in 2 new sub functions
  coap_io_process_configure_threads()
  coap_io_process_remove_threads()
to set up the parallel threads.

Update example servers to include using coap_io_process_loop().

Support optional thread number logging by configuration to maintain
backwards logging format output if required.
This commit is contained in:
Jon Shallow
2025-03-15 11:17:44 +00:00
committed by Jon Shallow
parent 0ba446d788
commit cce7b65398
27 changed files with 947 additions and 46 deletions

View File

@@ -156,6 +156,10 @@ option(
ENABLE_THREAD_RECURSIVE_LOCK_CHECK
"enable building with thread recursive lock detection"
OFF)
option(
ENABLE_THREAD_NUM_LOGGING
"enable building with thread number logging support"
ON)
option(
ENABLE_SMALL_STACK
"enable if the system has small stack size"
@@ -274,6 +278,7 @@ check_include_file(sys/epoll.h HAVE_EPOLL_H)
check_include_file(sys/timerfd.h HAVE_TIMERFD_H)
check_include_file(arpa/inet.h HAVE_ARPA_INET_H)
check_include_file(stdbool.h HAVE_STDBOOL_H)
check_include_file(signal.h HAVE_SIGNAL_H)
check_include_file(netdb.h HAVE_NETDB_H)
check_include_file(pthread.h HAVE_PTHREAD_H)
check_include_file(stdlib.h HAVE_STDINT_H)
@@ -428,6 +433,11 @@ if(${ENABLE_THREAD_RECURSIVE_LOCK_CHECK})
message(STATUS "compiling with thread recursive lock detection support")
endif()
if(${ENABLE_THREAD_NUM_LOGGING})
set(COAP_THREAD_SAFE "${ENABLE_THREAD_NUM_LOGGING}")
message(STATUS "compiling with thread number logging support")
endif()
if(${ENABLE_SMALL_STACK})
set(COAP_CONSTRAINED_STACK "${ENABLE_SMALL_STACK}")
message(STATUS "compiling with small stack support")
@@ -694,6 +704,7 @@ message(STATUS "ENABLE_OSCORE:...................${ENABLE_OSCORE}")
message(STATUS "ENABLE_ASYNC:....................${ENABLE_ASYNC}")
message(STATUS "ENABLE_THREAD_SAFE:..............${ENABLE_THREAD_SAFE}")
message(STATUS "ENABLE_THREAD_RECURSIVE_CHECK....${ENABLE_THREAD_RECURSIVE_LOCK_CHECK}")
message(STATUS "ENABLE_THREAD_NUM_LOGGING........${ENABLE_THREAD_NUM_LOGGING}")
message(STATUS "ENABLE_DOCS:.....................${ENABLE_DOCS}")
message(STATUS "ENABLE_EXAMPLES:.................${ENABLE_EXAMPLES}")
message(STATUS "DTLS_BACKEND:....................${DTLS_BACKEND}")

View File

@@ -62,6 +62,9 @@
/* Define to 1 if the library has thread safe support. */
#cmakedefine COAP_THREAD_SAFE @COAP_THREAD_SAFE@
/* Define to 1 if the library has thread number logging support. */
#cmakedefine COAP_THREAD_NUM_LOGGING @COAP_THREAD_NUM_LOGGING@
/* Define to 1 if the system has libgnutls28 */
#cmakedefine COAP_WITH_LIBGNUTLS @COAP_WITH_LIBGNUTLS@

View File

@@ -56,6 +56,9 @@
/* Define to 1 if libcoap has thread safe support. */
/* #undef COAP_THREAD_SAFE 1 */
/* Define to 1 if libcoap has thread number logging support. */
/* #undef COAP_THREAD_NUM_LOGGING 1 */
/* Define to 1 if you have the <assert.h> header file. */
#define HAVE_ASSERT_H 1

View File

@@ -144,6 +144,13 @@
#endif /* COAP_THREAD_SAFE */
#endif /* CONFIG_LIBCOAP_THREAD_SAFE */
#ifdef CONFIG_LIBCOAP_THREAD_NUM_LOGGING
#ifndef COAP_THREAD_NUM_LOGGING
/* Define to 1 if libcoap has thread number logging support. */
#define COAP_THREAD_NUM_LOGGING 1
#endif /* COAP_THREAD_SAFE */
#endif /* CONFIG_LIBCOAP_THREAD_NUM_LOGGING */
#ifdef CONFIG_LIBCOAP_THREAD_RECURSIVE_CHECK
#ifndef COAP_THREAD_RECURSIVE_CHECK
/* Define to 1 to build with thread recursive lock detection support. */

View File

@@ -155,6 +155,11 @@
#define COAP_THREAD_RECURSIVE_CHECK 0
#endif
#ifndef COAP_THREAD_NUM_LOGGING
/* Define to 1 if libcoap has thread number logging support. */
#define COAP_THREAD_NUM_LOGGING 0
#endif
#ifndef PACKAGE_BUGREPORT
/* Define to the address where bug reports for this package should be sent. */
#define PACKAGE_BUGREPORT "libcoap-developers@lists.sourceforge.net"

View File

@@ -1102,7 +1102,7 @@ fi
# Checks for header files.
AC_CHECK_HEADERS([assert.h arpa/inet.h limits.h netdb.h netinet/in.h \
pthread.h errno.h winsock2.h ws2tcpip.h \
pthread.h errno.h winsock2.h ws2tcpip.h signal.h \
stdlib.h string.h strings.h sys/socket.h sys/time.h \
time.h unistd.h sys/unistd.h sys/ioctl.h net/if.h ifaddrs.h])
@@ -1148,6 +1148,16 @@ if test "x$enable_recursive_detection" = "xyes"; then
AC_DEFINE(COAP_THREAD_RECURSIVE_CHECK, 1, [Define to 1 detect recursive locking detection support])
fi
AC_ARG_ENABLE([thread-num-logging],
[AS_HELP_STRING([--enable-thread-num-logging],
[Enable building with logging the thread number [default=yes]])],
[enable_thread_num_logging="$enableval"],
[enable_thread_num_logging="yes"])
if test "x$enable_thread_num_logging" = "xyes"; then
AC_DEFINE(COAP_THREAD_NUM_LOGGING, 1, [Define to 1 if libcoap has thread number logging support])
fi
AC_ARG_ENABLE([small-stack],
[AS_HELP_STRING([--enable-small-stack],
[Use small-stack if the available stack space is restricted [default=no]])],
@@ -1320,8 +1330,8 @@ AC_SUBST(PREDEFINED_CFLAGS)
# And finally combining the CFLAGS together ...
CFLAGS="$CFLAGS $ADDITIONAL_CFLAGS"
# Override the various template files, currently just makefiles and the
# pkgconfig *.pc file.
# Override the various template files, currently just makefiles, man pages
# and the pkgconfig *.pc file.
# Later if the API version is changing don't forget to change the
# libcoap-$LIBCOAP_API_VERSION.pc.in file too!! You will have to change
# the 'Cflags' variable to something like
@@ -1345,6 +1355,7 @@ man/coap_endpoint_server.txt
man/coap_handler.txt
man/coap_init.txt
man/coap_io.txt
man/coap_io_loop.txt
man/coap_keepalive.txt
man/coap_locking.txt
man/coap_logging.txt
@@ -1505,6 +1516,7 @@ else
fi
AC_MSG_RESULT([ enable thread safe code : "$enable_thread_safe"])
AC_MSG_RESULT([ enable recursive lock check : "$enable_recursive_detection"])
AC_MSG_RESULT([ enable thread number logging : "$enable_thread_num_logging"])
if test "x$build_doxygen" = "xyes"; then
AC_MSG_RESULT([ build doxygen pages : "yes"])
AC_MSG_RESULT([ --> Doxygen around : "yes" ($DOXYGEN $doxygen_version)])

View File

@@ -44,6 +44,14 @@
#endif
#include <coap3/coap.h>
#include <coap3/coap_defines.h>
#if COAP_THREAD_SAFE
/* Define the number of coap_io_process() threads required */
#ifndef NUM_SERVER_THREADS
#define NUM_SERVER_THREADS 3
#endif /* NUM_SERVER_THREADS */
#endif /* COAP_THREAD_SAFE */
#define COAP_RESOURCE_CHECK_TIME 2
@@ -126,13 +134,16 @@ resource_rd_delete(void *ptr) {
rd_delete(ptr);
}
static int quit = 0;
static volatile int quit = 0;
/* SIGINT handler: set quit to 1 for graceful termination */
static void
handle_sigint(int signum COAP_UNUSED) {
quit = 1;
coap_send_recv_terminate();
#if NUM_SERVER_THREADS
coap_io_process_terminate_loop();
#endif /* NUM_SERVER_THREADS */
}
static void
@@ -754,7 +765,6 @@ cmdline_read_extended_token_size(char *arg) {
int
main(int argc, char **argv) {
coap_context_t *ctx;
int result;
char addr_str[NI_MAXHOST] = "::";
char port_str[NI_MAXSERV] = "5683";
char *group = NULL;
@@ -864,12 +874,21 @@ main(int argc, char **argv) {
sigaction(SIGPIPE, &sa, NULL);
#endif
#if NUM_SERVER_THREADS
if (!coap_io_process_loop(ctx, NULL, NULL, COAP_RESOURCE_CHECK_TIME * 1000,
NUM_SERVER_THREADS)) {
coap_log_err("coap_io_process_loop: Failed\n");
}
#else
while (!quit) {
int result;
result = coap_io_process(ctx, COAP_RESOURCE_CHECK_TIME * 1000);
if (result >= 0) {
/* coap_check_resource_list( ctx ); */
}
}
#endif
coap_free_context(ctx);
coap_cleanup();

View File

@@ -60,6 +60,13 @@ strndup(const char *s1, size_t n) {
#include <coap3/coap.h>
#include <coap3/coap_defines.h>
#if COAP_THREAD_SAFE
/* Define the number of coap_io_process() threads required */
#ifndef NUM_SERVER_THREADS
#define NUM_SERVER_THREADS 3
#endif /* NUM_SERVER_THREADS */
#endif /* COAP_THREAD_SAFE */
#ifndef min
#define min(a,b) ((a) < (b) ? (a) : (b))
#endif
@@ -71,7 +78,7 @@ static char *tls_engine_conf = NULL;
static int ec_jpake = 0;
/* set to 1 to request clean server shutdown */
static int quit = 0;
static volatile int quit = 0;
/* set to 1 if persist information is to be kept on server shutdown */
static int keep_persist = 0;
@@ -182,6 +189,9 @@ static void
handle_sigint(int signum COAP_UNUSED) {
quit = 1;
coap_send_recv_terminate();
#if NUM_SERVER_THREADS
coap_io_process_terminate_loop();
#endif /* NUM_SERVER_THREADS */
}
#ifndef _WIN32
@@ -194,6 +204,9 @@ static void
handle_sigusr2(int signum COAP_UNUSED) {
quit = 1;
keep_persist = 1;
#if NUM_SERVER_THREADS
coap_io_process_terminate_loop();
#endif /* NUM_SERVER_THREADS */
}
#endif /* ! _WIN32 */
@@ -2299,12 +2312,30 @@ syslog_handler(coap_log_t level, const char *message) {
}
#endif /* ! _WIN32 */
/*
* This function only initiates an Observe unsolicited response when the time
* (in seconds) changes.
*/
static void
do_time_observe_code(void *arg) {
static coap_time_t t_last = 0;
coap_time_t t_now;
coap_tick_t now;
(void)arg;
coap_ticks(&now);
t_now = coap_ticks_to_rt(now);
if (t_now != t_last) {
t_last = t_now;
coap_resource_notify_observers(time_resource, NULL);
}
}
int
main(int argc, char **argv) {
coap_context_t *ctx = NULL;
char *group = NULL;
char *group_if = NULL;
coap_tick_t now;
char addr_str[NI_MAXHOST] = "::";
char *port_str = NULL;
int opt;
@@ -2312,10 +2343,6 @@ main(int argc, char **argv) {
coap_log_t log_level = COAP_LOG_WARN;
coap_log_t dtls_log_level = COAP_LOG_ERR;
unsigned wait_ms;
coap_time_t t_last = 0;
int coap_fd;
fd_set m_readfds;
int nfds = 0;
size_t i;
int exit_code = 0;
uint32_t max_block_size = 0;
@@ -2610,6 +2637,18 @@ main(int argc, char **argv) {
}
}
wait_ms = COAP_RESOURCE_CHECK_TIME * 1000;
#if NUM_SERVER_THREADS
if (!coap_io_process_loop(ctx, time_resource ? do_time_observe_code : NULL,
NULL, wait_ms, NUM_SERVER_THREADS)) {
coap_log_err("coap_io_process_loop: Failed\n");
}
#else
int nfds = 0;
int coap_fd;
fd_set m_readfds;
coap_fd = coap_context_get_coap_fd(ctx);
if (coap_fd != -1) {
/* if coap_fd is -1, then epoll is not supported within libcoap */
@@ -2618,10 +2657,9 @@ main(int argc, char **argv) {
nfds = coap_fd + 1;
}
wait_ms = COAP_RESOURCE_CHECK_TIME * 1000;
while (!quit) {
int result;
coap_tick_t now;
if (coap_fd != -1) {
/*
@@ -2679,23 +2717,19 @@ main(int argc, char **argv) {
wait_ms = COAP_RESOURCE_CHECK_TIME * 1000;
}
if (time_resource) {
coap_time_t t_now;
unsigned int next_sec_ms;
coap_ticks(&now);
t_now = coap_ticks_to_rt(now);
if (t_last != t_now) {
/* Happens once per second */
t_last = t_now;
coap_resource_notify_observers(time_resource, NULL);
}
do_time_observe_code(NULL);
/* need to wait until next second starts if wait_ms is too large */
coap_ticks(&now);
next_sec_ms = 1000 - (now % COAP_TICKS_PER_SECOND) *
1000 / COAP_TICKS_PER_SECOND;
if (next_sec_ms && next_sec_ms < wait_ms)
wait_ms = next_sec_ms;
}
}
#endif /* NUM_SERVER_THREADS */
exit_code = 0;
finish:

View File

@@ -150,4 +150,12 @@ typedef int coap_mutex_t;
#endif /* !WITH_CONTIKI && !WITH_LWIP && !RIOT_VERSION && !HAVE_PTHREAD_H && !HAVE_PTHREAD_MUTEX_LOCK */
#if COAP_THREAD_SAFE
extern coap_mutex_t m_show_pdu;
extern coap_mutex_t m_log_impl;
extern coap_mutex_t m_io_threads;
#endif /* COAP_THREAD_SAFE */
#endif /* COAP_MUTEX_INTERNAL_H_ */

View File

@@ -855,6 +855,73 @@ struct epoll_event;
COAP_API void coap_io_do_epoll(coap_context_t *ctx, struct epoll_event *events,
size_t nevents);
/**
* Main thread coap_io_process_loop activity.
*
* This function should not do any blocking.
*
* @param arg The value of main_loop_code_arg passed into coap_io_process_loop().
*
*/
typedef void (*coap_io_process_thread_t)(void *arg);
/**
* Do the coap_io_process() across @p thread_count threads.
* The main thread will invoke @p main_loop_code (if defined) at least
* every @p timeout_ms.
*
* Note: If multi-threading protection is not in place, then @p thread_count
* is ignored and only a single thread runs (but still executes
* @p main_loop_code)
*
* Note: To stop the threads and continual looping,
* coap_io_process_terminate_loop() should be called.
*
* @param context The current CoAP context.
* @param main_loop_code The name of the function to execute in the main
* thread or NULL if not required. This function should
* not do any blocking.
* @param main_loop_code_arg The argument to pass to @p main_loop_code.
* @param timeout_ms The maximum amount of time the main thread should delay up
* to (i.e. timeout parameter for coap_io_process()) before
* the loop starts again.
* @param thread_count The number of threads to run.
*
*/
COAP_API int coap_io_process_loop(coap_context_t *context,
coap_io_process_thread_t main_loop_code,
void *main_loop_code_arg, uint32_t timeout_ms,
uint32_t thread_count);
/**
* Terminate all the additional threads created by coap_io_process_loop()
* and break out of the main thread loop to return from coap_io_process_loop().
*
* Typically this would be called from within a SIGQUIT handler.
*
*/
void coap_io_process_terminate_loop(void);
/**
* Configure a defined number of threads to do the alternate coap_io_process()
* work with traffic load balanced across the threads based on inactive
* threads.
*
* @param context Context.
* @param thread_count The number of threads to configure.
*
* @return 1 success or 0 on failure.
*/
int coap_io_process_configure_threads(coap_context_t *context,
uint32_t thread_count);
/**
* Release the coap_io_process() worker threads.
*
* @param context Context.
*/
void coap_io_process_remove_threads(coap_context_t *context);
/**
* Get the libcoap internal file descriptor for a socket. This can be used to
* integrate libcoap in an external event loop instead of using one of its

View File

@@ -769,6 +769,36 @@ unsigned int coap_io_prepare_io_lkd(coap_context_t *ctx,
*/
int coap_io_process_lkd(coap_context_t *ctx, uint32_t timeout_ms);
/**
* Do the coap_io_process() across @p thread_count threads.
* The main thread will invoke @p main_loop_code (if defined) at least
* every @p timeout_ms.
*
* Note: This function must be called in the locked state.
*
* Note: If multi-threading protection is not in place, then @p thread_count
* is ignored and only a single thread runs (but still executes
* @p main_loop_code)
*
* Note: To stop the threads and continual looping,
* coap_io_process_terminate_loop() should be called.
*
* @param context The current CoAP context.
* @param main_loop_code The name of the function to execute in the main
* thread or NULL if not required. This function should
* not do any blocking.
* @param main_loop_code_arg The argument to pass to @p main_loop_code.
* @param timeout_ms The maximum amount of time the main thread should delay up
* to (i.e. timeout parameter for coap_io_process()) before
* the loop starts again.
* @param thread_count The number of threads to run.
*
*/
int coap_io_process_loop_lkd(coap_context_t *context,
coap_io_process_thread_t main_loop_code,
void *main_loop_code_arg, uint32_t timeout_ms,
uint32_t thread_count);
#if !defined(RIOT_VERSION) && !defined(WITH_CONTIKI)
/**
* The main message processing loop with additional fds for internal select.

View File

@@ -71,6 +71,16 @@ typedef USHORT in_port_t;
# endif /* __GNUC__ */
#endif /* COAP_UNUSED */
#ifndef COAP_THREAD_LOCAL_VAR
# ifdef __GNUC__
# define COAP_THREAD_LOCAL_VAR __thread
# elif defined(_MSC_VER)
# define COAP_THREAD_LOCAL_VAR __declspec(thread)
# else /* ! __GNUC__ && ! _MSC_VER */
# define COAP_THREAD_LOCAL_VAR
# endif
#endif
#ifndef COAP_API
#define COAP_API
#endif

View File

@@ -131,6 +131,10 @@ global:
coap_io_prepare_epoll;
coap_io_prepare_io;
coap_io_process;
coap_io_process_configure_threads;
coap_io_process_loop;
coap_io_process_remove_threads;
coap_io_process_terminate_loop;
coap_io_process_with_fds;
coap_ipv4_is_supported;
coap_ipv6_is_supported;

View File

@@ -129,6 +129,10 @@ coap_io_pending
coap_io_prepare_epoll
coap_io_prepare_io
coap_io_process
coap_io_process_configure_threads
coap_io_process_loop
coap_io_process_remove_threads
coap_io_process_terminate_loop
coap_io_process_with_fds
coap_ipv4_is_supported
coap_ipv6_is_supported

View File

@@ -31,6 +31,7 @@ TXT3 = coap_address.txt \
coap_handler.txt \
coap_init.txt \
coap_io.txt \
coap_io_loop.txt \
coap_keepalive.txt \
coap_locking.txt \
coap_logging.txt \

View File

@@ -54,6 +54,10 @@ DESCRIPTION
This documents the different callback handlers that can optionally be invoked
on receipt of a packet or when a timeout occurs.
*NOTE:* If multi-thread activity is supported, these callback handlers need to
be thread-safe at the application level as multiple threads could be executing
the same handler code.
FUNCTIONS
---------

View File

@@ -137,6 +137,9 @@ _timeout_ms_ set to COAP_IO_NO_WAIT. +
will then be using *epoll* internally to process all the file descriptors of
the different sessions.
:NOTE:* With multi-threading protection enabled, it is possible to
call *coap_io_process*() from multiple threads to do some load balancing.
See EXAMPLES below.
*Function: coap_io_prepare_epoll()*

307
man/coap_io_loop.txt.in Normal file
View File

@@ -0,0 +1,307 @@
// -*- mode:doc; -*-
// vim: set syntax=asciidoc tw=0
coap_io_loop(3)
===============
:doctype: manpage
:man source: coap_io_loop
:man version: @PACKAGE_VERSION@
:man manual: libcoap Manual
NAME
----
coap_io_loop,
coap_io_process_loop,
coap_io_process_terminate_loop,
coap_io_process_configure_threads,
coap_io_process_remove_threads
- Work with CoAP threads doing coap_io_process
SYNOPSIS
--------
*#include <coap@LIBCOAP_API_VERSION@/coap.h>*
*int coap_io_process_loop(coap_context_t *_context_,
coap_io_process_thread_t _main_loop_code_, void *_main_loop_code_arg_,
uint32_t _timeout_ms_, uint32_t _thread_count_);*
*void coap_io_process_terminate_loop(void);*
*int coap_io_process_configure_threads(coap_context_t *_context_,
uint32_t _thread_count_);*
*void coap_io_process_remove_threads(coap_context_t *_context_);*
For specific (D)TLS library support, link with
*-lcoap-@LIBCOAP_API_VERSION@-notls*, *-lcoap-@LIBCOAP_API_VERSION@-gnutls*,
*-lcoap-@LIBCOAP_API_VERSION@-openssl*, *-lcoap-@LIBCOAP_API_VERSION@-mbedtls*,
*-lcoap-@LIBCOAP_API_VERSION@-wolfssl*
or *-lcoap-@LIBCOAP_API_VERSION@-tinydtls*. Otherwise, link with
*-lcoap-@LIBCOAP_API_VERSION@* to get the default (D)TLS library support.
DESCRIPTION
-----------
This man page focuses on setting up and supporting multiple threads, each one
invoking *coap_io_process*(3) in a loop.
Each thread can receive a packet (a different packet for each thread), call the
appropriate application call-back handler and potentially be spending time of
consequence in that handler without blocking the reciept of other input traffic.
These functions should be called from the main thread. It is assumed that
thread-safe code has been enabled.
FUNCTIONS
---------
*Function: coap_io_process_loop()*
The *coap_io_process_loop*() function is used to set up additional threads that
are in a loop just calling *coap_io_process*(3) with COAP_IO_WAIT. These
threads are terminated when *coap_io_process_terminate_loop*() is called.
The thread calling *coap_io_process_loop*() will also be in a separate loop
that is calling *coap_io_process*(3), optionally calling _main_loop_code_
(if not NULL) with argument _main_loop_code_arg_. For the thread calling
*coap_io_process_loop*(), it will call _main_loop_code_ at least every
_timeout_ms_ milli-secs, the call to _main_loop_code_ start time aligned to
the nearest second.
_context_ defines the context to associate the threads with. _thread_count_
is the number of threads to be running *coap_io_process*(3) which includes the
*coap_io_process_loop*() calling thread in the count.
*Function: coap_io_process_terminate_loop()*
The *coap_io_process_terminate_loop*() function is used to terminate any added
threads running under the control of *coap_io_process_loop*() and causing the
thread that called *coap_io_process_loop*() to return.
*Function: coap_io_process_configure_threads()*
The *coap_io_process_configure_threads*() function is used to set up
an additional _thread_count_ threads for _context_. Usually
*coap_io_process_loop*() would be called, but can be used to wrap with
*coap_io_process_remove_threads*() a complex version of _main_loop_code_.
*coap_io_process_loop*() uses *coap_io_process_configure_threads*() and
*coap_io_process_remove_threads*() to wrap the call to _main_loop_code_.
*Function: coap_io_process_remove_threads()*
The *coap_io_process_remove_threads*() function is used stop and remove
threads created by *coap_io_process_configure_threads*() for _context_.
RETURN VALUES
-------------
*coap_io_process_loop*(), *coap_io_process_configure_threads*() return 1 on success
else 0 on failure.
EXAMPLES
--------
*coap_io_process_loop()*
[source, c]
----
#include <inttypes.h>
#include <stdio.h>
#include <signal.h>
#include <coap@LIBCOAP_API_VERSION@/coap.h>
#include <coap3/coap_defines.h>
#if COAP_THREAD_SAFE
/* Define the number of coap_io_process() threads required */
#ifndef NUM_SERVER_THREADS
#define NUM_SERVER_THREADS 3
#endif /* NUM_SERVER_THREADS */
#endif /* COAP_THREAD_SAFE */
static volatile int quit = 0;
coap_resource_t *time_resource;
static time_t my_clock_base = 0;
/* SIGINT handler: set quit to 1 for graceful termination */
static void
handle_sigint(int signum COAP_UNUSED) {
quit = 1;
#if NUM_SERVER_THREADS
coap_io_process_terminate_loop();
#endif /* NUM_SERVER_THREADS */
}
static void
hnd_get_fetch_time(coap_resource_t *resource,
coap_session_t *session,
const coap_pdu_t *request,
const coap_string_t *query,
coap_pdu_t *response) {
unsigned char buf[40];
size_t len;
time_t now;
coap_tick_t t;
(void)request;
coap_pdu_code_t code = coap_pdu_get_code(request);
size_t size;
const uint8_t *data;
coap_str_const_t *ticks = coap_make_str_const("ticks");
if (my_clock_base) {
/* calculate current time */
coap_ticks(&t);
now = my_clock_base + (t / COAP_TICKS_PER_SECOND);
/* coap_get_data() sets size to 0 on error */
(void)coap_get_data(request, &size, &data);
if (code == COAP_REQUEST_CODE_GET && query != NULL &&
coap_string_equal(query, ticks)) {
/* parameter is in query, output ticks */
len = snprintf((char *)buf, sizeof(buf), "%" PRIi64, (int64_t)now);
} else if (code == COAP_REQUEST_CODE_FETCH && size == ticks->length &&
memcmp(data, ticks->s, ticks->length) == 0) {
/* parameter is in data, output ticks */
len = snprintf((char *)buf, sizeof(buf), "%" PRIi64, (int64_t)now);
} else { /* output human-readable time */
struct tm *tmp;
tmp = gmtime(&now);
if (!tmp) {
/* If 'now' is not valid */
coap_pdu_set_code(response, COAP_RESPONSE_CODE_NOT_FOUND);
return;
} else {
len = strftime((char *)buf, sizeof(buf), "%b %d %H:%M:%S", tmp);
}
}
coap_pdu_set_code(response, COAP_RESPONSE_CODE_CONTENT);
coap_add_data_large_response(resource, session, request, response,
query, COAP_MEDIATYPE_TEXT_PLAIN, 1, 0,
len,
buf, NULL, NULL);
} else {
/* if my_clock_base was deleted, we pretend to have no such resource */
coap_pdu_set_code(response, COAP_RESPONSE_CODE_NOT_FOUND);
}
}
/*
* This function sends off an Observe unsolicited response when the time
* (based on seconds) changes.
*/
static void
do_time_observe_code(void *arg) {
static coap_time_t t_last = 0;
coap_time_t t_now;
coap_tick_t now;
(void)arg;
coap_ticks(&now);
t_now = coap_ticks_to_rt(now);
if (t_now != t_last) {
t_last = t_now;
coap_resource_notify_observers(time_resource, NULL);
}
}
static void
init_resources(coap_context_t *ctx) {
coap_resource_t *r;
my_clock_base = time(NULL);
r = coap_resource_init(coap_make_str_const("time"), 0);
coap_register_request_handler(r, COAP_REQUEST_GET, hnd_get_fetch_time);
coap_register_request_handler(r, COAP_REQUEST_FETCH, hnd_get_fetch_time);
coap_resource_set_get_observable(r, 1);
coap_add_attr(r, coap_make_str_const("ct"), coap_make_str_const("0"), 0);
coap_add_attr(r, coap_make_str_const("title"), coap_make_str_const("\"Internal Clock\""), 0);
coap_add_attr(r, coap_make_str_const("rt"), coap_make_str_const("\"ticks\""), 0);
coap_add_attr(r, coap_make_str_const("if"), coap_make_str_const("\"clock\""), 0);
coap_add_resource(ctx, r);
time_resource = r;
}
int
main(int argc, char **argv) {
unsigned wait_ms;
coap_context_t *ctx;
(void)argc;
(void)argv;
ctx = coap_new_context(NULL);
signal(SIGINT, handle_sigint);
init_resources(ctx);
/* Other general start up code */
wait_ms = 1000;
#if NUM_SERVER_THREADS
if (!coap_io_process_loop(ctx, do_time_observe_code, NULL, wait_ms,
NUM_SERVER_THREADS)) {
coap_log_err("coap_io_process_loop: Startup failed\n");
}
#else /* ! NUM_SERVER_THREADS */
while (!quit) {
unsigned int next_sec_ms;
int result;
coap_tick_t now;
/*
* result is time spent in coap_io_process()
*/
result = coap_io_process(ctx, wait_ms);
if (result < 0) {
break;
} else if (result && (unsigned)result < wait_ms) {
/* decrement if there is a result wait time returned */
wait_ms -= result;
} else {
/*
* result == 0, or result >= wait_ms
* (wait_ms could have decremented to a small value, below
* the granularity of the timer in coap_io_process() and hence
* result == 0)
*/
wait_ms = 1000;
}
do_time_observe_code(NULL);
/* need to wait until next second starts if wait_ms is too large */
coap_ticks(&now);
next_sec_ms = 1000 - (now % COAP_TICKS_PER_SECOND) *
1000 / COAP_TICKS_PER_SECOND;
if (next_sec_ms && next_sec_ms < wait_ms)
wait_ms = next_sec_ms;
}
#endif /* ! NUM_SERVER_THREADS */
/* General close down code */
}
----
SEE ALSO
--------
*coap_io_process*(3)
FURTHER INFORMATION
-------------------
See
"https://rfc-editor.org/rfc/rfc7252[RFC7252: The Constrained Application Protocol (CoAP)]"
for further information.
BUGS
----
Please raise an issue on GitHub at
https://github.com/obgm/libcoap/issues to report any bugs.
Please raise a Pull Request at https://github.com/obgm/libcoap/pulls
for any fixes.
AUTHORS
-------
The libcoap project <libcoap-developers@lists.sourceforge.net>

View File

@@ -75,7 +75,7 @@ appropriate GET/FETCH handler within the server application is called to fill
in the response packet with the appropriate information. This "fake GET/FETCH
request" is triggered by a call to *coap_resource_notify_observers*().
The call to *coap_io_process*() in the main server application i/o loop will do
Any call to *coap_io_process*() in the server application i/o loop will do
all the necessary processing of sending any outstanding "fake GET/FETCH
requests".

View File

@@ -784,7 +784,7 @@ void
coap_show_pdu(coap_log_t level, const coap_pdu_t *pdu) {
#if COAP_CONSTRAINED_STACK
/* Proxy-Uri: can be 1034 bytes long */
/* buf and outbuf can be protected by global_lock if needed */
/* buf and outbuf can be protected by m_show_pdu if needed */
static unsigned char buf[min(COAP_DEBUG_BUF_SIZE, 1035)];
static char outbuf[COAP_DEBUG_BUF_SIZE];
#else /* ! COAP_CONSTRAINED_STACK */
@@ -809,6 +809,9 @@ coap_show_pdu(coap_log_t level, const coap_pdu_t *pdu) {
if (level > coap_get_log_level())
return;
#if COAP_THREAD_SAFE
coap_mutex_lock(&m_show_pdu);
#endif /* COAP_THREAD_SAFE */
if (!pdu->session || COAP_PROTO_NOT_RELIABLE(pdu->session->proto)) {
snprintf(outbuf, sizeof(outbuf), "v:%d t:%s c:%s i:%04x {",
COAP_DEFAULT_VERSION, msg_type_string(pdu->type),
@@ -1055,6 +1058,9 @@ no_more:
snprintf(&outbuf[outbuflen], sizeof(outbuf)-outbuflen,
"data length %zu (data suppressed)\n", data_len);
COAP_DO_SHOW_OUTPUT_LINE;
#if COAP_THREAD_SAFE
coap_mutex_unlock(&m_show_pdu);
#endif /* COAP_THREAD_SAFE */
return;
}
@@ -1130,6 +1136,10 @@ no_more:
outbuflen--;
snprintf(&outbuf[outbuflen], sizeof(outbuf)-outbuflen, "\n");
COAP_DO_SHOW_OUTPUT_LINE;
#if COAP_THREAD_SAFE
coap_mutex_unlock(&m_show_pdu);
#endif /* COAP_THREAD_SAFE */
}
void
@@ -1278,12 +1288,21 @@ coap_set_log_handler(coap_log_handler_t handler) {
log_handler = handler;
}
/* Visible to only this thread */
extern COAP_THREAD_LOCAL_VAR uint32_t thread_no;
/* Visible across all threads */
extern uint32_t max_thread_no;
void
coap_log_impl(coap_log_t level, const char *format, ...) {
#if COAP_THREAD_SAFE
coap_mutex_lock(&m_log_impl);
#endif /* COAP_THREAD_SAFE */
if (log_handler) {
#if COAP_CONSTRAINED_STACK
/* message can be protected by global_lock if needed */
/* message can be protected by m_log_impl if needed */
static char message[COAP_DEBUG_BUF_SIZE];
#else /* ! COAP_CONSTRAINED_STACK */
char message[COAP_DEBUG_BUF_SIZE];
@@ -1312,6 +1331,13 @@ coap_log_impl(coap_log_t level, const char *format, ...) {
if (len)
fprintf(log_fd, "%.*s ", (int)len, timebuf);
#if COAP_THREAD_NUM_LOGGING
if (thread_no == 0) {
thread_no = ++max_thread_no;
}
fprintf(log_fd, "%2d ", thread_no);
#endif /* COAP_THREAD_NUM_LOGGING */
fprintf(log_fd, "%s ", coap_log_level_desc(level));
va_start(ap, format);
@@ -1323,6 +1349,10 @@ coap_log_impl(coap_log_t level, const char *format, ...) {
va_end(ap);
fflush(log_fd);
}
#if COAP_THREAD_SAFE
coap_mutex_unlock(&m_log_impl);
#endif /* COAP_THREAD_SAFE */
}
static struct packet_num_interval {

View File

@@ -1777,6 +1777,81 @@ coap_io_process_with_fds(coap_context_t *ctx, uint32_t timeout_ms,
return ret;
}
#if !defined(COAP_EPOLL_SUPPORT) && COAP_THREAD_SAFE
static unsigned int
coap_io_prepare_fds(coap_context_t *ctx,
int enfds, fd_set *ereadfds, fd_set *ewritefds,
fd_set *eexceptfds) {
coap_session_t *s, *stmp;
unsigned int max_sockets = sizeof(ctx->sockets) / sizeof(ctx->sockets[0]);
coap_fd_t nfds = 0;
unsigned int i;
ctx->num_sockets = 0;
#if COAP_SERVER_SUPPORT
coap_endpoint_t *ep;
LL_FOREACH(ctx->endpoint, ep) {
if (ep->sock.flags & (COAP_SOCKET_WANT_READ | COAP_SOCKET_WANT_WRITE | COAP_SOCKET_WANT_ACCEPT)) {
if (ctx->num_sockets < max_sockets)
ctx->sockets[ctx->num_sockets++] = &ep->sock;
}
SESSIONS_ITER(ep->sessions, s, stmp) {
if (s->sock.flags & (COAP_SOCKET_WANT_READ|COAP_SOCKET_WANT_WRITE)) {
if (ctx->num_sockets < max_sockets)
ctx->sockets[ctx->num_sockets++] = &s->sock;
}
}
}
#endif /* COAP_SERVER_SUPPORT */
#if COAP_CLIENT_SUPPORT
SESSIONS_ITER(ctx->sessions, s, stmp) {
if (s->sock.flags & (COAP_SOCKET_WANT_READ |
COAP_SOCKET_WANT_WRITE |
COAP_SOCKET_WANT_CONNECT)) {
if (ctx->num_sockets < max_sockets)
ctx->sockets[ctx->num_sockets++] = &s->sock;
}
}
#endif /* COAP_CLIENT_SUPPORT */
if (ereadfds) {
ctx->readfds = *ereadfds;
nfds = enfds;
} else {
FD_ZERO(&ctx->readfds);
}
if (ewritefds) {
ctx->writefds = *ewritefds;
nfds = enfds;
} else {
FD_ZERO(&ctx->writefds);
}
if (eexceptfds) {
ctx->exceptfds = *eexceptfds;
nfds = enfds;
} else {
FD_ZERO(&ctx->exceptfds);
}
for (i = 0; i < ctx->num_sockets; i++) {
if (ctx->sockets[i]->fd + 1 > nfds)
nfds = ctx->sockets[i]->fd + 1;
if (ctx->sockets[i]->flags & COAP_SOCKET_WANT_READ)
FD_SET(ctx->sockets[i]->fd, &ctx->readfds);
if (ctx->sockets[i]->flags & COAP_SOCKET_WANT_WRITE)
FD_SET(ctx->sockets[i]->fd, &ctx->writefds);
#if !COAP_DISABLE_TCP
if (ctx->sockets[i]->flags & COAP_SOCKET_WANT_ACCEPT)
FD_SET(ctx->sockets[i]->fd, &ctx->readfds);
if (ctx->sockets[i]->flags & COAP_SOCKET_WANT_CONNECT) {
FD_SET(ctx->sockets[i]->fd, &ctx->writefds);
FD_SET(ctx->sockets[i]->fd, &ctx->exceptfds);
}
#endif /* !COAP_DISABLE_TCP */
}
return nfds;
}
#endif /* ! COAP_EPOLL_SUPPORT && COAP_THREAD_SAFE */
int
coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms,
int enfds, fd_set *ereadfds, fd_set *ewritefds,
@@ -1858,7 +1933,7 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms,
coap_lock_lock(ctx, return -1);
} else {
result = 0;
goto all_over;
}
if (result < 0) { /* error */
@@ -1866,10 +1941,38 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms,
coap_win_error_to_errno();
#endif
if (errno != EINTR) {
coap_log_err("select: %s", coap_socket_strerror());
#if COAP_THREAD_SAFE
if (errno == EBADF) {
coap_log_debug("select: %s\n", coap_socket_strerror());
goto all_over;
}
#endif /* COAP_THREAD_SAFE */
coap_log_err("select: %s\n", coap_socket_strerror());
return -1;
}
goto all_over;
}
#if COAP_THREAD_SAFE
/* Need to refresh what is available to read / write etc. */
nfds = coap_io_prepare_fds(ctx, enfds, ereadfds, ewritefds, eexceptfds);
tv.tv_usec = 0;
tv.tv_sec = 0;
result = select((int)nfds, &ctx->readfds, &ctx->writefds, &ctx->exceptfds, &tv);
if (result < 0) { /* error */
#ifdef _WIN32
coap_win_error_to_errno();
#endif
if (errno != EINTR) {
if (errno == EBADF) {
coap_log_debug("select: %s\n", coap_socket_strerror());
goto all_over;
}
coap_log_err("select: %s\n", coap_socket_strerror());
return -1;
}
goto all_over;
}
#endif /* COAP_THREAD_SAFE */
if (ereadfds) {
*ereadfds = ctx->readfds;
}
@@ -1881,12 +1984,6 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms,
}
if (result > 0) {
#if COAP_THREAD_SAFE
/* Need to refresh what is available to read / write etc. */
tv.tv_usec = 0;
tv.tv_sec = 0;
select((int)nfds, &ctx->readfds, &ctx->writefds, &ctx->exceptfds, &tv);
#endif /* COAP_THREAD_SAFE */
for (i = 0; i < ctx->num_sockets; i++) {
if ((ctx->sockets[i]->flags & COAP_SOCKET_WANT_READ) &&
FD_ISSET(ctx->sockets[i]->fd, &ctx->readfds))
@@ -1962,6 +2059,7 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms,
break;
}
coap_lock_lock(ctx, return -1);
#if COAP_THREAD_SAFE
/* Need to refresh what is available to read / write etc. */
nfds = epoll_wait(ctx->epfd, events, COAP_MAX_EPOLL_EVENTS, 0);
@@ -1970,11 +2068,9 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms,
coap_log_err("epoll_wait: unexpected error: %s (%d)\n",
coap_socket_strerror(), nfds);
}
coap_lock_lock(ctx, return -1);
break;
}
#endif /* COAP_THREAD_SAFE */
coap_lock_lock(ctx, return -1);
coap_io_do_epoll_lkd(ctx, events, nfds);
@@ -1992,16 +2088,132 @@ coap_io_process_with_fds_lkd(coap_context_t *ctx, uint32_t timeout_ms,
#if COAP_SERVER_SUPPORT
coap_expire_cache_entries(ctx);
#endif /* COAP_SERVER_SUPPORT */
coap_ticks(&now);
#if COAP_ASYNC_SUPPORT
/* Check to see if we need to send off any Async requests as delay might
have been updated */
coap_check_async(ctx, now);
coap_ticks(&now);
coap_check_async(ctx, now);
#endif /* COAP_ASYNC_SUPPORT */
#ifndef COAP_EPOLL_SUPPORT
all_over:
#endif /* COAP_EPOLL_SUPPORT */
coap_ticks(&now);
return (int)(((now - before) * 1000) / COAP_TICKS_PER_SECOND);
}
volatile int coap_thread_quit = 0;
void
coap_io_process_terminate_loop(void) {
coap_send_recv_terminate();
coap_thread_quit = 1;
}
COAP_API int
coap_io_process_loop(coap_context_t *context,
coap_io_process_thread_t main_loop_code,
void *main_loop_code_arg, uint32_t timeout_ms,
uint32_t thread_count) {
int ret;
if (!context)
return 0;
coap_lock_lock(context, return 0);
ret = coap_io_process_loop_lkd(context, main_loop_code,
main_loop_code_arg, timeout_ms,
thread_count);
coap_lock_unlock(context);
return ret;
}
int
coap_io_process_loop_lkd(coap_context_t *context,
coap_io_process_thread_t main_loop_code,
void *main_loop_code_arg, uint32_t timeout_ms,
uint32_t thread_count) {
int ret = 0;;
#if COAP_THREAD_SAFE
if (thread_count > 1) {
if (!coap_io_process_configure_threads(context, thread_count - 1))
return 0;
}
#else /* COAP_THREAD_SAFE */
thread_count = 1;
#endif /* COAP_THREAD_SAFE */
while (!coap_thread_quit) {
if (main_loop_code) {
coap_tick_t begin, end;
uint32_t used_ms;
coap_ticks(&begin);
/*
* main_loop_codecode should not be blocking for any time, and not calling
* coap_io_process().
*/
coap_lock_callback_release(context, main_loop_code(main_loop_code_arg),
/* On re-lock failure */
ret = 0; break);
/*
* Need to delay for the remainder of timeout_ms. In case main_loop_code()
* is time sensitive (e.g Observe subscription to /time), delay to the
* start of the a second boundary
*/
coap_ticks(&end);
used_ms = (uint32_t)(end - begin) * 1000 / COAP_TICKS_PER_SECOND;
if (timeout_ms == COAP_IO_NO_WAIT || timeout_ms == COAP_IO_WAIT) {
ret = coap_io_process_lkd(context, timeout_ms);
} else if (timeout_ms > used_ms) {
/* Wait for remaining time rounded up to next second start */
coap_tick_t next_time = end + (timeout_ms - used_ms) * COAP_TICKS_PER_SECOND / 1000;
unsigned int next_sec_us;
unsigned int next_sec_ms;
next_sec_us = (timeout_ms - used_ms) * 1000000 / COAP_TICKS_PER_SECOND + 1000000 -
(coap_ticks_to_rt_us(next_time) % 1000000);
next_sec_ms = (next_sec_us + 999) / 1000;
if (next_sec_ms > timeout_ms && next_sec_ms > 1000)
next_sec_ms -= 1000;
ret = coap_io_process_lkd(context, next_sec_ms ? next_sec_ms : 1);
} else {
/* timeout_ms has expired */
ret = coap_io_process_lkd(context, COAP_IO_NO_WAIT);
}
if (thread_count == 1) {
/*
* Need to delay if only one thread until the remainder of
* timeout_ms is used up. Otherwise, another thread will be
* waiting on coap_io_process() to do any input / timeout work.
*/
coap_ticks(&end);
used_ms = (uint32_t)(end - begin) * 1000 / COAP_TICKS_PER_SECOND;
if (timeout_ms > 0 && timeout_ms < used_ms) {
ret = coap_io_process_lkd(context, used_ms - timeout_ms);
} else {
ret = coap_io_process_lkd(context, COAP_IO_NO_WAIT);
}
}
} else {
ret = coap_io_process_lkd(context, timeout_ms);
}
/* coap_io_process_lkd() can return 0 */
if (ret >= 0)
ret = 1;
if (ret < 0) {
ret = 0;
break;
}
}
#if COAP_THREAD_SAFE
coap_io_process_remove_threads(context);
#endif /* COAP_THREAD_SAFE */
coap_thread_quit = 0;
return ret;
}
#endif /* ! WITH_LWIP && ! WITH_CONTIKI && ! RIOT_VERSION*/
COAP_API int

View File

@@ -2500,11 +2500,13 @@ coap_read_endpoint(coap_context_t *ctx, coap_endpoint_t *endpoint, coap_tick_t n
} else if (bytes_read > 0) {
coap_session_t *session = coap_endpoint_get_session(endpoint, packet, now);
if (session) {
coap_session_reference_lkd(session);
coap_log_debug("* %s: netif: recv %4zd bytes\n",
coap_session_str(session), bytes_read);
result = coap_handle_dgram_for_proto(ctx, session, packet);
if (endpoint->proto == COAP_PROTO_DTLS && session->type == COAP_SESSION_TYPE_HELLO && result == 1)
coap_session_new_dtls_session(session, now);
coap_session_release_lkd(session);
}
}
return result;
@@ -4793,6 +4795,12 @@ int coap_started = 0;
* Global lock for multi-thread support
*/
coap_lock_t global_lock;
/*
* low level protection mutex
*/
coap_mutex_t m_show_pdu;
coap_mutex_t m_log_impl;
coap_mutex_t m_io_threads;
#endif /* COAP_THREAD_SAFE */
void
@@ -4808,6 +4816,9 @@ coap_startup(void) {
#if COAP_THREAD_SAFE
coap_lock_init();
coap_mutex_init(&m_show_pdu);
coap_mutex_init(&m_log_impl);
coap_mutex_init(&m_io_threads);
#endif /* COAP_THREAD_SAFE */
#if defined(HAVE_WINSOCK2_H)
@@ -4856,6 +4867,12 @@ coap_cleanup(void) {
#endif /* WITH_LWIP */
coap_dtls_shutdown();
#if COAP_THREAD_SAFE
coap_mutex_destroy(&m_show_pdu);
coap_mutex_destroy(&m_log_impl);
coap_mutex_destroy(&m_io_threads);
#endif /* COAP_THREAD_SAFE */
coap_debug_reset();
}

View File

@@ -105,10 +105,12 @@ coap_netif_dgrm_read_ep(coap_endpoint_t *endpoint, coap_packet_t *packet) {
bytes_read = coap_socket_recv(&endpoint->sock, packet);
keep_errno = errno;
if (bytes_read == -1) {
coap_log_debug("* %s: netif: failed to read %zd bytes (%s)\n",
coap_endpoint_str(endpoint), packet->length,
coap_socket_strerror());
errno = keep_errno;
if (errno != EAGAIN) {
coap_log_debug("* %s: netif: failed to read %zd bytes (%s)\n",
coap_endpoint_str(endpoint), packet->length,
coap_socket_strerror());
errno = keep_errno;
}
} else if (bytes_read > 0) {
/* Let the caller do the logging as session available by then */
}

View File

@@ -1343,7 +1343,7 @@ coap_resource_notify_observers_lkd(coap_resource_t *r,
}
r->context->observe_pending = 1;
coap_update_io_timer(r->context, 0);
coap_check_notify_lkd(r->context);
return 1;
}

View File

@@ -1433,8 +1433,8 @@ coap_session_create_client(coap_context_t *ctx,
#endif /* !COAP_DISABLE_TCP */
}
#ifdef COAP_EPOLL_SUPPORT
session->sock.session = session;
#ifdef COAP_EPOLL_SUPPORT
coap_epoll_ctl_add(&session->sock,
EPOLLIN |
((session->sock.flags & COAP_SOCKET_WANT_CONNECT) ?

View File

@@ -300,16 +300,18 @@ coap_socket_accept_tcp(coap_socket_t *server,
#endif
(void)extra;
server->flags &= ~COAP_SOCKET_CAN_ACCEPT;
new_client->fd = accept(server->fd, &remote_addr->addr.sa,
&remote_addr->size);
if (new_client->fd == COAP_INVALID_SOCKET) {
coap_log_warn("coap_socket_accept_tcp: accept: %s\n",
coap_socket_strerror());
if (errno != EAGAIN) {
coap_log_warn("coap_socket_accept_tcp: accept: %s\n",
coap_socket_strerror());
}
return 0;
}
server->flags &= ~COAP_SOCKET_CAN_ACCEPT;
if (getsockname(new_client->fd, &local_addr->addr.sa, &local_addr->size) < 0)
coap_log_warn("coap_socket_accept_tcp: getsockname: %s\n",
coap_socket_strerror());

View File

@@ -99,6 +99,112 @@ coap_lock_lock_func(void) {
}
#endif /* ! COAP_THREAD_RECURSIVE_CHECK */
#if !WITH_LWIP
extern volatile int coap_thread_quit;
static pthread_t *thread_id = NULL;
static uint32_t thread_id_count = 0;
/* Visible to only this thread */
COAP_THREAD_LOCAL_VAR uint32_t thread_no = 0;
/* Visible across all threads */
uint32_t max_thread_no = 0;
typedef struct {
coap_context_t *context;
uint32_t thread_no;
} coap_thread_param_t;
static void *
coap_io_process_worker_thread(void *arg) {
coap_thread_param_t *thread_param = (coap_thread_param_t *)arg;
coap_context_t *context = thread_param->context;
thread_no = thread_param->thread_no;
coap_free_type(COAP_STRING, thread_param);
coap_log_debug("Thread %lx start\n", pthread_self());
while (!coap_thread_quit) {
int result;
coap_lock_lock(context, return 0);
result = coap_io_process_lkd(context, COAP_IO_WAIT);
coap_lock_unlock(context);
if (result < 0)
break;
}
coap_log_debug("Thread %lx exit\n", pthread_self());
return 0;
}
int
coap_io_process_configure_threads(coap_context_t *context, uint32_t thread_count) {
uint32_t i;
coap_mutex_lock(&m_io_threads);
thread_no = 1;
max_thread_no = 1 + thread_count;
coap_free_type(COAP_STRING, thread_id);
thread_id = coap_malloc_type(COAP_STRING, thread_count * sizeof(pthread_t));
if (!thread_id) {
coap_log_err("thread start up memory allocate failure\n");
coap_mutex_unlock(&m_io_threads);
return 0;
}
for (i = 0; i < thread_count ; i++) {
coap_thread_param_t *thread_param = coap_malloc_type(COAP_STRING, sizeof(coap_thread_param_t));
int s;
thread_param->context = context;
thread_param->thread_no = i + 2;
s = pthread_create(&thread_id[i], NULL,
&coap_io_process_worker_thread, thread_param);
if (s != 0) {
coap_log_err("thread start up failure (%s)\n", coap_socket_strerror());
coap_mutex_unlock(&m_io_threads);
return 0;
}
thread_id_count++;
}
coap_mutex_unlock(&m_io_threads);
return 1;
}
#ifdef HAVE_SIGNAL_H
#include <signal.h>
#endif /* HAVE_SIGNAL_H */
void
coap_io_process_remove_threads(coap_context_t *context) {
uint32_t i;
(void)context;
coap_lock_unlock(context);
coap_mutex_lock(&m_io_threads);
for (i = 0; i < thread_id_count ; i++) {
int s = pthread_kill(thread_id[i], SIGINT);
if (s != 0) {
coap_log_err("thread kill failure\n");
}
}
for (i = 0; i < thread_id_count ; i++) {
void *retval;
int s = pthread_join(thread_id[i], &retval);
if (s != 0) {
coap_log_err("thread join failure\n");
}
}
coap_free_type(COAP_STRING, thread_id);
thread_id = NULL;
thread_id_count = 0;
coap_mutex_unlock(&m_io_threads);
coap_lock_lock(context, return);
}
#endif /* !WITH_LWIP */
#else /* ! COAP_THREAD_SAFE */
#ifdef __clang__