The article is taken from my Zen Channel .


ITKarma picture


All articles in the series


Article 1
Article 2
Article 3
Article 4
Article 5
Article 6
Article 7
Article 8
Article 9
Article 10
Article 11
Article 12
Article 13


In the last article , we looked at the debugging issues of crafting filters related to moving data.


This article will be the final in the cycle and we will devote it, as has been repeatedly promised, to issues of estimating the load on the ticker and ways to deal with excessive computing load in the media streamer.


What is a ticker load


The load on the ticker is calculated as the ratio of the time spent in processing all filters of the graph connected to this ticker to the interval between ticks. By default, the interval is 10ms, but if you change the sampling rate, then the calculation is performed for the value you set. The calculation is done according to the data that the ticker accumulates during operation. The mediastrimer provides a function that returns a percentage value averaged over several measurement results:


MS2_PUBLIC float ms_ticker_get_average_load(MSTicker *ticker); 

If the returned value is close to 100%, this means that this ticker is barely able to do its job before the start of the next tick. If we have an application that does not require real-time work (for example, it simply writes sound to a file), then it is not particularly important for us for how long the next call of the ticker was postponed. But in a real-time application, processing delay can affect the moment the RTP packet is sent, which in turn can affect the quality of sound or video. In some cases, the effect of the delay of individual packets can be stopped using the packet buffer at the receiving end (the so-called jitter buffer). In this case, your sound will be reproduced without defects, but with a delay proportional to the length of the buffer. Which may not be acceptable in cases where an audio signal is used to control processes in real time.


Actually, it’s worth starting to take measures already when the load on the ticker has crossed the border of 80%. With this load, on separate ticks, the ticker starts to start processing with a lag. Ticker lag, this is the time for which the next start of the ticker was postponed.
If the tick lag has exceeded a certain value, an event is generated:


struct _MSTickerLateEvent{ int lateMs;/**<Запаздывание которое было в последний раз, в миллисекундах */uint64_t time;/**< Время возникновения события, в миллисекундах */int current_late_ms;/**< Запаздывание на текущем тике, в миллисекундах */}; typedef struct _MSTickerLateEvent MSTickerLateEvent; 

By which a message is displayed in the console that looks something like this:


CDMY0CDMY


Using the function


void ms_ticker_get_last_late_tick(MSTicker *ticker, MSTickerLateEvent *ev); 

you can find out the details of the last such event.


Ways to reduce ticker loading


Here we have two options. The first is to change the priority of the ticker, the second to transfer part of the ticker to another thread. Consider these options.


Change ticker priority


Ticker priority has three gradations, which are defined in the MSTickerPrio enumeration:


enum _MSTickerPrio{ MS_TICKER_PRIO_NORMAL,/* Приоритет соответствующий значению по умолчанию для данной ОС. */MS_TICKER_PRIO_HIGH,/* Увеличенный приоритет устанавливается подlinux/MacOS с помощью setpriority() или sched_setschedparams() устанавливается политика SCHED_RR. */MS_TICKER_PRIO_REALTIME/* Наибольший приоритет, для него под Linux используется политика SCHED_FIFO. */}; typedef enum _MSTickerPrio MSTickerPrio; 

To experiment with the load of the ticker, we need a circuit that during operation will increase the load and complete the work when the load reaches 99%.As a load we will use the scheme:
CDMY1CDMY
The load will be increased by adding between dtmfgen and voidsink a new signal level control (filter type MS_VOLUME ), with a transmission coefficient equal to one so that the filter does not filter.
She is shown in the picture
ITKarma picture
The source code is below, it is provided with comments, so it’s not difficult to figure it out:


