diff --git a/media-proxy/include/mtl.h b/media-proxy/include/mtl.h index cc06160d..6d93776b 100644 --- a/media-proxy/include/mtl.h +++ b/media-proxy/include/mtl.h @@ -37,8 +37,8 @@ extern "C" { #define NS_PER_MS (1000 * 1000) #endif -#define SCH_CNT 1 -#define TASKLETS 800 +#define SCH_CNT 3 +#define TASKLETS 1000 enum direction { TX, @@ -387,6 +387,11 @@ typedef struct { bool sch_start; int new_NALU; bool check_first_new_NALU; + bool fragments_bunch; + mcm_buffer* rtp_header; + uint16_t tx_buf_num; + char* dst; + char* dst_nalu_size_point; //int sch_idx; //int tasklet_idx; diff --git a/media-proxy/src/proxy_context.cc b/media-proxy/src/proxy_context.cc index ef9e08f9..d132c2e6 100644 --- a/media-proxy/src/proxy_context.cc +++ b/media-proxy/src/proxy_context.cc @@ -204,7 +204,7 @@ void ProxyContext::ParseStInitParam(const mcm_conn_param* request, struct mtl_in st_param->rx_queues_cnt[MTL_PORT_P] = 128; st_param->tx_queues_cnt[MTL_PORT_P] = 128; st_param->lcores = NULL; - st_param->memzone_max = 3000; + st_param->memzone_max = 9000; INFO("ProxyContext: ParseStInitParam..."); INFO("num_ports : %d", st_param->num_ports); diff --git a/media-proxy/src/udp_h264_rx_sample.c b/media-proxy/src/udp_h264_rx_sample.c index d83b2c0f..beac9aaa 100644 --- a/media-proxy/src/udp_h264_rx_sample.c +++ b/media-proxy/src/udp_h264_rx_sample.c @@ -7,12 +7,12 @@ #include "mtl.h" #include "shm_memif.h" #include +#include #include +#include #include #include -#include #include -#include static void* rx_memif_event_loop(void* arg) { @@ -20,7 +20,7 @@ static void* rx_memif_event_loop(void* arg) memif_socket_handle_t memif_socket = (memif_socket_handle_t)arg; do { - INFO("media-proxy waiting event."); + INFO("media-proxy waiting event."); err = memif_poll_event(memif_socket, -1); } while (err == MEMIF_ERR_SUCCESS); @@ -125,199 +125,292 @@ static int udp_server_h264(void* arg) uint32_t buf_size = s->memif_nalu_size; int flagFragment = 0; - // FILE* fp = fopen("data.264", "wb"); + //FILE* fp = fopen("data1.264", "wb"); unsigned char h264_frame_start_str[4]; h264_frame_start_str[0] = 0; h264_frame_start_str[1] = 0; h264_frame_start_str[2] = 0; h264_frame_start_str[3] = 1; - /*udp poll*/ - //int new_NALU = 0; - //uint16_t nalu_size; mcm_buffer* rtp_header; - bool direct_transfer = true; bool memif_alloc = false; - /*udp poll*/ - //bool check_first_new_NALU = true; - while (s->shm_ready != 1) { INFO("%s, wait for share memory is ready\n", __func__); sleep(1); } //INFO("%s, start socket %p\n", __func__, socket); - rtp_header = calloc(1, sizeof(mcm_buffer)); + if (s->fragments_bunch == false) { + rtp_header = calloc(1, sizeof(mcm_buffer)); + } - if ( s->stop != true) { + if (s->stop != true) { ssize_t recv = mudp_recvfrom(socket, buf, sizeof(buf), 0, NULL, NULL); // printf("[%s] : recv = %d\n", __FUNCTION__, (int)recv); - /*udp poll*/ if (recv < 0) { //INFO("%s, mudp_recvfrom fail %d\n", __func__, (int)recv); - if (rtp_header != NULL) { - free(rtp_header); - rtp_header = NULL; - } return 0; } - //} else { - // INFO("Receive a UDP RTP package\n"); - /*dup poll*/ - //if (check_first_new_NALU == true) { - if (s->check_first_new_NALU == true) { - unsigned char RTP_payload_type = *((unsigned char*)buf + 1); - unsigned char mark = RTP_payload_type & 0x80; - if (mark > 0) { - s->new_NALU = 1; - // printf("First Mark = %d\n", mark); - s->check_first_new_NALU = false; - //continue; - return 0; - } else { - //continue; - return 0; - } - } - // } - - /* allocate memory */ - memif_alloc = false; - tx_bufs = s->shm_bufs; - while (memif_alloc != true) { - err = memif_buffer_alloc(s->memif_conn, qid, tx_bufs, buf_num, &tx_buf_num, buf_size); - if (err != MEMIF_ERR_SUCCESS) { - INFO("Failed to alloc memif buffer: %s, err:%d", memif_strerror(err), err); - //continue; - return -1; + if (s->check_first_new_NALU == true) { + unsigned char RTP_payload_type = *((unsigned char*)buf + 1); + unsigned char mark = RTP_payload_type & 0x80; + if (mark > 0) { + s->new_NALU = 1; + // printf("First Mark = %d\n", mark); + s->check_first_new_NALU = false; + return 0; } else { - // INFO("Success to alloc memif buffer\n"); - memif_alloc = true; + return 0; } } - dst = tx_bufs->data; - tx = 0; - rtp_header->metadata.seq_num = *((uint16_t*)(buf + 2)); - rtp_header->metadata.timestamp = *((uint32_t*)(buf + 4)); - mtl_memcpy(dst, &rtp_header->metadata.seq_num, sizeof(rtp_header->metadata.seq_num)); - dst += sizeof(rtp_header->metadata.seq_num); - mtl_memcpy(dst, &rtp_header->metadata.timestamp, sizeof(rtp_header->metadata.timestamp)); - dst += sizeof(rtp_header->metadata.timestamp); - dst_nalu_size_point = dst; - dst += sizeof(size_t); - rtp_header->len = 0; - - if (s->new_NALU == 1) { - s->new_NALU = 0; - - // fwrite(h264_frame_start_str, 1, 1, fp); - mtl_memcpy(dst, h264_frame_start_str, sizeof(unsigned char)); - dst += sizeof(unsigned char); - rtp_header->len = rtp_header->len + 1; - } - unsigned char payload_header = *((unsigned char*)(buf + 12)); - unsigned char fragment_header = *((unsigned char*)(buf + 13)); - typedef unsigned char u8; - u8 payload_type = payload_header & 0x1f; - u8 payload_flag = payload_header & 0xe0; + if (s->fragments_bunch == true) { + rtp_header = s->rtp_header; - u8 fragment_start = fragment_header & 0x80; - u8 fragment_end = fragment_header & 0x40; - u8 fragment_type = fragment_header & 0x1f; - u8 payload_header_temp = payload_flag + fragment_type; + if (s->new_NALU == 1) { + s->new_NALU = 0; + /* allocate memory */ + tx_bufs = s->shm_bufs; + while (memif_alloc != true) { + err = memif_buffer_alloc(s->memif_conn, qid, tx_bufs, buf_num, &s->tx_buf_num, buf_size); + if (err != MEMIF_ERR_SUCCESS) { + INFO("Failed to alloc memif buffer: %s, err:%d", memif_strerror(err), err); + return -1; + } else { + // INFO("Success to alloc memif buffer\n"); + memif_alloc = true; + } + } + s->dst = tx_bufs->data; + tx = 0; + rtp_header->metadata.seq_num = *((uint16_t*)(buf + 2)); + rtp_header->metadata.timestamp = *((uint32_t*)(buf + 4)); + mtl_memcpy(s->dst, &rtp_header->metadata.seq_num, sizeof(rtp_header->metadata.seq_num)); + s->dst += sizeof(rtp_header->metadata.seq_num); + mtl_memcpy(s->dst, &rtp_header->metadata.timestamp, sizeof(rtp_header->metadata.timestamp)); + s->dst += sizeof(rtp_header->metadata.timestamp); + s->dst_nalu_size_point = s->dst; + s->dst += sizeof(size_t); + rtp_header->len = 0; + + //fwrite(h264_frame_start_str, 1, 1, s->fp); + mtl_memcpy(s->dst, h264_frame_start_str, sizeof(unsigned char)); + s->dst += sizeof(unsigned char); + rtp_header->len = rtp_header->len + 1; + } - if (payload_type == 0x1c) { - flagFragment = 1; - } else { - flagFragment = 0; - } + unsigned char payload_header = *((unsigned char*)(buf + 12)); + unsigned char fragment_header = *((unsigned char*)(buf + 13)); - if (flagFragment == 1) { - // printf("fragment true!\n"); - if (fragment_start == 0x80) { - // printf("fragment start!\n"); - // fwrite(h264_frame_start_str + 1, 3, 1, fp);//printf("001\n"); - mtl_memcpy(dst, h264_frame_start_str + 1, sizeof(unsigned char) * 3); - dst += sizeof(unsigned char) * 3; + typedef unsigned char u8; + u8 payload_type = payload_header & 0x1f; + u8 payload_flag = payload_header & 0xe0; + + u8 fragment_start = fragment_header & 0x80; + u8 fragment_end = fragment_header & 0x40; + u8 fragment_type = fragment_header & 0x1f; + u8 payload_header_temp = payload_flag + fragment_type; + + if (payload_type == 0x1c) { + flagFragment = 1; + } else { + flagFragment = 0; + } + + if (flagFragment == 1) { + // printf("fragment true!\n"); + if (fragment_start == 0x80) { + // printf("fragment start!\n"); + //fwrite(h264_frame_start_str + 1, 3, 1, s->fp);//printf("001\n"); + mtl_memcpy(s->dst, h264_frame_start_str + 1, sizeof(unsigned char) * 3); + s->dst += sizeof(unsigned char) * 3; + rtp_header->len = rtp_header->len + 3; + //fwrite(&payload_header_temp, 1, 1, s->fp); + mtl_memcpy(s->dst, &payload_header_temp, sizeof(unsigned char)); + s->dst += sizeof(unsigned char); + rtp_header->len = rtp_header->len + 1; + //fwrite(buf + 14, (int)recv - 14, 1, s->fp); + mtl_memcpy(s->dst, buf + 14, (int)recv - 14); + s->dst += (int)recv - 14; + rtp_header->len = rtp_header->len + (int)recv - 14; + return 0; + } else { + if (fragment_end == 0x40) { + // printf("fragment end!\n"); + //fwrite(buf + 14, (int)recv - 14, 1, s->fp); + mtl_memcpy(s->dst, buf + 14, (int)recv - 14); + s->dst += (int)recv - 14; + rtp_header->len = rtp_header->len + (int)recv - 14; + } else { + // printf("fragment middle!\n"); + //fwrite(buf + 14, (int)recv - 14, 1, s->fp); + mtl_memcpy(s->dst, buf + 14, (int)recv - 14); + s->dst += (int)recv - 14; + rtp_header->len = rtp_header->len + (int)recv - 14; + return 0; + } + } + } else { + // printf("fragment false!\n"); + //fwrite(h264_frame_start_str + 1, 3, 1, s->fp);//printf("001\n"); + mtl_memcpy(s->dst, h264_frame_start_str + 1, sizeof(unsigned char) * 3); + s->dst += sizeof(unsigned char) * 3; rtp_header->len = rtp_header->len + 3; - // fwrite(&payload_header_temp, 1, 1, fp); - mtl_memcpy(dst, &payload_header_temp, sizeof(unsigned char)); - dst += sizeof(unsigned char); - rtp_header->len = rtp_header->len + 1; - // fwrite(buf + 14, (int)recv - 14, 1, fp); - mtl_memcpy(dst, buf + 14, (int)recv - 14); - rtp_header->len = rtp_header->len + (int)recv - 14; - mtl_memcpy(dst_nalu_size_point, &rtp_header->len, sizeof(size_t)); - /*Send to microservice application.*/ - // INFO("memif_tx_burst for framgment = start\n"); - err = memif_tx_burst(s->memif_conn, qid, tx_bufs, tx_buf_num, &tx); + //fwrite(buf + 12, (int)recv - 12, 1, s->fp); + mtl_memcpy(s->dst, buf + 12, sizeof(unsigned char) * ((int)recv - 12)); + s->dst += sizeof(unsigned char) * ((int)recv - 12); + rtp_header->len = rtp_header->len + (int)recv - 12; + } + + unsigned char RTP_payload_type = *((unsigned char*)buf + 1); + unsigned char mark = RTP_payload_type & 0x80; + + if (mark > 0) { + s->new_NALU = 1; + mtl_memcpy(s->dst_nalu_size_point, &rtp_header->len, sizeof(size_t)); + err = memif_tx_burst(s->memif_conn, qid, tx_bufs, s->tx_buf_num, &tx); + if (err != MEMIF_ERR_SUCCESS) { INFO("memif_tx_burst for framgment=0: %s", memif_strerror(err)); } + s->dst = NULL; + s->dst_nalu_size_point = NULL; + } + + } else { + /* allocate memory */ + memif_alloc = false; + tx_bufs = s->shm_bufs; + while (memif_alloc != true) { + err = memif_buffer_alloc(s->memif_conn, qid, tx_bufs, buf_num, &tx_buf_num, buf_size); + if (err != MEMIF_ERR_SUCCESS) { + INFO("Failed to alloc memif buffer: %s, err:%d", memif_strerror(err), err); + //continue; + return -1; + } else { + // INFO("Success to alloc memif buffer\n"); + memif_alloc = true; + } + } + dst = tx_bufs->data; + tx = 0; + rtp_header->metadata.seq_num = *((uint16_t*)(buf + 2)); + rtp_header->metadata.timestamp = *((uint32_t*)(buf + 4)); + mtl_memcpy(dst, &rtp_header->metadata.seq_num, sizeof(rtp_header->metadata.seq_num)); + dst += sizeof(rtp_header->metadata.seq_num); + mtl_memcpy(dst, &rtp_header->metadata.timestamp, sizeof(rtp_header->metadata.timestamp)); + dst += sizeof(rtp_header->metadata.timestamp); + dst_nalu_size_point = dst; + dst += sizeof(size_t); + rtp_header->len = 0; + + if (s->new_NALU == 1) { + s->new_NALU = 0; + + // fwrite(h264_frame_start_str, 1, 1, fp); + mtl_memcpy(dst, h264_frame_start_str, sizeof(unsigned char)); + dst += sizeof(unsigned char); + rtp_header->len = rtp_header->len + 1; + } + unsigned char payload_header = *((unsigned char*)(buf + 12)); + unsigned char fragment_header = *((unsigned char*)(buf + 13)); + + typedef unsigned char u8; + u8 payload_type = payload_header & 0x1f; + u8 payload_flag = payload_header & 0xe0; + + u8 fragment_start = fragment_header & 0x80; + u8 fragment_end = fragment_header & 0x40; + u8 fragment_type = fragment_header & 0x1f; + u8 payload_header_temp = payload_flag + fragment_type; + + if (payload_type == 0x1c) { + flagFragment = 1; } else { - if (fragment_end == 0x40) { - // printf("fragment end!\n"); + flagFragment = 0; + } + + if (flagFragment == 1) { + // printf("fragment true!\n"); + if (fragment_start == 0x80) { + // printf("fragment start!\n"); + // fwrite(h264_frame_start_str + 1, 3, 1, fp);//printf("001\n"); + mtl_memcpy(dst, h264_frame_start_str + 1, sizeof(unsigned char) * 3); + dst += sizeof(unsigned char) * 3; + rtp_header->len = rtp_header->len + 3; + // fwrite(&payload_header_temp, 1, 1, fp); + mtl_memcpy(dst, &payload_header_temp, sizeof(unsigned char)); + dst += sizeof(unsigned char); + rtp_header->len = rtp_header->len + 1; // fwrite(buf + 14, (int)recv - 14, 1, fp); mtl_memcpy(dst, buf + 14, (int)recv - 14); - dst += (int)recv - 14; rtp_header->len = rtp_header->len + (int)recv - 14; mtl_memcpy(dst_nalu_size_point, &rtp_header->len, sizeof(size_t)); /*Send to microservice application.*/ - // INFO("memif_tx_burst for framgment = end\n"); + // INFO("memif_tx_burst for framgment = start\n"); err = memif_tx_burst(s->memif_conn, qid, tx_bufs, tx_buf_num, &tx); if (err != MEMIF_ERR_SUCCESS) { - INFO("memif_tx_burst for framgment=1: %s", memif_strerror(err)); + INFO("memif_tx_burst for framgment=0: %s", memif_strerror(err)); } } else { - // printf("fragment middle!\n"); - // fwrite(buf + 14, (int)recv - 14, 1, fp); - mtl_memcpy(dst, buf + 14, (int)recv - 14); - dst += (int)recv - 14; - rtp_header->len = rtp_header->len + (int)recv - 14; - mtl_memcpy(dst_nalu_size_point, &rtp_header->len, sizeof(size_t)); - /*Send to microservice application.*/ - // INFO("memif_tx_burst for framgment = middle\n"); - err = memif_tx_burst(s->memif_conn, qid, tx_bufs, tx_buf_num, &tx); - if (err != MEMIF_ERR_SUCCESS) { - INFO("memif_tx_burst for framgment=1: %s", memif_strerror(err)); + if (fragment_end == 0x40) { + // printf("fragment end!\n"); + // fwrite(buf + 14, (int)recv - 14, 1, fp); + mtl_memcpy(dst, buf + 14, (int)recv - 14); + dst += (int)recv - 14; + rtp_header->len = rtp_header->len + (int)recv - 14; + mtl_memcpy(dst_nalu_size_point, &rtp_header->len, sizeof(size_t)); + /*Send to microservice application.*/ + // INFO("memif_tx_burst for framgment = end\n"); + err = memif_tx_burst(s->memif_conn, qid, tx_bufs, tx_buf_num, &tx); + if (err != MEMIF_ERR_SUCCESS) { + INFO("memif_tx_burst for framgment=1: %s", memif_strerror(err)); + } + } else { + // printf("fragment middle!\n"); + // fwrite(buf + 14, (int)recv - 14, 1, fp); + mtl_memcpy(dst, buf + 14, (int)recv - 14); + dst += (int)recv - 14; + rtp_header->len = rtp_header->len + (int)recv - 14; + mtl_memcpy(dst_nalu_size_point, &rtp_header->len, sizeof(size_t)); + /*Send to microservice application.*/ + // INFO("memif_tx_burst for framgment = middle\n"); + err = memif_tx_burst(s->memif_conn, qid, tx_bufs, tx_buf_num, &tx); + if (err != MEMIF_ERR_SUCCESS) { + INFO("memif_tx_burst for framgment=1: %s", memif_strerror(err)); + } } } + } else { + // printf("fragment false!\n"); + // fwrite(h264_frame_start_str + 1, 3, 1, fp);//printf("001\n"); + mtl_memcpy(dst, h264_frame_start_str + 1, sizeof(unsigned char) * 3); + dst += sizeof(unsigned char) * 3; + rtp_header->len = rtp_header->len + 3; + // fwrite(buf + 12, (int)recv - 12, 1, fp); + mtl_memcpy(dst, buf + 12, sizeof(unsigned char) * ((int)recv - 12)); + dst += sizeof(unsigned char) * ((int)recv - 12); + rtp_header->len = rtp_header->len + (int)recv - 12; + mtl_memcpy(dst_nalu_size_point, &rtp_header->len, sizeof(size_t)); + /*Send to microservice application.*/ + // INFO("memif_tx_burst for framgment=0\n"); + err = memif_tx_burst(s->memif_conn, qid, tx_bufs, tx_buf_num, &tx); + if (err != MEMIF_ERR_SUCCESS) { + INFO("memif_tx_burst for framgment=0: %s", memif_strerror(err)); + } } - } else { - // printf("fragment false!\n"); - // fwrite(h264_frame_start_str + 1, 3, 1, fp);//printf("001\n"); - mtl_memcpy(dst, h264_frame_start_str + 1, sizeof(unsigned char) * 3); - dst += sizeof(unsigned char) * 3; - rtp_header->len = rtp_header->len + 3; - // fwrite(buf + 12, (int)recv - 12, 1, fp); - mtl_memcpy(dst, buf + 12, sizeof(unsigned char) * ((int)recv - 12)); - dst += sizeof(unsigned char) * ((int)recv - 12); - rtp_header->len = rtp_header->len + (int)recv - 12; - mtl_memcpy(dst_nalu_size_point, &rtp_header->len, sizeof(size_t)); - /*Send to microservice application.*/ - // INFO("memif_tx_burst for framgment=0\n"); - err = memif_tx_burst(s->memif_conn, qid, tx_bufs, tx_buf_num, &tx); - if (err != MEMIF_ERR_SUCCESS) { - INFO("memif_tx_burst for framgment=0: %s", memif_strerror(err)); - } - } - unsigned char RTP_payload_type = *((unsigned char*)buf + 1); - unsigned char mark = RTP_payload_type & 0x80; + unsigned char RTP_payload_type = *((unsigned char*)buf + 1); + unsigned char mark = RTP_payload_type & 0x80; - if (mark > 0) { - s->new_NALU = 1; - } - /*dup poll*/ - //} - //INFO("%s, stop\n", __func__); - // fclose(fp); - - if (rtp_header != NULL) { - free(rtp_header); - rtp_header = NULL; + if (mark > 0) { + s->new_NALU = 1; + } + if (rtp_header != NULL) { + free(rtp_header); + rtp_header = NULL; + } } } else { INFO("%s, has stopped\n", __func__); @@ -325,7 +418,8 @@ static int udp_server_h264(void* arg) return 0; } -static int media_proxy_mudp_poll(void* priv) { +static int media_proxy_mudp_poll(void* priv) +{ int ret; rx_udp_h264_session_context_t* ctx = (rx_udp_h264_session_context_t*)priv; @@ -340,16 +434,18 @@ static int media_proxy_mudp_poll(void* priv) { } } -static int udp_poll_tasklet_start(void* priv) { - rx_udp_h264_session_context_t* ctx = (rx_udp_h264_session_context_t*)priv; - ctx->sch_start = true; - return 0; +static int udp_poll_tasklet_start(void* priv) +{ + rx_udp_h264_session_context_t* ctx = (rx_udp_h264_session_context_t*)priv; + ctx->sch_start = true; + return 0; } -static int udp_poll_tasklet_stop(void* priv) { - rx_udp_h264_session_context_t* ctx = (rx_udp_h264_session_context_t*)priv; - ctx->sch_start = false; - return 0; +static int udp_poll_tasklet_stop(void* priv) +{ + rx_udp_h264_session_context_t* ctx = (rx_udp_h264_session_context_t*)priv; + ctx->sch_start = false; + return 0; } rx_udp_h264_session_context_t* mtl_udp_h264_rx_session_create(mtl_handle dev_handle, mcm_dp_addr* dp_addr, memif_ops_t* memif_ops, mtl_sch_handle schs[]) @@ -379,6 +475,13 @@ rx_udp_h264_session_context_t* mtl_udp_h264_rx_session_create(mtl_handle dev_han ctx->st = dev_handle; ctx->stop = false; + ctx->fragments_bunch = true; + //ctx->fp = fopen("data1.264", "wb"); + + if (ctx->fragments_bunch == true) { + ctx->rtp_header = calloc(1, sizeof(mcm_buffer)); + } + if (!ctx->st) { printf("m20230903235050_debug01_01_01\n"); INFO("%s, mtl_init fail\n", __func__); @@ -409,7 +512,7 @@ rx_udp_h264_session_context_t* mtl_udp_h264_rx_session_create(mtl_handle dev_han INFO("%s, bind fail %d\n", __func__, ret); return NULL; } - + ctx->memif_nalu_size = 5184000; /*udp poll*/