> "concurrency" : Birden fazla olayın aynı zaman dilimi içerisinde gerçekleşmesine "concurrency", birden fazla olayın tam olarak aynı anda gerçekleşiyor olmasına da "simultaneously" denir. Dolayısıyla "simultaneously" gerçekleşen olaylar için aynı zamanda "concurrency" tabirini kullanabiliriz fakat "concurrency" gerçekleşen olaylar için "simultaneously" tabirini kullanamayız. Buradan hareketle "simultaneously" için işlemci/çekirdek sayısının birden fazla olması bir zorunlulukken, "concurrency" için böyle bir zorunluluk bulunmamaktadır. Pekala bir sistemde bu iki mekanizmanın birleşmiş hali de bulunabilir. C++11 ile dile bu mekanizmalar eklenmiştir. Diğer yandan unutulmamalıdır ki C++ dilindeki iş bu standart fonksiyonlar, arka planda işletim sistemininin fonksiyonlarını çağırmaktadır. C++17 ile birlikte "STL" algoritma fonksiyonları paralel biçimde çalıştırabilir. Şimdi de örnekler üzerinden incelemelerde bulunalım; * Örnek 1.0, Bir iş yükü atanmamış "std::thread" nesneleri olabilir. Bunlar iş yapmayı bekleyenlerdir. #include #include int main() { std::thread tx; std::cout << tx.joinable() << '\n'; // true tx.join(); std::thread ty([]{}); std::cout << ty.joinable() << '\n'; // false ty.join(); } * Örnek 1.1, #include #include #include #include void foo() { } int main() { /* # OUTPUT # [I] : Full [II] : Empty ========== [I] : Empty [II] : Full */ std::thread t1{foo}; std::thread t2{}; std::cout << "[I] : "; std::cout << (t1.joinable() ? "Full" : "Empty")<< '\n'; std::cout << "[II] : "; std::cout << (t2.joinable() ? "Full" : "Empty") << '\n'; std::cout << "==========\n"; t2 = std::move(t1); std::cout << "[I] : "; std::cout << (t1.joinable() ? "Full" : "Empty")<< '\n'; std::cout << "[II] : "; std::cout << (t2.joinable() ? "Full" : "Empty") << '\n'; t2.join(); } * Örnek 1.2, #include #include int main() { auto fn = []{ std::cout << "My first thread\n"; }; std::thread tx{ fn }; // "std::thread" nesnesi, bir iş yüküyle hayata geldi. tx.join(); // "main-thread", "tx" in işi bitene kadar bloke olmuştur. // "tx" is in "non-joinable" state. } * Örnek 1.3, İş yükü olarak çeşitli fonksiyonları "std::thread" nesnelerine verebiliriz. #include #include void bar(int x, int y) { std::cout << "bar(" << x << ", " << y << ")\n"; } class Myclass { public: void foo(int x) { std::cout << "Myclass::foo(" << x << ")\n"; } static void s_foo(int x) { std::cout << "Myclass::s_foo(" << x << ")\n"; } void operator()(int x) { std::cout << "Myclass::operator()(" << x << ")\n"; } }; int main() { auto fn = [](int x){ std::cout << "Lambda::operator()(" << x << ")\n"; }; std::thread t1{ fn, 1 }; // Lambda::operator()(1) std::thread t2{ &Myclass::s_foo, 2 }; // Myclass::s_foo(2) Myclass mc; std::thread t3{ &Myclass::foo, &mc, 3 }; // Myclass::foo(3) std::thread t4{ bar, 4, 5 }; // bar(4, 5) t4.join(); t3.join(); t2.join(); t1.join(); } * Örnek 1.4.0, İş yükü olarak kullanılacak fonksiyonlar "non-const L-Value" referans alacaklarsa, o parametreleri "std::ref" ile sarmalama yapmamız gerekmektedir. Aksi halde sentaks hatası alırız. #include #include #include #include void func(std::list& local) { local.push_back(*local.begin()); local.push_back(*local.begin()); local.push_back(*local.begin()); } int main() { std::list my_list{ 2, 4, 6, 8, 10 }; for (const auto i : my_list) std::cout << i << ' '; // 2 4 6 8 10 std::cout << '\n'; std::thread t{ func, std::ref(my_list) }; t.join(); for (const auto i : my_list) std::cout << i << ' '; // 2 4 6 8 10 2 2 2 std::cout << '\n'; } * Örnek 1.4.1, #include #include #include class Myclass { public: Myclass() { std::cout << "Default Ctor.\n"; } Myclass(const Myclass&) { std::cout << "Copy Ctor.\n"; } Myclass(Myclass&&) noexcept { std::cout << "Move Ctor.\n"; } }; void bar(Myclass&) { } void foo(const Myclass&) { } void baz(Myclass&&) { } int main() { { // Default Ctor. Myclass m; // ERROR // std::thread t1{ bar, m }; t1.join(); // Copy Ctor. std::thread t2{ foo, m }; t2.join(); // Copy Ctor. std::thread t3{ baz, m }; t3.join(); } std::cout << "====================\n"; { // Default Ctor. Myclass m; // A reference for 'm'; no new instance of "Myclass"ç std::thread t1{ bar, std::ref(m) }; t1.join(); // Copy Ctor. std::thread t2{ foo, m }; t2.join(); // Move Ctor. std::thread t3{ baz, std::move(m) }; t3.join(); } } * Örnek 1.5, #include #include class Myclass { public: Myclass() = default; Myclass(const Myclass&) { std::cout << "Copy\n"; } Myclass(Myclass&&) noexcept { std::cout << "Move\n"; } }; void foo (Myclass) {} void bar (Myclass&&) {} void baz (Myclass&) {} void bat (const Myclass&) {} int main() { // "std::thread" sınıfının "Ctor." fonksiyonunun parametre paketindeki // öğeler için "decay" işlemi uygulanıyor; diziler göstericiye, fonksiyon // isimleri fonksiyon adreslerine, referans niteliğinin düşmesi vb. // Daha sonra elde edilen değer ilgili fonksiyona gönderiliyor. { Myclass m; // Copy // Move std::thread t1{ foo, m }; t1.join(); } std::cout << "\n==================\n"; { Myclass m; // Copy std::thread t1{ bar, m }; t1.join(); } std::cout << "\n==================\n"; { // Move std::thread t1{ bar, Myclass{} }; t1.join(); } std::cout << "\n==================\n"; { Myclass m; // Error: // std::thread t1{ baz, m }; t1.join(); } std::cout << "\n==================\n"; { Myclass m; // Copy std::thread t1{ bat, m }; t1.join(); } std::cout << "\n==================\n"; { // Move std::thread t1{ bat, Myclass{} }; t1.join(); } std::cout << "\n==================\n"; { Myclass m; // Default Ctor. std::thread t1{ baz, std::ref(m) }; t1.join(); } } * Örnek 2, "std::thread" türünden nesneler kopyalamaya karşı kapalıdır. #include #include #include #include int main() { /* # OUTPUT # thread-2 is running. thread-0 is running. thread-6 is running. thread-8 is running. thread-9 is running. thread-3 is running. thread-1 is running. thread-4 is running. thread-5 is running. thread-7 is running. main devam ediyor */ auto fn = [](int id){ // "thread-safe" version. std::osyncstream{std::cout} << "thread-" << id << " is running.\n"; }; std::vector tvec; for (int i = 0; i < 10; ++i) tvec.push_back(std::thread{fn, i}); // "std::thread" sınıfı "Move Only" bir sınıf. // "Copy Ctor." ve "Copy Assign" fonksiyonları "delete" edilmiş. for (auto& t : tvec) t.join(); std::cout << "main devam ediyor\n"; } * Örnek 3.0, Hayata gelen bir "thread" ya "join" edilmeli ya da "detach". #include #include #include #include void foo() { std::cout << "foo\n"; } void my_terminate() { std::cout << "my_terminate\n"; (void)getchar(); std::abort(); } int main() { /* // -----> (1) * İlgili "thread" nesnesi ".join()" veya ".detach()" * edilmediğinden bir hata nesnesi fırlatıldı. Yakalanmadığı * için de "std::terminate" çağrıldı fakat onu da konfigure * ettiğimizden, bizimki çağrıldı. */ std::set_terminate(my_terminate); { std::thread t1{foo}; // t1.join(); // -----> (1) } std::cout << "main devam ediyor\n"; } * Örnek 3.1, #include #include #include #include void foo() { std::cout << "foo\n"; } void my_terminate() { std::cout << "my_terminate\n"; (void)getchar(); std::abort(); } int main() { /* // -----> (1) * İlgili "thread" nesnesi ".join()" veya ".detach()" edildikten * sonra tekrardan ".join()" veya ".detach()" edilirse, yine bir * hata nesnesi fırlatır. Yakalanmadığı için de "std::terminate" * çağrılır fakat onu da konfigure ettiğimizden, bizimki çağrıldı. */ std::set_terminate(my_terminate); { std::thread t1{foo}; t1.join(); t1.detach(); // -----> (1) } std::cout << "main devam ediyor\n"; } * Örnek 3.2, #include #include #include #include void foo() { std::cout << "foo\n"; } void my_terminate() { std::cout << "my_terminate\n"; (void)getchar(); std::abort(); } int main() { /* // -----> (1) * İş yükü olmayan ilgili "thread" nesnesi * ".join()" veya ".detach()" edilirse yine bir * hata nesnesi fırlatacaktır. */ std::set_terminate(my_terminate); { std::thread t1{}; t1.join(); // t1.detach(); // -----> (1) } std::cout << "main devam ediyor\n"; } * Örnek 3.3, #include #include #include #include void foo() { std::cout << "foo\n"; } int main() { /* # OUTPUT # foo exception caught main devam ediyor. */ std::thread t1{ foo }; // 't1' is joinable here. std::thread t2{ std::move(t1) }; // 't1' is no longer joinable here, // but 't2' is. try { t1.join(); // Will cause throwing an error } catch (...) { std::cout << "exception caught\n"; } std::cout << "main devam ediyor.\n"; t2.join(); } * Örnek 3.4, #include #include #include #include #include #include std::thread make_one(int n) { return std::thread{ [n]() mutable { while (n--) std::cout << "Ulya Yuruk\n"; } }; } std::thread transfer_one(std::thread t) { //... return t; } int main() { /* # OUTPUT # Ulya Yuruk Ulya Yuruk Ulya Yuruk */ auto t1 = make_one(3); auto t2 = std::move(t1); t1 = transfer_one(std::move(t2)); t1.join(); } * Örnek 3.5.0, Tabii böyle yapmak yerine "RAII idiom" kullanan "std::jthread" sınıfını kullanabiliriz; #include #include #include void foo() { std::cout << "func called\n"; } int main() { /* # OUTPUT # main basladi func called Hata yakalandi main sonlanacak */ std::cout << "main basladi\n"; { try { std::jthread t{ foo }; throw std::runtime_error{ "error" }; // t.join(); } catch(...) { std::cout << "Hata yakalandi\n"; } } std::cout << "main sonlanacak\n"; } * Örnek 3.5.1, "std::jthread" kullanılmadığı ve hata nesnesinin fırlatılması durumunda ilgili "thread" için ".join()" çağrılmayacağından, "dead_lock" oluşacaktır. #include #include #include void foo() { std::cout << "func called\n"; } int main() { /* # OUTPUT # main basladi func called terminate called without an active exception */ std::cout << "main basladi\n"; { try { std::thread t{ foo }; throw std::runtime_error{ "error" }; t.join(); } catch(...) { std::cout << "Hata yakalandi\n"; } } std::cout << "main sonlanacak\n"; } * Örnek 3.5.2, // Concurrency in Action kitabında verilen basit bir jthread implementasyonu #include class joining_thread { public: joining_thread() noexcept = default; template explicit joining_thread(Callable&& func, Args&& ... args) : m_t(std::forward(func), std::forward(args)...) {} explicit joining_thread(std::thread t) noexcept : m_t(std::move(t)) {} joining_thread(joining_thread&& other) noexcept : m_t(std::move(other.m_t)) {} joining_thread& operator=(joining_thread&& other) noexcept { if (joinable()) join(); m_t = std::move(other.m_t); return *this; } joining_thread& operator=(std::thread other) noexcept { if (joinable()) join(); m_t = std::move(other); return *this; } ~joining_thread() noexcept { if (joinable()) join(); } void swap(joining_thread& other) noexcept { m_t.swap(other.m_t); } std::thread::id get_id() const noexcept { return m_t.get_id(); } bool joinable() const noexcept { return m_t.joinable(); } void join() { m_t.join(); } void detach() { m_t.detach(); } std::thread& as_thread() noexcept { return m_t; } const std::thread& as_thread() const noexcept { return m_t; } private: std::thread m_t; }; * Örnek 4, "n" tane "thread" ile eş zamanlı olarak bir iş yaptırtabiliriz. #include #include #include void foo(char c) { for (int i = 0; i < 1; ++i) std::cout << c; } int main() { /* # OUTPUT # ABEFGIJLKMNOQRSTUVWXYPZCDH */ std::vector tvec(26); for (int i = 0; i < 26; ++i) tvec[i] = std::thread{ foo, static_cast(i + 'A') }; for (auto& t: tvec) t.join(); } * Örnek 5.0, "this_thread" isim alanı içerisindeki "sleep_for" fonksiyonu, içinde bulunulan "thread" i belirtilen süre boyunca bekletir. #include #include #include #include using dsec = std::chrono::duration; void foo(char c) { for (int i = 0; i < 1; ++i) std::cout << c; // Çağıran "thread" i ilgili süre boyunca // bekletir, uyutur, akışını durdurur. std::this_thread::sleep_for(dsec{5}); } int main() { /* # OUTPUT # ABEFGIJLKMNOQRSTUVWXYPZCDH */ std::vector tvec(26); for (int i = 0; i < 26; ++i) tvec[i] = std::thread{ foo, static_cast(i + 'A') }; for (auto& t: tvec) t.join(); } * Örnek 5.1.0, "get_id()" isimli global fonksiyon ile içinde bulunulan "thread" in ID değerini, ".get_id()" isimli üye fonksiyon ile ilgili "std::thread" türünden nesnenin temsil ettiği "thread" in ID değerini "get" edebiliriz. Bu fonksiyonların geri dönüş türü ise "std::thread::id" türündendir. #include #include #include #include #include #include using dsec = std::chrono::duration; void foo(char c) { for (int i = 0; i < 1; ++i) std::osyncstream{ std::cout } << "[" << std::this_thread::get_id() << "] : " << c << '\n'; } int main() { /* # OUTPUT # [140511882524224] : A [140511848953408] : E [140511865738816] : C [140511840560704] : F [140511874131520] : B [140511823775296] : H [140511832168000] : G [140511857346112] : D [140511583131200] : M [140511599916608] : J [140511591523904] : L [140511574738496] : N [140511557953088] : P [140511566345792] : O [140511549560384] : Q [140511465698880] : R [140511448913472] : S [140511440520768] : T [140511432128064] : U [140511423735360] : V [140511415342656] : W [140511406949952] : X [140511398557248] : Y [140511390164544] : Z [140511457306176] : K [140511815382592] : I */ std::vector tvec(26); for (int i = 0; i < 26; ++i) tvec[i] = std::thread{ foo, static_cast(i + 'A') }; //for (auto& t: tvec) // std::cout << "<" << t.get_id() << ">\n"; // -----> (1) for (auto& t: tvec) t.join(); } * Örnek 5.1.1, #include #include #include #include #include #include std::thread::id main_thread_id; void foo() { if (std::this_thread::get_id() == main_thread_id) { std::cout << "Call from main thread\n"; } else { std::cout << "Call from ANOTHER thread\n"; } } int main() { /* # OUTPUT # Call from main thread Call from ANOTHER thread */ main_thread_id = std::this_thread::get_id(); foo(); std::thread tx{ foo }; tx.join(); } * Örnek 5.1.2, #include #include #include #include #include #include #include std::mutex foo_mtx; std::vector id_vec; void foo() { std::lock_guard g{ foo_mtx }; id_vec.push_back(std::this_thread::get_id()); } int main() { /* # OUTPUT # 140181196453440 140181188060736 140181179668032 140181171275328 140181088761408 140181080368704 140181071976000 140181063583296 140181055190592 140181046797888 140181038405184 140181030012480 140181021619776 140181013227072 140181004834368 140180996441664 140180988048960 140180979656256 140180971263552 */ std::vector t_vec; for(int i = 0; i < 20; ++i) { t_vec.emplace_back(foo); } for(const auto& id: id_vec) { std::cout << id << '\n'; } for(auto& t: t_vec) { t.join(); } } * Örnek 6.0, "thread" içerisinden bir hata nesnesi yakalandığında ya onu yakalayıp "handle" etmeli ya da ilgili "thread" i hayata getiren "thread" e ilgili hata nesnesi hakkında bilgi vermeliyiz. #include #include #include #include void foo(int x) { if (x % 5 == 0) throw std::runtime_error{ "foo: division error\n" }; } void my_terminate() { std::cout << "my_terminate: "; std::exit(EXIT_FAILURE); } int main() { std::set_terminate(&my_terminate); std::thread tr{ foo, 15 }; tr.join(); std::cout << "main devam ediyor\n"; } * Örnek 6.1, #include #include #include #include void foo(int x) { std::cout << "void foo(" << x << ") was called.\n"; // Handling the exception thrown within the thread: try { if (x % 5 == 0) throw std::runtime_error{ "foo: division error" }; } catch (const std::exception& ex) { std::cout << ex.what() << '\n'; } std::cout << "void foo(" << x << ") was completed.\n"; } void my_terminate() { std::cout << "my_terminate: "; std::exit(EXIT_FAILURE); } int main() { /* # OUTPUT # main basladi void foo(15) was called. foo: division error void foo(15) was completed. main devam ediyor main bitecek */ std::cout << "main basladi\n"; std::set_terminate(&my_terminate); std::thread tr{ foo, 15 }; tr.join(); std::cout << "main devam ediyor\n"; std::cout << "main bitecek\n"; } * Örnek 6.2, #include #include #include #include std::exception_ptr g_ex_ptr{ nullptr }; void foo(int x) { std::cout << "void foo(" << x << ") was called.\n"; try { if (x % 5 == 0) throw std::runtime_error{ "foo: division error" }; } catch (...) { g_ex_ptr = std::current_exception(); } std::cout << "void foo(" << x << ") was completed.\n"; } void my_terminate() { std::cout << "my_terminate: "; std::exit(EXIT_FAILURE); } int main() { /* # OUTPUT # main basladi void foo(15) was called. void foo(15) was completed. main devam ediyor foo: division error main bitecek */ std::cout << "main basladi\n"; std::set_terminate(&my_terminate); std::thread tr{ foo, 15 }; tr.join(); std::cout << "main devam ediyor\n"; // Handling the exception thrown from the thread: try { if (g_ex_ptr) { std::rethrow_exception(g_ex_ptr); } } catch (const std::exception& ex) { std::cout << ex.what() << '\n'; } std::cout << "main bitecek\n"; } * Örnek 6.3, #include #include #include #include std::vector g_ex_vec; std::mutex g_mutex; void f1() { throw std::runtime_error{ "runtime_error from f1" }; } //---------------------------------------------------------------------------------------------------- void f2() { throw std::out_of_range{ "out_of_range error from f2" }; } //---------------------------------------------------------------------------------------------------- void th_func1() { try { f1(); } catch (...) { std::lock_guard guard{ g_mutex }; g_ex_vec.push_back(std::current_exception()); } } //---------------------------------------------------------------------------------------------------- void th_func2() { try { f2(); } catch (...) { std::lock_guard guard{ g_mutex }; g_ex_vec.push_back(std::current_exception()); } } //---------------------------------------------------------------------------------------------------- int main() { std::thread t1(th_func1); std::thread t2(th_func2); t1.join(); t2.join(); for (auto const& ex : g_ex_vec) { try { if (ex != nullptr) std::rethrow_exception(ex); } catch (std::exception const& ex) { std::cout << "exception caught: " << ex.what() << '\n'; } } } * Örnek 7.0, Referans/gösterici semantiği ile bir nesneyi "thread" e iş yükü olarak vermişsek, referans olunan/gösterilen esas nesnenin hayatı bitmesi durumunda ilgili "thread" imiz bunun farkında olmayacaktır. Dolayısıyla "Tanımsız Davranış" oluşacak, "Dangling Reference/Pointer" durumu. #include #include #include #include #include #include void read_load(const std::vector* p) { auto n = std::accumulate(p->begin(), p->end(), 0); std::cout << n << '\n'; } void foo() { std::vector ivec{ 0, 2, 4, 6, 8, 1, 3, 5, 7, 9 }; std::thread t{ read_load, &ivec }; // Buradaki ".detach()" işleminden ötürü // iş bu "thread" arka planda bağımsız çalışacaktır. // Fakat "main-thread" akmaya devam edecektir. Bundan // dolayı "ivec" nesnesinin hayatı bitecektir. t.detach(); } int main() { /* # OUTPUT # 0 */ foo(); using namespace std::literals; std::this_thread::sleep_for(10s); } * Örnek 7.1, #include #include #include #include #include #include void read_load(const std::vector& p) { auto n = std::accumulate(begin(p), end(p), 0); std::cout << n << '\n'; } void foo() { std::vector ivec{ 0, 2, 4, 6, 8, 1, 3, 5, 7, 9 }; std::thread t{ read_load, std::ref(ivec) }; // Buradaki ".detach()" işleminden ötürü // iş bu "thread" arka planda bağımsız çalışacaktır. // Fakat "main-thread" akmaya devam edecektir. Bundan // dolayı "ivec" nesnesinin hayatı bitecektir. t.detach(); } int main() { /* # OUTPUT # 0 */ foo(); using namespace std::literals; std::this_thread::sleep_for(10s); } * Örnek 8, Yine bir "thread" i gelecekteki bir "time point" gelene kadar da bloke edebiliriz. #include #include #include auto now() { return std::chrono::steady_clock::now(); } auto awake_time() { using std::chrono::operator""ms; return now() + 2000ms; } int main() { std::cout << "Hello\n" << std::flush; const auto start{ now() }; std::this_thread::sleep_until(awake_time()); std::chrono::duration elapsed{ now() - start }; std::cout << "Waited " << elapsed.count() << " ms\n"; } * Örnek 9.0, "thread_local" anahtar sözcüğünü kullanarak bir değişkenin o "thread" e özgü olmasını sağlatabiliriz. "thread" in başlaması ile birlikte değişken de hayata gelecektir. "thread" in çalışması bittiğinde de değişkenimizin hayatı da sonlanacaktır. #include #include #include thread_local int t_val{ 0 }; void func(const std::string& id) { ++t_val; std::osyncstream{ std::cout } << "t_val in thread named [" << id << "] is [" << t_val << "]\n"; } int main() { /* # OUTPUT # t_val in thread named [main] is [1] t_val in thread named [t1] is [1] t_val in thread named [t2] is [1] t_val in thread named [t3] is [1] t_val in thread named [main] is [2] */ func("main"); std::thread t1{ func, "t1" }; std::thread t2{ func, "t2" }; std::thread t3{ func, "t3" }; //... t3.join(); t2.join(); t1.join(); func("main"); } * Örnek 9.1, #include #include thread_local int ival{ 0 }; void func(int* p) { *p = 777; std::cout << "*p: " << *p << ", ival: " << ival << '\n'; } int main() { /* # OUTPUT # ival: 0 ival: 333 *p: 777, ival: 0 */ std::cout << "ival: " << ival << '\n'; ival = 333; std::cout << "ival: " << ival << '\n'; std::thread t1{ func, &ival }; t1.join(); } * Örnek 9.2, #include #include #include #include #include thread_local std::string name{ "Ulya" }; void func(std::string const& surname) { name += '_' + surname + '@'; std::osyncstream ocout{ std::cout }; ocout << name << "[" << &name << "]\n"; } int main() { /* # OUTPUT # Ulya_Yuruk@[0x7f56394e6618] Ulya_Istanbul@[0x7f56384e4618] Ulya_Uskudar@[0x7f5638ce5618] Ulya_Ulya@[0x7f5639ce7618] Ulya_29.10.1995@[0x7f5637ce3618] */ const char* const pa[] = { "Ulya", "Yuruk", "Uskudar", "Istanbul", "29.10.1995" }; std::vector t_vec; for( auto i: pa ) { t_vec.emplace_back(func, i); } for( auto& i: t_vec ) { i.join(); } } * Örnek 9.3.0, #include #include #include #include class Myclass { public: Myclass() { std::osyncstream{ std::cout } << "Myclass constructor this : " << this << '\n'; } ~Myclass() { std::osyncstream{ std::cout } << "Myclass destructor this : " << this << '\n'; } }; void foo() { std::osyncstream{ std::cout } << "foo called\n"; thread_local Myclass m; std::osyncstream{ std::cout } << "foo ends\n"; } void bar() { using namespace std::chrono_literals; std::osyncstream{ std::cout } << "bar called\n"; foo(); std::this_thread::sleep_for(3s); std::osyncstream{ std::cout } << "bar ends\n"; } int main() { /* # OUTPUT # bar called foo called Myclass constructor this : 0x7fd352f73630 foo ends bar ends Myclass destructor this : 0x7fd352f73630 */ std::thread t{ bar }; t.join(); } * Örnek 9.3.1, #include #include #include #include class Myclass { public: Myclass() { std::osyncstream{ std::cout } << "Myclass constructor this : " << this << '\n'; } ~Myclass() { std::osyncstream{ std::cout } << "Myclass destructor this : " << this << '\n'; } }; void foo() { std::osyncstream{ std::cout } << "foo called\n"; thread_local Myclass m; std::osyncstream{ std::cout } << "foo ends\n"; } void bar() { using namespace std::chrono_literals; std::osyncstream{ std::cout } << "bar called\n"; foo(); std::this_thread::sleep_for(3s); std::osyncstream{ std::cout } << "bar ends\n"; } int main() { /* # OUTPUT # bar called foo called Myclass constructor this : 0x7f1ca9b7d630 foo ends bar called foo called Myclass constructor this : 0x7f1caa37e630 foo ends bar called foo called Myclass constructor this : 0x7f1ca937c630 foo ends bar ends Myclass destructor this : 0x7f1ca9b7d630 bar ends Myclass destructor this : 0x7f1caa37e630 bar ends Myclass destructor this : 0x7f1ca937c630 */ std::thread t1{ bar }; std::thread t2{ bar }; std::thread t3{ bar }; t3.join(); t2.join(); t1.join(); } * Örnek 9.4, #include #include #include #include thread_local int gt{}; void func(char c) { ++gt; std::osyncstream{ std::cout } << c << " " << gt << '\n'; } int main() { /* # OUTPUT # a 1 f 1 b 1 c 1 d 1 g 1 h 1 e 1 i 1 k 1 p 1 m 1 j 1 o 1 l 1 q 1 r 1 n 1 t 1 u 1 s 1 v 1 x 1 y 1 z 1 w 1 */ using namespace std; vector tvec; for (char c = 'a'; c <= 'z'; ++c) { tvec.emplace_back(func, c); } // for (auto& t : tvec) { t.join(); } } * Örnek 9.5, #include #include #include #include thread_local std::mt19937 eng{ 454255u }; void foo() { std::uniform_int_distribution dist{ 10, 99 }; std::osyncstream os{ std::cout }; for (int i = 0; i < 10; ++i) { os << dist(eng) << ' '; } os << '\n'; } int main() { /* # OUTPUT # 39 18 40 53 31 30 13 41 99 63 39 18 40 53 31 30 13 41 99 63 39 18 40 53 31 30 13 41 99 63 */ std::thread t1{ foo }; std::thread t2{ foo }; std::thread t3{ foo }; t3.join(); t2.join(); t1.join(); } * Örnek 10.0, "std::thread" nesnelerinin çalışmasını dışarıdan durdurabiliriz. Bunun için bizlerin "std::stop_source" türünden bir değişkene ihtiyacımız vardır. #include #include #include int main() { using namespace std::literals::chrono_literals; std::stop_source st_src; std::thread foo( [stoken = st_src.get_token()]() { // "stoken" is of type "std::stop_token". while (!stoken.stop_requested()) { std::cout.put('*'); std::this_thread::sleep_for(50ms); } } ); std::thread bar( // "Lambda Init. Capture" [stoken = st_src.get_token()]() { while (!stoken.stop_requested()) { std::cout.put('.'); std::this_thread::sleep_for(1s); } } ); std::this_thread::sleep_for(5s); // Aynı "std::stop_source" türden nesne kullanan "thread" ler için durdurulma talebi gönderildi. st_src.request_stop(); std::cout << "\nstopped\n"; foo.join(); bar.join(); } * Örnek 10.1, Fakat bunun yerine "std::jthread" sınıfını kullanabiliriz. Sadece iş yükü olarak kullanılacak fonksiyonun ilk parametresini "std::stop_token" türünden yapmalıyız. Gerisini "std::jthread" halledecektir. #include #include #include int main() { using namespace std::literals::chrono_literals; std::jthread foo( [](std::stop_token stoken) { while (!stoken.stop_requested()) { std::cout.put('*'); std::this_thread::sleep_for(50ms); } } ); std::jthread bar( [](std::stop_token stoken) { while (!stoken.stop_requested()) { std::cout.put('.'); std::this_thread::sleep_for(50ms); } } ); std::this_thread::sleep_for(5s); foo.request_stop(); bar.request_stop(); std::cout << "\nstopped\n"; } Şimdi de 'thread' ler arası haberleşmeye değinelim. "thread" ler arası haberleşme, eğer birden fazla "thread" nesnesi ortak nesne üzerinde çalışıyorsa ve en az birisi yazma işlevindeyse, "data racing" görülür. Çünkü "data racing" oluşması "Tanımsız Davranış". Ortak nesne üzerinde çalışan "thread" lerin mutlaka bir şekilde senkronize edilmesi gerekmektedir. Örneğin, * Örnek 1, #include #include void foo(long long) { } long long value; bool flag{}; void producer() { value = 76324; flag = true; } void consumer() { while (!flag) ; foo(value); } int main() { // Başlangıçta "flag" değişkeni "false" değerinde. // Ne zamanki "producer" fonksiyonu değişkenin değerini // "true" yaptı, "consumer" fonksiyonundaki döngüden çıkılacak // ve "value" değişkeni kullanılacak. // Eğer birden fazla "thread" nesnesi bu işlemleri yapıyorsa ve // senkronizasyon mekanizması da yoksa, tanımsız davranış oluşacaktır. // Örneğin, "flag" değişkeninin değerinin sınanması esnasında "value" // değişkeninin değeri değişebilir. Hatta "value" değişkeninin değeri // ne yeni değeri ne de eski değerinde de olabilir, yani ara bir değerde // de olabilir. } İşte bunu engellemek için bir takım kavramlar dile eklenmiştir. Bunlar, "mutex" nesneleri, "std::future", "std::promise", "std::async", "std::packaged_task", "Conditional Variables", "atomic", "semaphore" nesneleri biçimindeki kavramlardır. Şimdi de sırayla bu kavramları incelemeye koyulalım: >> "mutex" nesneleri: C++ dilinde "mutex" birden fazla "mutex" nesnesi vardır. Bunlar, "std::mutex", "std::recursive_mutex", "std::timed_mutex", "std::shared_mutex" isimli sınıflardır. Bu sınıflar birim zamanda sadece tek bir "thread" nesnesinin çalışmasını sağlatabilir. >>> "std::mutex" : Diğer "mutex" sınıfları arasında en minimal arayüzü sunan sınıftır. Bünyesinde ".lock()", ".unlock()" ve ".try_unlock" isimli fonksiyonları barındırır. Bu fonksiyonlar sırasıyla ilgili "mutex" nesnesini kilitler, kilidini kaldırır ve kilitlemeye çalışır. Dolayısıyla birim zamanda bir işin tek bir "thread" nesnesi tarafından yapılmasını istiyorsak, o işi bu iki fonksiyon çağrısı arasında yapmalıyız. * Örnek 1, #include #include #include #include #include std::string name{ "Ulya" }; std::mutex mtx; void func(std::string const& surname) { mtx.lock(); name += '_' + surname + '@'; std::cout << name << "[" << &name << "]\n"; mtx.unlock(); } int main() { /* # OUTPUT # Ulya_Ulya@[0x55cbd385a160] Ulya_Ulya@_Uskudar@[0x55cbd385a160] Ulya_Ulya@_Uskudar@_Yuruk@[0x55cbd385a160] Ulya_Ulya@_Uskudar@_Yuruk@_29.10.1995@[0x55cbd385a160] Ulya_Ulya@_Uskudar@_Yuruk@_29.10.1995@_Istanbul@[0x55cbd385a160] */ const char* const pa[] = { "Ulya", "Yuruk", "Uskudar", "Istanbul", "29.10.1995" }; std::vector t_vec; for( auto i: pa ) { t_vec.emplace_back(func, i); } for( auto& i: t_vec ) { i.join(); } } * Örnek 2, Eğer "thread" imizin halihazırda "lock" edilmiş bir "mutex" nesnesi gördüğünde bloke olmasını değil de başka işlevler yapmasını istiyorsak, ".try_lock()" fonksiyonunu kullanabiliriz. Bu fonksiyon ilgili "mutex" nesnesinin kilit durumunu sorgulamaktadır. #include #include #include int counter{}; std::mutex counter_mtx; void try_increase() { for (int i = 0; i < 100'000; ++i) { if (counter_mtx.try_lock()) { // Eğer "true" değer döndürürse, "std::mutex" nesnesi kilitlenecektir. ++counter; counter_mtx.unlock(); } } } int main() { std::thread ar_t[10]; for (int i = 0; i < 10; ++i) ar_t[i] = std::thread(try_increase); for (auto& t : ar_t) t.join(); std::cout << counter << " kez arttirma islemi yapilabildi.\n"; } * Örnek 3, Yine "std::jthread" ile birlikte "std::mutex" nesnesini de pekala kullanabiliriz. #include #include #include #include using namespace std; using namespace literals; std::mutex mtx; void foo() { cout << "foo is trying to lock the mutex\n"; mtx.lock(); std::cout << "foo has locked the mutex\n"; this_thread::sleep_for(800ms); cout << "foo is unlocking the mutex\n"; mtx.unlock(); } void bar() { this_thread::sleep_for(100ms); cout << "bar trying to lock the mutex\n"; while (!mtx.try_lock()) { cout << "bar could not lock the mutex\n"; this_thread::sleep_for(100ms); } cout << "bar has locked the mutex\n"; mtx.unlock(); } int main() { std::jthread thr1(foo); std::jthread thr2(bar); } Tabii böyle yapmak yerine "RAII idiom" u kullanan "std::lock_guard" sınıfını da pekala kullanabiliriz. >>>> "std::lock_guard" : Bu sınıfın "ctor." fonksiyonu argüman olarak aldığı "mutex" nesnesinin ".lock()" fonksiyonunu, "dtor." fonksiyonu ise ilgili "mutex" nesnesinin ".unlock()" fonksiyonunu çağıracaktır. Bundan dolayıdır ki birim zamanda bir işin tek bir "thread" nesnesi tarafından yapılmasını istiyorsak, bu işi "std::lock_guard" nesnesinin hayata gelmesinden sonra yapmalıyız. Ayrıca bu sınıf, halihazırda ".lock()" fonksiyonu çağrılmış bir "mutex" nesnesinin kilidini almak suretiyle de hayata gelebilir. Bu işlevleri haricinde başka bir "interface" sunmamaktadır. * Örnek 1, "RAII idiom" kullanan "std::lock_guard" sınıfı; #include #include #include #include #include std::string name{ "Ulya" }; std::mutex mtx; void func(std::string const& surname) { std::lock_guard lg{ mtx }; name += '_' + surname + '@'; std::cout << name << "[" << &name << "]\n"; } int main() { /* # OUTPUT # Ulya_Ulya@[0x55cbd385a160] Ulya_Ulya@_Uskudar@[0x55cbd385a160] Ulya_Ulya@_Uskudar@_Yuruk@[0x55cbd385a160] Ulya_Ulya@_Uskudar@_Yuruk@_29.10.1995@[0x55cbd385a160] Ulya_Ulya@_Uskudar@_Yuruk@_29.10.1995@_Istanbul@[0x55cbd385a160] */ const char* const pa[] = { "Ulya", "Yuruk", "Uskudar", "Istanbul", "29.10.1995" }; std::vector t_vec; for( auto i: pa ) { t_vec.emplace_back(func, i); } for( auto& i: t_vec ) { i.join(); } } * Örnek 2.0, Halihazırda kilitlenmiş "mutex" nesnesi ile hayata gelebilir, hayatının bitmesi durumunda "dtor." fonksiyonu sayesinde ".unlock()" fonksiyonunu çağırtabilir. #include #include #include std::mutex m; int main() { //... m.lock(); { //... std::lock_guard lg(m, std::adopt_lock); //... } // Bu noktada "m" için ".unlock()" fonksiyonu çağrılacaktır. } * Örnek 2.1, #include #include #include unsigned long long counter = 0; std::mutex mtx; void func() { for (unsigned long long i = 0; i < 1'000'000ull; ++i) { mtx.lock(); std::lock_guard lg(mtx, std::adopt_lock); ++counter; } } int main() { { std::jthread t1(func); std::jthread t2(func); std::jthread t3(func); std::jthread t4(func); } std::cout << "counter = " << counter << '\n'; } * Örnek 3, #include #include #include #include #include int gt{}; std::mutex mtx; void func(char c) { std::lock_guard lg{ mtx }; ++gt; std::osyncstream{ std::cout } << c << " " << gt << '\n'; } int main() { /* # OUTPUT # z 1 y 2 x 3 s 4 t 5 u 6 v 7 r 8 b 9 h 10 j 11 l 12 k 13 o 14 e 15 d 16 f 17 n 18 m 19 i 20 c 21 p 22 q 23 g 24 a 25 w 26 */ using namespace std; vector tvec; for (char c = 'a'; c <= 'z'; ++c) { tvec.emplace_back(func, c); } // for (auto& t : tvec) { t.join(); } } * Örnek 4, #include #include #include #include #include std::mt19937 eng{ 454255u }; std::mutex mtx; void foo() { std::uniform_int_distribution dist{ 10, 99 }; std::lock_guard lg{ mtx }; for (int i = 0; i < 10; ++i) { std::cout << dist(eng) << ' '; } std::cout << '\n'; } int main() { /* # OUTPUT # 39 18 40 53 31 30 13 41 99 63 72 71 22 76 61 82 85 69 54 63 72 21 38 21 87 32 88 10 43 89 */ std::thread t1{ foo }; std::thread t2{ foo }; std::thread t3{ foo }; t3.join(); t2.join(); t1.join(); } * Örnek 5, #include #include #include std::mutex mtx; void foo(const std::string& s) { int x = 0; // "automatic lifetime". A "thread" has its own copy. static int y = 0; // "static lifetime". It is unique across the program. thread_local int z = 0; // "thread_local lifetime". A "thread" has its own copy. ++x; // OK ++z; // OK // Way - I: w/o using "RAII idiom" /* mtx.lock(); // Start of Critical Section ++y; std::cout << "thread-" << s << " has id of: [" << std::this_thread::get_id() << "]\n"; // End of Critical Section mtx.unlock(); */ // Way - II: w/ using "RAII idiom" // "Ctor." will call ".lock()", "Dtor." will call ".unlock()". std::lock_guard lg{ mtx }; // Start of Critical Section ++y; std::cout << "thread-" << s << " has id of [" << std::this_thread::get_id() << "] and value of [" << x << "," << y << "," << z << "]\n"; // End of Critical Section // Burada "RAII idiom" yöntemini seçmeliyiz. Eğer "Critical Section" // bölümünden bir hata fırlatırlırsa ve bunu "handle" edemezsek, // ilgili "mutex" nesnesi ".unlock()" edilemeyeceğinden "dead_lock" // oluşacaktır. Artık diğer "thread" lerin bu "Critical Section" // bölümüne erişmesi imkansız hale gelecektir. (bkz. "Stack Unwinding") } int main() { /* # OUTPUT # thread-b has id of [140651935209024] and value of [1,1,1] thread-a has id of [140651943601728] and value of [1,2,1] thread-d has id of [140651918423616] and value of [1,3,1] thread-c has id of [140651926816320] and value of [1,4,1] */ std::thread ta(foo, "a"); std::thread tb(foo, "b"); std::thread tc(foo, "c"); std::thread td(foo, "d"); td.join(); tc.join(); tb.join(); ta.join(); } Öte yandan hem "std::mutex" hem de "std::lock_guard" sınıfları "non-copyable" ve "non-moveable" bir sınıflardır. Tabii şunu da unutmamak gerekir ki birim zamanda tek bir "thread" nesnesi tarafından yapılmasını istediğimiz işin büyüklüğü de programın genel performansı üzerinde etkilidir. Dolayısıyla Kritik Bölge / Kritik Kod ismiyle nitelenen böyle işleri mümkün olduğunda minimal tutmalıyız. * Örnek 1, #include #include #include std::mutex mtx; int t_val{ 0 }; void foo(const std::string& s) { { // Burada "nested" bir blok daha oluşturduk. Böylece ilgili "std::mutex" // nesnesi "foo" fonksiyon çağrısının bitimine kadar değil, bu bloğun // sonuna kadar kilitli kalacaktır. std::lock_guard lg(mtx); ++t_val; std::cout << "thread-" << s << " has value of " << t_val << '\n'; } //... } int main() { /* # OUTPUT # thread-main has value of 0 thread-d has value of 1 thread-b has value of 2 thread-a has value of 3 thread-c has value of 4 */ std::thread ta(foo, "a"); std::thread tb(foo, "b"); std::thread tc(foo, "c"); std::thread td(foo, "d"); { std::lock_guard lg{mtx}; std::cout << "thread-" << "main" << " has value of " << t_val << '\n'; } ta.join(); tb.join(); tc.join(); td.join(); } * Örnek 2, Aşağıdaki örnekte bulunan fonksiyonlar arasında en verimlisi "foo_III" isimli fonksiyondur. //... std::mutex mtx; int shared_variable{}; void foo_I() { std::lock_guard lg{mtx}; // Döngü bitene kadar ilgili "mutex" kilitli tutulacaktır. for (int i = 0; i < 100; ++i) ++shared_variable; } void foo_II() { for (int i = 0; i < 100; ++i) { // Döngünün her turunda ilgili "mutex" önce kilitlenecek, sonra açılacaktır. std::lock_guard lg{mtx}; ++shared_variable; } } void foo_III() { int cnt = 0; for (int i = 0; i < 100; ++i) ++cnt; // Sadece yerel değişkenimizin değerini kopyalarken ilgili "mutex" kilitli kalacaktır. std::lock_guard lg{mtx}; shared_variable = cnt; } int main() { //... } Son olarak "std::mutex" gibi senkronize nesnelerini kullanırken de "dead_lock" oluşmamasına dikkat etmeliyiz. * Örnek 1.0, Aşağıdaki örnekte ilkin "a_mtx" ve "b_mtx" nesneleri kilitlenecektir. Daha sonra 500ms iki "thread" de bekletilecektir. Sonrasındada "b_mtx" ve "a_mtx" nesneleri kilitlenmek istenecektir fakat halihazırda her ikisi de kilitli olduğundan, bu aşamada iki "thread" de ilgili "mutex" nesnesinin açılmasını bekleyecektir. Fakat açılma aşaması diğer adımlarda olduğundan, "dead_lock" oluşacaktır. #include #include #include #include std::mutex a_mtx; std::mutex b_mtx; using namespace std::literals; namespace td = std::this_thread; void foo() { std::osyncstream{ std::cout } << td::get_id() << " is trying to lock a_mtx\n"; a_mtx.lock(); std::osyncstream{ std::cout } << td::get_id() << " has locked a_mtx\n"; std::this_thread::sleep_for(500ms); std::osyncstream{ std::cout } << td::get_id() << " is trying to lock b_mtx\n"; b_mtx.lock(); std::osyncstream{ std::cout } << td::get_id() << " has locked b_mtx\n"; a_mtx.unlock(); std::osyncstream{ std::cout } << td::get_id() << " has unlocked a_mtx\n"; b_mtx.unlock(); std::osyncstream{ std::cout } << td::get_id() << " has unlocked b_mtx\n"; } void bar() { std::osyncstream{ std::cout } << td::get_id() << " is trying to lock b_mtx\n"; b_mtx.lock(); std::osyncstream{ std::cout } << td::get_id() << " has locked b_mtx\n"; std::this_thread::sleep_for(500ms); std::osyncstream{ std::cout } << td::get_id() << " is trying to lock a_mtx\n"; a_mtx.lock(); std::osyncstream{ std::cout } << td::get_id() << " has locked a_mtx\n"; b_mtx.unlock(); std::osyncstream{ std::cout } << td::get_id() << " has unlocked b_mtx\n"; a_mtx.unlock(); std::osyncstream{ std::cout } << td::get_id() << " has unlocked a_mtx\n"; } int main() { /* # OUTPUT # 17100 is trying to lock a_mtx 17100 has locked a_mtx 17560 is trying to lock b_mtx 17560 has locked b_mtx 17100 is trying to lock b_mtx 17560 is trying to lock a_mtx :> */ std::jthread t1{ foo }; std::jthread t2{ bar }; } * Örnek 1.1, #include #include #include #include std::mutex a_mtx; std::mutex b_mtx; using namespace std::literals; namespace td = std::this_thread; void foo() { std::osyncstream{ std::cout } << td::get_id() << " is trying to lock a_mtx\n"; a_mtx.lock(); std::osyncstream{ std::cout } << td::get_id() << " has locked a_mtx\n"; std::this_thread::sleep_for(500ms); std::osyncstream{ std::cout } << td::get_id() << " is trying to lock b_mtx\n"; b_mtx.lock(); std::osyncstream{ std::cout } << td::get_id() << " has locked b_mtx\n"; a_mtx.unlock(); std::osyncstream{ std::cout } << td::get_id() << " has unlocked a_mtx\n"; b_mtx.unlock(); std::osyncstream{ std::cout } << td::get_id() << " has unlocked b_mtx\n"; } void bar() { std::osyncstream{ std::cout } << td::get_id() << " is trying to lock a_mtx\n"; a_mtx.lock(); std::osyncstream{ std::cout } << td::get_id() << " has locked a_mtx\n"; std::this_thread::sleep_for(500ms); std::osyncstream{ std::cout } << td::get_id() << " is trying to lock b_mtx\n"; b_mtx.lock(); std::osyncstream{ std::cout } << td::get_id() << " has locked b_mtx\n"; a_mtx.unlock(); std::osyncstream{ std::cout } << td::get_id() << " has unlocked a_mtx\n"; b_mtx.unlock(); std::osyncstream{ std::cout } << td::get_id() << " has unlocked b_mtx\n"; } int main() { /* # OUTPUT # 13908 is trying to lock a_mtx 14320 is trying to lock a_mtx 13908 has locked a_mtx 13908 is trying to lock b_mtx 13908 has locked b_mtx 13908 has unlocked a_mtx 13908 has unlocked b_mtx 14320 has locked a_mtx 14320 is trying to lock b_mtx 14320 has locked b_mtx 14320 has unlocked a_mtx 14320 has unlocked b_mtx */ std::jthread t1{ foo }; std::jthread t2{ bar }; } * Örnek 2.0, "dead_lock" oluşmasını yine kod bakarak rahat bir şekilde fark edemeyebiliriz; #include #include #include #include class Buffer { public: static constexpr auto size() { return 16u * 1024u * 1024u; } Buffer() : m_data(size()) {} Buffer(const Buffer&) = delete; Buffer & operator=(Buffer&&) = delete; void swap(Buffer & other) { if (this == &other) return; std::lock_guard lock1(mtx); std::lock_guard lock2(other.mtx); std::swap(m_data, other.m_data); } // ... private: std::vector m_data; mutable std::mutex mtx; }; Buffer buf_x; Buffer buf_y; int main() { std::jthread t1([]() { for (long i = 0; i < 1'000'000; ++i) buf_x.swap(buf_y); }); std::jthread t2([]() { for (long i = 0; i < 1'000'000; ++i) buf_y.swap(buf_x); //!!! }); } * Örnek 2.1, #include #include #include #include class Buffer { public: static constexpr auto size() { return 16u * 1024u * 1024u; } Buffer() : m_data(size()) {} Buffer(const Buffer&) = delete; Buffer & operator=(Buffer&&) = delete; void swap(Buffer & other) { if (this == &other) return; std::lock(mtx, other.mtx); std::swap(m_data, other.m_data); mtx.unlock(); other.mtx.unlock(); } // ... private: std::vector m_data; mutable std::mutex mtx; }; Buffer buf_x; Buffer buf_y; int main() { std::jthread t1([]() { for (long i = 0; i < 1'000'000; ++i) buf_x.swap(buf_y); }); std::jthread t2([]() { for (long i = 0; i < 1'000'000; ++i) buf_y.swap(buf_x); //!!! }); } * Örnek 3, Yine hata nesnesinin gönderilmesinden dolayı da bir "dead_lock" oluşabilir. #include #include #include #include using namespace std; using namespace std::literals; std::mutex gmtx; void func(int x) { gmtx.lock(); try { osyncstream{ std::cout } << this_thread::get_id() << " " << "locked the mutex\n"; osyncstream{ std::cout } << "x = " << x << '\n'; if (x % 2 == 0) throw invalid_argument{ "no even number" }; gmtx.unlock(); osyncstream{ std::cout } << this_thread::get_id() << " " << "unocked the mutex\n"; } catch (const exception& ex) { osyncstream{ std::cout } << this_thread::get_id() << " exception caught : " << ex.what() << '\n'; } } int main() { jthread t1{ func, 4 }; jthread t2{ func, 5 }; } * Örnek 4, #include #include #include #include using namespace std; using namespace std::literals; std::mutex gmtx; void func(int x) { std::lock_guard lg{ gmtx }; try { osyncstream{ std::cout } << this_thread::get_id() << " " << "locked the mutex\n"; osyncstream{ std::cout } << "x = " << x << '\n'; if (x % 2 == 0) throw invalid_argument{ "no even number" }; osyncstream{ std::cout } << this_thread::get_id() << " " << "unocked the mutex\n"; } catch (const exception& ex) { osyncstream{ std::cout } << this_thread::get_id() << " exception caught : " << ex.what() << '\n'; } } int main() { jthread t1{ func, 4 }; jthread t2{ func, 5 }; } Yine "dead_lock" oluşmasını engellemek için "std::lock" isimli fonksiyonu kullanabiliriz. >>>> "std::lock" : Değişken sayıda argümanla bu fonksiyona çağrı yapabiliriz. Bu fonksiyon, kendisine gönderilen birden fazla "mutex" nesnelerinin hepsini kilitliyor. Fakat arka planda "dead_lock_avoidance" algoritması da kullanıyor. Dolayısıyla artık "dead_lock" olma ihtimali kalmıyor. * Örnek 1, #include #include #include std::mutex m1, m2; void foo() { std::lock(m1, m2); m1.unlock(); m2.unlock(); std::osyncstream{ std::cout } << "foo()\n"; } void bar() { std::lock(m2, m1); m1.unlock(); m2.unlock(); std::osyncstream{ std::cout } << "bar()\n"; } int main() { std::thread t1{ foo }; std::thread t2{ bar }; t1.join(); t2.join(); } >>> "std::recursive_mutex" : Bu sınıf ise birden fazla "thread" tarafından kilitlenmeye olanak sağlar. Fakat maksimum kaç adet tarafından kilitlenmeye izin verileceği "Implementation Defined". Çalışma zamanına ilişkin maliyet doğurduğundan, kullanımına dikkat etmeliyiz. * Örnek 1.0, #include #include #include std::recursive_mutex rmtx; int gcount = 0; void rfunc(char c, int n) { if (n < 0) return; rmtx.lock(); std::cout << c << ' ' << gcount++ << '\n'; rfunc(c, n - 1); rmtx.unlock(); } int main() { std::thread tx{ rfunc, 'x', 10 }; std::thread ty{ rfunc, 'y', 10 }; tx.join(); ty.join(); } * Örnek 1.1, #include class DatabaseAccess { public: void create_table() { std::lock_guard lg{ db_mutex }; //... } void insert_data() { std::lock_guard lg{ db_mutex }; //.. } void create_table_and_insert_data() { std::lock_guard lg{ db_mutex }; create_table(); //... } private: std::recursive_mutex db_mutex; //... }; int main() { DatabaseAccess dx; dx.create_table_and_insert_data(); //deadlock } * Örnek 1.2, #include #include class Nec { public: void func() { std::lock_guard guard{ mtx }; std::cout << std::this_thread::get_id() << " func cagrildi\n"; foo(); std::cout << std::this_thread::get_id() << " func sona eriyor\n"; } void foo() { std::lock_guard guard{ mtx }; std::cout << std::this_thread::get_id() << " foo cagrildi\n"; std::cout << std::this_thread::get_id() << " foo sona eriyor\n"; } private: mutable std::recursive_mutex mtx; }; void gf() { Nec nec; nec.func(); } int main() { std::jthread t1{ gf }; std::jthread t2{ gf }; } >>> "std::timed_mutex" : Bu sınıf bir müddet boyunca kilitlenmeye olanak tanır. * Örnek 1.0, #include #include #include #include int gcounter{}; std::timed_mutex mtx; void increment(int i) { using namespace std::literals; if (mtx.try_lock_for(50ms)) { ++gcounter; std::this_thread::sleep_for(10ms); std::cout << "thread : " << i << " kritik bolgeye girdi\n"; mtx.unlock(); } else std::cout << "thread " << i << " kritik bolgeye giremedi\n"; } int main() { std::thread t1{ increment, 1 }; std::thread t2{ increment, 2 }; t1.join(); t2.join(); std::cout << "gcounter = " << gcounter << '\n'; } * Örnek 1.1, #include #include #include #include #include #include #include std::timed_mutex mtx; void task(int id) { using namespace std::literals; int cnt{}; for (int i{}; i < 10'000; ++i) { if (mtx.try_lock_for(1us)) { ++cnt; mtx.unlock(); } } std::osyncstream{ std::cout } << id << " " << ++cnt << '\n'; } int main() { std::vector tvec; for (int i{}; i < 10; ++i) { tvec.emplace_back(task, i); } } >>> "std::shared_mutex" : C++17 ile dile eklenmiştir. Amacı şudur; belli işlevi olan "thread" ler tarafından kilitlenebilsin, belli işlevi olanlar bloke edilsin. Örneğin, yazma işlemi yapmak isteyen bütün "thread" ler bloke edilirken, okuma yapmak isteyen "thread" ler ise okuma yapabilsin. "shared_mutex" isimli başlık dosyasındadır. Pekala bu "mutex" nesnesini de "RAII idiom" kullanan şu sınıflar ile birlikte kullanabiliriz; "std::lock_guard", "std::unique_lock", "scoped_lock", "std::shared_lock" Bu sınıflardan "std::lock_guard", "std::unique_lock" ve "scoped_lock" sınıfları Kritik Bölgeye sadece tek bir "thread" in girmesine olanak tanırken, "std::shared_lock" sınıfı birden fazla "thread" in girmesine olanak tanır. * Örnek 1, Bu "mutex" nesnesinin tipik kullanım alanı "read" amacı güden "thread" lerin adedinin "write" amacı güdenlerden fazla olduğu durumlardır. #include #include #include #include #include #include #include int cnt{}; std::shared_mutex mtx; using namespace std::literals; void Writer() { for(int i = 0; i < 3; ++i) { std::scoped_lock sl{ mtx }; // Bu noktaya sadece tek bir "thread" // girebilecek, diğerleri bloke edilecek. ++cnt; } std::this_thread::sleep_for(20ms); } void Reader() { for(int i = 0; i < 6; ++i) { int c; { std::shared_lock sl{ mtx }; // Birden fazla "thread" bu aşamaya gelebilir, // eş zamanlı okuma yapabilir. c = cnt; } std::osyncstream{ std::cout } << "thread-" << std::this_thread::get_id() << ": " << c << '\n'; std::this_thread::sleep_for(20ms); } } int main() { std::vector jvec; jvec.reserve(9); for(auto i{0}; i < 3; ++i) jvec.emplace_back(Writer); for(auto i{0}; i < 6; ++i) jvec.emplace_back(Reader); } Bu örnekte kullanılan "std::scoped_lock", "std::lock_guard" sınıfının daha verimli halidir. Bu sınıf, o sınıfın yaptıklarının neredeyse tamamını yapabilmekte ve ilave bir maliyet de sunmamaktadır. Örneğin, "dead_lock_avoidance" bir mekanizma ile birden fazla "mutex" nesnesini kilitleyebilmektedir. C++17 ile dile eklendi. Tek bir "mutex" nesnesi üzerinde çalışırken bile bu sınıfın kullanılmasının kötü bir etkisi bulunmamaktadır. Yine örnekteki "std::shared_lock" için; "shared_mutex" isimli başlık dosyasındadır. Kopyalamaya izin veren bir sınıftır. Detaylar için bkz. https://github.com/necatiergin/CONCURRENCY/blob/main/mutex/shared_mutex/notlar.md * Örnek 2.0, "std::lock_guard" ve "std::scoped_lock" a nazaran "std::unique_lock" nesnesini de kullanabiliriz. Bu sınıf taşımaya karşı açık ve diğerine göre daha fazla fonksiyonu sunmaktadır. Bir diğer deyişle bu yeni sınıfımız olabilecek en yetenekli "RAII" sınıfıdır. Diğer iki "_lock" sınıfları gibi "scope" sonuna kadar ilgili "mutex" nesnesini kilitli tutmazlar, ömürleri boyunca istenildiği kadar ilgili "mutex" nesnesini ".lock()" ve ".unlock()" edebilirler. Yine "dtor." çağrıları da ilgili "mutex" nesnesini ".unlock()" edecektir. Fakat bu sınıf kopyalamaya karşı kapalıdır. Eğer bu sınıfın hizmetlerinden fayda görmeyeceksek, "std::lock_guard" ve/veya "std::scoped_lock" sınıflarını kullanabiliriz. #include std::mutex mtx; void f1() { // "RAII" deyimini kullanabiliriz. //std::unique_lock lock(mtx); std::unique_lock lock(mtx); } void f2() { // Halihazırda kilitli olan "mutex" nesnesini alıp, // otomatikman ".unlock()" edilmesini sağlatabiliriz. mtx.lock(); std::unique_lock lock(mtx, std::adopt_lock); } void f3() { // Bir "mutex" nesnesini kilitsiz olarak alır, istediğimiz zaman // kilitleyedebiliriz. std::unique_lock ulock(mtx, std::defer_lock); ulock.lock(); } void f4() { // Aldığı "mutex" nesnesinin ".try_lock" fonksiyonunu // çağırtabilir, duruma göre farklı aksiyonlar alabilirz. std::unique_lock ulock(mtx, std::try_to_lock); // Fakat "ctor." fonkisyonunun geri dönüş değeri olmadığından, // ".try_lock()" çağrısının sonucunu aşağıdaki biçimde sorgulamalıyız. if (ulock.owns_lock()) { //... } } * Örnek 2.1, 500ms boyunca kilitlemeye çalışacaktır. #include #include std::timed_mutex mtx; void func() { using namespace std::literals; //std::unique_lock ulock(mtx, 500ms); std::unique_lock ulock(mtx, 500ms); //... } * Örnek 2.2, Kilitsiz bir şekilde ilgili "mutex" nesnesini alıyoruz. Sınıfın "dtor." fonksiyonu kilidi açacaktır. #include #include #include unsigned long gcount{}; std::mutex mtx; void foo() { for (unsigned long i{}; i < 1'000'000ul; ++i) { std::unique_lock lock(mtx, std::defer_lock); lock.lock(); //mutex'i ediniyor. ++gcount; lock.unlock(); // mutex'i serbest bırakıyor. // ... lock.lock(); //mutex'i ediniyor. ++gcount; lock.unlock(); // mutex'i serbest bırakıyor. // ... // mutex edinilmiş durumda ise burada destructor serbest bırakıyor } } int main() { { std::jthread t1(foo); std::jthread t2(foo); } std::cout << gcount << '\n'; //4000000 } >> "std::future" ve "std::promise" : Bu sınıfları kullanılarak iki "thread" arasında haberleşme sağlanabilir. Her iki sınıfın başlık dosyası "future" ismindedir. Buradaki "std::promise" sınıfı değişikliği yapacak, "std::future" ise bu değişikliği görecek sınıftır. Fakat haberleşme sadece bir defalıktır. Bu iki sınıf kullanılarak hata nesnesi de paylaşılabilir, diğer değerler de. * Örnek 1.0, Bu iki sınıfı farklı "thread" ile çalıştırmak zorunda değiliz. Tek bir "thread" kullanarak bir değeri paylaşabiliriz. #include #include int main() { std::promise prom; std::future ftr = prom.get_future(); std::cout << "The value to be sent: " << 991 << '\n'; // The value to be sent: 991 prom.set_value(991); auto val = ftr.get(); std::cout << "The value to be read: " << val << '\n'; // The value to be read: 991 } * Örnek 1.1, Pekala bir hata nesnesini de aynı yöntemle paylaşabiliriz. #include #include #include int main() { std::promise prom; std::future ftr = prom.get_future(); prom.set_exception(std::make_exception_ptr(std::runtime_error{ "The value could not be set!" })); try { auto val = ftr.get(); } catch(const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; // Ex: [The value could not be set!] } } * Örnek 2.0.0, İki "thread" arasında haberleşmeyi bir fonksiyonun geri dönüş değeri ile sağlatmak istiyorsak ilgili fonksiyonun bir parametresi "std::promise" türünden olmalıdır. Bu parametre referans türünden olduğu gibi değer türünden de olabilir. "R-Value Reference" ve değer türünden olması durumunda "std::move()" ile sarmalamalı, "L-Value Reference" olması durumunda ise "std::ref" ile sarmalamalıyız, bu parametreye geçilecek argümanı. #include #include #include #include double square(double dval) { //... return dval * dval; } void calculate_square(std::promise prom, double dval) { //... prom.set_value(square(dval)); } int main() { std::promise prom; std::future ftr = prom.get_future(); std::jthread jt{ calculate_square, std::move(prom), 5.68 }; std::cout << "Calculated Value: " << ftr.get() << '\n'; // Calculated Value: 32.2624 } * Örnek 2.0.1, Pekala fonksiyon kullanmak yerine diğer "callable" nesneleri de pekala kullanabiliriz. #include #include #include #include void calculate_sum_square(std::promise prom, double dval_l, double dval_r) { //... prom.set_value(dval_l*dval_r + dval_l*dval_r); } struct CalculateSumSquare { void operator()(std::promise& prom, double dval_l, double dval_r) { //... prom.set_value(dval_l*dval_r + dval_l*dval_r); } }; int main() { double d1{ 341.143 }, d2{ 134.431 }; // Using Functions std::promise prom_func; std::future ftr_func = prom_func.get_future(); std::jthread jt_func{ calculate_sum_square, std::move(prom_func), d1, d2 }; std::cout << "Calculated Value: " << ftr_func.get() << '\n'; // Calculated Value: 91720.4 // Using Functor std::promise prom_Struct; std::future ftr_Struct = prom_Struct.get_future(); std::jthread jt_Struct{ CalculateSumSquare{}, std::ref(prom_Struct), d1, d2 }; std::cout << "Calculated Value: " << ftr_Struct.get() << '\n'; // Calculated Value: 91720.4 // Using Lambda std::promise prom_Lambda; std::future ftr_Lambda = prom_Lambda.get_future(); std::jthread jt_Lambda{ [](std::promise&& prom, double dval_l, double dval_r){ prom.set_value(dval_l*dval_r + dval_l*dval_r); }, std::move(prom_Lambda), d1, d2 }; std::cout << "Calculated Value: " << ftr_Lambda.get() << '\n'; // Calculated Value: 91720.4 } * Örnek 2.1.0, Yine hata nesnesi de kullanabiliriz. #include #include #include #include void calculate_sum_square(std::promise prom, double dval_l, double dval_r) { //... try { if (dval_l == 0 || dval_r == 0) throw std::invalid_argument("Arguments must be non-zero!"); } catch (...) { prom.set_exception(std::current_exception()); return; } prom.set_value(dval_l*dval_r + dval_l*dval_r); } struct CalculateSumSquare { void operator()(std::promise& prom, double dval_l, double dval_r) { //... try { if (dval_l == 0 || dval_r == 0) throw std::invalid_argument("Arguments must be non-zero!"); } catch (...) { prom.set_exception(std::current_exception()); return; } prom.set_value(dval_l*dval_r + dval_l*dval_r); } }; int main() { double d1{ 341.143 }, d2{ 0 }; // Using Functions std::promise prom_func; std::future ftr_func = prom_func.get_future(); std::jthread jt_func{ calculate_sum_square, std::move(prom_func), d1, d2 }; try { // Calculated Value: Ex: [Arguments must be non-zero!] std::cout << "Calculated Value: " << ftr_func.get() << '\n'; } catch(const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; } // Using Functor std::promise prom_Struct; std::future ftr_Struct = prom_Struct.get_future(); std::jthread jt_Struct{ CalculateSumSquare{}, std::ref(prom_Struct), d1, d2 }; try { // Calculated Value: Ex: [Arguments must be non-zero!] std::cout << "Calculated Value: " << ftr_Struct.get() << '\n'; } catch(const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; } // Using Lambda std::promise prom_Lambda; std::future ftr_Lambda = prom_Lambda.get_future(); std::jthread jt_Lambda{ [](std::promise&& prom, double dval_l, double dval_r){ try { if (dval_l == 0 || dval_r == 0) throw std::invalid_argument("Arguments must be non-zero!"); } catch (...) { prom.set_exception(std::current_exception()); return; } prom.set_value(dval_l*dval_r + dval_l*dval_r); }, std::move(prom_Lambda), d1, d2 }; try { // Calculated Value: Ex: [Arguments must be non-zero!] std::cout << "Calculated Value: " << ftr_Lambda.get() << '\n'; } catch(const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; } } * Örnek 2.1.1, Yine "std::make_exception_ptr" fonksiyonunu kullanarak içerideki "try-catch" bloğundan kurtulabiliriz. #include #include #include #include void calculate_sum_square(std::promise prom, double dval_l, double dval_r) { //... if (dval_l == 0 || dval_r == 0) { prom.set_exception(std::make_exception_ptr(std::invalid_argument("Arguments must be non-zero!"))); return; } prom.set_value(dval_l*dval_r + dval_l*dval_r); } struct CalculateSumSquare { void operator()(std::promise& prom, double dval_l, double dval_r) { //... if (dval_l == 0 || dval_r == 0) { prom.set_exception(std::make_exception_ptr(std::invalid_argument("Arguments must be non-zero!"))); return; } prom.set_value(dval_l*dval_r + dval_l*dval_r); } }; int main() { double d1{ 341.143 }, d2{ 0 }; // Using Functions std::promise prom_func; std::future ftr_func = prom_func.get_future(); std::jthread jt_func{ calculate_sum_square, std::move(prom_func), d1, d2 }; try { // Calculated Value: Ex: [Arguments must be non-zero!] std::cout << "Calculated Value: " << ftr_func.get() << '\n'; } catch(const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; } // Using Functor std::promise prom_Struct; std::future ftr_Struct = prom_Struct.get_future(); std::jthread jt_Struct{ CalculateSumSquare{}, std::ref(prom_Struct), d1, d2 }; try { // Calculated Value: Ex: [Arguments must be non-zero!] std::cout << "Calculated Value: " << ftr_Struct.get() << '\n'; } catch(const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; } // Using Lambda std::promise prom_Lambda; std::future ftr_Lambda = prom_Lambda.get_future(); std::jthread jt_Lambda{ [](std::promise&& prom, double dval_l, double dval_r){ if (dval_l == 0 || dval_r == 0) { prom.set_exception(std::make_exception_ptr(std::invalid_argument("Arguments must be non-zero!"))); return; } prom.set_value(dval_l*dval_r + dval_l*dval_r); }, std::move(prom_Lambda), d1, d2 }; try { // Calculated Value: Ex: [Arguments must be non-zero!] std::cout << "Calculated Value: " << ftr_Lambda.get() << '\n'; } catch(const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; } } * Örnek 3.0, "std::future" sınıfının ".get()" fonksiyonunu iki defa çağırmamız "std::future_error" sınıfı türünden hata nesnesi gönderilmesine neden olur ki bu sınıf da yine "std::exception" sınıfından kalıtım yoluyla elde edilmiştir. #include #include #include int main() { std::promise d_prom; auto ftr = d_prom.get_future(); d_prom.set_value(123.321); std::cout << "Read Value: " << ftr.get() << '\n'; // Read Value: 123.321 try { std::cout << "Read Value: " << ftr.get() << '\n'; } catch (const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; // Read Value: Ex: [std::future_error: No associated state] } } * Örnek 3.1, Benzer şekilde "std::promise" sınıfının ".get_future()" fonksiyonunu iki defa çağırmamız da "std::future_error" sınıfı türünden hata nesnesi gönderilmesine neden olur ki bu sınıf da yine "std::exception" sınıfından kalıtım yoluyla elde edilmiştir. #include #include #include #include int main() { std::promise d_prom; auto ftr1 = d_prom.get_future(); d_prom.set_value(123.321); std::cout << "Read Value: " << ftr1.get() << '\n'; // Read Value: 123.321 try { auto ftr2 = d_prom.get_future(); } catch (const std::exception& ex) { std::cout << "Ex: [" << ex.what() << "]\n"; // Ex: [std::future_error: Future already retrieved] } } * Örnek 4.0, "std::future" sınıfının ".wait" isimli fonksiyonları beklenen değer hesaplanıncaya dek, kendisini çağıran "thread" in bloke olmasını sağlamaktadır. ".wait", ".wait_for" ve ".wait_until" olmak üzere üç adettir. Bu üç fonksiyondan ".wait" fonksiyonu geri dönüş değerine sahip değilken, ".wait_for" ve ".wait_until" fonksiyonları "std::future_status" türünden bir numaralandırma sabiti döndürmektedir. Bu "std::future_status" türü ise üç değere sahiptir; "ready", "timeout" ve "deferred". Fonksiyonumuz bu değerlerden "ready" olanını döndürürse beklenen değerin hesaplandığını, "timeout" döndürürse sürecin zaman aşımına uğradığını, "deferred" döndürürse de hesaplama işleminin "std::future" sınıfının ".get()" fonksiyonu çağrıldığında yapılacağını belirtmektedir. #include #include #include using namespace std::literals; void func(std::promise x) { std::this_thread::sleep_for(3s); x.set_value(25); } int main() { /* # OUTPUT # Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... Some work ... value is : 25 */ std::promise prom; auto ftr = prom.get_future(); std::jthread tx{ func, std::move(prom) }; std::future_status status{}; do { status = ftr.wait_for(200ms); std::cout << "Some work ...\n"; } while (status != std::future_status::ready); std::cout << "value is : " << ftr.get() << '\n'; } * Örnek 4.1, #include #include #include constexpr int x = 50; long long fib(int n) { return n < 3 ? 1 : fib(n - 1) + fib(n - 2); } int main() { /* # OUTPUT # bekle cevap gelecek ............................................................................... ............................................................................... ............................................................................... ............................................................................... ............................................................................... ib(50) is : 12586269025 */ using namespace std::literals; auto ftr = std::async(fib, x); std::cout << "bekle cevap gelecek\n"; while (ftr.wait_for(10ms) == std::future_status::timeout) std::cout << '.' << std::flush; auto result = ftr.get(); std::cout << "fib(" << x << ") is : " << result << '\n'; } * Örnek 5, Eğer kullanılan "std::future" nesnesini tekrar tekrar kullanmak istiyorsak "std::shared_future" sınıfını kullanmalıyız. Bu sınıf kopyalamaya müsait bir sınıftır. #include #include #include #include #include struct SumSquare { void operator()(std::promise&& prom, int a, int b) { prom.set_value(a * a + b * b); } }; void func(std::shared_future sftr) { std::osyncstream os{ std::cout }; os << "threadId (" << std::this_thread::get_id() << "): "; os << "result = " << sftr.get() << '\n'; } int main() { /* # OUTPUT # threadId (139986838124096): result = 106 threadId (139986821338688): result = 106 threadId (139986741491264): result = 106 threadId (139986829731392): result = 106 threadId (139986733098560): result = 106 */ std::promise prom; std::shared_future sftr = prom.get_future(); std::jthread th(SumSquare{}, std::move(prom), 5, 9); std::jthread t1(func, sftr); std::jthread t2(func, sftr); std::jthread t3(func, sftr); std::jthread t4(func, sftr); std::jthread t5(func, sftr); } >> "std::async" : İşte "std::future" ve "std::promise" gibi nesneler ile uğraşmamak için kütüphanedeki "std::async" isimli fonksiyon şablonunu kullanabiliriz. Argüman olarak bizden çağrılacak "callable" nesne ile bu nesneye geçilecek argümanları istemektedir. Fonksiyonun bir diğer "overload" versiyonu için de argüman olarak senkron(geri dönüş değerinin ".get()" fonksiyonu çağrılırsa, kod çalıştırılmaya başlayacak) mu asenkron(ayrı bir "thread" oluşturulacak) mu çalıştırılma bilgisini geçiyoruz. Pekala bu bilgileri "|" operatörü ile birleştirmek suretyile de fonksiyona geçebiliriz. Bu durumda kararı derleyiciye bırakmış oluruz. Geri dönüş değeri olarak da bir "std::future" nesnesi döndürmektedir. Eğer ".get()" fonksiyonu çağrıldığında işlemler hala devam ediyorsa, o "thread" bloke edilecektir. * Örnek 1, #include #include #include using namespace std::literals; int func(int x, int y) { return x+y; } int main() { // Çalıştırma bilgisi derleyiciye bırakıldı. // auto ft = std::async(std::launch::deferred | std::launch::async, func, 13, 31); auto ft = std::async(func, 13, 31); std::cout << "Result: " << ft.get() << '\n'; // Result: 44 // ".get()" fonksiyon çağrısından sonra kod çalışmaya başlayacaktır. // Artık ".get()" çağrısını yapan "thread", kodu çalıştıracaktır. // Dolayısıyla "async" gibi "exception" gönderme ihtimali yoktur. ft = std::async(std::launch::deferred, func, 13, 31); std::cout << "Result: " << ft.get() << '\n'; // Result: 44 // Ayrı bir "thread" oluşturulacak. "exception" fırlatabilir. ft = std::async(std::launch::async, func, 13, 31); std::cout << "Result: " << ft.get() << '\n'; // Result: 44 // Eğer derleyiciye bırakırsak ve o da "deferred" seçerse // fakat biz de "async" varsayımında bulunarak ".get()" // yapmazsak, kod çalıştırılmayacaktır. Dolayısıyla kodun // çalıştırılmasını garanti altına almak istiyorsak "async" // olarak belirtmeliyiz eğer ".get()" çağrısının yapımında // şüphe varsa. } * Örnek 2.0, #include #include #include unsigned long long fibo(unsigned long long n) { return n < 3 ? 1 : fibo(n - 1) + fibo(n - 2); } using namespace std::literals; int main() { auto tp_start = std::chrono::steady_clock::now(); auto result = fibo(42) + fibo(44); auto tp_end = std::chrono::steady_clock::now(); // Duratiton: 4.80863 std::cout << "Duratiton: " << std::chrono::duration(tp_end - tp_start).count() << '\n'; } * Örnek 2.1, #include #include #include unsigned long long fibo(unsigned long long n) { return n < 3 ? 1 : fibo(n - 1) + fibo(n - 2); } using namespace std::literals; int main() { auto tp_start = std::chrono::steady_clock::now(); auto result_1 = std::async(fibo, 42); auto result_2 = fibo(44); auto result = result_1.get() + result_2; auto tp_end = std::chrono::steady_clock::now(); // Duratiton: 4.75029 std::cout << "Duratiton: " << std::chrono::duration(tp_end - tp_start).count() << '\n'; } * Örnek 3.0, "std::launch::async" ve "std::launch::deferred" arasındaki farkı gösteren güzel bir örnek. #include #include #include int main() { using namespace std::chrono_literals; std::chrono::time_point start = std::chrono::steady_clock::now(); auto eager = std::async(std::launch::async, [] {return std::chrono::steady_clock::now(); }); auto lazy = std::async(std::launch::deferred, [] {return std::chrono::steady_clock::now(); }); // "std::launch::async" ile oluşturulan "thread" DEĞİL, // "std::launch::deferred" ile oluşturulan bir saniye bekleyecektir. std::this_thread::sleep_for(1s); using dsec = std::chrono::duration; auto eager_sec = duration_cast(eager.get() - start).count(); auto deferred_sec = duration_cast(lazy.get() - start).count(); // duration for eager in sec : 0.00690772 std::cout << "duration for eager in sec : " << eager_sec << '\n'; // duration for deferred in sec : 1.00023 std::cout << "duration for deferred in sec : " << deferred_sec << '\n'; } * Örnek 3.1, #include #include void func() { std::cout << "func\n"; } int main() { std::cout << "main basladi\n"; std::async(std::launch::async, func); // Geri dönüş değerinin ömrü biteceğinden, // yine "dtor." üzerinden ".get()" çağrısı // otomatik olarak yapılacaktır. Dolayısıyla // bu çağrının aşağıdaki çağrıdan bir farkı // kalmamış olacaktır; // func(); std::cout << "main devam ediyor\n"; { auto ft = std::async(std::launch::async, func); // Arka planda "dtor." fonksiyonu "std::future" // sınıfının ".get()" fonksiyonunu çağıracaktır. } std::cout << "main hala devam ediyor\n"; { auto ft = std::async(std::launch::deferred, func); // "std::future" sınıfının ".get()" fonksiyonunu // çağırmadığımız müddetçe kod çalışmayacaktır. } std::cout << "main bitecek\n"; } * Örnek 4.0, "std::launch::async" ve "std::launch::deferred" birlikte kullanımına ilişkin örnek. #include #include #include #include #include #include using namespace std; using namespace std::chrono; int task(char ch) { mt19937 eng{ random_device{}() }; std::uniform_int_distribution dist{ 20, 500 }; int total_duration{}; for (int i = 0; i < 20; ++i) { auto dur = milliseconds(dist(eng)); this_thread::sleep_for(dur); cout << ch << flush; total_duration += static_cast(dur.count()); } return total_duration; } int foo() { return task('!'); } int bar() { return task('?'); } int main() { /* # OUTPUT # starting foo() in background and bar() in foreground: ??!!!?!!?!!???!??!!!?!?!?!!!!?!?!?!????? result = 8875 */ cout << "starting foo() in background and bar() in foreground:" << '\n'; // "launch policy", derleyiciye bırakıldı. future foo_result = async(foo); // "main-thread" or "another-thread" const auto bar_result = bar(); // "main-thread" const auto result = foo_result.get() + bar_result; cout << "\nresult = " << result << '\n'; } * Örnek 4.1, #include #include #include #include #include #include std::map histogram(const std::string& str) { std::map cmap{}; for (const auto c : str) { ++cmap[c]; } return cmap; } std::string get_sorted(std::string str) { sort(str.begin(), str.end()); erase_if(str, [](const char c) {return isspace(c); }); //str.erase(remove(str.begin(), str.end(), ' '),str.end()); //Remove erase idiom return str; } bool is_vowel(char c) { using namespace std::literals; return "aeiouAEIOU"s.contains(c); } size_t count_vowel(const std::string& str) { return count_if(str.begin(), str.end(), is_vowel); } int main() { /* # OUTPUT # Enter a string : Ahmet Kandemir Pehlivanli Ulya Yuruk A 1 K 1 P 1 U 1 Y 1 a 3 d 1 e 3 h 2 i 3 k 1 l 3 m 2 n 2 r 2 t 1 u 2 v 1 y 1 sorted string : "AKPUYaaadeeehhiiiklllmmnnrrtuuvy" total vowels : 13 */ std::string sline; std::cout << "Enter a string : "; getline(std::cin, sline); auto hist = std::async(histogram, sline); auto sorted_string = std::async(get_sorted, sline); auto vowel_cnt = std::async(count_vowel, sline); for (const auto& [c, count] : hist.get()) { std::cout << c << ' ' << count << '\n'; } std::cout << "sorted string : " << quoted(sorted_string.get()) << '\n' << "total vowels : " << vowel_cnt.get() << '\n'; } * Örnek 5, Hata gönderilmesi durumunda yine işi kendi halletmektedir. #include #include #include #include #include double square_root(double x) { if (x < 0.0) { throw std::domain_error("negative value to square root function"); } return std::sqrt(x); } int main() { /* # OUTPUT # 1.51658 1.84391 1.04881 error */ std::vector dvec{ 2.3, 3.4, 1.1, -1.9 }; std::vector> results; for (auto x : dvec) { results.push_back(std::async(square_root, x)); } for (auto& x : results) { try { std::cout << x.get() << '\n'; } catch (const std::domain_error&) { std::cout << "error\n"; } } } >> "std::packaged_task" : Tıpkı "std::async" fonksiyonu gibi yüksek seviyeli bir başka aracımız daha vardır; "std::packaged_task" sınıfı. Bu sınıf türünden bir nesne, çoğunlukla asenkron bir çağrı yapmak için, bir "callable" nesneyi sarmalamaktadır. Bizler "std::async" ile doğrudan bir fonksiyonu çağırırken, "std::packaged_task" ise bir "Functor" görevi görmektedir. Geri dönüş değeri de yine "std::future" nesnesidir. "std::packaged_task" bünyesinde bir ".operator()()" barındırır, bu da sarmaladığı "callable" nesneyi çağırmaktadır. Yine bu sınıf da "move only" bir sınıftır. * Örnek 1, // synchronous usage of std::package_task #include #include #include int sum(int a, int b) { return std::pow(a, b) + std::pow(b, a); } int main() { { // Using Lambda std::packaged_task ptask( [](double a, double b){ return std::pow(a, b) + std::pow(b, a); } // returns "double" ); //std::future result = ptask.get_future(); auto result = ptask.get_future(); ptask(1.2, 3.4); // İşte şimdi kodlar çalışmaya başladı. std::cout << "result : " << result.get() << '\n'; // result : 6.20158 } std::cout << '\n'; { // Using a Function std::packaged_task ptask{ sum }; auto result = ptask.get_future(); ptask(2, 4); // İşte şimdi kodlar çalışmaya başladı. std::cout << "result : " << result.get() << '\n'; // result : 32 } } * Örnek 2, Yine "thread" lerle birlikte de kullanabiliriz. #include #include #include int fib(int n) { return (n < 3) ? 1 : fib(n - 1) + fib(n - 2); } int main() { std::packaged_task fib_task(&fib); auto result = fib_task.get_future(); std::thread th(std::move(fib_task), 40); std::cout << "task'in bitmesi bekleniyor...\n"; std::cout << result.get() << '\n'; std::cout << "task tamamlandi\n"; th.join(); } * Örnek 3, "Default Ctor." edilmiş bir "std::packaged_task" nesnesini argümanlarla kullanmaya kalkarsak hata nesnesi gönderilir. #include #include int main() { using ftype = int(int, int); std::packaged_task ptask; try { ptask(3, 6); } //catch (const std::future_error& ex) catch (const std::exception& ex) { // exception caught: std::future_error: No associated state std::cout << "exception caught: " << ex.what() << '\n'; } } * Örnek 4, Kopyalamaya karşı kapalı olmasına rağmen taşımaya müsaittir. #include #include #include using ftype = int(int, int); int main() { std::packaged_task pt_x; std::packaged_task pt_y([](int x, int y) {return x * x + y * y; }); // pt_x = pt_y; Syntax error pt_x = std::move(pt_y); std::future ftr = pt_x.get_future(); std::thread{ std::move(pt_x),4,6 }.detach(); std::cout << ftr.get(); } * Örnek 5, Farklı farklı "callable" nesnelerini sarmalayabiliriz. #include #include #include #include int sum_square(int x, int y) { return x * x + y * y; } void task_bind() { std::packaged_task task(std::bind(sum_square, 2, 11)); //std::future result = task.get_future(); auto result = task.get_future(); task(); std::cout << "task_bind\t\t: " << result.get() << '\n'; } void task_thread() { std::packaged_task task(sum_square); //std::future result = task.get_future(); auto result = task.get_future(); std::thread task_td(std::move(task), 2, 10); task_td.join(); std::cout << "task_thread\t\t: " << result.get() << '\n'; } void task_lambda() { std::packaged_task task( [](int a, int b) { return a * a + b * b; } ); //std::future result = task.get_future(); auto result = task.get_future(); task(2, 9); std::cout << "task_lambda\t\t: " << result.get() << '\n'; } int main() { /* # OUTPUT # task_bind : 125 task_thread : 104 task_lambda : 85 */ task_bind(); task_thread(); task_lambda(); } * Örnek 6, "std::future" sınıfındaki ".reset()" fonksiyonunu çağırarak da aynı "std::future" nesnesini tekrar tekrar kullanabiliriz. Bununla birlikte "std::packaged_task" nesnesini de bir fonksiyona göndererek, işlemleri o fonksiyon tarafından yapılmasını sağlatabiliriz. #include #include #include #include #include #include using ipair = std::pair; void func(std::packaged_task& ptask, const std::vector& pairs) { std::osyncstream os{ std::cout }; for (const auto [x, y] : pairs) { auto ftr = ptask.get_future(); ptask(x, y); os << x << " * " << x << " + " << y << " * " << y << " = " << ftr.get() << '\n'; ptask.reset(); // Bu çağrı sayesinde döngünün diğer turunda da kullanabileceğiz. } } int main() { /* # OUTPUT # 1 * 1 + 3 * 3 = 10 3 * 3 + 5 * 5 = 34 7 * 7 + 9 * 9 = 130 11 * 11 + 13 * 13 = 290 15 * 15 + 17 * 17 = 514 */ std::vector pvec; pvec.emplace_back(1, 3); pvec.emplace_back(3, 5); pvec.emplace_back(7, 9); pvec.emplace_back(11, 13); pvec.emplace_back(15, 17); std::packaged_task pt{ [](int x, int y) { return x * x + y * y; } }; std::jthread t(func, std::ref(pt), pvec); } * Örnek 7, Pekala "std::packaged_task" nesnelerini de bir "container" içerisinde tutabiliriz. #include #include #include #include #include class Summer { public: auto operator()(int from, int to) const { int sum{}; for (int i = from; i < to; ++i) sum += i; return sum; } }; int main() { using ftype = int(int, int); // Functor nesnelerimiz: Summer sum1, sum2, sum3, sum4; // Her bir Functor nesnemizi sarmalayan "std::packaged_task" nesnemiz: std::packaged_task pt1(sum1), pt2(sum2), pt3(sum3), pt4(sum4); // Her bir "std::packaged_task" için "std::future" nesnemiz: auto ft1(pt1.get_future()), ft2(pt2.get_future()), ft3(pt3.get_future()), ft4(pt4.get_future()); // "std::packaged_task" nesnelerinin tutulacağı kap: std::deque> td; td.push_back(std::move(pt1)); td.push_back(std::move(pt2)); td.push_back(std::move(pt3)); td.push_back(std::move(pt4)); int begin{ 1 }; int increment{ 5 }; int end = begin + increment; while (!td.empty()) { // Kabın en başındaki "std::packaged_task" nesnesini temin ediyoruz; auto task = std::move(td.front()); // Daha sonra onu kaptan çıkartıyor, kabı boşaltıyoruz. td.pop_front(); // Temin ettiğimiz "std::packaged_task" nesnesine ilişkin kodları koşturuyoruz; // Döngünün ilk turunda [1-6) arasındakiler toplanacaktır: begin: 1, end: 6 std::thread t(std::move(task), begin, end); // Daha sonra; begin = end; // begin: 6, end: 6 end += increment; // begin: 6, end: 11 // Döngünün ikinci turunda 6-11 arasındakiler toplanacaktır. // Döngünün üçüncü turunda 11-16 arasındakiler toplanacaktır. // Döngünün son turunda 16-21 arasındakiler toplanacaktır. // Böylelikle [1-21) arasındakileri toplamış oluyoruz. // "thread" nesnemizi "joinable" duruma getiriyoruz. t.detach(); } auto sum = ft1.get() + ft2.get() + ft3.get() + ft4.get(); std::cout << "result = " << sum << '\n'; } >> "Conditional Variables" : Birden fazla "thread" çalıştığı zaman "thread" lerden birisinin işini yapmaya devam etmesi için diğer "thread" lerin bir değer üretmesi durumunda kullanılan bir kavramdır. Öyle bir kavramdır ki o değerin üretildiği bilgisini dinler ve bilgi geldiğinde de bu değeri bekleyen diğer "thread" lere bilgi geçer. "condition_variable" isimli başlık dosyasında bildirilmiştir bu nesnemiz. * Örnek 1.0, "pooling" yöntemi ile "Conditional Variables" kullanmadan iki "thread" arasında haberleşme sağlanabilir. #include #include #include #include int shared_variable{}; std::mutex mtx; using namespace std::literals; void producer() { std::lock_guard lg{ mtx }; // ... production code std::this_thread::sleep_for(1000ms); shared_variable = 999; } void consumer() { std::unique_lock ulock{ mtx }; // Burada "std::unique_lock" kullanmak zorundayız // çünkü diğer "lock" sınıflarını ".lock()" ve // ".unlock()" fonksiyonlarını çağırmamıza izin // VERMEMEKTEDİR. while (shared_variable == 0) { // Programın akışı bu kısma girmişse, // beklenen değer üretilmemiş demektir. // Dolayısıyla kilidi açıyor ve varsa // yapacak başka işler yapıyoruz. ulock.unlock(); std::this_thread::yield(); std::this_thread::sleep_for(1000ms); // Daha sonra tekrar kilitliyoruz. // Böylelikle "while" döngüsünden çıkılırsa, // ilgili "mutex" nesnesi kilitli olacak ve // devamında yapacağımız işlemler senkronize // edilmiş olacaktır. ulock.lock(); } // Programın akışı buraya girmişse beklenen değer // üretilmiş demektir. İlgili "mutex" nesnesinin // kilidi hala bizde olmalı ki senkronize bir şekilde // işlemleri gerçekleştirelim. std::cout << "the value is : " << shared_variable << '\n'; } int main() { // the value is : 999 std::jthread t1{ producer }; std::jthread t2{ consumer }; } * Örnek 1.1, "pooling" yöntemi ile "Conditional Variables" kullanmadan birden fazla "thread" arasında haberleşme sağlanabilir. #include #include #include #include #include // #include using namespace std; using namespace literals; // "thread" ler tarafından ortak kullanılacak nesnemiz. string shared_data{}; // Anlık veri alımı gerçekleştiğinde kullanılacak: bool update_flag{ false }; // Veri alımı tamamlandığında kullanılacak: bool completed_flag{ false }; mutex data_mutex; mutex completed_mutex; void receive_data() { for (int i = 0; i < 4; ++i) { cout << "<<< receive_data_thread is waiting for data... >>>\n"; // Veri alındığını belirtmesi için ilgili "thread" uyutuldu. this_thread::sleep_for(1s); // Daha sonra alınan verinin ortak değişkene aktarılması süreci; scoped_lock shared_data_lock(data_mutex); shared_data += std::string{"[chunk_"} + std::to_string(i) + std::string{"]"}; cout << shared_data << '\n'; update_flag = true; // Anlık veri alımının gerçekleştiğini belirttik. } cout << "<<< receiving_data_operation has ended >>>\n"; scoped_lock completed_lock(completed_mutex); completed_flag = true; // Veri alımının tamamlandığını belirttik. } void display_progress() { while (true) { cout << "<<< display_progress_thread waiting for data... >>>\n"; unique_lock shared_data_lock(data_mutex); // "unique_lock" is must here. while (!update_flag) { // Anlık veri alımı boyunca bizler uyku modunda olacağız. shared_data_lock.unlock(); this_thread::sleep_for(20ms); shared_data_lock.lock(); } // Akışın buraya gelmesi demek anlık veri alımının tamamlandığının // göstergesidir. Dolayısıyla aşağıdaki atamaları yapıyoruz ki tekrardan // anlık veri akışı başlasın. Aynı zamanda ilgili "mutex" nesnesinin // kilidi hala bizdedir. update_flag = false; cout << "<<< received [" << shared_data.length() << "] bytes so far >>>\n"; shared_data_lock.unlock(); // Veri alımının sonlanıp sonlanmadığının kontrolünü yapıyoruz: lock_guard completed_lock(completed_mutex); if (completed_flag) { cout << "<<< display_progress_thread has ended >>>\n"; break; } } } void process_data() { cout << "<<< process_data_thread waiting for data... >>>\n"; unique_lock completed_lock(completed_mutex); while (!completed_flag) { // Veri alımı tamamlanmadıysa, bizler uyku modunda olacağız. completed_lock.unlock(); this_thread::sleep_for(10ms); completed_lock.lock(); } // Akış buraya gelmişse veri alımının tamamlandığının // göstergesidir. completed_lock.unlock(); lock_guard shared_data_lock(data_mutex); cout << "<<< display_progress_thread ended. shared_data: [" << shared_data << "] >>>\n"; // ... } int main() { /* # OUTPUT # <<< receive_data_thread is waiting for data... >>> <<< process_data_thread waiting for data... >>> <<< display_progress_thread waiting for data... >>> [chunk_0] <<< receive_data_thread is waiting for data... >>> <<< received [9] bytes so far >>> <<< display_progress_thread waiting for data... >>> [chunk_0][chunk_1] <<< receive_data_thread is waiting for data... >>> <<< received [18] bytes so far >>> <<< display_progress_thread waiting for data... >>> [chunk_0][chunk_1][chunk_2] <<< receive_data_thread is waiting for data... >>> <<< received [27] bytes so far >>> <<< display_progress_thread waiting for data... >>> [chunk_0][chunk_1][chunk_2][chunk_3] <<< receiving_data_operation has ended >>> <<< display_progress_thread ended. shared_data: [[chunk_0][chunk_1][chunk_2][chunk_3]] >>> <<< received [36] bytes so far >>> <<< display_progress_thread has ended >>> */ jthread receiver(receive_data); // Veri alımından sorumlu jthread progress(display_progress); // Alınan verinin toplam veriye oranını ekrana yazdıcarak. jthread processor(process_data); // Veri alımı tamamlandığında, veri üzerinde işlem yapacak. } * Örnek 2.0.0, "Conditional Variable" kullanarak iki "thread" arasında haberleşme sağlanabilir. #include #include #include #include int gdata; bool ready_flag{}; std::mutex mtx; std::condition_variable cv; void producer() { { std::lock_guard lg{ mtx }; gdata = 2345; ready_flag = true; } // Aşağıdaki çağrı ile birlikte değer gelmesini bekleyen "thread" // lerden bir tanesine bilgi gönderiyoruz. Yani bir nevi yayın // yapıyoruz. cv.notify_one(); } void consumer() { { std::unique_lock lock{ mtx }; // ".wait()" fonksiyonu şöyle çalışmaktadır; // i. Kilidi ediniyor ve "predicate" olarak gönderilen // argümana bir çağrı yapıyor. Eğer "true" değer // elde ederse, programın akışı oradan yoluna devam // ediyor. Eğer "false" ise, kilidi bırakıyor ve // tekrar uyumaya geçiyor. Ta ki "condition_variable" // tarafından bir bildirim gelene ya da "fake notifying" // oluşana kadar. Bu ikisi oluşursa uyanacak ve tekrar // "predicate" çağrılacak. İş bu ikinci çağrıdan "false" // elde derse, "fake notifying" deyip tekrar uyumaya; // "true" elde ederse, programın akışı oradan yoluna // devam edecek. cv.wait(lock, [] {return ready_flag; }); // Bu noktada şu iki şeyden eminiz; // i. Beklenen veri hazırdır. // ii. Kilidin hayla bizde olduğundan. } std::cout << "gdata : " << gdata; } int main() { // gdata : 2345 std::jthread t1(producer); std::jthread t2(consumer); } * Örnek 2.0.1, #include #include #include #include #include #include #include class IStack { public: IStack() {}; IStack(const IStack&) = delete; IStack& operator=(const IStack&) = delete; int pop() { std::unique_lock lock(mtx); m_cv.wait(lock, [this]() {return !m_vec.empty(); }); int val = m_vec.back(); m_vec.pop_back(); return val; } void push(int x) { std::scoped_lock lock(mtx); m_vec.push_back(x); m_cv.notify_one(); } private: std::vector m_vec; mutable std::mutex mtx; mutable std::condition_variable m_cv; }; constexpr int n{ 1'000 }; IStack gstack; void producer(std::ofstream& ofs) { for (int i = 0; i < n; ++i) { gstack.push(2 * i + 1); std::osyncstream{ ofs} << 2 * i + 1 << " pushed\n"; } } void consumer(std::ofstream& ofs) { for (int i = 0; i < n; ++i) { std::osyncstream{ ofs } << gstack.pop() << " popped\n"; } } int main() { /* # log.txt # 1 pushed 3 pushed 5 pushed 7 pushed 9 pushed 1 popped //... 49 popped 51 popped 51 pushed //... 553 pushed 555 popped 555 pushed //... 609 popped 611 pushed 611 popped //... 1227 pushed 1227 popped 1229 pushed 1229 popped 1231 popped 1231 pushed //... 1347 pushed 1341 popped 1349 popped //... 1373 pushed 1373 popped 1375 pushed 1375 popped //... 1991 popped 1991 pushed 1993 popped 1993 pushed 1995 popped 1995 pushed 1997 popped 1997 pushed 1999 pushed 1999 popped */ std::ofstream ofs{ "log.txt" }; if (!ofs) { std::cerr << "cannot create log.txt\n"; exit(EXIT_FAILURE); } std::jthread th1(producer, std::ref(ofs)); std::jthread th2(consumer, std::ref(ofs)); } * Örnek 2.0.2, #include #include #include #include #include #include using namespace std; using namespace chrono; string shared_data; mutex mtx; condition_variable cv; bool cflag{ false }; void reader() { cout << "READER thread is locking the mutex\n"; unique_lock ulock(mtx); cout << "READER thread has locked the mutex\n"; cout << "READER thread is going to sleep...\n"; cv.wait(ulock, [] {return cflag; }); cout << "READER thread wakes up\n"; cout << quoted(shared_data) << '\n'; cout << "Reader thread unlocks the mutex\n"; } void writer() { { cout << "WRITER thread is locking the mutex\n"; scoped_lock slock(mtx); cout << "WRITER thread has locked the mutex\n"; this_thread::sleep_for(2s); // Modify the string cout << "WRITER thread modifying data...\n"; shared_data = "shared data is ready now"; cflag = true; cout << "WRITER thread unlocks the mutex\n"; } cout << "WRITER thread is sending a notification\n"; cv.notify_one(); } int main() { /* # OUTPUT # shared_data value is : "not ready yet." WRITER thread is locking the mutex WRITER thread has locked the mutex READER thread is locking the mutex WRITER thread modifying data... WRITER thread unlocks the mutex WRITER thread is sending a notification READER thread has locked the mutex READER thread is going to sleep... READER thread wakes up "shared data is ready now" Reader thread unlocks the mutex */ shared_data = "not ready yet."; cout << "shared_data value is : " << quoted(shared_data) << '\n'; jthread twriter(writer); this_thread::sleep_for(500ms); jthread treader(reader); } * Örnek 2.1, "Conditional Variable" birden fazla "thread" arasında haberleşme sağlanabilir. #include #include #include #include #include #include std::condition_variable cv; std::mutex mtx; int gval = 0; using namespace std::literals; void waits(const std::string& id) { std::unique_lock lk(mtx); std::osyncstream{ std::cout } << id << " is waiting\n"; cv.wait(lk, [] {return gval == 1; }); std::osyncstream{ std::cout } << id << " finished waiting. gval == 1\n"; } void signals(const std::string& id) { std::this_thread::sleep_for(1s); std::osyncstream{ std::cout } << id << " will notify\n"; std::this_thread::sleep_for(std::chrono::seconds(1s)); // İlgili "condition_variable" nesnesini kullanan bütün // "thread" ler uyandırılacaktır. Fakat beklenen değer // henüz hazır olmadığından tekrar uyku moduna geçeceklerdir. cv.notify_all(); { std::lock_guard lk(mtx); gval = 1; std::osyncstream{ std::cout } << id << " is notifying\n"; } // İlgili "condition_variable" nesnesini kullanan bütün // "thread" ler uyandırılacaktır. cv.notify_all(); } int main() { /* # OUTPUT # t2 is waiting t1 is waiting t3 is waiting t5 will notify t4 will notify t5 is notifying t4 is notifying t3 finished waiting. gval == 1 t2 finished waiting. gval == 1 t1 finished waiting. gval == 1 */ std::jthread t1(waits, "t1"), t2(waits, "t2"), t3(waits, "t3"), t4(signals, "t4"), t5(signals, "t5"); } >> "atomic" : Anımsanacağı üzere paylaşımlı kullanılan bir nesneye en az bir "thread" yazma amacı ile, diğer "thread" ler ise okuma amacıyla erişirse "data racing" oluşur ki bu da bir "Tanımsız Davranış" olur. Dolayısıyla böyle nesneleri ya senkronize etmeli ya hiç paylaşmamalı ya da sadece salt okuma amacı ile erişmemiz gerekir. Çünkü yazma işlemi üç kademeli bir işlemdir; -> İlgili değer bellekten kendi "register" bölgemize "cache" edilir. -> Kendi İlgili değer "modify" edilir. -> Yeni değer tekrar belleğe aktarılır, "publish" edilir. İşte bu üç kademeli işlem sırasında başka "thread" lerin araya girmemesi, "interleave" gerçekleştirmemesi, gerekmektedir. Dolayısıyla bir işlemin "atomic" olması demek o işlem gerçekleşirken araya başkalarının girmeyeceği GARANTİ ALTINDADIR demek. Başlık dosyası "atomic" biçimindedir. "Atomic" türler başlıca şunlardır; "std::atomic_flag" ve "std::atomic" Bunlardan, >>> "std::atomic_flag" : "primitive" seviyesinde olan bir türdür ve "lock-free" olma garantisini altındadır, yani senkronizasyon için arka planda "mutex" nesnesinin kullanılmadığı KESİNDİR. C++11 ile dile eklenmiştir. C++20 ile arayüzü genişletilmiştir. "true" veya "false" değerlerinden birisini tutabilir. Bünyesindeki ".clear()" ile değerini "false" a; ".test_and_set()" ile değerini "true" ya çekeriz ve eski durumunu geri döndürür. Bu türü hayata getirirken "ATOMIC_FLAG_INIT" makrosunu da kullanabiliriz, "Default Ctor." da çağırabiliriz. C++20'ye kadar "Default Ctor." çağrısı sonucu nesnemiz çöp değer ile hayata gelirken, C++20 ile birlikte "false" ile hayata gelir. Diğer yandan "ATOMIC_FLAG_INIT" kullanımı sonucunda "false" değeri ile hayata gelir. "lock_free" garantisi VERMEKTEDİR. Yani herhangi bir senkronizasyon mekanizması kullanılmadan, "mutex" nesneleri gibi, "concurrency" uygulamalarında kullanılabilirler. Bir diğer deyişle "lock_free" olması demek donanım seviyesinde bir senkronizasyon, olmaması demek arka planda "mutex" benzeri nesnelerin kullanıldığı anlamındadır. * Örnek 1, #include #include int main() { using namespace std; cout << boolalpha; // atomic_flag flag_x{ false }; //gecersiz // atomic_flag flag_y{ true }; //gecersiz atomic_flag flag_z; // C++ 17'de belirsiz deger, C++20'de false değeri cout << "flag_z = " << flag_z.test() << '\n'; // C++20: flag_z = false atomic_flag flag = ATOMIC_FLAG_INIT; // gecerli cout << "flag = " << flag.test() << '\n'; // C++20: flag = false auto b = flag.test_and_set(); cout << "b = " << b << '\n'; // b = false cout << "flag = " << flag.test() << '\n'; // flag = true flag.clear(); cout << "flag = " << flag.test() << '\n'; // flag = false b = flag.test_and_set(); cout << "b = " << b << '\n'; // b = false cout << "flag = " << flag.test() << '\n'; // flag = true } * Örnek 2, #include #include #include #include class SpinLockMutex { public: SpinLockMutex() { m_af.clear(); } void lock() { // İlk baştaki durum "false" olduğundan, bu "lock()" // ilk çağrıldığında "true" olacak ve "while" döngüsünden // çıkacak. Artık bu aşamada yapılan diğer "lock()" // çağrılarında değerimiz zaten "true" olduğundan, ".test_and_set()" // çağrısının geri döndürdüğü değer yine "true" olacak. Dolayısıyla // diğerleri beklemede kalacak. while (m_af.test_and_set()) ; //NULL STATEMENT } void unlock() { // Bu çağrıdan dolayı da değerimiz "false" olacaktır. Artık ".lock()" // çağrısında bekleyen diğer "thread" ler kilitleyebilir. m_af.clear(); } private: std::atomic_flag m_af; // true if thread holds mutex }; SpinLockMutex sm; unsigned long long gcount{}; void worker() { for (unsigned long long i = 0; i < 100'000ULL; ++i) { std::scoped_lock lock(sm); ++gcount; } } int main() { { std::jthread th1(worker); std::jthread th2(worker); } std::cout << "gcount = " << gcount << '\n'; } >>> "std::atomic" : Bir sınıf şablonudur. Bu sınıfın, -> "bool" ve "User Defined Type" açılımı "Primary Template" dir. Sadece "User Defined Type" türlerin bazı şartları sunması gerekmektedir. -> Tam sayı türlerinin her biri ayrı birer "Explicit/Full Specialization". -> Gösterici türleri için "Partial Specialization". Aşağıda bu konuya ilişkin örnekler verilmiştir: * Örnek 1.0, Bu sınıf türü ile yapılan bütün işlemlerin "atomic" işlemler olduğu garanti DEĞİLDİR. #include #include #include int main() { using namespace std; int x = 0; auto f = [&x]{ for (int i = 0; i < 10'000; ++i) { ++x; } }; { std::jthread j1{ f }; std::jthread j2{ f }; std::jthread j3{ f }; std::jthread j4{ f }; } std::cout << x << '\n'; // 35213 } * Örnek 1.1, #include #include #include int main() { using namespace std; atomic x = 0; auto f = [&x]{ for (int i = 0; i < 10'000; ++i) { x++; // Atomik bir işlemdir. // ++x; // Atomik bir işlemdir. // x += 1; // Atomik bir işlemdir. // Atomik bir işlem DEĞİLDİR. // Burada ilk önce "int" türüne dönüşüm, // sonrasında toplama, // sonunda da atama işlemi gerçekleşecektir. // x = x + 1; } }; { std::jthread j1{ f }; std::jthread j2{ f }; std::jthread j3{ f }; std::jthread j4{ f }; } std::cout << x << '\n'; // 40000 } * Örnek 1.2, #include #include #include #include using namespace std; atomic x = 5; void bar() { for (int i = 0; i < 10'000; ++i) { // The operation is "non-atomic". x.exchange(x + 1); // x : 25427 } } int main() { { std::vector vec; for (int i = 0; i < 10; ++i) { vec.emplace_back(bar); } } std::cout << "x : " << x << '\n'; } * Örnek 2, Yine bu sınıf "non-assignable" ve "non-copyable" dır. Sadece "primitive" türlere ve/veya "primitive" türlerden atama/kopyalama yapılabilir. #include #include int main() { using namespace std; cout << boolalpha; atomic flag_1; atomic flag_2; //indetermined value before before C++20. false value since C++20 cout << flag_1 << '\n'; // false cout << flag_2 << '\n'; // false //atomic flag_3{flag_2}; //invalid //flag_1 = flag_2; //invalid flag_1 = true; // ".store()" fonksiyonunun aldığı argüman, yeni değeri olacak. // flag_1.store(false); cout << "flag_1 = " << flag_1 << '\n'; // operator T: flag_1 = true flag_2 = false; // flag_2.store(true); cout << "flag_2 = " << flag_2 << '\n'; // operator T: flag_2 = false // ".exchange()" fonksiyonunun geri dönüş değeri eski değeri, // aldığı argüman ise yeni değeri olacak. auto b = flag_1.exchange(true); cout << "b = " << b << '\n'; // b = true cout << "flag_1 = " << flag_1 << '\n'; // operator T: flag_1 = true // ".load()" fonksiyonu ise değeri "get" etmektedir. cout << "flag_1.load() = " << flag_1.load() << '\n'; // flag_1.load() = true cout << "flag_2.load() = " << flag_2.load() << '\n'; // flag_2.load() = false } * Örnek 3, Şimdi de kapsayıcı bir örnek yapalım; #include #include #include class AtomicCounter { public: AtomicCounter() : m_c(0) {} AtomicCounter(int val) : m_c{ val } {} // Operatör fonksiyonları artık referans döndürmemektedir, // "atomic" türlerde. int operator++() { return ++m_c; } int operator++(int) { return m_c++; } int operator--() { return --m_c; } int operator--(int) { return m_c--; } int get() const { return m_c.load(); } operator int()const { return m_c.load(); } private: std::atomic m_c; }; AtomicCounter cnt; void foo() { for (int i = 0; i < 1'000'000; ++i) { ++cnt; } } int main() { { std::jthread ta[10]; for (auto& th : ta) th = std::jthread{ foo }; } std::cout << "cnt = " << cnt.get() << '\n'; // cnt = 10000000 std::cout << "cnt = " << cnt << '\n'; // cnt = 10000000 } * Örnek 4.0, Sınıfın ".lock_free()" fonksiyonu, "lock_free" sorgulamasını çalışma zamanında yapar; Sınıfın statik veri elemanı "is_always_lock_free" ise derleme zamanında "lock_free" sorgulaması yapar. #include #include #include struct Neco {}; int main() { /* # OUTPUT # a1.lock_free() : true a2.lock_free() : true a3.lock_free() : false a4.lock_free() : true a5.lock_free() : true */ using namespace std; boolalpha(cout); atomic a1; cout << "a1.lock_free() : " << a1.is_lock_free() << '\n'; atomic a2; cout << "a2.lock_free() : " << a2.is_lock_free() << '\n'; atomic> a3; //C++20 cout << "a3.lock_free() : " << a3.is_lock_free() << '\n'; atomic a4; cout << "a4.lock_free() : " << a4.is_lock_free() << '\n'; atomic a5; cout << "a5.lock_free() : " << a5.is_lock_free() << '\n'; } * Örnek 4.1, #include #include #include struct Nec_4 { short x, y, z, t; }; struct Nec_8 { short x, y, a, b, c, d, e, f; }; int main() { using namespace std; constexpr auto b1 = atomic::is_always_lock_free; // true constexpr auto b2 = atomic::is_always_lock_free; // true constexpr auto b3 = atomic::is_always_lock_free; // true constexpr auto b4 = atomic::is_always_lock_free; // false } * Örnek 5, İsminde "fatch" geçen fonksiyonlar hem "atomic" hem de bu fonksiyonların parametrelerine "Memory Order" argümanı geçebiliriz. Bu fonksiyonlar "+=" gibi operatör fonksiyonlarının "Memory Order" argümanı alan versiyonlarıdır. Fakat iş bu "fatch" fonksiyonlarının geri dönüş değeri, eski değerdir. #include #include int main() { using namespace std; atomic x = 5; atomic y = 5; auto r1 = x += 2; auto r2 = y.fetch_add(2); cout << "x = " << x << '\n'; //7 cout << "y = " << y << '\n'; //7 cout << "r1 = " << r1 << '\n'; //7 cout << "r2 = " << r2 << '\n'; //5 } * Örnek 6.0, "Compare-Exchange-Swap" fonksiyonunun işlevi "multi-thread" çalışma esnasında yapılan bazı işlemler arasında, bizim "atomic" değişkenimizin değerinin değişmesi söz konusudur ve programımızın lojik yapısı gereği bu değişikliği yakalamamız gerekmektedir. Yani herhangi iki "atomic" işlem arasında "atomic" değerimizin değeri değiştiğinde, bu değişikliği yakalamamız gerekiyor. İşte tipik olarak döngüsel bir yapıda bu fonksiyonu çağırarak işimizi görebiliriz. "Compare-Exchange-Swap" fonksiyonu, temelde aşağıdaki gibi işlev görür: // from Fedor Picus CppCon talk //CAS compare-exchange-swap conceptually bool compare_exchange_swap_strong(T &expected, T desired) { // Assuma that; // expected is 5 // desired is 95 //Lock is not a real mutex but // some form of exclusive access implemented in hardware Lock lock; //get exclusive access //"value" is the value of atomic "this->value" T temp = value; // Assume that it is "5". if (temp != expected) { expected = temp; return false; } value = desired; return true; } * Örnek 6.1.0, #include #include #include int main() { /* # OUTPUT # b = true x = 95 expected = 5 desired = 95 */ std::atomic x; x.store(5); // "this->value" is "5". int expected{ 5 }; int desired{ 95 }; auto b = x.compare_exchange_strong(expected, desired); // ^ ^ ^ ^ // | | | | New value of "this->value". // | | | Old value of "this->value" must be equal to the this value // | | | so that its value will be equal to the "desired". // | | Old value of "this->value" // | The result std::cout << std::boolalpha; std::cout << "b = " << b << '\n'; std::cout << "x = " << x << '\n'; std::cout << "expected = " << expected << '\n'; std::cout << "desired = " << desired << '\n'; } * Örnek 6.1.1, #include #include #include int main() { /* # OUTPUT # b = false x = 5 expected = 5 desired = 95 */ std::atomic x; x.store(5); // "this->value" is "5". int expected{ 7 }; int desired{ 95 }; auto b = x.compare_exchange_strong(expected, desired); std::cout << std::boolalpha; std::cout << "b = " << b << '\n'; std::cout << "x = " << x << '\n'; std::cout << "expected = " << expected << '\n'; std::cout << "desired = " << desired << '\n'; } * Örnek 6.2.0, #include #include template void atomic_inc(std::atomic&x) { T val{ x }; // "val" in değeri "50" olsun. // <---- // Bu aşamada başka bir "thread" ile "interleave" oluşmadığı // varsayılırsa; // "expired" isimli parametrenin değeri "50" olduğundan, // ".compare_exchange_weak()" fonksiyonu "true" döndürecek. // "val" değişkeninin değeri artık "51" olacak. while (!x.compare_exchange_weak(val, val + 1)) { } } * Örnek 6.2.1, #include #include template void atomic_inc(std::atomic&x) { T val{ x }; // "val" in değeri "50" olsun. // <---- // Bu araya başka bir "thread" in girdiği varsayılırsa; // "expired" isimli parametrenin değeri "20" olduğundan, // ".compare_exchange_weak()" fonksiyonu "false" döndürecek. // Döngüye girecek. "val" değişkeninin değeri artık "20" olacak. // Döngünün bir sonraki turunda eğer araya başka "thread" girmezse, // "* Örnek 6.2.0" numaralı örnekteki senaryo gerçekleşecek. while (!x.compare_exchange_weak(val, val + 1)) { } } * Örnek 6.3, "atomic multiply" işlemi: #include #include template T fetch_multiply(std::atomic&a, T factor) { T old_value = a.load(); while (!a.compare_exchange_strong(old_value, old_value * factor)) ; return old_value; } int main() { using namespace std; atomic ax(6); cout << "ax = " << ax << '\n'; auto pval = fetch_multiply(ax, 9); cout << "ax = " << ax << '\n'; cout << "pval = " << pval << '\n'; } * Örnek 7.0, "ABA Problem" : The scenario consists of you sitting in a car and waiting that the traffic light becomes green. Green stands in our case for B, and red for A. What’s happening? -> You look at the traffic light and it is red (A). -> Because you are bored, you begin to check the news on your smartphone and forget the time. -> You look once more at the traffic light. Damn, is still red (A). Of course, the traffic light became green (B) between your two checks. Therefore, what seems to be one red phase was two. What does this mean for threads (processes)? Now once more formal. -> Thread 1 reads a variable var with value A. -> Thread 1 is preempted, and thread 2 runs. -> Thread 2 changes the variable var from A to B to A. -> Thread 1 starts to execute and checks the value of variable var; because the value of variable var is the same, thread 1 continues with its work, * Örnek 7.1, #include #include #include #include #include #include std::atomic is_ready{ false }; //CTAD std::atomic is_done{ false }; std::atomic go_count{ 0 }; void lottery(std::string name) { ++go_count; while (!is_ready) ; for (volatile int i = 0; i < 20000; ++i) { } bool expected{ false }; if (is_done.compare_exchange_strong(expected, true)) { std::cout << "kazanan: " << name << '\n'; } } int main() { using namespace std; const char* pnames[] = { "necati", "akif", "ahmet", "harun", "mehmet", "dogukan" }; mt19937 eng{ random_device{}() }; std::shuffle(begin(pnames), end(pnames), eng); vector tvec; for (auto p : pnames) { tvec.emplace_back(lottery, p); } while (go_count != std::size(pnames)) ; is_ready = true; for (auto& t : tvec) { t.join(); } } * Örnek 8, Diğer yandan "std::atomic" sınıfının "primitive" tür açılımları için tür eş isimleri de dile eklenmiştir. #include #include #include int main() { using namespace std; constexpr auto b1 = same_as, atomic_int>; constexpr auto b2 = same_as, atomic_char>; constexpr auto b3 = same_as, atomic_long>; constexpr auto result = b1 && b2 && b3; //true } * Örnek 9, C diline eklenen "atomic" ile uyum sağlanması için bazı global fonksiyonlar da mevcuttur. #include #include int main() { using namespace std; atomic x{}; x.store(98); cout << x.load() << '\n'; // 98 atomic_store(&x, 30); cout << atomic_load(&x) << '\n'; // 30 // } * Örnek 10, "volatile" is nothing to do with "atomic". Yani bir değişkeni "volatile" olarak nitelememiz, o işlemin "interleave" OLUŞTURACAĞINI GARANTİ ETMEZ. // volatile is not atomic #include #include #include // volatile int x = 0; // -----> (1) // std::atomic x = 0; // -----> (2) int main() { using namespace std; const auto fn_inc = [] { for (int i = 0; i < 100'000; ++i) ++x; }; const auto fn_dec = [] { for (int i = 0; i < 100'000; ++i) --x; }; { jthread t1{ fn_inc }; jthread t2{ fn_dec }; } // -----> (1) : x = -20619 // -----> (2) : x = 0 cout << "x = " << x << '\n'; } * Örnek 11.0, Pekiyi yukarıdaki örnekler bahsi geçen "Memory Order" konusu nedir? "Memory Order" C++20'ye kadar bir "unscoped enum" yapıdır. C++20 ile birlikte "scoped enum" hale getirildi ve bu türden yeni "inline" değişkenler tanımlandı. (For more info, see https://en.cppreference.com/w/cpp/atomic/memory_order) Aşağıdaki örnekte "assert" "fail" olmayacaktır. #include #include #include #include std::atomic_bool x_flag, y_flag; std::atomic ival; void set_x() { x_flag.store(true); } void set_y() { y_flag.store(true); } void read_x_then_y() { while (!x_flag.load()) ; if (y_flag.load()) ++ival; } void read_y_then_x() { while (!y_flag.load()) ; if (x_flag.load()) ++ival; } void func() { x_flag = false; y_flag = false; ival = 0; { std::jthread t1{ set_x }; std::jthread t2{ set_y }; std::jthread t3{ read_x_then_y}; std::jthread t4{ read_y_then_x}; } assert(ival != 0); } int main() { for (int i = 0; i < 10'000'000; ++i) { func(); } } * Örnek 11.1, #include #include #include #include std::atomic ptr; int data; void producer() { std::string* p = new std::string("Hello"); data = 42; ptr.store(p, std::memory_order_release); } void consumer() { std::string* p2; while (!(p2 = ptr.load(std::memory_order_acquire))) ; assert(*p2 == "Hello"); // never fires assert(data == 42); // never fires } int main() { std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); } * Örnek 11.2, #include #include #include #include std::atomic_bool flag1 = false; std::atomic_bool flag2 = false; std::string name{}; void foo() { name = "necati"; // Derleyici optimizasyon yaparken, bu çağrının üstünde kalan kodları, bu çağrının altına kaydıramaz. flag1.store(true, std::memory_order_release); } void bar() { // Derleyici optimizasyon yaparken, bu çağrının altında kalan kodları, bu çağrının üzerine kaydıramaz. while (!flag1.load(std::memory_order_acquire)) ; name += " ergin"; flag2.store(true, std::memory_order_release); } void baz() { // Derleyici optimizasyon yaparken, bu çağrının altında kalan kodları, bu çağrının üzerine kaydıramaz. while (!flag2.load(std::memory_order_acquire)) ; name += " can"; } void test() { flag1 = false; flag2 = false; std::thread t1{ foo }; std::thread t2{ bar }; std::thread t3{ baz }; t1.join(); t2.join(); t3.join(); assert(name == "necati ergin can"); } int main() { for (int i = 0; i < 1000; ++i) { test(); } } * Örnek 11.3, #include #include #include #include using namespace std; std::atomic x, y, z; void func_1() { x.store(true, std::memory_order_relaxed); } void func_2() { y.store(true, std::memory_order_relaxed); } void func_3() { while (!y.load(std::memory_order_relaxed)) ; if (x.load(std::memory_order_relaxed)) { z = true; } assert(z); // It fails. } int main() { for (int i = 0; i < 100000; ++i) { x = false; y = false; z = false; std::jthread thread_1(func_1); std::jthread thread_2(func_2); std::jthread thread_3(func_3); } } * Örnek 11.4, #include #include #include std::atomic g_count = 0; void foo() { for (int i = 0; i < 10'000; ++i) g_count.fetch_add(1, std::memory_order_relaxed); } void bar() { for (int i = 0; i < 10'000; ++i) g_count.fetch_add(1, std::memory_order_relaxed); } void baz() { for (int i = 0; i < 10'000; ++i) g_count.fetch_add(1, std::memory_order_relaxed); } int main() { { std::jthread t1{ foo }; std::jthread t2{ bar }; std::jthread t3{ baz }; } std::cout << "g_count: " << g_count << '\n'; // g_count: 30000 } * Örnek 11.5, #include #include #include #include #include #include class SpinLockMutex { public: SpinLockMutex() { m_f.clear(); } void lock() { while (m_f.test_and_set(std::memory_order::acquire)) ; //null statement } void unlock() { m_f.clear(std::memory_order::release); } private: std::atomic_flag m_f; }; SpinLockMutex mtx; unsigned long long counter{}; void func() { for (int i{ 0 }; i < 100'000; ++i) { mtx.lock(); ++counter; mtx.unlock(); } } int main() { std::vector tvec; for (int i = 0; i < 10; ++i) { tvec.emplace_back(func); } for (auto &th : tvec) { th.join(); } std::cout << "counter = " << counter << "\n"; } * Örnek 11.6, #include #include #include #include std::atomic data{45}; void do_work(int id) { std::osyncstream{ std::cout } << "thread id : " << id << " " << data.fetch_add(1, std::memory_order_relaxed) << '\n'; } int main() { std::jthread jt1{ do_work, 0 }; std::jthread jt2{ do_work, 1 }; std::jthread jt3{ do_work, 2 }; std::jthread jt4{ do_work, 3 }; std::jthread jt5{ do_work, 4 }; std::jthread jt6{ do_work, 5 }; std::jthread jt7{ do_work, 6 }; std::jthread jt8{ do_work, 7 }; } >> "semaphore" nesnesi: "n" tane "thread" i bir koda ulaşmasını sağlamaktır. "std::recursive_mutex" yapısını andırmaktadır fakat "mutex" nesnelerinden farkı şudur; "mutex" nesnelerinde ".lock()" fonksiyonunu çağıran "thread" in ".unlock()" fonksiyonunu çağırması lazım gelir ki diğer "thread" lerin akışları devam edebilsin. Fakat "semaphore" nesnesinde durum farklıdır. Kritik Bölge'ye girmek isteyenler, nesnenin ".release()" fonksiyonuna çağrı yaparlar. İzin verilirse giriş yaparlar. Kritik Bölge'den çıkanlar da yine nesnenin ".acquire()" isimli fonksiyonunu çağırır ki sayaç bilgisi güncellensin. "semaphore" nesnesi, tipik olarak bir değişkeni ve bir kuyruk yapısını sarmalamaktadır. Sarmalanan bu tam sayı değişken, kaç tane "thread" in Kritik Bölgeye girebileceği ile ilgili. Eğer bu tam sayı değişkenin değeri "0" olursa, artık giriş kapatılacak ve girmeye çalışan "thread" ler bloke edilecektir. Ta ki giriş hakkı tekrardan doğana kadar. Başlık dosyasının ismi "semaphore" dir. Kullanılan sınıf ise "counting_semaphore" isimli bir sınıf şablonudur. C++20 ile dile eklenmiştir, bu sınıf. Yine sınıfın "try_" fonksiyonları "bool" değer döndürür, müsaitlik durumunu sorgular ve izin verilmediği taktirde blokeye yol açmaz ve içinde "_for" ve "_until" geçen fonksiyonlar sırasıyla "o kadar süre boyunca" ve "o vakte kadar" anlamları içerir. * Örnek 1, #include int main() { // "10" adet "thread" in girişine // izin verilir. std::counting_semaphore sem(10); // Hiç bir "thread" in girişine izin // verilmez. Herhangi bir "thread" in // "sem_zero" nun ".release()" fonksiyonunu // çağırması gerekiyor ki bekleyenlerden // bir tanesi girebilsin. std::counting_semaphore sem_zero(0); // En fazla bir adet "thread" in geçişine // izin verilir demektir. İşte böyle yapmak // yerine tür eş ismi eklenmiştir; -----> (1) std::counting_semaphore<1> sem_binary(0); // -----> (1) std::binary_semaphore binary_semaphore(1); } * Örnek 2, #include int main() { std::counting_semaphore smp(5); // En fazla "5" adede izin verildi. // "main-thread" tarafından yapılan bu çağrı sonucunda // sayaç değişkeni "4" oldu. Eğer sayaç değeri "0" olursa, // bu çağrı o "thread" i bloke edecektir. smp.acquire(); //... // "main-thread" tarafından yapılan bu çağrı sonucunda // sayaç değişkeni tekrar "5" oldu. smp.release(); // "main-thread" tarafından yapılan bu çağrı sonucunda // sayaç değişkeni tekrar "7" oldu. // smp.release(3); } * Örnek 3, #include #include #include #include #include #include using namespace std; counting_semaphore smp{ 3 }; void func() { using namespace literals; smp.acquire(); osyncstream{ cout } << "thread id : " << this_thread::get_id() << '\n'; this_thread::sleep_for(5000ms); smp.release(); } int main() { /* # OUTPUT # thread id : 140130699408960 thread id : 140130682623552 thread id : 140130691016256 :> thread id : 140130665838144 thread id : 140130649052736 thread id : 140130371212864 :> thread id : 140130404783680 thread id : 140130329249344 thread id : 140130387998272 :> thread id : 140130640660032 thread id : 140130346034752 thread id : 140130337642048 :> thread id : 140130396390976 thread id : 140130657445440 thread id : 140130379605568 :> thread id : 140130354427456 thread id : 140130304071232 thread id : 140130362820160 :> thread id : 140130674230848 thread id : 140130320856640 thread id : 140130421569088 :> thread id : 140130632267328 thread id : 140130413176384 thread id : 140130312463936 :> */ vector tvec; for (int i = 0; i < 24; ++i) { tvec.emplace_back(func); } } * Örnek 4, #include #include #include #include #include #include using namespace std; using namespace literals; counting_semaphore smp{ 3 }; void foo() { smp.acquire(); osyncstream{ cout } << "thread id in foo: " << this_thread::get_id() << '\n'; this_thread::sleep_for(3000ms); smp.release(); } void bar() { smp.acquire(); osyncstream{ cout } << "thread id in bar : " << this_thread::get_id() << '\n'; this_thread::sleep_for(3000ms); smp.release(); } int main() { /* # OUTPUT # thread id in bar : 140670448510528 thread id in foo: 140670440117824 thread id in bar : 140670431725120 :> thread id in foo: 140670423332416 thread id in foo: 140670287201856 thread id in foo: 140670389761600 :> thread id in bar : 140670398154304 thread id in foo: 140670178162240 thread id in bar : 140670262023744 :> thread id in foo: 140670127806016 thread id in foo: 140670270416448 thread id in bar : 140670152984128 :> thread id in bar : 140670278809152 thread id in bar : 140670043944512 thread id in bar : 140670414939712 :> thread id in bar : 140670136198720 thread id in foo: 140670406547008 thread id in bar : 140670312379968 :> thread id in foo: 140670144591424 thread id in foo: 140670303987264 thread id in foo: 140670035551808 :> thread id in bar : 140670295594560 thread id in bar : 140670169769536 thread id in foo: 140670161376832 */ vector tvec; for (int i = 0; i < 24; ++i) { if (i % 2) tvec.emplace_back(foo); else tvec.emplace_back(bar); } } * Örnek 5, #include #include #include #include std::string name{}; std::binary_semaphore smp(0); void prepare() { name = "tamer dundar"; std::cout << "data is ready now\n"; smp.release(); } void use() { std::cout << "use is waiting the data\n"; smp.acquire(); std::cout << "name is " << name << '\n'; } int main() { // use is waiting the data // data is ready now // name is tamer dundar std::jthread t1(prepare); std::jthread t2(use); } * Örnek 6, // adapted from Reiner Grimm (modified) #include #include #include std::binary_semaphore sm_ping{0}; std::binary_semaphore sm_pong{0}; std::atomic counter{ 0 }; constexpr int n = 100; void ping() { while (counter <= n) { sm_ping.acquire(); ++counter; std::cout << "ping\n"; sm_pong.release(); } } void pong() { while (counter < n) { sm_pong.acquire(); ++counter; std::cout<< "pong\n"; sm_ping.release(); } } int main() { /* # OUTPUT # ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping pong ping */ sm_ping.release(); std::jthread t1(ping); std::jthread t2(pong); } Tabii diğer yandan şöyle de bir gelişme yaşanmıştır; Modern C++ ile birlikte "static" ömürlü yerel değişkenlerin "initialization" süreci "thread-safe" hale gelmiştir. Dolayısıyla böylesi nesneler için herhangi bir senkronizasyon mekanizması kullanmaya lüzum yoktur. * Örnek 1, #include #include #include class Myclass { public: Myclass() { std::cout << "Ctor. @" << this << '\n'; } ~Myclass() { std::cout << "Dtor. @" << this << '\n'; } }; void foo() { static Myclass m; } int main() { // Ctor. @0x561ef08b6199 // Dtor. @0x561ef08b6199 std::vector jvec; for (int i = 0; i < 100; ++i) jvec.emplace_back(foo); } * Örnek 2, İşte bu garantiye güvenerek bir fonksiyonun sadece bir kez çağrılmasını sağlatabiliriz. #include #include #include void foo() { std::cout << "Once!\n"; } void foo_call() { static auto f = [](){ foo(); return 0; }(); } int main() { std::vector tvec; for (int i{}; i < 100; ++i) { tvec.emplace_back(foo_call); // Once! } } * Örnek 3, Fakat bu yönteme alternatif olarak "std::once_flag" sınıfını ve "std::call_once" fonksiyonunu kullanabiliriz.Bunlar sırasıyla bir sınıf ve fonksiyon şablonudur ve birlikte kullanıldığında bir fonksiyonun sadece bir defa çağrılmasını sağlarlar. Bu iki kavram da "std::mutex" başlık dosyasındadır. #include #include #include #include #include std::unique_ptr uptr; std::once_flag init_flag; void f_init() { uptr = std::make_unique(656); } const int& get_value() { std::call_once(init_flag, f_init); // Birinci parametresi "once_flag" türünden nesnemiz, // ikinci parametresi çağrılacak fonksiyon, // diğer parametreler ise iş bu fonksiyonun alacağı argümanlar. return *uptr; } void do_work() { const int& v = get_value(); assert(v == 656); // Holds } int main() { std::vector tvec; tvec.reserve(16); for (int i{}; i < 16; ++i) { tvec.emplace_back(do_work); } for (auto& th : tvec) { th.join(); } } * Örnek 4.0, Aşağıdaki örnekte ya bir kez "foo" ya da bir kez "bar" çağrılacak. Çünkü ortak "std::once_flag" türünden nesne kullandığımız için. #include #include #include std::once_flag once_flag; using namespace std::literals; void foo() { std::this_thread::sleep_for(100ms); std::call_once(once_flag, []() { std::cout << "registered in foo\n"}); } void bar() { std::this_thread::sleep_for(100ms); std::call_once(once_flag, []() { std::cout << "register in bar\n"; }); } int main() { std::thread ta[10]; for (int i = 0; i < 10; ++i) { ta[i] = i % 2 ? std::thread{ foo } : std::thread{ bar }; } for (auto& t : ta) t.join(); } * Örnek 4.1, Bir diğer "Singleton" implementasonu. #include #include #include #include #include class Singleton { public: Singleton(const Singleton&) = delete; Singleton& operator=(const Singleton&) = delete; // Non moveable also. static Singleton* get_instance() { call_once(m_init_flag, Singleton::init); return m_instance; } static void init() { m_instance = new Singleton(); } private: static std::once_flag m_init_flag; static Singleton* m_instance; Singleton() = default; }; Singleton* Singleton::m_instance{}; std::once_flag Singleton::m_init_flag; void func() { std::osyncstream{ std::cout } << Singleton::get_instance() << '\n'; } int main() { std::vector tvec; for (int i = 0; i < 100; ++i) { tvec.emplace_back(func); } for (auto& th : tvec) th.join(); } Öte yandan "std::algorithm" başlık dosyasındaki bazı fonksiyonlara eklenen "overload" versiyonlar da mevcuttur. Böylelikle artık, -> Sıralı biçimde: "std::execution::sequenced_policy" sınıfı kullanılır. C++17 ile dile eklenmiştir. İlave bir "thread" kullanmadan, sıralı biçimde çalıştırır. Kaldı ki halihazırdaki diğer "overload" versiyonlar da sıralı biçimde çalıştırmaktadırlar ancak gönderilen hata nesnelerini yakalayabiliyoruz. Fakat "std::execution::sequenced_policy" sınıfı kullanılarak çalıştırılanlar hata nesnesi gönderildiğinde doğrudan "std::terminate" fonksiyonu çağrılmaktadır. İki versiyon arasındaki en büyük farklılık budur. -> Vektörize edilmiş biçimde: "std::execution::unsequenced_policy" sınıfı kullanılır. C++20 ile dile eklenmiştir. Ayrı "thread" kullanmadan, sadece vektörize ederek kullan anlamındadır. Çoğu derleyici nezdinde bu "std::execution::unsequenced_policy" sınıfı "std::execution::sequenced_policy" olarak işlenir. -> Paralel biçimde: "std::execution::parallel_policy" sınıfı kullanılır. C++17 ile dile eklenmiştir. İş yükünü "thread" lere dağıtarak halleder. Paylaşılan nesnelerin senkronize edilmesinden BİZ SORUMLUYUZ. Yine bu sınıf kullanıldığında arka plandaki "thread" lerin öğeleri sırayla ele almaları da kesin değildir. -> Hem Paralel hem Vektörize edilmiş biçimde: "std::execution::parallel_unsequenced_policy" sınıfı kullanılır ve C++17 dile eklenmiştir. Hem vektörize et hem de ayrı "thread" ler kullan anlamındadır. Paylaşılan nesnelerin senkronize edilmesinden BİZ SORUMLUYUZ. Çoğu derleyici nezdinde bu "std::execution::parallel_unsequenced_policy" sınıfı "std::execution::parallel_policy" olarak işlenir. çalıştırabiliriz. Artık yapılacak işi birden fazla "thread" e yükleme vb. kısımları ilgili fonksiyonun kendisi halledecektir. Başlık dosyası "execution" ismindedir. İşte ilgili fonksiyonlara yukarıdaki sınıf türünden nesneler geçerek, o fonksiyonun çalışma biçimini belirleyebiliriz. Yine bu kütüphane bu sınıflar türünden "tag" nesneleri de sunmaktadır. Böylelikle sınıf türünden nesne oluşturmaktansa, kütüphanedeki bu "tag" nesnelerini de kullanabiliriz. Ayrıca bu sınıfların içi boştur, sadece tür belirtmek için kullanılırlar.Diğer yandan yukarıdaki belirtilen "execution_policy" ler birer RİCADIR, EMİR DEĞİLDİR. Tabii şunu da hatırlatmakta fayda vardır; "std::algorithm" başlık dosyasındaki bazı fonksiyonlar için "overload" versiyonu oluşturulamamış, yeni fonksiyonlar eklenmiştir. * Örnek 1.0, Aşağıdaki örnekte bir iş küçük iş parçacıklarına bölünerek bir paralelleştirilme gerçekleştirilmiştir. #include #include #include #include #include #include #include using uint64 = unsigned long long; uint64 accummulate_s(const uint64* first, const uint64* last) { return std::accumulate(first, last, 0ull); } uint64 acc_parrallel(const std::vector& vec) { const auto data = vec.data(); const auto size = vec.size(); // "data" : Dizinin başlangıç noktası. // "data + size / 4" : Dizinin ilk dörtte birlik kısmının bittiği yer. auto ft1 = std::async(std::launch::async, accummulate_s, data, data + size / 4); // "data + size / 4" : Dizinin ilk dörtte birlik kısmının bittiği yer. // "data + 2 * (size / 4)" : Dizinin ikinci dörtte birlik kısmının bittiği yer. auto ft2 = std::async(std::launch::async, accummulate_s, data + size / 4, data + 2 * (size / 4)); // "data + 2 * (size / 4)" : Dizinin ikinci dörtte birlik kısmının bittiği yer. // "data + 3 * (size / 4)" : Dizinin üçüncü dörtte birlik kısmının bittiği yer. auto ft3 = std::async(std::launch::async, accummulate_s, data + 2 * (size / 4), data + 3 * (size / 4)); // "data + 3 * (size / 4)" : Dizinin üçüncü dörtte birlik kısmının bittiği yer. // "data + size" : Dizinin bittiği yer. auto ft4 = std::async(std::launch::async, accummulate_s, data + 3 * (size / 4), data + size); // Dizinin her dörtte birlik kısmının toplanması: return ft1.get() + ft2.get() + ft3.get() + ft4.get(); } int main() { using namespace std; using namespace chrono; vector uvec(50'000'000u); mt19937 eng; uniform_int_distribution dist{ 0ull, 100ull }; generate(uvec.begin(), uvec.end(), [&] {return dist(eng); }); { auto tp_start = steady_clock::now(); auto sum = accumulate(uvec.begin(), uvec.end(), 0ull); auto tp_end = steady_clock::now(); cout << duration(tp_end - tp_start).count() << '\n'; // 0.714942 seconds cout << "sum = " << sum << '\n'; // sum = 2499803776 } std::cout << '\n'; { auto tp_start = steady_clock::now(); auto sum = acc_parrallel(uvec); auto tp_end = steady_clock::now(); cout << duration(tp_end - tp_start).count() << '\n'; // 0.173961 cout << "sum = " << sum << '\n'; // sum = 2499803776 } } * Örnek 1.1, Aşağıda ise "std::packaged_task" lar kullanılmıştır. #include #include #include #include #include #include #include #include using uint64 = unsigned long long; uint64 accummulate_s(const uint64* first, const uint64* last) { return std::accumulate(first, last, 0ull); } uint64 acc_parrallel(const std::vector& vec) { using task_t = uint64(const uint64*, const uint64*); const auto data = vec.data(); const auto size = vec.size(); std::packaged_task task1(accummulate_s); std::packaged_task task2(accummulate_s); std::packaged_task task3(accummulate_s); std::packaged_task task4(accummulate_s); auto ft1 = task1.get_future(); auto ft2 = task2.get_future(); auto ft3 = task3.get_future(); auto ft4 = task4.get_future(); std::jthread t1(std::move(task1), data, data + size / 4); std::jthread t2(std::move(task2), data + size / 4, data + 2 * (size / 4)); std::jthread t3(std::move(task3), data + 2 * (size / 4), data + 3 * (size / 4)); std::jthread t4(std::move(task4), data + 3 * (size / 4), data + size); return ft1.get() + ft2.get() + ft3.get() + ft4.get(); } int main() { using namespace std; using namespace chrono; vector uvec(25'000'000u); mt19937 eng; uniform_int_distribution dist{ 0ull, 100ull }; generate(uvec.begin(), uvec.end(), [&] {return dist(eng); }); { auto tp_start = steady_clock::now(); auto sum = acc_parrallel(uvec); auto tp_end = steady_clock::now(); cout << duration(tp_end - tp_start).count() << '\n'; // 0.0786106 cout << "sum = " << sum << '\n'; // sum = 1249894138 } std::cout << '\n'; { auto tp_start = steady_clock::now(); auto sum = accumulate(uvec.begin(), uvec.end(), 0ull); auto tp_end = steady_clock::now(); cout << duration(tp_end - tp_start).count() << '\n'; // 0.161134 cout << "sum = " << sum << '\n'; // sum = 1249894138 } } * Örnek 2.0, Bu yeni "overload" versiyonlar ile eski "overload" versiyonların arasındaki fark; #include #include #include #include #include #include void my_terminate() { std::cout << "my_terminate\n"; abort(); } int main() { // OUTPUT => exception caught: 8 is not allowed std::set_terminate(my_terminate); std::vector ivec{ 5, 7, 9, 2, 4, 6, 8, 10, 3, 1}; try { for_each(ivec.begin(), ivec.end(), [](int x) { if (x == 8) throw std::runtime_error{ "8 is not allowed" }; }); } catch (const std::exception& ex) { std::cout << "exception caught: " << ex.what() << '\n'; } } * Örnek 2.1, #include #include #include #include #include #include void my_terminate() { std::cout << "my_terminate\n"; abort(); } int main() { // OUTPUT => my_terminate std::set_terminate(my_terminate); std::vector ivec{ 5, 7, 9, 2, 4, 6, 8, 10, 3, 1}; try { for_each(std::execution::seq, ivec.begin(), ivec.end(), [](int x) { if (x == 8) throw std::runtime_error{ "8 is not allowed" }; }); } catch (const std::exception& ex) { std::cout << "exception caught: " << ex.what() << '\n'; } } * Örnek 3.0, "std::execution::parallel_policy" kullanırken ortak nesnenin senkronizasyonundan bizler sorumluyuz. #include #include #include #include #include #include namespace ex = std::execution; int main() { using namespace std; vector ivec(100'000); int cnt{}; for_each(ex::par, ivec.begin(), ivec.end(), [&cnt](int& x) { x = ++cnt; }); cout << "cnt = " << cnt; // 25545 } * Örnek 3.1, "std::atomic" sınıfını kullanarak da senkronizasyon sağlayabiliriz. #include #include #include #include #include #include #include namespace ex = std::execution; int main() { using namespace std; vector ivec(100'000'000); std::atomic cnt{}; for_each(ex::par, ivec.begin(), ivec.end(), [&cnt](int& x) { x = ++cnt; }); cout << "cnt = " << cnt; // 100000 } * Örnek 4, "execution_policy" kullanımları arasındaki süre bazlı fark; #include #include #include #include #include #include using dsec = std::chrono::duration; constexpr std::size_t n{ 20'000'000 }; int main() { std::vector vec(n); std::mt19937 eng{ std::random_device{}() }; std::uniform_real_distribution dist{ -20., 20.}; std::generate(vec.begin(), vec.end(), [&] {return dist(eng); }); auto tp_start = std::chrono::steady_clock::now(); //sort(std::execution::seq, vec.begin(), vec.end()); // 13.9295 saniye //sort(std::execution::unseq, vec.begin(), vec.end()); // 9.78678 saniye //sort(std::execution::par, vec.begin(), vec.end()); // 9.83721 saniye //sort(std::execution::par_unseq, vec.begin(), vec.end()); // 9.92502 saniye auto tp_end = std::chrono::steady_clock::now(); std::cout << dsec{ tp_end - tp_start }.count() << " saniye\n"; } * Örnek 5.0, "std::accumulate" fonksiyonunun "execution_policy" argümanına sahip versiyonu bir "overload" değil, "std::reduce" isimli fonksiyondur. #include #include #include int main() { using namespace std; vector ivec{ 1, 3, 5, 7, 9 }; auto x = reduce(ivec.begin(), ivec.end(), 0); auto y = reduce(ivec.begin(), ivec.end()); auto z = reduce(ivec.begin(), ivec.end(), int{}); cout << "x = " << x << '\n'; // x = 25 cout << "y = " << y << '\n'; // y = 25 cout << "z = " << z << '\n'; // z = 25 } * Örnek 5.1.0, #include #include #include #include #include #include using namespace std; using namespace chrono; namespace ex = std::execution; int main() { vector ivec(100'000'000); mt19937 eng; uniform_int_distribution dist{ 0u, 100u }; generate(ex::par, ivec.begin(), ivec.end(), [&]{ return dist(eng); }); auto tp_start = steady_clock::now(); auto result = reduce( ex::par, ivec.begin(), ivec.end(), 0u, [](auto a, auto b){ return a*a + b*b; } ); auto tp_end = steady_clock::now(); // 1153778770 in 767ms std::cout << result << " in " << duration_cast(tp_end - tp_start).count() << "ms\n"; } * Örnek 5.1.1, #include #include #include #include #include #include using namespace std; using namespace chrono; namespace ex = std::execution; int main() { vector ivec(100'000'000); mt19937 eng; uniform_int_distribution dist{ 0u, 100u }; generate(ex::par, ivec.begin(), ivec.end(), [&]{ return dist(eng); }); auto tp_start = steady_clock::now(); auto result = reduce( ex::seq, ivec.begin(), ivec.end(), 0u, [](auto a, auto b){ return a*a + b*b; } ); auto tp_end = steady_clock::now(); // 1153778770 in 730ms std::cout << result << " in " << duration_cast(tp_end - tp_start).count() << "ms\n"; } * Örnek 6.0.0, "std::inner_product" fonksiyonunun "execution_policy" argümanına sahip versiyonu bir "overload" değil, "std::transform_reduce" isimli fonksiyondur. #include #include #include #include int main() { std::vector x{ 2, 3, 1, 5, 6 }; std::vector y{ 1, 2, 4, 3, 5 }; // 2 6 4 15 30 //auto result = std::inner_product(x.begin(), x.end(), y.begin(), 0); auto result = std::inner_product(x.begin(), x.end(), y.begin(), 0, std::plus<>{}, std::multiplies<>{}); std::cout << "Inner product of x and y: " << result << '\n'; // Inner product of x and y: 57 } * Örnek 6.0.1, #include #include #include #include #include int main() { std::vector x{ 2, 3, 1, 5, 6 }; std::vector y{ 1, 2, 4, 3, 5 }; // 2 6 4 15 30 auto result = std::transform_reduce(std::execution::par, x.begin(), x.end(), y.begin(), 0); std::cout << "Inner product of x and y: " << result << '\n'; // Inner product of x and y: 57 } * Örnek 6.1.0, #include #include #include #include int main() { using namespace std; std::vector v1{ "ali", "can", "ece", "ata", "gul", "tan", "eda", "naz" }; std::vector v2{ "nur", "tan", "ece", "ece", "gul", "tan", "naz", "eda" }; // 1 2 3 int result = std::inner_product(v1.begin(), v1.end(), v2.begin(), 0, plus{}, equal_to{}); cout << "result = " << result << '\n'; // result = 3 } * Örnek 6.1.1, #include #include #include #include int main() { using namespace std; vector target { 0.12, 0.17, 0.25, 0.39, 0.43, 0.70 }; vector source { 0.08, 0.11, 0.23, 0.36, 0.42, 0.74 }; auto max_dev = transform_reduce(execution::par, target.begin(), target.end(), source.begin(), 0.0, [](auto x, auto y) { return max(x, y); }, [](auto trg, auto src) { return std::abs(src - trg); } ); cout << "Max devistion is: " << max_dev << '\n'; // Max devistion is: 0.06 } * Örnek 7, Sonuçtan da görüleceği üzere, paralel çalıştırıldığında öğelerin "thread" lere aktarılması sırayla değildir. Dolayısıyla sadece değerlerlerin işlem sırası farklıdır, değerlerin kendisi aynıdır. //from cppreference.com #include #include #include #include #include #include std::mutex mtx; int main() { using namespace std; constexpr size_t size = 1'000'000u; mt19937 eng; vector vec1(size); vector vec2(size); eng.seed(134); for_each( vec1.begin(), vec1.end(), [&eng](unsigned int& x) {x = eng(); } ); // Sequence; Birinci rastgele sayı, dizinin ilk öğesine atanacak. eng.seed(134); for_each( execution::par, vec2.begin(), vec2.end(), [&eng](unsigned int& x) { scoped_lock lock(mtx); x = eng(); } ); // Parallel; Birinci rastgele sayı, dizinin ilk öğesine atanmayabilir. cout << boolalpha << (vec1 == vec2); // false sort(vec1.begin(), vec1.end()); sort(vec2.begin(), vec2.end()); cout << boolalpha << (vec1 == vec2); // true } * Örnek 8.0, "std::partial_sum" fonksiyonunun "execution_policy" argümanına sahip versiyonu bir "overload" değil, "inclusive_scan" ve "exclusive_scan" isimli fonksiyonlardır. #include #include #include int main() { using namespace std; vector svec{ 1, 3, 5, 7, 9, 11, 13, 15 }; // 1 = 1 // 1 + 3 = 4 // 4 + 5 = 9 // 9 + 7 = 16 // 16 + 9 = 25 // 25 + 11 = 36 // 36 + 13 = 49 // 49 + 15 = 64 vector dvec(svec.size()); // Bu fonksiyon varsayılan olarak "std::plus" almaktadır. Fakat diğer "overload" // versiyonuna bir "callable" geçerek istediğimiz operasyonu yaptırtabiliriz. partial_sum(svec.begin(), svec.end(), dvec.begin()); for (auto i : dvec) cout << i << ' '; // 1 4 9 16 25 36 49 64 } * Örnek 8.1, #include #include #include #include int main() { using namespace std; vector svec{ 1, 3, 5, 7, 9, 11, 13, 15 }; vector dvec(svec.size()); inclusive_scan(std::execution::par, svec.begin(), svec.end(), dvec.begin()); for (auto i : dvec) cout << i << ' '; // 1 4 9 16 25 36 49 64 cout << '\n'; // Başlangıç değerini "exclusive_scan" için belirleyebiliyoruz. exclusive_scan(std::execution::par, svec.begin(), svec.end(), dvec.begin(), 10); for (auto i : dvec) cout << i << ' '; // 10 11 14 19 26 35 46 59 cout << '\n'; } Şimdi de "thread pooling" kavramına değinelim. Aslında "thread" oluşturmak çok maaliyetli bir iştir. Dolayısıyla her seferinde bir "thread" oluşturmaktansa, işin başında belli bir miktar oluştururuz. Yapılacak iş(ler) ile oluşturulan bu "thread" (ler) karşılıklı eşleşecekler. Dolayısıyla iş yükünün ağırlığı ile oluşturulacak "thread" adedinin doğrusal olması önemlidir. * Örnek 1, // ThreadPool.h #pragma once #include "CQueue.h" #include #include #include using Task = std::function; class ThreadPool { public: ThreadPool() { const auto thread_count = std::thread::hardware_concurrency(); for (std::size_t i{}; i < thread_count; ++i) { m_tvec.emplace_back( &ThreadPool::work_load, this ); } } ~ThreadPool() { for (auto& i : m_tvec) i.join(); } void push(Task f) { m_task_queue.push(f); } //... private: CQueue m_task_queue; std::vector m_tvec; void work_load() { for (;;) { Task task; m_task_queue.pop(task); task(); } } }; // CQueue.h #pragma once #include #include #include template class CQueue { public: //... void push(const T& tval) { std::scoped_lock sl{ mtx }; mq.push(tval); mcv.notify_one(); } void pop(T& tval) { std::unique_lock ul{ mtx }; // "std::unique_lock" is must here. mcv.wait( ul, [this]{ return !mq.empty(); } ); tval = mq.front(); mq.pop(); } //... private: std::mutex mtx; std::queue mq; std::condition_variable mcv; }; // main.cpp #include "CQueue.h" #include "ThreadPool.h" #include #include #include using namespace std::chrono; void task() { std::osyncstream{ std::cout } << "thread-" << std::this_thread::get_id() << " : started.\n"; std::this_thread::sleep_for(300ms); std::osyncstream{ std::cout } << "thread-" << std::this_thread::get_id() << " : stopped.\n"; } int main() { /* # OUTPUT # Threads in the pool: 8 thread-140255750960704 : started. thread-140255708997184 : started. thread-140255717389888 : started. thread-140255725782592 : started. thread-140255767746112 : started. thread-140255759353408 : started. thread-140255742568000 : started. thread-140255734175296 : started. thread-140255750960704 : stopped. thread-140255708997184 : stopped. thread-140255725782592 : stopped. thread-140255717389888 : stopped. thread-140255767746112 : stopped. thread-140255742568000 : stopped. thread-140255759353408 : stopped. thread-140255734175296 : stopped. Done! */ // Max. # of thread supported by the implementation: const auto thread_count = std::thread::hardware_concurrency(); std::cout << "Threads in the pool: " << thread_count << '\n'; ThreadPool the_pool; // Bu aşamada "the_pool" içerisindeki vektör dizisinin her bir // elemanı ayrı bir "thread" ve bu "thread" ler de sınıfın // "work_load" fonksiyonlarını çalıştırmaya başlıyorlar. for (std::size_t i{}; i < 1 * thread_count; ++i) { // Her bir "thread" için "1" adet iş yükü tayin // edilmiştir. the_pool.push(task); } std::this_thread::sleep_for(5s); std::cout << "Done!\n"; } Son olarak "std::syncstream" sınıfı gelmeden evvel "std::cout" nesnesinin senkronize edilmesini inceleyelim: * Örnek 1, #include #include #include #include #include struct pcout : std::stringstream { public: ~pcout() { std::lock_guard locker{ cmutex }; std::cout << rdbuf(); std::cout.flush(); } static inline std::mutex cmutex; }; void my_print(int x) { pcout{} << "print islevi x = " << x << "\n"; } int main() { std::vector tvec(20); for (int i = 0; i < 20; ++i) tvec[i] = std::jthread{ my_print, i }; } > Hatırlatıcı Notlar: >> Concurrency in Action kitabı tavsiye edilir. >> "STL" kütüphanesindeki fonksiyonlar, farklı indislerden yazma işlemi için senkronizasyon garantisi verirken aksi halde garanti vermemektedir. >> Unutmamalıyız ki bu "std::mutex" sınıfı ne taşınabilir ne de kopyalanabilir bir sınıftır. >> "std::lock_guard" yerine "std::scoped_lock" kullanmalıyız, C++17 itibariyle. >> "Sequenced Before" : Tek bir "thread" ile ilgilidir. C dilindeki "sequence point" kavramının daha gelişmiş halidir. >>> "sequence point": Kaynak kodda öyle bir noktaki o noktadan önce yazılmış kodların yan etkileri, iş bu noktadan sonra artık gözlemlenebilir olmak zorundadır. >> "Happens Before" : Tek bir "thread" ile de ilgili olabilir, birden fazla "thread" ile de. "Sequenced Before" kavramının daha gelişmiş halidir. Burada işlemlerin önceliği değil, işlemlerin sonucunun gözlemlenebilirliği kastetilir. Örneğin, iki ayrı "thread" de koşan "t_opt1" ve "t_opt2" isimli işlemlerimiz olsun ve bu ikisi arasında da "Happens Before" ilişkisi olsun. Dolayısıyla "t_opt1" işlemi "t_opt2" işleminden önce yapılır manasına değil, "t_opt1" işleminin oluşturduğu sonuç "t_opt2" tarafından GÖZLEMLENEBİLİR manasına GELMEKTEDİR. Çünkü zamansal olarak "t_opt1" işlemi önce gerçekleşmiş olabilir fakat sonuç henüz belleğe aktarılmadığı için "t_opt2" işlemi tarafından görülmeyebilir. Yani yapılan değişiklik henüz içsel durumdadır. İşte "std::mutex", "std::condition_variable" ve/veya "atomic" nitelikteki nesneler ile "Happens Before" ilişkisi kurabiliriz. >> "Sequential Consistency" : "thread" lerin kendi içerisindeki "atomic" işlemlerin sırasının korunduğunun garantisidir. Örneğin, iki adet "thread" imiz olsun. Bu iki "thread" ise kendi içinde önce ".store()" daha sonra ".load" işlemi yapıyor olsun. Kendi içlerindeki bu ".store()" ve ".load()" işlemleri arasında "Sequential Consistency" varsa, program genelinde önce "store", sonra "load" işlemi yapılır. Ama bu işlemler ard arda yapılacağı garanti DEĞİLDİR. Uzaktan bakıldığında "store" işlemi önce, sonra "load" işlemi.