File mstest13.c Variable processing load.
/* Файл mstest13.c Переменная вычислительная нагрузка. */#include <stdio.h> #include <signal.h> #include <mediastreamer2/msfilter.h> #include <mediastreamer2/msticker.h> #include <mediastreamer2/dtmfgen.h> #include <mediastreamer2/mssndcard.h> #include <mediastreamer2/msvolume.h>/*----------------------------------------------------------*/struct _app_vars { int step;/* Количество фильтров добавляемых за раз. */int limit;/* Количество фильтров на котором закончить работу. */int ticker_priority;/* Приоритет тикера. */char* file_name;/* Имя выходного файла. */FILE *file; }; typedef struct _app_vars app_vars;/*----------------------------------------------------------*//* Функция преобразования аргументов командной строки в * настройки программы. */void scan_args(int argc, char *argv[], app_vars *v) { char i; for (i=0; i<argc; i++) { if (!strcmp(argv[i], "--help")) { char *p=argv[0]; p=p + 2; printf(" %s computational load\n\n", p); printf("--help List of options.\n"); printf("--version Version of application.\n"); printf("--step Filters count per step.\n"); printf("--tprio Ticker priority:\n" " MS_TICKER_PRIO_NORMAL 0\n" " MS_TICKER_PRIO_HIGH 1\n" " MS_TICKER_PRIO_REALTIME 2\n"); printf("--limit Filters count limit.\n"); printf("-o Output file name.\n"); exit(0); } if (!strcmp(argv[i], "--version")) { printf("0.1\n"); exit(0); } if (!strcmp(argv[i], "--step")) { v->step=atoi(argv[i+1]); printf("step: %i\n", v->step); } if (!strcmp(argv[i], "--tprio")) { int prio=atoi(argv[i+1]); if ((prio >=MS_TICKER_PRIO_NORMAL) && (prio <= MS_TICKER_PRIO_REALTIME)) { v->ticker_priority=atoi(argv[i+1]); printf("ticker priority: %i\n", v->ticker_priority); } else { printf(" Bad ticker priority: %i\n", prio); exit(1); } } if (!strcmp(argv[i], "--limit")) { v->limit=atoi(argv[i+1]); printf("limit: %i\n", v->limit); } if (!strcmp(argv[i], "-o")) { v->file_name=argv[i+1]; printf("file namet: %s\n", v->file_name); } } }/*----------------------------------------------------------*//* Структура для хранения настроек программы. */app_vars vars;/*----------------------------------------------------------*/void saveMyData() {//Закрываем файл. if (vars.file) fclose(vars.file); exit(0); } void signalHandler( int signalNumber ) { static pthread_once_t semaphore=PTHREAD_ONCE_INIT; printf("\nsignal %i received.\n", signalNumber); pthread_once( & semaphore, saveMyData ); }/*----------------------------------------------------------*/int main(int argc, char *argv[]) {/* Устанавливаем настройки по умолчанию. */app_vars vars={100, 100500, MS_TICKER_PRIO_NORMAL, 0};//Подключаем обработчик Ctrl-C. signal( SIGTERM, signalHandler ); signal( SIGINT, signalHandler );/* Устанавливаем настройки настройки программы в * соответствии с аргументами командной строки. */scan_args(argc, argv, &vars); if (vars.file_name) { vars.file=fopen(vars.file_name, "w"); } ms_init();/* Создаем экземпляры фильтров. */MSFilter *voidsource=ms_filter_new(MS_VOID_SOURCE_ID); MSFilter *dtmfgen=ms_filter_new(MS_DTMF_GEN_ID); MSSndCard *card_playback=ms_snd_card_manager_get_default_card(ms_snd_card_manager_get()); MSFilter *snd_card_write=ms_snd_card_create_writer(card_playback); MSFilter *voidsink=ms_filter_new(MS_VOID_SINK_ID); MSDtmfGenCustomTone dtmf_cfg;/* Устанавливаем имя нашего сигнала, помня о том, что в массиве мы должны * оставить место для нуля, который обозначает конец строки. */strncpy(dtmf_cfg.tone_name, "busy", sizeof(dtmf_cfg.tone_name)); dtmf_cfg.duration=1000; dtmf_cfg.frequencies[0]=440;/* Будем генерировать один тон, частоту второго тона установим в 0.*/dtmf_cfg.frequencies[1]=0; dtmf_cfg.amplitude=1.0;/* Такой амплитуде синуса должен соответствовать результат измерения 0.707.*/dtmf_cfg.interval=0.; dtmf_cfg.repeat_count=0.;/* Задаем переменные для хранения результата */float load=0.; float latency=0.; int filter_count=0;/* Создаем тикер. */MSTicker *ticker=ms_ticker_new(); ms_ticker_set_priority(ticker, vars.ticker_priority);/* Соединяем фильтры в цепочку. */ms_filter_link(voidsource, 0, dtmfgen, 0); ms_filter_link(dtmfgen, 0, voidsink, 0); MSFilter* previous_filter=dtmfgen; int gain=1; int i; printf("# filters load\n"); if (vars.file) { fprintf(vars.file, "# filters load\n"); } while ((load <= 99.) && (filter_count < vars.limit)) {//Временно отключаем "поглотитель" пакетов от схемы. ms_filter_unlink(previous_filter, 0, voidsink, 0); MSFilter *volume; for (i=0; i<vars.step; i++) { volume=ms_filter_new(MS_VOLUME_ID); ms_filter_call_method(volume, MS_VOLUME_SET_DB_GAIN, &gain); ms_filter_link(previous_filter, 0, volume, 0); previous_filter=volume; }//Возвращаем "поглотитель" пакетов в схему. ms_filter_link(volume, 0, voidsink, 0);/* Подключаем источник тактов. */ms_ticker_attach(ticker,voidsource);/* Включаем звуковой генератор. */ms_filter_call_method(dtmfgen, MS_DTMF_GEN_PLAY_CUSTOM, (void*)&dtmf_cfg);/* Даем, время 100 миллисекунд, чтобы были накоплены данные для усреднения. */ms_usleep(500000);/* Читаем результат измерения. */load=ms_ticker_get_average_load(ticker); filter_count=filter_count + vars.step;/* Отключаем источник тактов. */ms_ticker_detach(ticker,voidsource); printf("%i %f\n", filter_count, load); if (vars.file) fprintf(vars.file,"%i %f\n", filter_count, load); } if (vars.file) fclose(vars.file); } 

