| /* SPDX-License-Identifier: LGPL-2.1-or-later */ |
| /* |
| * Copyright (C) 2019, Google Inc. |
| * |
| * thread.cpp - Thread support |
| */ |
| |
| #include "libcamera/internal/thread.h" |
| |
| #include <atomic> |
| #include <condition_variable> |
| #include <list> |
| #include <sys/syscall.h> |
| #include <sys/types.h> |
| #include <unistd.h> |
| |
| #include "libcamera/internal/event_dispatcher.h" |
| #include "libcamera/internal/event_dispatcher_poll.h" |
| #include "libcamera/internal/log.h" |
| #include "libcamera/internal/message.h" |
| |
| /** |
| * \page thread Thread Support |
| * |
| * libcamera supports multi-threaded applications through a threading model that |
| * sets precise rules to guarantee thread-safe usage of the API. Additionally, |
| * libcamera makes internal use of threads, and offers APIs that simplify |
| * interactions with application threads. Careful compliance with the threading |
| * model will ensure avoidance of race conditions. |
| * |
| * Every thread created by libcamera is associated with an instance of the |
| * Thread class. Those threads run an internal event loop by default to |
| * dispatch events to objects. Additionally, the main thread of the application |
| * (defined as the thread that calls CameraManager::start()) is also associated |
| * with a Thread instance, but has no event loop accessible to libcamera. Other |
| * application threads are not visible to libcamera. |
| * |
| * \section thread-objects Threads and Objects |
| * |
| * Instances of the Object class and all its derived classes are thread-aware |
| * and are bound to the thread they are created in. They are said to *live* in |
| * a thread, and they interact with the event loop of their thread for the |
| * purpose of message passing and signal delivery. Messages posted to the |
| * object with Object::postMessage() will be delivered from the event loop of |
| * the thread that the object lives in. Signals delivered to the object, unless |
| * explicitly connected with ConnectionTypeDirect, will also be delivered from |
| * the object thread's event loop. |
| * |
| * All Object instances created internally by libcamera are bound to internal |
| * threads. As objects interact with thread event loops for proper operation, |
| * creating an Object instance in a thread that has no internal event loop (such |
| * as the main application thread, or libcamera threads that have a custom main |
| * loop), prevents some features of the Object class from being used. See |
| * Thread::exec() for more details. |
| * |
| * \section thread-signals Threads and Signals |
| * |
| * When sent to a receiver that does not inherit from the Object class, signals |
| * are delivered synchronously in the thread of the sender. When the receiver |
| * inherits from the Object class, delivery is by default asynchronous if the |
| * sender and receiver live in different threads. In that case, the signal is |
| * posted to the receiver's message queue and will be delivered from the |
| * receiver's event loop, running in the receiver's thread. This mechanism can |
| * be overridden by selecting a different connection type when calling |
| * Signal::connect(). |
| * |
| * Asynchronous signal delivery is used internally in libcamera, but is also |
| * available to applications if desired. To use this feature, applications |
| * shall create receiver classes that inherit from the Object class, and |
| * provide an event loop to the CameraManager as explained above. Note that |
| * Object instances created by the application are limited to living in the |
| * application's main thread. Creating Object instances from another thread of |
| * an application causes undefined behaviour. |
| * |
| * \section thread-reentrancy Reentrancy and Thread-Safety |
| * |
| * Through the documentation, several terms are used to define how classes and |
| * their member functions can be used from multiple threads. |
| * |
| * - A **reentrant** function may be called simultaneously from multiple |
| * threads if and only if each invocation uses a different instance of the |
| * class. This is the default for all member functions not explictly marked |
| * otherwise. |
| * |
| * - \anchor thread-safe A **thread-safe** function may be called |
| * simultaneously from multiple threads on the same instance of a class. A |
| * thread-safe function is thus reentrant. Thread-safe functions may also be |
| * called simultaneously with any other reentrant function of the same class |
| * on the same instance. |
| * |
| * - \anchor thread-bound A **thread-bound** function may be called only from |
| * the thread that the class instances lives in (see section \ref |
| * thread-objects). For instances of classes that do not derive from the |
| * Object class, this is the thread in which the instance was created. A |
| * thread-bound function is not thread-safe, and may or may not be reentrant. |
| * |
| * Neither reentrancy nor thread-safety, in this context, mean that a function |
| * may be called simultaneously from the same thread, for instance from a |
| * callback invoked by the function. This may deadlock and isn't allowed unless |
| * separately documented. |
| * |
| * A class is defined as reentrant, thread-safe or thread-bound if all its |
| * member functions are reentrant, thread-safe or thread-bound respectively. |
| * Some member functions may additionally be documented as having additional |
| * thread-related attributes. |
| * |
| * Most classes are reentrant but not thread-safe, as making them fully |
| * thread-safe would incur locking costs considered prohibitive for the |
| * expected use cases. |
| */ |
| |
| /** |
| * \file thread.h |
| * \brief Thread support |
| */ |
| |
| namespace libcamera { |
| |
| LOG_DEFINE_CATEGORY(Thread) |
| |
| class ThreadMain; |
| |
| /** |
| * \brief A queue of posted messages |
| */ |
| class MessageQueue |
| { |
| public: |
| /** |
| * \brief List of queued Message instances |
| */ |
| std::list<std::unique_ptr<Message>> list_; |
| /** |
| * \brief Protects the \ref list_ |
| */ |
| Mutex mutex_; |
| }; |
| |
| /** |
| * \brief Thread-local internal data |
| */ |
| class ThreadData |
| { |
| public: |
| ThreadData() |
| : thread_(nullptr), running_(false), dispatcher_(nullptr) |
| { |
| } |
| |
| static ThreadData *current(); |
| |
| private: |
| friend class Thread; |
| friend class ThreadMain; |
| |
| Thread *thread_; |
| bool running_; |
| pid_t tid_; |
| |
| Mutex mutex_; |
| |
| std::atomic<EventDispatcher *> dispatcher_; |
| |
| std::condition_variable cv_; |
| std::atomic<bool> exit_; |
| int exitCode_; |
| |
| MessageQueue messages_; |
| }; |
| |
| /** |
| * \brief Thread wrapper for the main thread |
| */ |
| class ThreadMain : public Thread |
| { |
| public: |
| ThreadMain() |
| { |
| data_->running_ = true; |
| } |
| |
| protected: |
| void run() override |
| { |
| LOG(Thread, Fatal) << "The main thread can't be restarted"; |
| } |
| }; |
| |
| static thread_local ThreadData *currentThreadData = nullptr; |
| static ThreadMain mainThread; |
| |
| /** |
| * \brief Retrieve thread-local internal data for the current thread |
| * \return The thread-local internal data for the current thread |
| */ |
| ThreadData *ThreadData::current() |
| { |
| if (currentThreadData) |
| return currentThreadData; |
| |
| /* |
| * The main thread doesn't receive thread-local data when it is |
| * started, set it here. |
| */ |
| ThreadData *data = mainThread.data_; |
| data->tid_ = syscall(SYS_gettid); |
| currentThreadData = data; |
| return data; |
| } |
| |
| /** |
| * \typedef Mutex |
| * \brief An alias for std::mutex |
| */ |
| |
| /** |
| * \typedef MutexLocker |
| * \brief An alias for std::unique_lock<std::mutex> |
| */ |
| |
| /** |
| * \class Thread |
| * \brief A thread of execution |
| * |
| * The Thread class is a wrapper around std::thread that handles integration |
| * with the Object, Signal and EventDispatcher classes. |
| * |
| * Thread instances by default run an event loop until the exit() method is |
| * called. The event loop dispatches events (messages, notifiers and timers) |
| * sent to the objects living in the thread. This behaviour can be modified by |
| * overriding the run() function. |
| * |
| * \context This class is \threadsafe. |
| */ |
| |
| /** |
| * \brief Create a thread |
| */ |
| Thread::Thread() |
| { |
| data_ = new ThreadData; |
| data_->thread_ = this; |
| } |
| |
| Thread::~Thread() |
| { |
| delete data_->dispatcher_.load(std::memory_order_relaxed); |
| delete data_; |
| } |
| |
| /** |
| * \brief Start the thread |
| */ |
| void Thread::start() |
| { |
| MutexLocker locker(data_->mutex_); |
| |
| if (data_->running_) |
| return; |
| |
| data_->running_ = true; |
| data_->exitCode_ = -1; |
| data_->exit_.store(false, std::memory_order_relaxed); |
| |
| thread_ = std::thread(&Thread::startThread, this); |
| } |
| |
| void Thread::startThread() |
| { |
| struct ThreadCleaner { |
| ThreadCleaner(Thread *thread, void (Thread::*cleaner)()) |
| : thread_(thread), cleaner_(cleaner) |
| { |
| } |
| ~ThreadCleaner() |
| { |
| (thread_->*cleaner_)(); |
| } |
| |
| Thread *thread_; |
| void (Thread::*cleaner_)(); |
| }; |
| |
| /* |
| * Make sure the thread is cleaned up even if the run method exits |
| * abnormally (for instance via a direct call to pthread_cancel()). |
| */ |
| thread_local ThreadCleaner cleaner(this, &Thread::finishThread); |
| |
| data_->tid_ = syscall(SYS_gettid); |
| currentThreadData = data_; |
| |
| run(); |
| } |
| |
| /** |
| * \brief Enter the event loop |
| * |
| * This method enters an event loop based on the event dispatcher instance for |
| * the thread, and blocks until the exit() method is called. It is meant to be |
| * called within the thread from the run() method and shall not be called |
| * outside of the thread. |
| * |
| * \return The exit code passed to the exit() method |
| */ |
| int Thread::exec() |
| { |
| MutexLocker locker(data_->mutex_); |
| |
| EventDispatcher *dispatcher = eventDispatcher(); |
| |
| locker.unlock(); |
| |
| while (!data_->exit_.load(std::memory_order_acquire)) |
| dispatcher->processEvents(); |
| |
| locker.lock(); |
| |
| return data_->exitCode_; |
| } |
| |
| /** |
| * \brief Main method of the thread |
| * |
| * When the thread is started with start(), it calls this method in the context |
| * of the new thread. The run() method can be overridden to perform custom |
| * work, either custom initialization and cleanup before and after calling the |
| * Thread::exec() function, or a custom thread loop altogether. When this |
| * method returns the thread execution is stopped, and the \ref finished signal |
| * is emitted. |
| * |
| * Note that if this function is overridden and doesn't call Thread::exec(), no |
| * events will be dispatched to the objects living in the thread. These objects |
| * will not be able to use the EventNotifier, Timer or Message facilities. This |
| * includes functions that rely on message dispatching, such as |
| * Object::deleteLater(). |
| * |
| * The base implementation just calls exec(). |
| */ |
| void Thread::run() |
| { |
| exec(); |
| } |
| |
| void Thread::finishThread() |
| { |
| data_->mutex_.lock(); |
| data_->running_ = false; |
| data_->mutex_.unlock(); |
| |
| finished.emit(this); |
| data_->cv_.notify_all(); |
| } |
| |
| /** |
| * \brief Stop the thread's event loop |
| * \param[in] code The exit code |
| * |
| * This method interrupts the event loop started by the exec() method, causing |
| * exec() to return \a code. |
| * |
| * Calling exit() on a thread that reimplements the run() method and doesn't |
| * call exec() will likely have no effect. |
| */ |
| void Thread::exit(int code) |
| { |
| data_->exitCode_ = code; |
| data_->exit_.store(true, std::memory_order_release); |
| |
| EventDispatcher *dispatcher = data_->dispatcher_.load(std::memory_order_relaxed); |
| if (!dispatcher) |
| return; |
| |
| dispatcher->interrupt(); |
| } |
| |
| /** |
| * \brief Wait for the thread to finish |
| * \param[in] duration Maximum wait duration |
| * |
| * This function waits until the thread finishes or the \a duration has |
| * elapsed, whichever happens first. If \a duration is equal to |
| * utils::duration::max(), the wait never times out. If the thread is not |
| * running the function returns immediately. |
| * |
| * \return True if the thread has finished, or false if the wait timed out |
| */ |
| bool Thread::wait(utils::duration duration) |
| { |
| bool hasFinished = true; |
| |
| { |
| MutexLocker locker(data_->mutex_); |
| |
| if (duration == utils::duration::max()) |
| data_->cv_.wait(locker, [&]() { return !data_->running_; }); |
| else |
| hasFinished = data_->cv_.wait_for(locker, duration, |
| [&]() { return !data_->running_; }); |
| } |
| |
| if (thread_.joinable()) |
| thread_.join(); |
| |
| return hasFinished; |
| } |
| |
| /** |
| * \brief Check if the thread is running |
| * |
| * A Thread instance is considered as running once the underlying thread has |
| * started. This method guarantees that it returns true after the start() |
| * method returns, and false after the wait() method returns. |
| * |
| * \return True if the thread is running, false otherwise |
| */ |
| bool Thread::isRunning() |
| { |
| MutexLocker locker(data_->mutex_); |
| return data_->running_; |
| } |
| |
| /** |
| * \var Thread::finished |
| * \brief Signal the end of thread execution |
| */ |
| |
| /** |
| * \brief Retrieve the Thread instance for the current thread |
| * \return The Thread instance for the current thread |
| */ |
| Thread *Thread::current() |
| { |
| ThreadData *data = ThreadData::current(); |
| return data->thread_; |
| } |
| |
| /** |
| * \brief Retrieve the ID of the current thread |
| * |
| * The thread ID corresponds to the Linux thread ID (TID) as returned by the |
| * gettid system call. |
| * |
| * \return The ID of the current thread |
| */ |
| pid_t Thread::currentId() |
| { |
| ThreadData *data = ThreadData::current(); |
| return data->tid_; |
| } |
| |
| /** |
| * \brief Retrieve the event dispatcher |
| * |
| * This function retrieves the internal event dispatcher for the thread. The |
| * returned event dispatcher is valid until the thread is destroyed. |
| * |
| * \return Pointer to the event dispatcher |
| */ |
| EventDispatcher *Thread::eventDispatcher() |
| { |
| if (!data_->dispatcher_.load(std::memory_order_relaxed)) |
| data_->dispatcher_.store(new EventDispatcherPoll(), |
| std::memory_order_release); |
| |
| return data_->dispatcher_.load(std::memory_order_relaxed); |
| } |
| |
| /** |
| * \brief Post a message to the thread for the \a receiver |
| * \param[in] msg The message |
| * \param[in] receiver The receiver |
| * |
| * This method stores the message \a msg in the message queue of the thread for |
| * the \a receiver and wake up the thread's event loop. Message ownership is |
| * passed to the thread, and the message will be deleted after being delivered. |
| * |
| * Messages are delivered through the thread's event loop. If the thread is not |
| * running its event loop the message will not be delivered until the event |
| * loop gets started. |
| * |
| * If the \a receiver is not bound to this thread the behaviour is undefined. |
| * |
| * \sa exec() |
| */ |
| void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver) |
| { |
| msg->receiver_ = receiver; |
| |
| ASSERT(data_ == receiver->thread()->data_); |
| |
| MutexLocker locker(data_->messages_.mutex_); |
| data_->messages_.list_.push_back(std::move(msg)); |
| receiver->pendingMessages_++; |
| locker.unlock(); |
| |
| EventDispatcher *dispatcher = |
| data_->dispatcher_.load(std::memory_order_acquire); |
| if (dispatcher) |
| dispatcher->interrupt(); |
| } |
| |
| /** |
| * \brief Remove all posted messages for the \a receiver |
| * \param[in] receiver The receiver |
| * |
| * If the \a receiver is not bound to this thread the behaviour is undefined. |
| */ |
| void Thread::removeMessages(Object *receiver) |
| { |
| ASSERT(data_ == receiver->thread()->data_); |
| |
| MutexLocker locker(data_->messages_.mutex_); |
| if (!receiver->pendingMessages_) |
| return; |
| |
| std::vector<std::unique_ptr<Message>> toDelete; |
| for (std::unique_ptr<Message> &msg : data_->messages_.list_) { |
| if (!msg) |
| continue; |
| if (msg->receiver_ != receiver) |
| continue; |
| |
| /* |
| * Move the message to the pending deletion list to delete it |
| * after releasing the lock. The messages list element will |
| * contain a null pointer, and will be removed when dispatching |
| * messages. |
| */ |
| toDelete.push_back(std::move(msg)); |
| receiver->pendingMessages_--; |
| } |
| |
| ASSERT(!receiver->pendingMessages_); |
| locker.unlock(); |
| |
| toDelete.clear(); |
| } |
| |
| /** |
| * \brief Dispatch posted messages for this thread |
| * \param[in] type The message type |
| * |
| * This function immediately dispatches all the messages previously posted for |
| * this thread with postMessage() that match the message \a type. If the \a type |
| * is Message::Type::None, all messages are dispatched. |
| */ |
| void Thread::dispatchMessages(Message::Type type) |
| { |
| MutexLocker locker(data_->messages_.mutex_); |
| |
| std::list<std::unique_ptr<Message>> &messages = data_->messages_.list_; |
| |
| for (auto iter = messages.begin(); iter != messages.end(); ) { |
| std::unique_ptr<Message> &msg = *iter; |
| |
| if (!msg) { |
| iter = data_->messages_.list_.erase(iter); |
| continue; |
| } |
| |
| if (type != Message::Type::None && msg->type() != type) { |
| ++iter; |
| continue; |
| } |
| |
| std::unique_ptr<Message> message = std::move(msg); |
| iter = data_->messages_.list_.erase(iter); |
| |
| Object *receiver = message->receiver_; |
| ASSERT(data_ == receiver->thread()->data_); |
| receiver->pendingMessages_--; |
| |
| locker.unlock(); |
| receiver->message(message.get()); |
| message.reset(); |
| locker.lock(); |
| } |
| } |
| |
| /** |
| * \brief Move an \a object and all its children to the thread |
| * \param[in] object The object |
| */ |
| void Thread::moveObject(Object *object) |
| { |
| ThreadData *currentData = object->thread_->data_; |
| ThreadData *targetData = data_; |
| |
| MutexLocker lockerFrom(currentData->messages_.mutex_, std::defer_lock); |
| MutexLocker lockerTo(targetData->messages_.mutex_, std::defer_lock); |
| std::lock(lockerFrom, lockerTo); |
| |
| moveObject(object, currentData, targetData); |
| } |
| |
| void Thread::moveObject(Object *object, ThreadData *currentData, |
| ThreadData *targetData) |
| { |
| /* Move pending messages to the message queue of the new thread. */ |
| if (object->pendingMessages_) { |
| unsigned int movedMessages = 0; |
| |
| for (std::unique_ptr<Message> &msg : currentData->messages_.list_) { |
| if (!msg) |
| continue; |
| if (msg->receiver_ != object) |
| continue; |
| |
| targetData->messages_.list_.push_back(std::move(msg)); |
| movedMessages++; |
| } |
| |
| if (movedMessages) { |
| EventDispatcher *dispatcher = |
| targetData->dispatcher_.load(std::memory_order_acquire); |
| if (dispatcher) |
| dispatcher->interrupt(); |
| } |
| } |
| |
| object->thread_ = this; |
| |
| /* Move all children. */ |
| for (auto child : object->children_) |
| moveObject(child, currentData, targetData); |
| } |
| |
| } /* namespace libcamera */ |