174 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
		
		
			
		
	
	
			174 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
|  | /*
 | ||
|  |  * Copyright (c) 2006-2023, RT-Thread Development Team | ||
|  |  * | ||
|  |  * SPDX-License-Identifier: Apache-2.0 | ||
|  |  * | ||
|  |  * Change Logs: | ||
|  |  * Date           Author       Notes | ||
|  |  * 2023-11-20     Shell        Support of condition variable | ||
|  |  */ | ||
|  | #define DBG_TAG "ipc.condvar"
 | ||
|  | #define DBG_LVL DBG_INFO
 | ||
|  | #include <rtdbg.h>
 | ||
|  | 
 | ||
|  | #include <rtdevice.h>
 | ||
|  | #include <rtatomic.h>
 | ||
|  | #include <rtthread.h>
 | ||
|  | 
 | ||
|  | static struct rt_spinlock _local_cv_queue_lock = RT_SPINLOCK_INIT; | ||
|  | 
 | ||
|  | #define CV_ASSERT_LOCKED(cv)                                       \
 | ||
|  |     RT_ASSERT(!(cv)->waiting_mtx ||                                \ | ||
|  |               rt_mutex_get_owner((rt_mutex_t)(cv)->waiting_mtx) == \ | ||
|  |                   rt_thread_self()) | ||
|  | 
 | ||
|  | void rt_condvar_init(rt_condvar_t cv, char *name) | ||
|  | { | ||
|  | #ifdef USING_RT_OBJECT
 | ||
|  |     /* TODO: support rt object */ | ||
|  |     rt_object_init(); | ||
|  | #endif
 | ||
|  | 
 | ||
|  |     rt_wqueue_init(&cv->event); | ||
|  |     rt_atomic_store(&cv->waiters_cnt, 0); | ||
|  |     rt_atomic_store(&cv->waiting_mtx, 0); | ||
|  | } | ||
|  | 
 | ||
|  | static int _waitq_inqueue(rt_wqueue_t *queue, struct rt_wqueue_node *node, | ||
|  |                           rt_tick_t timeout, int suspend_flag) | ||
|  | { | ||
|  |     rt_thread_t tcb = node->polling_thread; | ||
|  |     rt_timer_t timer = &(tcb->thread_timer); | ||
|  |     rt_err_t ret; | ||
|  | 
 | ||
|  |     if (queue->flag != RT_WQ_FLAG_WAKEUP) | ||
|  |     { | ||
|  |         ret = rt_thread_suspend_with_flag(tcb, suspend_flag); | ||
|  |         if (ret == RT_EOK) | ||
|  |         { | ||
|  |             rt_wqueue_add(queue, node); | ||
|  |             if (timeout != RT_WAITING_FOREVER) | ||
|  |             { | ||
|  |                 rt_timer_control(timer, RT_TIMER_CTRL_SET_TIME, &timeout); | ||
|  | 
 | ||
|  |                 rt_timer_start(timer); | ||
|  |             } | ||
|  |         } | ||
|  |     } | ||
|  |     else | ||
|  |     { | ||
|  |         ret = RT_EOK; | ||
|  |     } | ||
|  | 
 | ||
|  |     return ret; | ||
|  | } | ||
|  | 
 | ||
|  | #define INIT_WAITQ_NODE(node)                                  \
 | ||
|  |     {                                                          \ | ||
|  |         .polling_thread = rt_thread_self(), .key = 0,          \ | ||
|  |         .wakeup = __wqueue_default_wake, .wqueue = &cv->event, \ | ||
|  |         .list = RT_LIST_OBJECT_INIT(node.list)                 \ | ||
|  |     } | ||
|  | 
 | ||