Save as mstest13.c and compile with the command:


$ gcc mstest13.c -o mstest13 `pkg-config mediastreamer --libs --cflags` 

Next, run our tool to evaluate the load of the ticker that works with the lowest priority:


$./mstest13 --step 100 --limit 40000 -tprio 0 -o log0.txt 

$./mstest13 --step 100 --limit 40000 -tprio 1 -o log1.txt 

$./mstest13 --step 100 --limit 40000 -tprio 2 -o log2.txt 

Next, we "feed" the resulting files log0.txt, log1.txt, log2.txt to the excellent utility gnuplot :


$ gnuplot -e "set terminal png; set output 'load.png'; plot 'log0.txt' using 1:2 with lines, 'log1.txt' using 1:2 with lines, 'log2.txt' using 1:2 with lines" 

As a result of the program’s work, the file load.png will be created, in which a graph will be drawn having the following form:
ITKarma picture
Vertical shows the ticker load in percent, the horizontal number of added load filters.
In this graph, we see that, as expected, for priority 2 (blue line), the first noticeable outlier is observed with 6000 filters connected, when for priorities 0 (purple) and 1 (green) outliers appear earlier, with 1000 and 3000 filters respectively.


To obtain average results, you need to perform much more program runs, but already on this graph we can see that the ticker load increases linearly in proportion to the number of filters.


Transferring work to another thread


If your task can be divided into two or more threads, then everything is simple - create one or more new tickers and connect filters to each of your graphs. If the task cannot be parallelized, then it can be divided “across”, breaking the filter chain into several segments, each of which will work in a separate thread (i.e. with its own ticker). Next, you will need to “stitch” the data streams so that the output of the first segment falls on the input of the next. Such data transfer between threads is performed using two special filters MS_ITC_SINK and MS_ITC_SOURCE, they have the common name "inter-stickers".


Inter-stickers


The MS_ITC_SINK filter provides data output from the thread; it has only one input; it has no outputs. The MS_ITC_SOURCE filter provides asynchronous data entry into the thread, it has one output, it has no inputs. In terms of media streamer, this pair of filters makes it possible to transfer data between filters working from different tickers.


In order for data transfer to begin, these filters need to be connected, but not as we did with ordinary filters, i.e. function ms_filter_link () . In this case, the MS_ITC_SINK_CONNECT method of the MS_ITC_SINK filter is used:


ms_filter_call_method (itc_sink, MS_ITC_SINK_CONNECT, itc_src) 

The method binds two filters using an asynchronous queue. There is no method for disconnecting intertekters.


Example of using inter-stickers


An example use case is shown below. The triple arrow shows the asynchronous queue between the two threads.
ITKarma picture


Now we modify our test program in accordance with this scheme so that two threads connected by inter-sticks work in it as in the figure. Each thread will have an insertion point where new instances of load filters will be added, as shown in the figure below. The added number of filters will be split between the threads.
ITKarma picture
The corresponding program code is shown below.


