174 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
		
		
			
		
	
	
			174 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| 
								 | 
							
								/*
							 | 
						||
| 
								 | 
							
								 * Copyright (c) 2006-2023, RT-Thread Development Team
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * SPDX-License-Identifier: Apache-2.0
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * Change Logs:
							 | 
						||
| 
								 | 
							
								 * Date           Author       Notes
							 | 
						||
| 
								 | 
							
								 * 2023-11-20     Shell        Support of condition variable
							 | 
						||
| 
								 | 
							
								 */
							 | 
						||
| 
								 | 
							
								#define DBG_TAG "ipc.condvar"
							 | 
						||
| 
								 | 
							
								#define DBG_LVL DBG_INFO
							 | 
						||
| 
								 | 
							
								#include <rtdbg.h>
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#include <rtdevice.h>
							 | 
						||
| 
								 | 
							
								#include <rtatomic.h>
							 | 
						||
| 
								 | 
							
								#include <rtthread.h>
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								static struct rt_spinlock _local_cv_queue_lock = RT_SPINLOCK_INIT;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#define CV_ASSERT_LOCKED(cv)                                       \
							 | 
						||
| 
								 | 
							
								    RT_ASSERT(!(cv)->waiting_mtx ||                                \
							 | 
						||
| 
								 | 
							
								              rt_mutex_get_owner((rt_mutex_t)(cv)->waiting_mtx) == \
							 | 
						||
| 
								 | 
							
								                  rt_thread_self())
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								void rt_condvar_init(rt_condvar_t cv, char *name)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								#ifdef USING_RT_OBJECT
							 | 
						||
| 
								 | 
							
								    /* TODO: support rt object */
							 | 
						||
| 
								 | 
							
								    rt_object_init();
							 | 
						||
| 
								 | 
							
								#endif
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    rt_wqueue_init(&cv->event);
							 | 
						||
| 
								 | 
							
								    rt_atomic_store(&cv->waiters_cnt, 0);
							 | 
						||
| 
								 | 
							
								    rt_atomic_store(&cv->waiting_mtx, 0);
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								static int _waitq_inqueue(rt_wqueue_t *queue, struct rt_wqueue_node *node,
							 | 
						||
| 
								 | 
							
								                          rt_tick_t timeout, int suspend_flag)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								    rt_thread_t tcb = node->polling_thread;
							 | 
						||
| 
								 | 
							
								    rt_timer_t timer = &(tcb->thread_timer);
							 | 
						||
| 
								 | 
							
								    rt_err_t ret;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (queue->flag != RT_WQ_FLAG_WAKEUP)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        ret = rt_thread_suspend_with_flag(tcb, suspend_flag);
							 | 
						||
| 
								 | 
							
								        if (ret == RT_EOK)
							 | 
						||
