When I feel sad, I write libraries to nobody I need...
Link to the source code:



The Internet is full of articles about coroutines and Habr this topic was not spared. For example, great articles: Using Boost.Asio with Coroutines TS , Fundamentals of Userver is a framework for writing asynchronous microservices , but all this becomes useless when you need to call a function from the library in the coroutine that does blocking I/O.


Meet another unnecessary library for anyone: yurco - a library that helps you build coroutines transparently for third-party code. This article is written with the expectation that the reader knows what coroutines are and at least roughly represents how they are implemented.


To get started, consider the code snippet from the library example:


void process_connection(unistd::fd& fd) { try { char buf[100]; unistd::read(fd, buf, sizeof(buf));/* Create a connection */std::unique_ptr<MYSQL, std::function<decltype(mysql_close)>> con(mysql_init(nullptr), mysql_close); mysql_real_connect(con.get(), "db.local", "ro", "", nullptr, 0, nullptr, 0); mysql_query(con.get(), "SELECT NOW(), SLEEP(10);"); std::unique_ptr<MYSQL_RES, std::function<decltype(mysql_free_result)>> result(mysql_store_result(con.get()), mysql_free_result); if (!result) return;//silently close connection for (MYSQL_ROW row=mysql_fetch_row(result.get()); row; row=mysql_fetch_row(result.get())) { static char header[]="HTTP/1.1 200 OK\r\nContent-Length: 21\r\nConnection: close\r\n\r\n"; unistd::write_all(fd, header, strlen(header)); const char* const answer=row[0]; unistd::write_all(fd, answer, strlen(answer)); unistd::write_all(fd, "\r\n", 2); } }... 

some code explanation

Example source code https://github.com/yurial/yurco -examples/blob/master/042_mysql.cpp


Q: What is CDMY0CDMY?
A: This is another library that nobody needs ( https://github.com/yurial/unistd ). It implements simple wrappers over system functions. These wrappers check the return code and, in case of an error, throw CDMY1CDMY. There is no magic here, just the code becomes a little more concise.


Q: And CDMY2CDMY?
A: A simple class that makes CDMY3CDMY in the destructor and CDMY4CDMY when copying. Also no magic.


Q: And why CDMY5CDMY in SQL query?
A: This is done on purpose, a program working with blocking input/output will hang here for 10 seconds and will not process other requests.


Q: Why does the code work so clumsy with mysql, HTTP, etc?
A: The correctness of the code in this example is not important, it will increase the volume and make it difficult to understand the main thing:


  • uses a library with blocking input/output from third-party developers;
  • looks like the code is synchronous.

It may seem that this function works synchronously, doing blocking I/O (at least when executing an SQL query, because libmysqlclient does not know anything about coroutines), but in fact it’s not so. Thanks to magic and some kind of mother , this code is executed in a coroutine, carefully interrupting on I/O operations.




Now let's see how it all works from the inside. The review will go from simple to complex.


CDMY6CDMY


It is worth noting that for our task we need exactly stackfull coroutines, because we want to interrupt when third-party libraries run. There are 2 popular coroutine implementations for C/C++ under linux: boost.coroutine and CDMY7CDMY. To implement the idea, it doesn’t matter which library for coroutines will be used, in our case, the choice fell on CDMY8CDMY for religious reasons.


And to be more precise.

And more precisely, the CDMY9CDMY function has been rewritten - the rt_sigprocmask call has been removed from it. This allowed us to get rid of redundant system calls and speed up context switching a bit, otherwise the code remained unchanged.


class Coroutine { public: Coroutine(const Coroutine&)=delete; template <class Func, class... Args> Coroutine(Stack&& stack, Func&& func, Args&&... args) noexcept; bool is_completed() const noexcept; bool is_running() const noexcept; void yield(); void operator() (); void operator() (const std::nothrow_t&) noexcept; void set_exception(const std::exception_ptr& e) noexcept; const std::exception_ptr& get_exception() const noexcept; void rethrow(); Stack&& take_away_stack() noexcept; }; 

The class constructor receives the stack instance as input arguments, the func entry point, and arguments to invoke the entry point.The entry point is associated with parameters using CDMY10CDMY, it is worth noting that the entry point can take an optional additional argument - CDMY11CDMY, in this case, a call will additionally transmit a link to the current instance of the class.


What is Stack?

A simple class that allocates memory for the coroutine stack.


class Stack { public: Stack(const size_t size, const bool protect=true); Stack(const Stack&)=delete; Stack(Stack&&)=default; char* data() const noexcept; size_t size() const noexcept; }; 

Constructor argument size - indicates the size of the allocated memory in bytes. It must be a multiple of the size of the memory page (sysconf (_SC_PAGE_SIZE)). The protect argument selects an additional 2 pages (one at the beginning of the range, the second at the end) for which the PROT_NONE access mode is set. Thus, a simple stack overflow protection is implemented.


The operator CDMY12CDMY and its noexcept version are used to switch to the coroutine context, and the CDMY13CDMY method is used to switch back.


Let's write a simple example:


creating a project, connecting submodules, CMakeList.txt
$ mkdir yurco-examples && cd yurco-examples $ git init $ git submodule add https://github.com/yurial/yurco $ git submodule add https://github.com/yurial/unistd $ git submodule update --init --remote $ cat CMakeList.txt project(yurco-examples) cmake_minimum_required(VERSION 3.10) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") if ("${CMAKE_BUILD_TYPE}" STREQUAL "Release" AND NOT YURCO_TRANSPARENCY) set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE) endif() set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-inline") option(USE_SINGLE_THREAD "Disable multithread locking") if (USE_SINGLE_THREAD) add_definitions(-DSINGLE_THREAD) endif() if (YURCO_TRANSPARENCY) add_definitions(-DYURCO_TRANSPARENCY) set(CMAKE_EXE_LINKER_FLAGS "-Wl,-wrap=read,-wrap=write,-wrap=close,-wrap=connect,-wrap=accept4,-wrap=recv,-wrap=recvfrom,-wrap=recvmsg,-wrap=send,-wrap=sendto,-wrap=sendmsg ${CMAKE_EXE_LINKER_FLAGS}") endif() include_directories(.) add_subdirectory(unistd) add_subdirectory(yurco) add_executable(011_basic "011_basic.cpp") target_link_libraries(011_basic unistd yurco) $ mkdir build && cd build $ cmake.. -DCMAKE_BUILD_TYPE=Release -DUSE_SINGLE_THREAD=1 -DYURCO_TRANSPARENCY=0 $ make 

$ cat 011_basic.cpp
#include <yurco/all.hpp> #include <iostream> #include <stdlib.h> void entry(yurco::Coroutine& self) { std::cout << "At 3" << std::endl; self.yield(); std::cout << "At 5" << std::endl; } int main() { std::cout << "At 1" << std::endl; yurco::Coroutine coro(yurco::Stack(16*1024), entry); std::cout << "At 2" << std::endl; coro(); std::cout << "At 4" << std::endl; coro(); std::cout << "At 6" << std::endl; return EXIT_SUCCESS; } 

$./011_basic
At 1
At 2
At 3
At 4
At 5
At 6
By this example, it is easy to track in which places context switching occurs.
Now let's measure the time it takes to switch context.:


$ cat 012_time.cpp
#include <yurco/all.hpp> #include <stdlib.h> void entry(yurco::Coroutine& self) { for (;;) self.yield(); } int main() { yurco::Coroutine coro(yurco::Stack(16*1024), entry); for (uint32_t i=0; i < 5000000; ++i) coro(); return EXIT_SUCCESS; } 

$ time./012_time
real 0m0.921s
user 0m0.920s
sys 0m0.000s


For each iteration of the loop, we have 2 switches: one to the coroutine context, the second to the CDMY14CDMY context. The loop has CDMY15CDMY iterations, which means the program does CDMY16CDMY context switches. We spend a little less than a second on all of this, which means we spend on the same switching order CDMY17CDMY. The result is not outrageous, but if in the coroutine code you are doing something more significant than CDMY18CDMY, then there will be enough performance with a margin.


The following group of methods: set_exception, get_exception, rethrow - allow you to throw an exception both to and from the coroutine.


$ cat 013_exceptions.cpp
#include <yurco/all.hpp> #include <iostream> #include <stdlib.h> void entry(yurco::Coroutine& self) { std::cout << "Coroutine started" << std::endl; throw std::runtime_error("My Exception ;)"); } int main() { yurco::Coroutine coro(yurco::Stack(16*1024), entry); try { coro(std::nothrow);//noexcept version should ignore any returned exception std::cout << "1: No exception" << std::endl; } catch (const std::exception& e) { std::cout << "1: Exception: " << e.what() << std::endl; } try { coro.rethrow(); std::cout << "2: No exception" << std::endl; } catch (const std::exception& e) { std::cout << "2: Exception: " << e.what() << std::endl; } try { coro(); std::cout << "3: No exception" << std::endl; } catch (const std::exception& e) { std::cout << "3: Exception: " << e.what() << std::endl; } return EXIT_SUCCESS; } 

$./013_exceptions
Coroutine started
1: No exception
2: Exception: My Exception;)
Coroutine started
3: Exception: My Exception;)


As you can see, we are able to get an exception from the coroutine instantly, but we can delay it.
The same mechanism can be used to interrupt the coroutine:


$ cat 014_terminate.cpp
#include <yurco/all.hpp> #include <iostream> #include <stdlib.h> void entry(yurco::Coroutine& self) { try { for (;;) self.yield(); } catch (const yurco::terminate_exception&) { std::cout << "Coroutine should be terminated" << std::endl; } } int main() { yurco::Coroutine coro(yurco::Stack(16*1024), entry); coro(); try { coro.set_exception(std::make_exception_ptr(yurco::terminate_exception())); coro(); std::cout << "No exception" << std::endl; } catch (const yurco::terminate_exception&) { std::cout << "Terminate exception was returned" << std::endl; } return EXIT_SUCCESS; } 

Why the exception does not return

To raise an exception in a coroutine, the CDMY19CDMY method calls CDMY20CDMY, which in turn clears the saved exception.


$./014_terminate
Coroutine should be terminated
No exception returned from coroutine


Medot CDMY21CDMY allows you to pick up the stack from an instance of the class. It is used to reuse the stack, usually there is no need to call this method manually.


CDMY22CDMY


As with most multitasking systems, you need to plan tasks.For this, the SimpleScheduler class is implemented in the library (more advanced task-priority schedulers may appear later). The SimpleScheduler class has 3 queues for each of the coroutine states: CDMY23CDMY, CDMY24CDMY, CDMY25CDMY. Coroutines move between queues thanks to appropriate methods.


class SimpleScheduler { public: SimpleScheduler(std::atomic<bool>& terminate, const size_t stack_size, const bool protect_stack) noexcept; template <class Func, class... Args> void coroutine(Func&& func, Args&&... args) noexcept; bool try_execute_one() noexcept; void resume_all() noexcept; void resume(Coroutine& coro) noexcept; void resume_many(std::vector<Coroutine*>& coros) noexcept; void suspend(Coroutine& coro) noexcept; 

The CDMY26CDMY method marks the coroutine as wishing to suspend its execution. It is important to understand that this method does not stop the coroutine itself, only when the coroutine returns control, an instance of its class is moved to the suspended queue.
The CDMY27CDMY methods allow you to move coroutine (s) from the suspended queue to the ready queue (in the reactor, this method is called during events on the connected socket).
As you probably already guessed, CDMY28CDMY executes a coroutine from the CDMY29CDMY queue. At run time, the coroutine is moved to the CDMY30CDMY queue, and upon returning control, it is moved back to CDMY31CDMY or CDMY32CDMY if the corresponding method was called during its execution.
All this would be useless if we did not have the opportunity to create a new coroutine. The CDMY33CDMY method will help us with this. An attentive reader will notice that this method does not require a stack transfer, the scheduler is responsible for the stack - there is a stack pool inside it.
The sizes of the created stacks are set in the constructor: the parameters CDMY34CDMY and CDMY35CDMY fully correspond to the parameters of the constructor of the CDMY36CDMY class. But CDMY37CDMY parameter is special: when the value becomes CDMY38CDMY, for all launched coroutines, CDMY39CDMY exception is automatically set.


$ cat 021_scheduler.cpp
#include <yurco/all.hpp> #include <iostream> #include <stdlib.h> void entry(yurco::Coroutine& self, yurco::SimpleScheduler& scheduler, int my_id) { for (uint32_t i=0; i < 2; ++i) { std::cout << "Coroutine " << my_id << std::endl; self.yield(); } scheduler.suspend(self); for (;;) self.yield(); } int main() { std::atomic<bool> terminate_flag=false; yurco::SimpleScheduler scheduler(terminate_flag, 16*1024, true); scheduler.coroutine(entry, std::ref(scheduler), 1); scheduler.coroutine(entry, std::ref(scheduler), 2); while (scheduler.try_execute_one()) ; terminate_flag=true; scheduler.resume_all(); while (scheduler.try_execute_one()) ; std::cout << "Coroutines was terminated via yurco::terminate_exception, cause terminate_flag set to true" << std::endl; return EXIT_SUCCESS; } 

$./021_scheduler
Coroutine 1
Coroutine 2
Coroutine 1
Coroutine 2
Coroutines was terminated via yurco :: terminate_exception, cause terminate_flag set to true


Now we have the ability to execute many coroutines and some controllability.
It's time to move on to "asynchronous" I/O (in fact, of course, not to asynchronous, but to non-blocking I/O with multiplexing).


CDMY40CDMY


The reactor is used to execute ready-made coroutines, wait for an event on file descriptors, and wake up suspended coroutines (suspended).


class Reactor { public: Reactor(const size_t stack_size, const bool protect_stack=true) noexcept; Reactor(const Reactor&)=delete; template <class Func, class... Args> void coroutine(Func&& func, Args&&... args) noexcept; template <class Func, class... Args> void async(Func&& func, Args&&... args) noexcept; void run(const size_t batch_size=16, const size_t events_at_once=1024) noexcept; void terminate() noexcept; void suspend(Coroutine& coro, const int fd, int events); void close(const int fd); int close(const std::nothrow_t&, const int fd) noexcept; }; 

The constructor parameters CDMY41CDMY and CDMY42CDMY are already familiar to you, they correspond to the parameters of the class CDMY43CDMY, which we examined earlier.


The CDMY44CDMY method creates a coroutine by calling the appropriate method on CDMY45CDMY, and the CDMY46CDMY method is an alias.


The CDMY47CDMY method starts a coroutine execution cycle and waits for events on file descriptors via CDMY48CDMY. You can interrupt the cycle using the CDMY49CDMY method - coroutines will receive the CDMY50CDMY exception and after they are completed, the CDMY51CDMY method will return control.


The CDMY52CDMY method and its noexcept version will close the file descriptor via CDMY53CDMY and clean the internal structures associated with it.


The CDMY54CDMY method is the most important here, it is it that allows you to interrupt coroutine execution until the corresponding event occurs on the file descriptor.


It's time to write a simple example of a program that terminates when it receives signals SIGTERM, SIGQUIT, SIGINT.


$ cat 031_signals.cpp
#include <yurco/all.hpp> #include <unistd/signalfd.hpp> #include <iostream> #include <signal.h> #include <stdlib.h> void signal_handler(yurco::Coroutine& coro, yurco::Reactor& reactor, unistd::fd& sigfd) { reactor.suspend(coro, sigfd, EPOLLIN); std::cerr << "we got a signal" << std::endl; reactor.terminate(); } void register_signal_handler(yurco::Reactor& reactor) { sigset_t sigmask; sigemptyset(&sigmask); sigaddset(&sigmask, SIGINT); sigaddset(&sigmask, SIGTERM); sigaddset(&sigmask, SIGQUIT); unistd::fd sigfd=unistd::fd::nodup(unistd::signalfd(sigmask)); reactor.async(signal_handler, std::ref(reactor), std::move(sigfd));//use std::move to avoid ::dup() file descriptior sigaddset(&sigmask, SIGPIPE);//SIGPIPE just ignored and not processed sigprocmask(SIG_BLOCK, &sigmask, nullptr); } int main() { const size_t stack_size=16*1024;//size less than 16k lead to SIGSEGV cause libunwind require more space yurco::Reactor reactor(stack_size); register_signal_handler(reactor); reactor.run(); return EXIT_SUCCESS; } 

$./031_signals
^ Cwe got a signal


By analogy, you can write functions for "asynchronous" I/O, it will look something like this:


yurco :: read ()
size_t read(const std::nothrow_t&, Reactor& reactor, Coroutine& coro, const int fd, void* const buf, const size_t count) { for (;;) { const ssize_t nread=::read(fd, buf, count); if (-1 == nread && errno == EAGAIN) { reactor.suspend(coro, fd, EPOLLIN); continue; } return nread; } } 

Other I/O functions are implemented similarly. Their source code can be viewed at https://github.com/yurial/yurco/blob/master/operations.cpp


Now we can implement a simple echo-server:


$ cat 032_echo.cpp
#include <yurco/all.hpp> #include <unistd/signalfd.hpp> #include <unistd/addrinfo.hpp> #include <unistd/netdb.hpp> #include <iostream> #include <signal.h> #include <stdlib.h> void process_connection(yurco::Coroutine& coro, yurco::Reactor& reactor, int& clientfd) { (void)coro; char buf[1024]; try { const size_t nread=yurco::read(reactor, coro, clientfd, buf, sizeof(buf)); if (0 == nread) return;//terminate coroutine, close connection for (size_t nwrite=0; nwrite < nread;) nwrite += yurco::write(reactor, coro, clientfd, buf+nwrite, nread-nwrite); } catch (const yurco::terminate_exception&) { std::cerr << "terminate connection coroutine" << std::endl; } catch (...) { std::cerr << "got unknown exception while read/write" << std::endl; } reactor.close(std::nothrow, clientfd); } void listener(yurco::Coroutine& coro, yurco::Reactor& reactor, int serverfd) { try { for (;;) { for (size_t i=0; i < 32; ++i)//sometimes we should yield() to process accepted connections { int clientfd=yurco::accept(reactor, coro, serverfd, nullptr, nullptr, SOCK_NONBLOCK); reactor.async(process_connection, std::ref(reactor), clientfd); } coro.yield(); } } catch (const yurco::terminate_exception&) { std::cerr << "terminate listener coroutine" << std::endl; } catch (...) { std::cerr << "unknwon exception while accept" << std::endl; } reactor.close(std::nothrow, serverfd); } void signal_handler(yurco::Coroutine& coro, yurco::Reactor& reactor, int sigfd) { reactor.suspend(coro, sigfd, EPOLLIN); std::cerr << "we got a signal" << std::endl;//use unistd::read(sigfd) to get a signals reactor.terminate(); reactor.close(std::nothrow, sigfd); } void register_signal_handler(yurco::Reactor& reactor) { sigset_t sigmask; sigemptyset(&sigmask); sigaddset(&sigmask, SIGINT); sigaddset(&sigmask, SIGTERM); sigaddset(&sigmask, SIGQUIT); int sigfd=unistd::signalfd(sigmask); reactor.async(signal_handler, std::ref(reactor), sigfd); sigaddset(&sigmask, SIGPIPE);//SIGPIPE just ignored and not processed sigprocmask(SIG_BLOCK, &sigmask, nullptr); } void register_listener(yurco::Reactor& reactor) { const std::vector<unistd::addrinfo> addr=unistd::getaddrinfo("localhost:31337");//or [::]:31337 or other valid variants int serverfd=unistd::socket(addr.at(0), SOCK_NONBLOCK); unistd::setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, 1); unistd::bind(serverfd, addr.at(0)); unistd::listen(serverfd, 8192/*backlog*/); reactor.async(listener, std::ref(reactor), serverfd); } int main() { const size_t stack_size=16*1024;//size less than 16k lead to SIGSEGV cause libunwind require more space yurco::Reactor reactor(stack_size); register_listener(reactor); register_signal_handler(reactor); reactor.run(); return EXIT_SUCCESS; } 

console 1: $./032_echo
console 2: $ echo qweasd | nc -6 :: 1 31337
console 2: qweasd
console 1: ^ Cwe got a signal
console 1: terminate listener coroutine


Everything works! It's time for dirty hacks magic!


CDMY55CDMY


Modern systems allow the substitution of functions for both dynamically connected libraries and static linking. To replace functions during static linking, an additional argument CDMY56CDMY is passed to the linker, which allows replacing the CDMY57CDMY function with CDMY58CDMY, while the original function will be available by the name CDMY59CDMY. An important condition is the coincidence of function prototypes, and in order to replace the system CDMY60CDMY with CDMY61CDMY, we need to write another wrapper that will know about instances of the CDMY62CDMY and CDMY63CDMY classes.


Where can I get the current CDMY64CDMY and CDMY65CDMY? The CDMY66CDMY library gives us a wonderful opportunity to store a pointer to an arbitrary object, by key, in a special stream memory using the CDMY67CDMY function, and retrieve it later using CDMY68CDMY. Using these functions, we implement CDMY69CDMY/CDMY70CDMY, CDMY71CDMY/CDMY72CDMY.


__inline__ void set_reactor(Reactor& reactor) noexcept { pthread_setspecific(reactor_key, &reactor); } __inline__ void set_coroutine(Coroutine& coro) noexcept { pthread_setspecific(coro_key, &coro); } __inline__ Reactor& get_reactor() noexcept { return *reinterpret_cast<Reactor*>(pthread_getspecific(reactor_key)); } __inline__ Coroutine& get_coroutine() noexcept { return *reinterpret_cast<Coroutine*>(pthread_getspecific(coro_key)); } 

The calls CDMY73CDMY and CDMY74CDMY are already built into the yurco library class internals, so that everything works, you just need to call CDMY75CDMY before use. For convenience, calling CDMY76CDMY will also create one instance of the CDMY77CDMY class, now you can write methods to work with it:


void run(const size_t batch_size=16, const size_t events_at_once=1024) noexcept; __inline__ void terminate() noexcept; template <class Func, class... Args> __inline__ void async(Func&& func, Args&&... args) noexcept; __inline__ void suspend(const int fd, int events); 

and another method for the current coroutine:


__inline__ void yield(); 

Write CDMY78CDMY to replace CDMY79CDMY


ssize_t __wrap_read(int fd, void* buf, size_t count) { return yurco::read(std::nothrow, yurco::get_reactor(), yurco::get_coroutine(), fd, buf, count); } 

and by analogy, we’ll write wrappers for other system functions https://github. com/yurial/yurco/blob/master/transparency.cpp


Now you can greatly simplify our echo-server.
To enable system call spoofing, you need to build a project with the CDMY80CDMY directive: CDMY81CDMY


$ cat 041_echo.cpp
#include <yurco/all.hpp> #include <unistd/signalfd.hpp> #include <unistd/addrinfo.hpp> #include <unistd/netdb.hpp> #include <iostream> #include <signal.h> #include <stdlib.h> void process_connection(unistd::fd& clientfd) { char buf[1024]; try { const size_t nread=unistd::read(clientfd, buf, sizeof(buf)); if (0 == nread) return;//terminate coroutine, close connection unistd::write_all(clientfd, buf, nread); } catch (const yurco::terminate_exception&) { std::cerr << "terminate connection coroutine" << std::endl; } catch (...) { std::cerr << "got unknown exception while read/write" << std::endl; } } void listener(unistd::fd& serverfd) { try { for (;;) { for (size_t i=0; i < 32; ++i)//sometimes we should yield() to process accepted connections { unistd::fd clientfd=unistd::fd::nodup(unistd::accept(serverfd, SOCK_NONBLOCK)); yurco::async(process_connection, std::move(clientfd)); } yurco::yield(); } } catch (const yurco::terminate_exception&) { std::cerr << "terminate listener coroutine" << std::endl; } catch (...) { std::cerr << "unknwon exception while accept" << std::endl; } } void signal_handler(unistd::fd& sigfd) { yurco::suspend(sigfd, EPOLLIN); std::cerr << "we got a signal" << std::endl;//use unistd::read(sigfd) to get a signals yurco::terminate(); } void register_signal_handler() { sigset_t sigmask; sigemptyset(&sigmask); sigaddset(&sigmask, SIGINT); sigaddset(&sigmask, SIGTERM); sigaddset(&sigmask, SIGQUIT); unistd::fd sigfd=unistd::fd::nodup(unistd::signalfd(sigmask)); yurco::async(signal_handler, std::move(sigfd));//use std::move to avoid ::dup() file descriptior sigaddset(&sigmask, SIGPIPE);//SIGPIPE just ignored and not processed sigprocmask(SIG_BLOCK, &sigmask, nullptr); } void register_listener() { const std::vector<unistd::addrinfo> addr=unistd::getaddrinfo("localhost:31337");//or [::]:31337 or other valid variants unistd::fd serverfd=unistd::fd::nodup(unistd::socket(addr.at(0), SOCK_NONBLOCK)); unistd::setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, 1); unistd::bind(serverfd, addr.at(0)); unistd::listen(serverfd, 8192/*backlog*/); yurco::async(listener, std::move(serverfd));//use std::move to avoid ::dup() file descriptior } int main() { const size_t stack_size=16*1024;//size less than 16k lead to SIGSEGV cause libunwind require more space yurco::init(stack_size); register_signal_handler(); register_listener(); yurco::run(); return EXIT_SUCCESS; } 

console 1: $./041_echo
console 2: $ echo qweasd | nc -6 :: 1 31337
console 2: qweasd
console 1: ^ Cwe got a signal
console 1: terminate listener coroutine


The code has lost a lot of weight, all references to CDMY82CDMY and CDMY83CDMY have disappeared from it, but it is still working.


Check the operation of transparent coroutines on a third-party library, for example, mysql:


$ cat 042_mysql.cpp
#include <yurco/all.hpp> #include <unistd/signalfd.hpp> #include <unistd/addrinfo.hpp> #include <unistd/netdb.hpp> #include <mysql/mysql.h> #include <iostream> #include <signal.h> #include <stdlib.h> void process_connection(unistd::fd& fd) { try { char buf[100]; unistd::read(fd, buf, sizeof(buf));/* Create a connection */std::unique_ptr<MYSQL, std::function<decltype(mysql_close)>> con(mysql_init(nullptr), mysql_close); mysql_real_connect(con.get(), "db.local", "ro", "", nullptr, 0, nullptr, 0); mysql_query(con.get(), "SELECT NOW(), SLEEP(10);"); std::unique_ptr<MYSQL_RES, std::function<decltype(mysql_free_result)>> result(mysql_store_result(con.get()), mysql_free_result); if (!result) return;//silently close connection for (MYSQL_ROW row=mysql_fetch_row(result.get()); row; row=mysql_fetch_row(result.get())) { static char header[]="HTTP/1.1 200 OK\r\nContent-Length: 21\r\nConnection: close\r\n\r\n"; unistd::write_all(fd, header, strlen(header)); const char* const answer=row[0]; unistd::write_all(fd, answer, strlen(answer)); unistd::write_all(fd, "\r\n", 2); } } catch (const yurco::terminate_exception&) { std::cerr << "terminate connection coroutine while write" << std::endl; } catch (...) { std::cerr << "unknown exception while write" << std::endl; } } void listener(unistd::fd& sock) { try { for (;;) { for (size_t i=0; i < 32; ++i)//sometimes we should yield() to processing accepted connections { unistd::fd clientfd=unistd::fd::nodup(unistd::accept(sock, SOCK_NONBLOCK)); yurco::async(process_connection, std::move(clientfd)); } yurco::yield(); } } catch (const yurco::terminate_exception&) { std::cerr << "terminate listener coroutine" << std::endl; } catch (...) { std::cerr << "unknwon exception while accept" << std::endl; } } void signal_handler(unistd::fd& sigfd) { yurco::suspend(sigfd, EPOLLIN); std::cerr << "we got a signal" << std::endl;//use unistd::read(sigfd) to get a signals instead of yurco::suspend yurco::terminate(); } void balast_handler() { uint64_t counter=0; for (;;) { ++counter; yurco::yield(); } } void register_signal_handler() { sigset_t sigmask; sigemptyset(&sigmask); sigaddset(&sigmask, SIGINT); sigaddset(&sigmask, SIGTERM); sigaddset(&sigmask, SIGQUIT); unistd::fd sigfd=unistd::fd::nodup(unistd::signalfd(sigmask)); yurco::async(signal_handler, std::move(sigfd));//use std::move to avoid ::dup() file descriptior sigaddset(&sigmask, SIGPIPE);//SIGPIPE just ignored and not processed sigprocmask(SIG_BLOCK, &sigmask, nullptr); } void register_listener() { const std::vector<unistd::addrinfo> addr=unistd::getaddrinfo("localhost:31337");//or [::]:31337 or other valid variants unistd::fd sock=unistd::fd::nodup(unistd::socket(addr.at(0), SOCK_NONBLOCK)); unistd::setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1); unistd::setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, 3); unistd::setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, 1); unistd::setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, 1); unistd::setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1);//keep-alive not required, but good practic unistd::bind(sock, addr.at(0)); unistd::listen(sock, 8192/*backlog*/); yurco::async(listener, std::move(sock));//use std::move to avoid ::dup() file descriptior } void async_main() { register_signal_handler(); register_listener(); } int main() { const size_t stack_size=16*1024;//size less than 16k lead to SIGSEGV cause libunwind require more space yurco::init(stack_size); yurco::async(async_main); yurco::run(); return EXIT_SUCCESS; } 

console 1: $./042_mysql
console 2: $ curl -6 ' http://[:: 1]: 31337/ '& amp; & amp; date
console 3: $ curl -6 ' http://[:: 1]: 31337/ '& amp; & amp; date
console 2: 2020-06-08 12:20:48
console 2: Mon Jun 8 12:20:58 MSK 2020
console 3: 2020-06-08 12:20:48
console 3: Mon Jun 8 12:20:58 MSK 2020
console 1: ^ Cwe got a signal
console 1: terminate listener coroutine


Voila! Our program processed 2 queries (making queries to mysql) in one thread, moreover, SQL queries were executed in parallel (this can be seen by the time the mysql server answered).


ps At the moment, the library does not support timeouts and disk I/O, intercepts the system call ':: getaddrinfo ()'. How sad it becomes - I’ll definitely add it.


Patch writing is welcome!


UPD 2020-06-08 the article has been substantially revised, class descriptions and usage examples have been added.

.

Source