File mstest14.c Variable computational load with intertekters
/* Файл mstest14.c Переменная вычислительная нагрузка c интертикерами. */#include <stdio.h> #include <signal.h> #include <mediastreamer2/msfilter.h> #include <mediastreamer2/msticker.h> #include <mediastreamer2/dtmfgen.h> #include <mediastreamer2/mssndcard.h> #include <mediastreamer2/msvolume.h> #include <mediastreamer2/msitc.h>/*----------------------------------------------------------*/struct _app_vars { int step;/* Количество фильтров добавляемых за раз. */int limit;/* Количество фильтров на котором закончить работу. */int ticker_priority;/* Приоритет тикера. */char* file_name;/* Имя выходного файла. */FILE *file; }; typedef struct _app_vars app_vars;/*----------------------------------------------------------*//* Функция преобразования аргументов командной строки в * настройки программы. */void scan_args(int argc, char *argv[], app_vars *v) { char i; for (i=0; i<argc; i++) { if (!strcmp(argv[i], "--help")) { char *p=argv[0]; p=p + 2; printf(" %s computational load diveded for two threads.\n\n", p); printf("--help List of options.\n"); printf("--version Version of application.\n"); printf("--step Filters count per step.\n"); printf("--tprio Ticker priority:\n" " MS_TICKER_PRIO_NORMAL 0\n" " MS_TICKER_PRIO_HIGH 1\n" " MS_TICKER_PRIO_REALTIME 2\n"); printf("--limit Filters count limit.\n"); printf("-o Output file name.\n"); exit(0); } if (!strcmp(argv[i], "--version")) { printf("0.1\n"); exit(0); } if (!strcmp(argv[i], "--step")) { v->step=atoi(argv[i+1]); printf("step: %i\n", v->step); } if (!strcmp(argv[i], "--tprio")) { int prio=atoi(argv[i+1]); if ((prio >=MS_TICKER_PRIO_NORMAL) && (prio <= MS_TICKER_PRIO_REALTIME)) { v->ticker_priority=atoi(argv[i+1]); printf("ticker priority: %i\n", v->ticker_priority); } else { printf(" Bad ticker priority: %i\n", prio); exit(1); } } if (!strcmp(argv[i], "--limit")) { v->limit=atoi(argv[i+1]); printf("limit: %i\n", v->limit); } if (!strcmp(argv[i], "-o")) { v->file_name=argv[i+1]; printf("file namet: %s\n", v->file_name); } } }/*----------------------------------------------------------*//* Структура для хранения настроек программы. */app_vars vars;/*----------------------------------------------------------*/void saveMyData() {//Закрываем файл. if (vars.file) fclose(vars.file); exit(0); } void signalHandler( int signalNumber ) { static pthread_once_t semaphore=PTHREAD_ONCE_INIT; printf("\nsignal %i received.\n", signalNumber); pthread_once( & semaphore, saveMyData ); }/*----------------------------------------------------------*/int main(int argc, char *argv[]) {/* Устанавливаем настройки по умолчанию. */app_vars vars={100, 100500, MS_TICKER_PRIO_NORMAL, 0};//Подключаем обработчик Ctrl-C. signal( SIGTERM, signalHandler ); signal( SIGINT, signalHandler );/* Устанавливаем настройки настройки программы в * соответствии с аргументами командной строки. */scan_args(argc, argv, &vars); if (vars.file_name) { vars.file=fopen(vars.file_name, "w"); } ms_init();/* Создаем экземпляры фильтров для первого треда. */MSFilter *voidsource=ms_filter_new(MS_VOID_SOURCE_ID); MSFilter *dtmfgen=ms_filter_new(MS_DTMF_GEN_ID); MSFilter *itc_sink=ms_filter_new(MS_ITC_SINK_ID); MSDtmfGenCustomTone dtmf_cfg;/* Устанавливаем имя нашего сигнала, помня о том, что в массиве мы должны * оставить место для нуля, который обозначает конец строки. */strncpy(dtmf_cfg.tone_name, "busy", sizeof(dtmf_cfg.tone_name)); dtmf_cfg.duration=1000; dtmf_cfg.frequencies[0]=440;/* Будем генерировать один тон, частоту второго тона установим в 0.*/dtmf_cfg.frequencies[1]=0; dtmf_cfg.amplitude=1.0;/* Такой амплитуде синуса должен соответствовать результат измерения 0.707.*/dtmf_cfg.interval=0.; dtmf_cfg.repeat_count=0.;/* Задаем переменные для хранения результата */float load=0.; float latency=0.; int filter_count=0;/* Создаем тикер. */MSTicker *ticker1=ms_ticker_new(); ms_ticker_set_priority(ticker1, vars.ticker_priority);/* Соединяем фильтры в цепочку. */ms_filter_link(voidsource, 0, dtmfgen, 0); ms_filter_link(dtmfgen, 0, itc_sink, 0);/* Создаем экземпляры фильтров для второго треда. */MSTicker *ticker2=ms_ticker_new(); ms_ticker_set_priority(ticker2, vars.ticker_priority); MSFilter *itc_src=ms_filter_new(MS_ITC_SOURCE_ID); MSFilter *voidsink2=ms_filter_new(MS_VOID_SINK_ID); ms_filter_call_method (itc_sink, MS_ITC_SINK_CONNECT, itc_src); ms_filter_link(itc_src, 0, voidsink2, 0); MSFilter* previous_filter1=dtmfgen; MSFilter* previous_filter2=itc_src; int gain=1; int i; printf("# filters load\n"); if (vars.file) { fprintf(vars.file, "# filters load\n"); } while ((load <= 99.) && (filter_count < vars.limit)) {//Временно отключаем "поглотители" пакетов от схем. ms_filter_unlink(previous_filter1, 0, itc_sink, 0); ms_filter_unlink(previous_filter2, 0, voidsink2, 0); MSFilter *volume1, *volume2;//Делим новые фильтры нагрузки между двумя тредами. int new_filters=vars.step>>1; for (i=0; i < new_filters; i++) { volume1=ms_filter_new(MS_VOLUME_ID); ms_filter_call_method(volume1, MS_VOLUME_SET_DB_GAIN, &gain); ms_filter_link(previous_filter1, 0, volume1, 0); previous_filter1=volume1; } new_filters=vars.step - new_filters; for (i=0; i < new_filters; i++) { volume2=ms_filter_new(MS_VOLUME_ID); ms_filter_call_method(volume2, MS_VOLUME_SET_DB_GAIN, &gain); ms_filter_link(previous_filter2, 0, volume2, 0); previous_filter2=volume2; }//Возвращаем "поглотители" пакетов в схемы. ms_filter_link(volume1, 0, itc_sink, 0); ms_filter_link(volume2, 0, voidsink2, 0);/* Подключаем источник тактов. */ms_ticker_attach(ticker2, itc_src); ms_ticker_attach(ticker1, voidsource);/* Включаем звуковой генератор. */ms_filter_call_method(dtmfgen, MS_DTMF_GEN_PLAY_CUSTOM, (void*)&dtmf_cfg);/* Даем, время, чтобы были накоплены данные для усреднения. */ms_usleep(500000);/* Читаем результат измерения. */load=ms_ticker_get_average_load(ticker1); filter_count=filter_count + vars.step;/* Отключаем источник тактов. */ms_ticker_detach(ticker1, voidsource); printf("%i %f\n", filter_count, load); if (vars.file) fprintf(vars.file,"%i %f\n", filter_count, load); } if (vars.file) fclose(vars.file); } 

Next, compile and run our program with tickers that work with the lowest priority:


$./mstest14 --step 100 --limit 40000 -tprio 0 -o log4.txt 

The measurement result is as follows:


ITKarma picture
For convenience, the curves obtained for the first version of the program are added to the graph. The orange curve shows the result for the two-thread version of the program. It can be seen from the graph that the tick download loading rate for the "two-thread" scheme is lower. The load on the second ticker is not shown.


If necessary, you can connect threads working on different hosts, but instead of using intertikers use an RTP session (as we did before creating an intercom), here you also need to take into account that the size of RTP packets is limited from above by the MTU.


According to the results of this article, we can conclude that changing the priority of a ticker does not affect its performance, increasing priority only reduces the likelihood of a ticker lag. The recipe for improving the performance of the scheme as a whole is to divide the processing graph into several clusters with their individual tickers. In this case, the payment for reducing the load on the ticker is to increase the time it takes for the data to pass from the input to the output of the circuit.


Conclusion


This article completes the MediaStrimer2 series. Without a doubt, the material in the articles covers only part of the capabilities and features of this engine. The cycle should be considered as the first step towards mastering this technology.

.

Source