|  | int rt_condvar_timedwait(rt_condvar_t cv, rt_mutex_t mtx, int suspend_flag, | ||
|  |                          rt_tick_t timeout) | ||
|  | { | ||
|  |     rt_err_t acq_mtx_succ, rc; | ||
|  |     rt_atomic_t waiting_mtx; | ||
|  |     struct rt_wqueue_node node = INIT_WAITQ_NODE(node); | ||
|  | 
 | ||
|  |     /* not allowed in IRQ & critical section */ | ||
|  |     RT_DEBUG_SCHEDULER_AVAILABLE(1); | ||
|  |     CV_ASSERT_LOCKED(cv); | ||
|  | 
 | ||
|  |     /**
 | ||
|  |      * for the worst case, this is racy with the following works to reset field | ||
|  |      * before mutex is taken. The spinlock then comes to rescue. | ||
|  |      */ | ||
|  |     rt_spin_lock(&_local_cv_queue_lock); | ||
|  |     waiting_mtx = rt_atomic_load(&cv->waiting_mtx); | ||
|  |     if (!waiting_mtx) | ||
|  |         acq_mtx_succ = rt_atomic_compare_exchange_strong( | ||
|  |             &cv->waiting_mtx, &waiting_mtx, (size_t)mtx); | ||
|  |     else | ||
|  |         acq_mtx_succ = 0; | ||
|  | 
 | ||
|  |     rt_spin_unlock(&_local_cv_queue_lock); | ||
|  | 
 | ||
|  |     if (acq_mtx_succ == 1 || waiting_mtx == (size_t)mtx) | ||
|  |     { | ||
|  |         rt_atomic_add(&cv->waiters_cnt, 1); | ||
|  | 
 | ||
|  |         rt_enter_critical(); | ||
|  | 
 | ||
|  |         if (suspend_flag == RT_INTERRUPTIBLE) | ||
|  |             rc = _waitq_inqueue(&cv->event, &node, timeout, RT_INTERRUPTIBLE); | ||
|  |         else /* UNINTERRUPTIBLE is forbidden, since it's not safe for user space */ | ||
|  |             rc = _waitq_inqueue(&cv->event, &node, timeout, RT_KILLABLE); | ||
|  | 
 | ||
|  |         acq_mtx_succ = rt_mutex_release(mtx); | ||
|  |         RT_ASSERT(acq_mtx_succ == 0); | ||
|  |         rt_exit_critical(); | ||
|  | 
 | ||
|  |         if (rc == RT_EOK) | ||
|  |         { | ||
|  |             rt_schedule(); | ||
|  | 
 | ||
|  |             rc = rt_get_errno(); | ||
|  |             rc = rc > 0 ? -rc : rc; | ||
|  |         } | ||
|  |         else | ||
|  |         { | ||
|  |             LOG_D("%s() failed to suspend", __func__); | ||
|  |         } | ||
|  | 
 | ||
|  |         rt_wqueue_remove(&node); | ||
|  | 
 | ||
|  |         rt_spin_lock(&_local_cv_queue_lock); | ||
|  |         if (rt_atomic_add(&cv->waiters_cnt, -1) == 1) | ||
|  |         { | ||
|  |             waiting_mtx = (size_t)mtx; | ||
|  |             acq_mtx_succ = rt_atomic_compare_exchange_strong(&cv->waiting_mtx, | ||
|  |                                                       &waiting_mtx, 0); | ||
|  |             RT_ASSERT(acq_mtx_succ == 1); | ||
|  |         } | ||
|  |         rt_spin_unlock(&_local_cv_queue_lock); | ||
|  | 
 | ||
|  |         acq_mtx_succ = rt_mutex_take(mtx, RT_WAITING_FOREVER); | ||
|  |         RT_ASSERT(acq_mtx_succ == 0); | ||
|  |     } | ||
|  |     else | ||
|  |     { | ||
|  |         LOG_D("%s: conflict waiting mutex", __func__); | ||
|  |         rc = -EBUSY; | ||
|  |     } | ||
|  | 
 | ||
|  |     return rc; | ||
|  | } | ||
|  | 
 | ||
|  | /** Keep in mind that we always operating when cv.waiting_mtx is taken */ | ||
|  | 
 | ||
|  | int rt_condvar_signal(rt_condvar_t cv) | ||
|  | { | ||
|  |     CV_ASSERT_LOCKED(cv); | ||
|  | 
 | ||
|  |     /* to avoid spurious wakeups */ | ||
|  |     if (rt_atomic_load(&cv->waiters_cnt) > 0) | ||
|  |         rt_wqueue_wakeup(&cv->event, 0); | ||
|  | 
 | ||
|  |     cv->event.flag = 0; | ||
|  |     return 0; | ||
|  | } | ||
|  | 
 | ||
|  | int rt_condvar_broadcast(rt_condvar_t cv) | ||
|  | { | ||
|  |     CV_ASSERT_LOCKED(cv); | ||
|  | 
 | ||
|  |     /* to avoid spurious wakeups */ | ||
|  |     if (rt_atomic_load(&cv->waiters_cnt) > 0) | ||
|  |         rt_wqueue_wakeup_all(&cv->event, 0); | ||
|  | 
 | ||
|  |     cv->event.flag = 0; | ||
|  |     return 0; | ||
|  | } |