proxygen
PThread.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #if !FOLLY_HAVE_PTHREAD && _WIN32
20 #include <boost/thread/tss.hpp> // @manual
21 
22 #include <errno.h>
23 
24 #include <chrono>
25 #include <condition_variable>
26 #include <exception>
27 #include <limits>
28 #include <mutex>
29 #include <shared_mutex>
30 #include <thread>
31 
32 #include <folly/lang/Assume.h>
34 
35 namespace folly {
36 namespace portability {
37 namespace pthread {
38 
39 int pthread_attr_init(pthread_attr_t* attr) {
40  if (attr == nullptr) {
41  errno = EINVAL;
42  return -1;
43  }
44  attr->stackSize = 0;
45  attr->detached = false;
46  return 0;
47 }
48 
49 int pthread_attr_setdetachstate(pthread_attr_t* attr, int state) {
50  if (attr == nullptr) {
51  errno = EINVAL;
52  return -1;
53  }
54  attr->detached = state == PTHREAD_CREATE_DETACHED ? true : false;
55  return 0;
56 }
57 
58 int pthread_attr_setstacksize(pthread_attr_t* attr, size_t kb) {
59  if (attr == nullptr) {
60  errno = EINVAL;
61  return -1;
62  }
63  attr->stackSize = kb;
64  return 0;
65 }
66 
67 namespace pthread_detail {
68 pthread_t::~pthread_t() noexcept {
69  if (handle != INVALID_HANDLE_VALUE && !detached) {
70  CloseHandle(handle);
71  }
72 }
73 } // namespace pthread_detail
74 
75 int pthread_equal(pthread_t threadA, pthread_t threadB) {
76  if (threadA == threadB) {
77  return 1;
78  }
79 
80  // Note that, in the presence of detached threads, it is in theory possible
81  // for two different pthread_t handles to be compared as the same due to
82  // Windows HANDLE and Thread ID re-use. If you're doing anything useful with
83  // a detached thread, you're probably doing it wrong, but I felt like leaving
84  // this note here anyways.
85  if (threadA->handle == threadB->handle &&
86  threadA->threadID == threadB->threadID) {
87  return 1;
88  }
89  return 0;
90 }
91 
92 namespace {
93 thread_local pthread_t current_thread_self;
94 struct pthread_startup_info {
95  pthread_t thread;
96  void* (*startupFunction)(void*);
97  void* startupArgument;
98 };
99 
100 DWORD internal_pthread_thread_start(void* arg) {
101  // We are now in the new thread.
102  auto startupInfo = reinterpret_cast<pthread_startup_info*>(arg);
103  current_thread_self = startupInfo->thread;
104  auto ret = startupInfo->startupFunction(startupInfo->startupArgument);
105  if /* constexpr */ (sizeof(void*) != sizeof(DWORD)) {
106  auto tmp = reinterpret_cast<uintptr_t>(ret);
107  if (tmp > std::numeric_limits<DWORD>::max()) {
108  throw std::out_of_range(
109  "Exit code of the pthread is outside the range representable on Windows");
110  }
111  }
112 
113  delete startupInfo;
114  return static_cast<DWORD>(reinterpret_cast<uintptr_t>(ret));
115 }
116 } // namespace
117 
118 int pthread_create(
119  pthread_t* thread,
120  const pthread_attr_t* attr,
121  void* (*start_routine)(void*),
122  void* arg) {
123  if (thread == nullptr) {
124  errno = EINVAL;
125  return -1;
126  }
127 
128  size_t stackSize = attr != nullptr ? attr->stackSize : 0;
129  bool detach = attr != nullptr ? attr->detached : false;
130 
131  // Note that the start routine passed into pthread returns a void* and the
132  // windows API expects DWORD's, so we need to stub around that.
133  auto startupInfo = new pthread_startup_info();
134  startupInfo->startupFunction = start_routine;
135  startupInfo->startupArgument = arg;
136  startupInfo->thread = std::make_shared<pthread_detail::pthread_t>();
137  // We create the thread suspended so we can assign the handle and thread id
138  // in the pthread_t.
139  startupInfo->thread->handle = CreateThread(
140  nullptr,
141  stackSize,
142  internal_pthread_thread_start,
143  startupInfo,
144  CREATE_SUSPENDED,
145  &startupInfo->thread->threadID);
146  ResumeThread(startupInfo->thread->handle);
147 
148  if (detach) {
149  *thread = std::make_shared<pthread_detail::pthread_t>();
150  (*thread)->detached = true;
151  (*thread)->handle = startupInfo->thread->handle;
152  (*thread)->threadID = startupInfo->thread->threadID;
153  } else {
154  *thread = startupInfo->thread;
155  }
156  return 0;
157 }
158 
159 pthread_t pthread_self() {
160  // Not possible to race :)
161  if (current_thread_self == nullptr) {
162  current_thread_self = std::make_shared<pthread_detail::pthread_t>();
163  current_thread_self->threadID = GetCurrentThreadId();
164  // The handle returned by GetCurrentThread is a pseudo-handle and needs to
165  // be swapped out for a real handle to be useful anywhere other than this
166  // thread.
167  DuplicateHandle(
168  GetCurrentProcess(),
169  GetCurrentThread(),
170  GetCurrentProcess(),
171  &current_thread_self->handle,
172  DUPLICATE_SAME_ACCESS,
173  TRUE,
174  0);
175  }
176 
177  return current_thread_self;
178 }
179 
180 int pthread_join(pthread_t thread, void** exitCode) {
181  if (thread->detached) {
182  errno = EINVAL;
183  return -1;
184  }
185 
186  if (WaitForSingleObjectEx(thread->handle, INFINITE, FALSE) == WAIT_FAILED) {
187  return -1;
188  }
189 
190  if (exitCode != nullptr) {
191  DWORD e;
192  if (!GetExitCodeThread(thread->handle, &e)) {
193  return -1;
194  }
195  *exitCode = reinterpret_cast<void*>(static_cast<uintptr_t>(e));
196  }
197 
198  return 0;
199 }
200 
201 HANDLE pthread_getw32threadhandle_np(pthread_t thread) {
202  return thread->handle;
203 }
204 
205 DWORD pthread_getw32threadid_np(pthread_t thread) {
206  return thread->threadID;
207 }
208 
209 int pthread_setschedparam(
210  pthread_t thread,
211  int policy,
212  const sched_param* param) {
213  if (thread->detached) {
214  errno = EINVAL;
215  return -1;
216  }
217 
218  auto newPrior = param->sched_priority;
219  if (newPrior > THREAD_PRIORITY_TIME_CRITICAL ||
220  newPrior < THREAD_PRIORITY_IDLE) {
221  errno = EINVAL;
222  return -1;
223  }
224  if (GetPriorityClass(GetCurrentProcess()) != REALTIME_PRIORITY_CLASS) {
225  if (newPrior > THREAD_PRIORITY_IDLE && newPrior < THREAD_PRIORITY_LOWEST) {
226  // The values between IDLE and LOWEST are invalid unless the process is
227  // running as realtime.
228  newPrior = THREAD_PRIORITY_LOWEST;
229  } else if (
230  newPrior < THREAD_PRIORITY_TIME_CRITICAL &&
231  newPrior > THREAD_PRIORITY_HIGHEST) {
232  // Same as above.
233  newPrior = THREAD_PRIORITY_HIGHEST;
234  }
235  }
236  if (!SetThreadPriority(thread->handle, newPrior)) {
237  return -1;
238  }
239  return 0;
240 }
241 
242 int pthread_mutexattr_init(pthread_mutexattr_t* attr) {
243  if (attr == nullptr) {
244  return EINVAL;
245  }
246 
247  attr->type = PTHREAD_MUTEX_DEFAULT;
248  return 0;
249 }
250 
251 int pthread_mutexattr_destroy(pthread_mutexattr_t* attr) {
252  if (attr == nullptr) {
253  return EINVAL;
254  }
255 
256  return 0;
257 }
258 
259 int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int type) {
260  if (attr == nullptr) {
261  return EINVAL;
262  }
263 
264  if (type != PTHREAD_MUTEX_DEFAULT && type != PTHREAD_MUTEX_RECURSIVE) {
265  return EINVAL;
266  }
267 
268  attr->type = type;
269  return 0;
270 }
271 
272 struct pthread_mutex_t_ {
273  private:
274  int type;
275  union {
276  std::timed_mutex timed_mtx;
277  std::recursive_timed_mutex recursive_timed_mtx;
278  };
279 
280  public:
281  pthread_mutex_t_(int mutex_type) : type(mutex_type) {
282  switch (type) {
283  case PTHREAD_MUTEX_NORMAL:
284  new (&timed_mtx) std::timed_mutex();
285  break;
286  case PTHREAD_MUTEX_RECURSIVE:
287  new (&recursive_timed_mtx) std::recursive_timed_mutex();
288  break;
289  }
290  }
291 
292  ~pthread_mutex_t_() noexcept {
293  switch (type) {
294  case PTHREAD_MUTEX_NORMAL:
295  timed_mtx.~timed_mutex();
296  break;
297  case PTHREAD_MUTEX_RECURSIVE:
298  recursive_timed_mtx.~recursive_timed_mutex();
299  break;
300  }
301  }
302 
303  void lock() {
304  switch (type) {
305  case PTHREAD_MUTEX_NORMAL:
306  timed_mtx.lock();
307  break;
308  case PTHREAD_MUTEX_RECURSIVE:
309  recursive_timed_mtx.lock();
310  break;
311  }
312  }
313 
314  bool try_lock() {
315  switch (type) {
316  case PTHREAD_MUTEX_NORMAL:
317  return timed_mtx.try_lock();
318  case PTHREAD_MUTEX_RECURSIVE:
319  return recursive_timed_mtx.try_lock();
320  }
322  }
323 
324  bool timed_try_lock(std::chrono::system_clock::time_point until) {
325  switch (type) {
326  case PTHREAD_MUTEX_NORMAL:
327  return timed_mtx.try_lock_until(until);
328  case PTHREAD_MUTEX_RECURSIVE:
329  return recursive_timed_mtx.try_lock_until(until);
330  }
332  }
333 
334  void unlock() {
335  switch (type) {
336  case PTHREAD_MUTEX_NORMAL:
337  timed_mtx.unlock();
338  break;
339  case PTHREAD_MUTEX_RECURSIVE:
340  recursive_timed_mtx.unlock();
341  break;
342  }
343  }
344 
345  void condition_wait(std::condition_variable_any& cond) {
346  switch (type) {
347  case PTHREAD_MUTEX_NORMAL: {
348  std::unique_lock<std::timed_mutex> lock(timed_mtx);
349  cond.wait(lock);
350  break;
351  }
352  case PTHREAD_MUTEX_RECURSIVE: {
353  std::unique_lock<std::recursive_timed_mutex> lock(recursive_timed_mtx);
354  cond.wait(lock);
355  break;
356  }
357  }
358  }
359 
360  bool condition_timed_wait(
361  std::condition_variable_any& cond,
362  std::chrono::system_clock::time_point until) {
363  switch (type) {
364  case PTHREAD_MUTEX_NORMAL: {
365  std::unique_lock<std::timed_mutex> lock(timed_mtx);
366  return cond.wait_until(lock, until) == std::cv_status::no_timeout;
367  }
368  case PTHREAD_MUTEX_RECURSIVE: {
369  std::unique_lock<std::recursive_timed_mutex> lock(recursive_timed_mtx);
370  return cond.wait_until(lock, until) == std::cv_status::no_timeout;
371  }
372  }
374  }
375 };
376 
377 int pthread_mutex_init(
378  pthread_mutex_t* mutex,
379  const pthread_mutexattr_t* attr) {
380  if (mutex == nullptr) {
381  return EINVAL;
382  }
383 
384  auto type = attr != nullptr ? attr->type : PTHREAD_MUTEX_DEFAULT;
385  auto ret = new pthread_mutex_t_(type);
386  *mutex = ret;
387  return 0;
388 }
389 
390 int pthread_mutex_destroy(pthread_mutex_t* mutex) {
391  if (mutex == nullptr) {
392  return EINVAL;
393  }
394 
395  delete *mutex;
396  *mutex = nullptr;
397  return 0;
398 }
399 
400 int pthread_mutex_lock(pthread_mutex_t* mutex) {
401  if (mutex == nullptr) {
402  return EINVAL;
403  }
404 
405  // This implementation does not implement deadlock detection, as the
406  // STL mutexes we're wrapping don't either.
407  (*mutex)->lock();
408  return 0;
409 }
410 
411 int pthread_mutex_trylock(pthread_mutex_t* mutex) {
412  if (mutex == nullptr) {
413  return EINVAL;
414  }
415 
416  if ((*mutex)->try_lock()) {
417  return 0;
418  } else {
419  return EBUSY;
420  }
421 }
422 
423 static std::chrono::system_clock::time_point timespec_to_time_point(
424  const timespec* t) {
425  using time_point = std::chrono::system_clock::time_point;
426  auto ns =
427  std::chrono::seconds(t->tv_sec) + std::chrono::nanoseconds(t->tv_nsec);
428  return time_point(std::chrono::duration_cast<time_point::duration>(ns));
429 }
430 
431 int pthread_mutex_timedlock(
432  pthread_mutex_t* mutex,
433  const timespec* abs_timeout) {
434  if (mutex == nullptr || abs_timeout == nullptr) {
435  return EINVAL;
436  }
437 
438  auto time = timespec_to_time_point(abs_timeout);
439  if ((*mutex)->timed_try_lock(time)) {
440  return 0;
441  } else {
442  return ETIMEDOUT;
443  }
444 }
445 
446 int pthread_mutex_unlock(pthread_mutex_t* mutex) {
447  if (mutex == nullptr) {
448  return EINVAL;
449  }
450 
451  // This implementation allows other threads to unlock it,
452  // as the STL containers also do.
453  (*mutex)->unlock();
454  return 0;
455 }
456 
457 struct pthread_rwlock_t_ {
458  std::shared_timed_mutex mtx;
459  std::atomic<bool> writing{false};
460 };
461 
462 int pthread_rwlock_init(pthread_rwlock_t* rwlock, const void* attr) {
463  if (attr != nullptr) {
464  return EINVAL;
465  }
466  if (rwlock == nullptr) {
467  return EINVAL;
468  }
469 
470  *rwlock = new pthread_rwlock_t_();
471  return 0;
472 }
473 
474 int pthread_rwlock_destroy(pthread_rwlock_t* rwlock) {
475  if (rwlock == nullptr) {
476  return EINVAL;
477  }
478 
479  delete *rwlock;
480  *rwlock = nullptr;
481  return 0;
482 }
483 
484 int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock) {
485  if (rwlock == nullptr) {
486  return EINVAL;
487  }
488 
489  (*rwlock)->mtx.lock_shared();
490  return 0;
491 }
492 
493 int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock) {
494  if (rwlock == nullptr) {
495  return EINVAL;
496  }
497 
498  if ((*rwlock)->mtx.try_lock_shared()) {
499  return 0;
500  } else {
501  return EBUSY;
502  }
503 }
504 
505 int pthread_rwlock_timedrdlock(
506  pthread_rwlock_t* rwlock,
507  const timespec* abs_timeout) {
508  if (rwlock == nullptr) {
509  return EINVAL;
510  }
511 
512  auto time = timespec_to_time_point(abs_timeout);
513  if ((*rwlock)->mtx.try_lock_shared_until(time)) {
514  return 0;
515  } else {
516  return ETIMEDOUT;
517  }
518 }
519 
520 int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock) {
521  if (rwlock == nullptr) {
522  return EINVAL;
523  }
524 
525  (*rwlock)->mtx.lock();
526  (*rwlock)->writing = true;
527  return 0;
528 }
529 
530 // Note: As far as I can tell, rwlock is technically supposed to
531 // be an upgradable lock, but we don't implement it that way.
532 int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock) {
533  if (rwlock == nullptr) {
534  return EINVAL;
535  }
536 
537  if ((*rwlock)->mtx.try_lock()) {
538  (*rwlock)->writing = true;
539  return 0;
540  } else {
541  return EBUSY;
542  }
543 }
544 
545 int pthread_rwlock_timedwrlock(
546  pthread_rwlock_t* rwlock,
547  const timespec* abs_timeout) {
548  if (rwlock == nullptr) {
549  return EINVAL;
550  }
551 
552  auto time = timespec_to_time_point(abs_timeout);
553  if ((*rwlock)->mtx.try_lock_until(time)) {
554  (*rwlock)->writing = true;
555  return 0;
556  } else {
557  return ETIMEDOUT;
558  }
559 }
560 
561 int pthread_rwlock_unlock(pthread_rwlock_t* rwlock) {
562  if (rwlock == nullptr) {
563  return EINVAL;
564  }
565 
566  // Note: We don't have any checking to ensure we have actually
567  // locked things first, so you'll actually be in undefined behavior
568  // territory if you do attempt to unlock things you haven't locked.
569  if ((*rwlock)->writing) {
570  (*rwlock)->mtx.unlock();
571  // If we fail, then another thread has already immediately acquired
572  // the write lock, so this should stay as true :)
573  bool dump = true;
574  (void)(*rwlock)->writing.compare_exchange_strong(dump, false);
575  } else {
576  (*rwlock)->mtx.unlock_shared();
577  }
578  return 0;
579 }
580 
581 struct pthread_cond_t_ {
582  // pthread_mutex_t is backed by timed
583  // mutexes, so no basic condition variable for
584  // us :(
585  std::condition_variable_any cond;
586 };
587 
588 int pthread_cond_init(pthread_cond_t* cond, const void* attr) {
589  if (attr != nullptr) {
590  return EINVAL;
591  }
592  if (cond == nullptr) {
593  return EINVAL;
594  }
595 
596  *cond = new pthread_cond_t_();
597  return 0;
598 }
599 
600 int pthread_cond_destroy(pthread_cond_t* cond) {
601  if (cond == nullptr) {
602  return EINVAL;
603  }
604 
605  delete *cond;
606  *cond = nullptr;
607  return 0;
608 }
609 
610 int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex) {
611  if (cond == nullptr || mutex == nullptr) {
612  return EINVAL;
613  }
614 
615  (*mutex)->condition_wait((*cond)->cond);
616  return 0;
617 }
618 
619 int pthread_cond_timedwait(
620  pthread_cond_t* cond,
621  pthread_mutex_t* mutex,
622  const timespec* abstime) {
623  if (cond == nullptr || mutex == nullptr || abstime == nullptr) {
624  return EINVAL;
625  }
626 
627  auto time = timespec_to_time_point(abstime);
628  if ((*mutex)->condition_timed_wait((*cond)->cond, time)) {
629  return 0;
630  } else {
631  return ETIMEDOUT;
632  }
633 }
634 
635 int pthread_cond_signal(pthread_cond_t* cond) {
636  if (cond == nullptr) {
637  return EINVAL;
638  }
639 
640  (*cond)->cond.notify_one();
641  return 0;
642 }
643 
644 int pthread_cond_broadcast(pthread_cond_t* cond) {
645  if (cond == nullptr) {
646  return EINVAL;
647  }
648 
649  (*cond)->cond.notify_all();
650  return 0;
651 }
652 
653 int pthread_key_create(pthread_key_t* key, void (*destructor)(void*)) {
654  try {
655  auto newKey = new boost::thread_specific_ptr<void>(destructor);
656  *key = newKey;
657  return 0;
658  } catch (boost::thread_resource_error) {
659  return -1;
660  }
661 }
662 
663 int pthread_key_delete(pthread_key_t key) {
664  try {
665  auto realKey = reinterpret_cast<boost::thread_specific_ptr<void>*>(key);
666  delete realKey;
667  return 0;
668  } catch (boost::thread_resource_error) {
669  return -1;
670  }
671 }
672 
673 void* pthread_getspecific(pthread_key_t key) {
674  auto realKey = reinterpret_cast<boost::thread_specific_ptr<void>*>(key);
675  // This can't throw as-per the documentation.
676  return realKey->get();
677 }
678 
679 int pthread_setspecific(pthread_key_t key, const void* value) {
680  try {
681  auto realKey = reinterpret_cast<boost::thread_specific_ptr<void>*>(key);
682  // We can't just call reset here because that would invoke the cleanup
683  // function, which we don't want to do.
684  boost::detail::set_tss_data(
685  realKey,
686  boost::shared_ptr<boost::detail::tss_cleanup_function>(),
687  const_cast<void*>(value),
688  false);
689  return 0;
690  } catch (boost::thread_resource_error) {
691  return -1;
692  }
693 }
694 } // namespace pthread
695 } // namespace portability
696 } // namespace folly
697 #endif
static constexpr int kb(int kilos)
LogLevel max
Definition: LogLevel.cpp:31
PskType type
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
#define TRUE
Definition: test.c:30
FOLLY_ALWAYS_INLINE void assume_unreachable()
Definition: Assume.h:59
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
#define FALSE
Definition: test.c:32
Until until(Predicate pred=Predicate())
Definition: Base.h:656
std::mutex mutex
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
std::chrono::nanoseconds time()
state
Definition: http_parser.c:272