1
0
mirror of https://github.com/ossrs/srs.git synced 2025-10-14 02:43:34 +08:00

AI: Add utest to cover recv thread module

This commit is contained in:
OSSRS-AI
2025-10-12 23:03:07 -04:00
committed by winlin
parent a3f8d13c0a
commit 6846f8e893
4 changed files with 364 additions and 13 deletions

View File

@@ -162,7 +162,7 @@ ISrsQueueRecvThread::~ISrsQueueRecvThread()
SrsQueueRecvThread::SrsQueueRecvThread(SrsLiveConsumer *consumer, ISrsRtmpServer *rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid)
{
_consumer = consumer;
consumer_ = consumer;
rtmp_ = rtmp_sdk;
recv_error_ = srs_success;
trd_ = new SrsRecvThread(this, rtmp_sdk, tm, parent_cid);
@@ -232,8 +232,8 @@ srs_error_t SrsQueueRecvThread::consume(SrsRtmpCommonMessage *msg)
// @see SrsRtmpConn::process_play_control_msg
queue_.push_back(msg);
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (_consumer) {
_consumer->wakeup();
if (consumer_) {
consumer_->wakeup();
}
#endif
return srs_success;
@@ -254,8 +254,8 @@ void SrsQueueRecvThread::interrupt(srs_error_t err)
recv_error_ = srs_error_copy(err);
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (_consumer) {
_consumer->wakeup();
if (consumer_) {
consumer_->wakeup();
}
#endif
}
@@ -287,12 +287,12 @@ SrsPublishRecvThread::SrsPublishRecvThread(ISrsRtmpServer *rtmp_sdk, ISrsRequest
{
rtmp_ = rtmp_sdk;
_conn = conn;
conn_ = conn;
source_ = source;
nn_msgs_for_yield_ = 0;
recv_error_ = srs_success;
_nb_msgs = 0;
nb_msgs_ = 0;
video_frames_ = 0;
error_ = srs_cond_new();
@@ -336,7 +336,7 @@ srs_error_t SrsPublishRecvThread::wait(srs_utime_t tm)
int64_t SrsPublishRecvThread::nb_msgs()
{
return _nb_msgs;
return nb_msgs_;
}
uint64_t SrsPublishRecvThread::nb_video_frames()
@@ -387,7 +387,7 @@ srs_error_t SrsPublishRecvThread::consume(SrsRtmpCommonMessage *msg)
cid_ = ncid_;
}
_nb_msgs++;
nb_msgs_++;
if (msg->header_.is_video()) {
video_frames_++;
@@ -398,7 +398,7 @@ srs_error_t SrsPublishRecvThread::consume(SrsRtmpCommonMessage *msg)
srs_time_now_realtime(), msg->header_.timestamp, msg->size);
// the rtmp connection will handle this message
err = _conn->handle_publish_message(source_, msg);
err = conn_->handle_publish_message(source_, msg);
// must always free it,
// the source will copy it if need to use.

View File

@@ -131,7 +131,7 @@ private:
ISrsRtmpServer *rtmp_;
// The recv thread error code.
srs_error_t recv_error_;
SrsLiveConsumer *_consumer;
SrsLiveConsumer *consumer_;
public:
// TODO: FIXME: Refine timeout in time unit.
@@ -182,7 +182,7 @@ private:
ISrsRtmpServer *rtmp_;
ISrsRequest *req_;
// The msgs already got.
int64_t _nb_msgs;
int64_t nb_msgs_;
// The video frames we got.
uint64_t video_frames_;
// For mr(merged read),
@@ -194,7 +194,7 @@ private:
bool realtime_;
// The recv thread error code.
srs_error_t recv_error_;
SrsRtmpConn *_conn;
SrsRtmpConn *conn_;
// The params for conn callback.
SrsSharedPtr<SrsLiveSource> source_;
// The error timeout cond

View File

@@ -11,11 +11,14 @@ using namespace std;
#include <srs_app_caster_flv.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_mpegts_udp.hpp>
#include <srs_app_recv_thread.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_packet.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_http_conn.hpp>
#include <srs_protocol_http_stack.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_utest_coworkers.hpp>
#include <srs_utest_http.hpp>
#include <srs_utest_kernel3.hpp>
@@ -2855,3 +2858,309 @@ VOID TEST(HttpxConnTest, OnConnDoneWithNonTimeoutError)
srs_freep(err);
srs_freep(mock_manager);
}
// Mock ISrsRtmpServer implementation for SrsQueueRecvThread
MockRtmpServerForQueueRecvThread::MockRtmpServerForQueueRecvThread()
{
set_auto_response_called_ = false;
auto_response_value_ = true;
}
MockRtmpServerForQueueRecvThread::~MockRtmpServerForQueueRecvThread()
{
}
void MockRtmpServerForQueueRecvThread::set_recv_timeout(srs_utime_t tm)
{
}
void MockRtmpServerForQueueRecvThread::set_send_timeout(srs_utime_t tm)
{
}
srs_error_t MockRtmpServerForQueueRecvThread::handshake()
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::connect_app(ISrsRequest *req)
{
return srs_success;
}
uint32_t MockRtmpServerForQueueRecvThread::proxy_real_ip()
{
return 0;
}
srs_error_t MockRtmpServerForQueueRecvThread::set_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::set_peer_bandwidth(int bandwidth, int type)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::set_chunk_size(int chunk_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::response_connect_app(ISrsRequest *req, const char *server_ip)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::on_bw_done()
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_play(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_fmle_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_haivision_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::fmle_unpublish(int stream_id, double unpublish_tid)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_flash_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_publishing(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::redirect(ISrsRequest *r, std::string url, bool &accepted)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::send_and_free_packet(SrsRtmpCommand *packet, int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::on_play_client_pause(int stream_id, bool is_pause)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::set_in_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::recv_message(SrsRtmpCommonMessage **pmsg)
{
return srs_success;
}
void MockRtmpServerForQueueRecvThread::set_auto_response(bool v)
{
set_auto_response_called_ = true;
auto_response_value_ = v;
}
void MockRtmpServerForQueueRecvThread::set_merge_read(bool v, IMergeReadHandler *handler)
{
}
void MockRtmpServerForQueueRecvThread::set_recv_buffer(int buffer_size)
{
}
void MockRtmpServerForQueueRecvThread::reset()
{
set_auto_response_called_ = false;
auto_response_value_ = true;
}
// Test SrsQueueRecvThread basic queue operations
// This test covers the major use scenario: consume messages, check queue state, pump messages, and handle errors
VOID TEST(QueueRecvThreadTest, BasicQueueOperations)
{
srs_error_t err;
// Create mock RTMP server
SrsUniquePtr<MockRtmpServerForQueueRecvThread> mock_rtmp(new MockRtmpServerForQueueRecvThread());
// Create SrsQueueRecvThread (without starting the actual recv thread)
SrsUniquePtr<SrsQueueRecvThread> queue_thread(new SrsQueueRecvThread(NULL, mock_rtmp.get(), 5 * SRS_UTIME_SECONDS, SrsContextId()));
// Test 1: Initially queue should be empty
EXPECT_TRUE(queue_thread->empty());
EXPECT_EQ(0, queue_thread->size());
EXPECT_FALSE(queue_thread->interrupted());
// Test 2: Consume first message
SrsRtmpCommonMessage *msg1 = new SrsRtmpCommonMessage();
msg1->header_.message_type_ = RTMP_MSG_VideoMessage;
msg1->header_.payload_length_ = 10;
msg1->create_payload(10);
HELPER_EXPECT_SUCCESS(queue_thread->consume(msg1));
// Queue should have one message
EXPECT_FALSE(queue_thread->empty());
EXPECT_EQ(1, queue_thread->size());
EXPECT_TRUE(queue_thread->interrupted()); // interrupted() returns true when queue is not empty
// Test 3: Consume second message
SrsRtmpCommonMessage *msg2 = new SrsRtmpCommonMessage();
msg2->header_.message_type_ = RTMP_MSG_AudioMessage;
msg2->header_.payload_length_ = 20;
msg2->create_payload(20);
HELPER_EXPECT_SUCCESS(queue_thread->consume(msg2));
// Queue should have two messages
EXPECT_FALSE(queue_thread->empty());
EXPECT_EQ(2, queue_thread->size());
// Test 4: Pump first message (FIFO order)
SrsRtmpCommonMessage *pumped_msg1 = queue_thread->pump();
EXPECT_TRUE(pumped_msg1 != NULL);
EXPECT_EQ(RTMP_MSG_VideoMessage, pumped_msg1->header_.message_type_);
EXPECT_EQ(10, pumped_msg1->header_.payload_length_);
srs_freep(pumped_msg1);
// Queue should have one message left
EXPECT_FALSE(queue_thread->empty());
EXPECT_EQ(1, queue_thread->size());
// Test 5: Pump second message
SrsRtmpCommonMessage *pumped_msg2 = queue_thread->pump();
EXPECT_TRUE(pumped_msg2 != NULL);
EXPECT_EQ(RTMP_MSG_AudioMessage, pumped_msg2->header_.message_type_);
EXPECT_EQ(20, pumped_msg2->header_.payload_length_);
srs_freep(pumped_msg2);
// Queue should be empty again
EXPECT_TRUE(queue_thread->empty());
EXPECT_EQ(0, queue_thread->size());
EXPECT_FALSE(queue_thread->interrupted());
// Test 6: Test error_code() - initially should be success
err = queue_thread->error_code();
HELPER_EXPECT_SUCCESS(err);
// Test 7: Test interrupt() with error
srs_error_t test_error = srs_error_new(ERROR_SOCKET_READ, "test error");
queue_thread->interrupt(test_error);
// Error code should now return the error
err = queue_thread->error_code();
EXPECT_TRUE(err != srs_success);
EXPECT_EQ(ERROR_SOCKET_READ, srs_error_code(err));
srs_freep(err);
// Test 8: Test on_start() and on_stop() - verify set_auto_response is called
queue_thread->on_start();
EXPECT_TRUE(mock_rtmp->set_auto_response_called_);
EXPECT_FALSE(mock_rtmp->auto_response_value_); // Should be set to false
mock_rtmp->reset();
queue_thread->on_stop();
EXPECT_TRUE(mock_rtmp->set_auto_response_called_);
EXPECT_TRUE(mock_rtmp->auto_response_value_); // Should be set to true
}
// Test SrsPublishRecvThread basic operations
// This test covers the major use scenario: wait(), nb_msgs(), nb_video_frames(), error_code(), set_cid(), get_cid()
VOID TEST(PublishRecvThreadTest, BasicOperations)
{
srs_error_t err;
// Create mock dependencies
SrsUniquePtr<MockRtmpServerForQueueRecvThread> mock_rtmp(new MockRtmpServerForQueueRecvThread());
SrsUniquePtr<MockSrsRequest> mock_req(new MockSrsRequest("__defaultVhost__", "live", "test_stream"));
SrsSharedPtr<SrsLiveSource> mock_source; // NULL is fine for this test
// Create SrsPublishRecvThread (without starting the actual recv thread)
SrsUniquePtr<SrsPublishRecvThread> publish_thread(new SrsPublishRecvThread(
mock_rtmp.get(), mock_req.get(), -1, 5 * SRS_UTIME_SECONDS, NULL, mock_source, SrsContextId()));
// Test 1: Initially nb_msgs should be 0
EXPECT_EQ(0, publish_thread->nb_msgs());
// Test 2: Initially nb_video_frames should be 0
EXPECT_EQ(0, publish_thread->nb_video_frames());
// Test 3: Test error_code() - initially should be success
err = publish_thread->error_code();
HELPER_EXPECT_SUCCESS(err);
// Test 4: Test set_cid() and get_cid()
SrsContextId test_cid;
test_cid.set_value("test-context-id");
publish_thread->set_cid(test_cid);
SrsContextId retrieved_cid = publish_thread->get_cid();
EXPECT_STREQ(test_cid.c_str(), retrieved_cid.c_str());
// Test 5: Test wait() with timeout - should return success when no error
err = publish_thread->wait(100 * SRS_UTIME_MILLISECONDS);
HELPER_EXPECT_SUCCESS(err);
// Test 6: Test consume() to increment message counters
SrsRtmpCommonMessage *video_msg = new SrsRtmpCommonMessage();
video_msg->header_.message_type_ = RTMP_MSG_VideoMessage;
video_msg->header_.payload_length_ = 100;
video_msg->create_payload(100);
// Note: consume() will try to call _conn->handle_publish_message() which will fail with NULL _conn
// So we expect this to fail, but we can still test the counter increment by checking nb_msgs
// For this basic test, we'll just verify the initial state and error handling
// Test 7: Test interrupt() with error
srs_error_t test_error = srs_error_new(ERROR_SOCKET_READ, "test error");
publish_thread->interrupt(test_error);
// Error code should now return the error
err = publish_thread->error_code();
EXPECT_TRUE(err != srs_success);
EXPECT_EQ(ERROR_SOCKET_READ, srs_error_code(err));
srs_freep(err);
// Test 8: Test wait() after error - should return the error immediately
err = publish_thread->wait(100 * SRS_UTIME_MILLISECONDS);
EXPECT_TRUE(err != srs_success);
EXPECT_EQ(ERROR_SOCKET_READ, srs_error_code(err));
srs_freep(err);
// Test 9: Test interrupted() - should always return false for publish recv thread
EXPECT_FALSE(publish_thread->interrupted());
// Clean up
srs_freep(video_msg);
}

View File

@@ -647,4 +647,46 @@ public:
virtual void expire();
};
// Mock ISrsRtmpServer for testing SrsQueueRecvThread
class MockRtmpServerForQueueRecvThread : public ISrsRtmpServer
{
public:
bool set_auto_response_called_;
bool auto_response_value_;
public:
MockRtmpServerForQueueRecvThread();
virtual ~MockRtmpServerForQueueRecvThread();
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_error_t handshake();
virtual srs_error_t connect_app(ISrsRequest *req);
virtual uint32_t proxy_real_ip();
virtual srs_error_t set_window_ack_size(int ack_size);
virtual srs_error_t set_peer_bandwidth(int bandwidth, int type);
virtual srs_error_t set_chunk_size(int chunk_size);
virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip);
virtual srs_error_t on_bw_done();
virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration);
virtual srs_error_t start_play(int stream_id);
virtual srs_error_t start_fmle_publish(int stream_id);
virtual srs_error_t start_haivision_publish(int stream_id);
virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid);
virtual srs_error_t start_flash_publish(int stream_id);
virtual srs_error_t start_publishing(int stream_id);
virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted);
virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id);
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket);
virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id);
virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause);
virtual srs_error_t set_in_window_ack_size(int ack_size);
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg);
virtual void set_auto_response(bool v);
virtual void set_merge_read(bool v, IMergeReadHandler *handler);
virtual void set_recv_buffer(int buffer_size);
void reset();
};
#endif