| 
								 | 
							
								        {
							 | 
						||
| 
								 | 
							
								            rt_wqueue_add(queue, node);
							 | 
						||
| 
								 | 
							
								            if (timeout != RT_WAITING_FOREVER)
							 | 
						||
| 
								 | 
							
								            {
							 | 
						||
| 
								 | 
							
								                rt_timer_control(timer, RT_TIMER_CTRL_SET_TIME, &timeout);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                rt_timer_start(timer);
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    else
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        ret = RT_EOK;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    return ret;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#define INIT_WAITQ_NODE(node)                                  \
							 | 
						||
| 
								 | 
							
								    {                                                          \
							 | 
						||
| 
								 | 
							
								        .polling_thread = rt_thread_self(), .key = 0,          \
							 | 
						||
| 
								 | 
							
								        .wakeup = __wqueue_default_wake, .wqueue = &cv->event, \
							 | 
						||
| 
								 | 
							
								        .list = RT_LIST_OBJECT_INIT(node.list)                 \
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								int rt_condvar_timedwait(rt_condvar_t cv, rt_mutex_t mtx, int suspend_flag,
							 | 
						||
| 
								 | 
							
								                         rt_tick_t timeout)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								    rt_err_t acq_mtx_succ, rc;
							 | 
						||
| 
								 | 
							
								    rt_atomic_t waiting_mtx;
							 | 
						||
| 
								 | 
							
								    struct rt_wqueue_node node = INIT_WAITQ_NODE(node);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /* not allowed in IRQ & critical section */
							 | 
						||
| 
								 | 
							
								    RT_DEBUG_SCHEDULER_AVAILABLE(1);
							 | 
						||
| 
								 | 
							
								    CV_ASSERT_LOCKED(cv);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * for the worst case, this is racy with the following works to reset field
							 | 
						||
| 
								 | 
							
								     * before mutex is taken. The spinlock then comes to rescue.
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    rt_spin_lock(&_local_cv_queue_lock);
							 | 
						||
| 
								 | 
							
								    waiting_mtx = rt_atomic_load(&cv->waiting_mtx);
							 | 
						||
| 
								 | 
							
								    if (!waiting_mtx)
							 | 
						||
| 
								 | 
							
								        acq_mtx_succ = rt_atomic_compare_exchange_strong(
							 | 
						||
| 
								 | 
							
								            &cv->waiting_mtx, &waiting_mtx, (size_t)mtx);
							 | 
						||
| 
								 | 
							
								    else
							 | 
						||
| 
								 | 
							
								        acq_mtx_succ = 0;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    rt_spin_unlock(&_local_cv_queue_lock);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (acq_mtx_succ == 1 || waiting_mtx == (size_t)mtx)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        rt_atomic_add(&cv->waiters_cnt, 1);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        rt_enter_critical();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if (suspend_flag == RT_INTERRUPTIBLE)
							 | 
						||
| 
								 | 
							
								            rc = _waitq_inqueue(&cv->event, &node, timeout, RT_INTERRUPTIBLE);
							 | 
						||
| 
								 | 
							
								        else /* UNINTERRUPTIBLE is forbidden, since it's not safe for user space */
							 | 
						||
| 
								 | 
							
								            rc = _waitq_inqueue(&cv->event, &node, timeout, RT_KILLABLE);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        acq_mtx_succ = rt_mutex_release(mtx);
							 | 
						||
| 
								 | 
							
								        RT_ASSERT(acq_mtx_succ == 0);
							 | 
						||
| 
								 | 
							
								        rt_exit_critical();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if (rc == RT_EOK)
							 | 
						||
| 
								 | 
							
								        {
							 | 
						||
| 
								 | 
							
								            rt_schedule();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            rc = rt_get_errno();
							 | 
						||
| 
								 | 
							
								            rc = rc > 0 ? -rc : rc;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        else
							 | 
						||
| 
								 | 
							
								        {
							 | 
						||
| 
								 | 
							
								            LOG_D("%s() failed to suspend", __func__);
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        rt_wqueue_remove(&node);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        rt_spin_lock(&_local_cv_queue_lock);
							 | 
						||
| 
								 | 
							
								        if (rt_atomic_add(&cv->waiters_cnt, -1) == 1)
							 | 
						||
| 
								 | 
							
								        {
							 | 
						||
| 
								 | 
							
								            waiting_mtx = (size_t)mtx;
							 | 
						||
| 
								 | 
							
								            acq_mtx_succ = rt_atomic_compare_exchange_strong(&cv->waiting_mtx,
							 | 
						||
| 
								 | 
							
								                                                      &waiting_mtx, 0);
							 | 
						||
| 
								 | 
							
								            RT_ASSERT(acq_mtx_succ == 1);
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        rt_spin_unlock(&_local_cv_queue_lock);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        acq_mtx_succ = rt_mutex_take(mtx, RT_WAITING_FOREVER);
							 | 
						||
| 
								 | 
							
								        RT_ASSERT(acq_mtx_succ == 0);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    else
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        LOG_D("%s: conflict waiting mutex", __func__);
							 | 
						||
| 
								 | 
							
								        rc = -EBUSY;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    return rc;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/** Keep in mind that we always operating when cv.waiting_mtx is taken */
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								int rt_condvar_signal(rt_condvar_t cv)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								    CV_ASSERT_LOCKED(cv);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /* to avoid spurious wakeups */
							 | 
						||
| 
								 | 
							
								    if (rt_atomic_load(&cv->waiters_cnt) > 0)
							 | 
						||
| 
								 | 
							
								        rt_wqueue_wakeup(&cv->event, 0);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    cv->event.flag = 0;
							 | 
						||
| 
								 | 
							
								    return 0;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								int rt_condvar_broadcast(rt_condvar_t cv)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								    CV_ASSERT_LOCKED(cv);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /* to avoid spurious wakeups */
							 | 
						||
| 
								 | 
							
								    if (rt_atomic_load(&cv->waiters_cnt) > 0)
							 | 
						||
| 
								 | 
							
								        rt_wqueue_wakeup_all(&cv->event, 0);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    cv->event.flag = 0;
							 | 
						||
| 
								 | 
							
								    return 0;
							 | 
						||
| 
								 | 
							
								}
							 |