VxWorks SMP多核编程指南
本文摘自 vxworks_kernel_programmers_guide_6.8 第24章
1. 介绍
VxWorks SMP是风河公司为VxWorks设计的symmetric multiprocessing(SMP)系统。它与风河公司的uniporcessor(UP)系统一样,具备实时操作系统的特性。
本章节介绍了风河VxWorks SMP系统的特点。介绍了VxWorks SMP的配置过程、它与UP编程的区别,还有就是如何将UP代码移植为SMP代码。
2. 关于VxWorks SMP
多核系统指的是一个系统中包含两个或两个以上的处理单元。SMP是多核技巧中的一个,它的主要特点是一个OS运行在多个处理单元上,并且内存是共享的。另一种多核技巧是asymmetric multiprocessing(AMP)系统,即多个处理单元上运行多个OS。
(1) 技术特点
关于CPU与处理器的概念在很多计算机相关书籍里有所介绍。但是,在此我们仍要对这二者在SMP系统中的区别进行详细说明。
CPU:一个CPU通常使用CPU ID、物理CPU索引、逻辑CPU索引进行标示。一个CPU ID通常由系统固件和硬件决定。物理CPU索引从0开始,系统从CPU0开始启动,随着CPU个数的增加,物理CPU索引也会增加。逻辑CPU索引指的是OS实例。例如,UP系统中逻辑CPU的索引永远是0;对于一个4个CPU的SMP系统而言,它的CPU逻辑索引永远是0到3,无论硬件系统中CPU的个数。
处理器(processor):是一个包含一个CPU或多个CPU的硅晶体单元。
多处理器(multiprocessor):在一个独立的硬件环境中包含两个以上的处理器。
单核处理器(uniprocessor):一个包含了一个CPU的硅晶体单元。
例如:a dual-core MPC8641D指的是一个处理器上有两个CPU;a quad-core Broadcom 1480指的是一个处理器上有四个CPU。
在SMP系统上运行UP代码总会遇到问题,即使将UP代码进行了更新,也很难保证代码很好的利用了SMP系统的特性。对于在SMP上运行的代码,我们分为两个级别:
SMP-ready:虽然可以正常的运行在SMP系统上,但是并没有很充分的利用SMP系统的特点,即没有利用到多核处理器的优势;
SMP-optimized:不仅可以正常的运行在SMP系统上,而且还能很好的利用SMP系统的特点,使用多个CPU使多个任务可以同时执行,提高系统的效率,比UP系统的效果更加明显。
(2) VxWorks SMP OS特点
VxWorks单核编程(UP)与SMP编程在多数情况下是一样的。类似的,多数API在UP和SMP编程中是通用的。一些少数UP编程中的API不能在SMP中使用。与此同时,SMP中的一些API在UP中使用时
现的不是SMP中的效果,而是默认UP的效果,或者压根就不能使用(例如,task spinLock 默认表现为task Lock)。
本小节将简短介绍一下VxWorks的对称多处理器的一些特点:
多任务:对于传统的UP系统而言,处理多任务的方法是通过任务优先级对CPU资源进行抢占式处理的。而SMP系统则改变了这种方法,它是实实在在的任务、中断的同时执行。实现同时执行的关键是多个任务可以在不同的CPU上执行,当然这需要OS的协调控制。对于UP系统中多任务所谓的同时执行,其实只不过是CPU的快速切换,占有CPU的任务由一个快速切换到另一个。在SMP系统中,同时执行不是幻想而是实实在在存在的。
任务调度机制:VxWorks SMP系统中的任务调度机制与UP中的类似,都是基于优先级的。不同的是,当不同的任务运行在不同的CPU上时,可以实现两个任务的同时执行。
互斥:由于SMP系统允许任务同时运行的情况存在,因此,在UP系统中通过关中断、锁任务调度等这些保护临界资源的手段在SMP系统中将不再适用。这种在所有CPU上通过强行关闭中断、锁任务调度的方法会影响到SMP系统发挥它的特点,将SMP系统带回到UP系统的模式。VxWorks SMP提供一套特殊的任务间、中断间同步/互斥的方法——即UP中的taskLock()和intLock()等将会被VxWorks SMP提供的spinLock,原子操作以及CPU-specific等机制替代。
CPU-Affinity:默认情况下,任意任务可以运行在任意CPU上。VxWorks SMP提供了一种叫做CPU-Affinity的机制,即可以分配任务到指定CPU(CPU逻辑索引)上执行。
(3) VxWorks SMP 硬件特点
VxWorks SMP系统要求硬件必须具备对称多处理器。这些处理器必须是一样的,处理器可以共享内存、可以平等的访问所有设备。VxWorks SMP必须遵循uniform memory access (UMA)结构。
图1显示了一个双CPU的SMP系统
图 1 SMP硬件结构
无论SMP系统中CPU的个数是多少,它们的重要特点是一样的:
a. 内存对所有CPU可见,不存在“只属于某个CPU的内存”的情况。即任意CPU可以在任意内存中执行代码;
b. 每个CPU都有Memory Management Unit(MMU)。MMU可以使任务在不同的虚拟内存中同时运行。例如,RTP1的一个任务可以在CPU0上运行,与此同时,RTP2的一个任务可以在CPU1上运行;
c. 每个CPU可以访问所有设备。设备产生的中断可以通过可编程中断控制器发送到任意CPU上执行;
d. 通过多CPU,任务和ISR可以实现同步;通过spinLock,任务和ISR可以实现互斥;
e. Snoop bus的作用是使CPU之间的data cache总是保持前后一致性。
(4) VxWorks SMP与AMP的对比
关于SMP与AMP系统中对内存访问的对比如图2所示:
图 2 SMP系统对内存的占用情况
在SMP系统中,所有物理内存被所有CPU共享。内存空间可以用来保存VxWorks SMP镜像、Real-Time Process(RTP)等。所有CPU可以读、写、运行所有内存。内核任务、用户任务可以在任意CPU中执行。
在SMP系统中,所有内存、设备被所有CPU共享,CPU之间的主要通讯是如何防止“同时访问共享资源”的情况发生。
图 3 AMP系统对内存的占用情况
在AMP系统中,每个CPU对应一个VxWorks镜像的拷贝,它们只能被对应的CPU访问。因此,CPU1中执行的内核任务不可能在CPU0的内存中执行,反之亦然。对于RTP也是一样的。
在AMP系统中,一些内存是共享的,但是在这些共享内存中读写数据是严格受到控制的。例如,在两个VxWorks镜像中传递数据等。硬件资源根据OS被划分,因此CPU之间的通信只有在访问共享内存时才会发生。
3. VxWorks SMP配置说明
SpinLock的调式版本组件
INCLUDE_SPINLock_DEBUG提供了spinLock的版本,这对调试SMP APP有帮助。
在包含INCLUDE_SPINLock_DEBUG的同时,最好要加入INCLUDE_EDR_ERRLOG组件,它可以记录spinLock的错误信息。
CPU配置参数组件
INCLUDE_KERNEL组件中包含了一些对VxWorks SMP参数的配置,包括:
VX_SMP_NUM_CPUS代表VxWorks SMP的使能CPU个数。所有体系结构的最大使能CPU个数如下:ARM=4,IA32=8,MIPS=32,PowerPC=8,VxWorks Simulator=32。
ENABLE_ALL_CPUS默认是TRUE,代表所有已配置的CPU使能。这个参数也可以设置为FALSE,一般出于调试目的,此时只有逻辑CPU0是使能的,只有通过kernelCpuEnable()才可以使能指定的CPU。
VX_ENABLE_CPU_TIMEOUT代表CPU使能超时时长,当ENABLE_ALL_CPUS是TRUE时,该值表示所有CPU的使能时长,当ENABLE_ALL_CPUS是FALSE时,在kernelCpuEnable()被调用时,它用来表示CPU的启动时长。
VX_SMP_CPU_EXPLICIT_RESERVE表示将指定CPU排除在“可使用CPU-Affinity属性的CPU池”之外。它是一个字符串,若填写“2 3 7”,则代表CPU2,3,7不能使用CPU-Affinity属性。即不能通过taskCpuAffinityset()分配任务到这些CPU上运行。
当某个CPU被VX_SMP_CPU_EXPLICIT_RESERVE包含,唯一能够使他们恢复预留属性的方法是调用vxCpuReserve()。
4. 在多核AMP系统上配置VxWorks SMP
略
5. 启动VxWorks SMP
在WorkBench开启后会有一个默认的SMP的simulator,如图4所示:
图 4 WR自带的SMP虚拟机
点击连接后启动,启动过程如图5所示,代表目前已经进入VxWorks SMP系统以及当前CPU的个数。
图 5 SMP虚拟机启动过程
启动后在SHELL中输入i可以查看系统目前运行的任务,你会发现两个idle任务,它们分别运行在两个不同的CPU上。如图6所示。
图 6 SMP系统任务运行情况
6. VxWorks SMP编程
VxWorks单核编程(UP)与SMP编程在多数情况下是一样的。类似的,多数API在UP和SMP编程中是通用的。一些少数UP编程中的API不能在SMP中使用。与此同时,SMP中的一些API在UP中使用时表现的不是SMP中的效果,而是默认UP的效果,或者压根就不能使用(例如,task spinLock 默认表现为task Lock)。
由于SMP系统的特殊性,因此SMP编程需要特别注意,尤其是在互斥/同步机制上,在使用的时候需要充分考虑如何提高系统的性能。在VxWorks SMP系统中针对每个CPU都有一个idle任务,这在UP中是没有的。Idle任务是最低优先级(用户级任务是不能达到这么低优先级的)。当CPU进出idle状态时,idle任务会提供任务上下文,这可以用来监视CPU的利用率情况。
当CPU无事可做时,Idle任务的存在不会影响CPU进入睡眠状态(当电源管理开启时)。
可以使用kernelIsCpuIdle()或者kernelIsSystemIdle()这两个API查看一个指定CPU是否执行了idle任务或者所有CPU是否执行了idle任务。
【注意】不要对idle任务进行挂起、关闭、跟踪、改变优先级等一系列操作。
SMP的互斥/同步机制
SMP编程与UP编程最大的一个不同就是互斥/同步API的使用。有一些API在这两种编程中都可以使用,而有一些则不同。此外,UP编程中的一些隐式同步技巧(例如使用任务优先级替代显示同步锁等)在SMP中是不能用的。
与UP系统不同,SMP系统允许真正意义上的同时执行。即多个任务或多个中断可以同时执行。在绝大多数情况下,UP系统中与SMP系统中的互斥/同步机制(例如,信号量、消息队列等)是一样的。
但是,UP中的一些机制(例如,关中断、挂起任务抢占机制以此来保护临界资源等)在SMP中是不适用的。这是因为这些机制阻碍了同时执行的理念,降低了CPU的利用率,是的SMP系统向UP系统的回溯。
SMP编程与UP编程的一点不同是关于taskLock()和intLock()的使用上。SMP提供了以下互斥/同步锁机制进行替代:
a. 任务级、中断级的spinLock;
b. 任务级、中断级的CPU-specific;
c. 原子操作;
d. 内存障碍(memory barrier)
7. spinLock互斥/同步机制
在UP(单核)编程中通过信号量的方法可以实现task的互斥与同步,在SMP系统中可以继续沿用信号量的机制,而spinLock则用于替换UP编程中使用taskLock()和intLock()的地方。
简介taskLock()和intLock()
通过taskLock()可以关闭系统的任务调度机制,调用taskLock()的任务将是唯一获得CPU运行资源的任务,直到这个任务调用taskUnlock()为止。intLock()与taskLock()类似,intLock()用于关闭中断,使得中断IRS无法执行,直到调用者调用了intUnlock()。
Spinlock具有“满内存障碍”属性
VxWorks spinlock的获取与释放操作具备“满内存障碍”属性。“满内存障碍”属性可以使读、写内存操作按照严格的顺序执行而不受到多CPU的影响。因此,在申请与释放spinlock之间进行更新的数据可以保证“更新顺序”。
SpinLock的种类
SpinLock分为两种:中断级spinLock和任务级spinLock:
a. 中断级spinLock:可用于关闭本地CPU的中断。当任务调用中断级spinLock时,将会关闭本CPU的任务抢占机制;
b. 任务级spinLock:用于关闭本地CPU的任务抢占机制。
(本第CPU指的是调用这些API的CPU)
Spinlock的作用以及使用说明
与信号量不同的是,当一个任务试图申请一个已被另一个任务占用的spinlock时,该任务并不会进入阻塞状态(pend),而是可以继续运行,它会进入一个简单的、紧凑的循环直到spinlock得到释放。
这种等待spinlock释放的状态可以用’spinning’和’busy waiting’来描述。在此,我们可以看出spinlock的优点和缺点。优点是:由于任务(或ISR)在等待spinlock的时候没有进入pend状态而是继续执行(一个简单的循环用于获取spinlock),这就避免了任务调用度以及上下文切换的消耗。缺点是:循环操作没有实际意义,会占用CPU资源。
因此,只有在必要时才使用spinlock。即占用spinlock的时间越短,spinlock的优势发挥的越明显(例如UP中的taskLock()和intLock())。否则,如果占用spinlock较长的时间,在UP编程中的缺陷(增加了任务和中断的响应时间)同样也会在多核编程中出现。
在一个CPU上获取spinlock,并不会影响另一个CPU上任务和中断的调度机制。当一个任务在持有spinlock的时候,该任务不能被删除。
(1) 中断级spinlock
任务和中断都可以获使用中断级spinlock。有两种中断级spinlock:确定性的和非确定性的。
【注意】在UP系统中,中断级spinlock与intLock()和intUnlock()的效果是一样的。
确定性中断级spinlock
确定性中断级spinlock的最大特点是:公平、确定性。Spinlock会分给第一个申请的中断或任务。申请的spinlock会屏蔽掉本地CPU的其他中断。如果是一个任务申请了中断用spinlock,本地CPU的任务调度机制将被停止直到该任务释放spinlock。Spinlock确保了任务可以独占CPU完成一些操作。其他CPU上的中断和任务不会受到干扰。确定性中断级spinlock的API全部包含在spinLockLib中,API如表1所示。
表 1 确定性中断级spinLock的API
API
描述
void spinLockIsrinit(
spinLockIsr_t *pLock, /* pointer to ISR-callable spinLock */
int flags /* spinLock attributes */
)
初始化确定性中断级spinLock
void spinLockIsrTake(
spinlockIsr_t *pLock /* pointer to ISR-callable spinlock */
)
获取确定性中断级spinlock
void spinLockIsrGive(
spinlockIsr_t *pLock /* pointer to ISR-callable spinlock */
)
释放确定性中断级spinlock
非确定性中断级spinlock
非确定性中断级spinlock提供了更高的性能,但是当多个CPU试图同时申请一个spinlock时,它并不保证公平性和确定性。即非确定性中断级spinlock并不一定会把spinlock分配给第一个申请者。它的优势在于中断响应时间较短,即当CPU等待获取spinlock的时候,中断不会被屏蔽。API如表2所示。
表 2 非确定性中断级spinlock的API
API
描述
void spinLockIsrNdinit(
spinlockIsrNd_t * spin /* pointer to spinlock */
)
初始化非确定性中断级spinlock
int spinLockIsrNdTake (
spinlockIsrNd_t * spin /* pointer to spinlock */
)
获取非确定性中断级spinlock
void spinLockIsrNdGive (
spinlockIsrNd_t * spin,
int key /* return value of spinLockIsrNdTake */
)
释放非确定性中断级spinlock
(2) 任务级spinLock
任务级spinLock(中断不可调用该spinLock)可以关掉本地CPU的任务切换机制,使持有spinLock的任务独占CPU完成一些操作。同时,它不会对其他CPU上的任务调度机制产生影响。
【注意】SMP中任务级spinLock等同于UP编程中的taskLock()和taskUnLock()
API如表3所示。
表 3 任务级spinlock的API
API
描述
void spinLockTaskinit(
spinlockTask_t *pLock, /* pointer to task-only spinlock */
int flags /* spinlock attributes */
)
初始化任务级spinlock
void spinLockTaskTake(
spinlockTask_t *pLock /* pointer to task-only spinlock */
)
获取任务级spinlock
void spinLockTaskGive(
spinlockTask_t *pLock /* pointer to task-only spinlock */
)
释放任务级spinlock
(3) Spinlock的使用注意事项
由于SMP系统允许任务的同时运行,因此在使用spinlock的时候需要注意以下事宜:
a. spinlock最好用于短时间占用的情况;
b. 任务(或中断)一次只能申请一个spinlock。当一个已申请了spinlock的实体再一次申请了另一个spinlock时,很有可能会造成死锁;
c. 任务(或中断)不能申请它已经持有的spinlock。这可能会造成死锁;
d. 持有spinlock的任务(或中断)不能再调用一些特殊函数(尤其是内核函数),由于这些特殊函数本身持有spinlock,这种操作可能会导致死锁。
(4) SpinLock的调式版本
SpinLock的调试版本可以运行那些开发中使用了spinLock的程序对spinLock的情况进行调试。这需要添加INCLUDE_SPINLock_DEBUG组件。如果添加了INCLUDE_EDR_ERRLOG组件,则当由使用spinLock造成的系统异常进而重启后,相关信息会被记录下来。会产生错误信息的情况如表4所示。
表 4 使用spinLock会出现错误的情况
使用的API
错误信息
spinLockTaskTake()
一个中断任务使用了该API
申请了已持有的spinlock
嵌套申请spinlock
spinLockTaskGive()
一个中断任务使用了该API
试图释放一个没有申请过的spinlock
spinLockIsrTake()
申请了已持有的spinlock
嵌套申请spinlock
spinLockIsrGive()
试图释放一个没有申请过的spinlock
(5) Spinlock中限制使用的系统API
当任务(或中断)持有spinlock时,一些系统API不能被调用(具体原因见Spinlock的使用注意事项)。这样做为的是防止持有spinlock的任务或ISR进入内核临界区,这可能会导致死锁的发生。这种限制对于intCpuLock()也是适用的。这是因为有些内核API需要中断操作。
这些限制看起来好像使spinlock的运用受到影响,但是它们却是有必要的。Spinlock适用于进程间很快的同步/互斥情况。若将spinlock用在会进行大量操作——包括内核API调用等——的情况时,则会导致SMP性能的下降。这是因为当使用spinlock时,任务抢占机制以及中断都将会被关闭。图7列出了在使用spinlock和CPU lock时限制使用的系统API。
图 7 spinlock中限制使用的系统API
8. CPU-specific互斥机制
VxWorks SMP提供了一种基于CPU-specific的互斥机制,它可以严格限定互斥操作的范围在调用该操作的CPU(本地CPU)上执行。通过设计CPU-specific使得将UP代码转到SMP系统上变得容易。
(1) 中断级CPU-specific
中断级CPU-specific可以关闭本地CPU上的中断。例如,当任务A在CPU-0上运行一个本地CPU的中断锁操作,则该CPU将不再允许其他中断执行,直到任务A释放这个锁。SMP系统中其他的CPU将不会受到影响。
对于那些想要使用CPU-specific互斥机制的任务和ISR,必须使用CPU-Affinity将它们指定运行在本地CPU上,只有这样CPU-specific互斥才会有意义。
与spinLock一样,在执行中断锁的任务中有些系统API不能被使用(详见图7)。
中断级CPU-specific的API如表5所示。
【注意】在UP中,它们默认的操作与intLock()和intUnLock()一样。
表 5 中断级CPU-specific互斥API
API
描述
int intCpuLock (void);
当CPU-0上的任务或ISR调用了该函数后,则禁止在CPU-0上的一切中断调用。
void intCpuUnLock(
int LockKey /* lock-out key returned by preceding intCpuLock() */
)
恢复在CPU-0上的中断调用。
(2) 任务级CPU-specific
任务级CPU-specific可以关闭调用该API的CPU上的任务抢占机制。例如,当运行在CPU-0上的任务A调用了任务锁操作,则该CPU上将禁止任务切换,即该CPU上其他任务将不能得到运行,直到任务A释放了这个锁或执行了一个阻塞操作。
【注意】调用该操作的任务是不能被移交到另外的CPU上运行的,直到这个锁被释放。
SMP系统中其他的CPU将不会受到影响。对于那些想要使用CPU-specific互斥机制的任务和ISR,必须使用CPU-Affinity将它们指定运行在本地CPU上,只有这样CPU-specific互斥才会有意义。
任务级CPU-specific的API如表6所示。
【注意】在UP编程中,他们默认的操作与taskLock()和taskUnLock()类似。
表 6 任务级CPU-specific互斥API
API
描述
taskCpuLock()
当CPU-0上的任务或ISR调用了该函数后,则禁止在CPU-0上的一切任务切换。
taskCpuUnLock()
恢复在CPU-0上的任务切换。
9. Memory Barrier
在现代多核体系中,CPU需要对读、写操作完成重排序,为的是提高系统的整体性能。而在单核CPU中,这种重排序完全是透明的,因为无论系统如何对读、写操作进行排序,CPU都能确保任何读操作获取的数据都是之前已写入的数据。
在多核体系中,当一个CPU执行了一系列写内存操作时,这些写操作将会在CPU执行操作写到内存之前被排序。CPU可以将这些写内存的操作按任意顺序排列,无论是哪条指令先到达的CPU。类似的,CPU可以将多个读操作并行处理。
由于这种重排序的存在,两个有共享数据的任务不能保证:一个任务在CPU0上执行读、写操作的顺序与另一个任务在CPU1获取对应数据的顺序是一致的。关于重排序问题有一个经典的例子:在一个双核CPU系统中,一个CPU正在准备工作,当设置一个bool变量为true时,告知另一个CPU这个工作准备就绪,在此之前,另一个CPU一直处于等待状态。这个程序的代码就像下面一样:
/* CPU 0 – announce the availability of work */
pWork = &work_item; /* store pointer to work item to be performed */
workAvailable = true;
/* CPU 1 – wait for work to be performed */
while (!workAvailable);
doWork(pWork); /* error – pWork might not be visible to this CPU yet */
这个程序的结果很有可能是CPU1使用的pWork指针指向了不正确的数据,这是因为CPU0会重排序它的写内存操作,这就会导致CPU1在观察到workAvailable改变的时候而pWork还未被更新。
为了解决内存操作排序问题,VxWorks提供了一系列的”memory barrier”操作。这些操作的唯一目的就是提供一种方法可以确保CPU间操作顺序的一致性。memory barrier分为三个主要方面:读memory barrier,写memory barrier,满(读写)memory barrier。
【注意】VxWorks SMP提供了一系列同步原语来保护共享资源。这些原语包括:信号量、消息队列、spinLock等。这些原语中已经包括了满memory barrier功能,不用再添加其他的memory barrier操作来保护共享资源。
【注意】memory barrier不能用在用户模式的RTP app中。
(1) 读memory barrier
VX_MEM_BARRIER_R()宏定义提供读memory barrier。VX_MEM_BARRIER_R()会强制所有读操作进行排序。如果没有barrier,CPU会随意的为这些读操作进行排序。对于一个单核CPU而言不受影响。例如,CPU可以随意重排序一下读操作的顺序:
a = *pAvalue; /* 读 可能发生在读pBvalue之后 */
b = *pBvalue; /* 读 可能发生在读pAvalue之前 */
通过在读操作间加入memory barrier,可以保证读的顺序,例如:
a = *pAvalue; /* 读 发生在读pBvalue之前 */
VX_MEM_BARRIER_R();
b = *pBvalue; /* 读 发生在读pAvalue之后 */
在使用VX_MEM_BARRIER_R()后可以确保读数据的顺序是正确的。但是,这种保证只有在“写数据”能够保证顺序正确的前提下才能有效。即VX_MEM_BARRIER_R()和VX_MEM_BARRIER_W()宏定义必须一起使用。
(2) 写Memory Barrier
VX_MEM_BARRIER_W()宏定义提供写memory barrier。VX_MEM_BARRIER_W()会强制所有写操作进行排序。以下程序片段来自前面的代码,通过加入写memory barrier后对代码进行了改进:
pWork = &work_item;
VX_MEM_BARRIER_W();
workAvailable = true;
通过加入barrier可以确保pWork的更新一定先于workAvailable.
【注意】VX_MEM_BARRIER_W()并不是强迫将变量写入内存,而是指定了写的顺序
【注意】VX_MEM_BARRIER_W()必须与VX_MEM_BARRIER_R()一起使用。
(3) 读写(满)Memory Barrier
VX_MEM_BARRIER_RW()宏定义提供读/写(满)memory barrier。VX_MEM_BARRIER_RW()包括了VX_MEM_BARRIER_R()和VX_MEM_BARRIER_W()的功能。使用VX_MEM_BARRIER_RW()的代价要高于VX_MEM_BARRIER_R()或VX_MEM_BARRIER_W()的使用代价。Wind River不推荐使用VX_MEM_BARRIER_RW()。
10. 原子的内存操作(原子操作)
原子操作是利用了CPU支持原子访问内存的特点。原子操作是一些不能被中断的操作的集合。原子操作为一组操作提供了互斥性(例如变量的自增、自减操作)。
使用原子操作更新一个数据,可以省去使用锁的步骤。例如,你想更新一个链表元素的next指针从NULL到非NULL,当你使用原子操作时,这个过程就不用使用中断锁了,这样可以使算法变得简单。
在调用者使用原子操作的时候,必须保证该操作所在的内存是可以访问的。若访问了一个不可访问的内存,会产生一个异常。
在vxAtmicLib库中提供了许多原子操作。如表7所示。需要注意的是vxAtmicLib还提供了这些原子操作的inline版本。例如,vxatomicAdd_inline()。还提供了兼容SMP和AMP的版本。例如vxAtomic32Add()。原子操作可以在用户(RTP APP)、内核空间中使用。
表 7 原子操作API
API
描述
atomicVal_t vxAtomicAdd(
atomic_t * target, /* memory location to add to */
AtomicVal_t value /* value to add */
)
将两个值相加。
atomicVal_t vxAtomicSub(
atomic_t * target, /* memory location to subtract from */
AtomicVal_t value /* value to sub */
)
将两个值相减
atomicVal_t vxAtomicInc(
atomic_t * target /* memory location to increment */
)
将值增加1
atomicVal_t vxAtomicDec(
atomic_t * target /* memory location to decrement */
)
将值减1
atomicVal_t vxAtomicOr(
atomic_t * target, /* memory location to OR */
atomicVal_t value /* OR with this value */
)
将两个值进行位或操作
atomicVal_t vxAtomicXor (
atomic_t * target, /* memory location to XOR */
atomicVal_t value /* XOR with this value */
)
将两个值进行位异或操作
atomicVal_t vxAtomicAnd (
atomic_t * target, /* memory location to AND */
atomicVal_t value /* AND with this value */
)
将两个值进行位与操作,
atomicVal_t vxAtomicNand (
atomic_t * target, /* memory location to NAND */
atomicVal_t value /* NAND with this value */
)
将两个值进行位非与操作
atomicVal_t vxAtomicset (
atomic_t * target, /* memory location to set */
atomicVal_t value /* set with this value */
)
将一个值设定为另一个值
atomicVal_t vxAtomicClear(
atomic_t * target /* memory location to clear */
)
将一个值清空
BOOL vxCas(
atomic_t * target, /* memory location to compare-and-swap */
atomicVal_t oldValue, /* compare to this value */
atomicVal_t newValue /* swap with this value */
)
对比或交换内存中的值。
11. CPU Affinity
VxWorks SMP提供了CPU Affinity这种机制。通过这种机制可以将中断或者任务分配给指定的CPU执行。
(1) 任务级 CPU Affinity
VxWorks SMP具有将任务分配给指定CPU执行的能力。从另一个角度来说,即将指定CPU预留给指定任务。
SMP的默认操作——任何任务可以运行在任何CPU上——这会根据系统的整体性能而定。但是有些时候将指定任务分配给指定的CPU对系统性能是有帮助的。例如一个CPU上只运行一个单独的任务而不做其他事情,则这块CPU的cache中就只保存了这个任务所需要的数据和代码。这样做节省了任务在CPU之间切换的消耗。
还有个例子就是当多个任务争夺一个spinLock时,如果这些任务运行在不同的CPU上,则会有大量的时间被浪费在等待spinLock上。若将争夺同一个spinLock的任务指定在同一个CPU上运行,则这会给另一块CPU上执行其他程序带来便利。
关于任务级 CPU affinity的使用方法如下:
a. 一个任务可以通过调用taskCpuAffinityset()设置自己的CPU affinity,也可以设置其他任务的CPU affinity;
b. 子任务会继承父任务的CPU affinity。一个任务中调用如下API就会自动继承CPU affinity: taskSpawn(), taskCreate(), taskInit(), taskOpen(), taskInitExcStk()。
任务级 CPU affinity的API如表8所示。
表 8 任务级CPU Affinity的API
API
描述
STATUS taskCpuAffinityset(
int tid, /* task ID */
cpuset_t newAffinity /* new affinity set */
)
分配一个任务在指定CPU上执行。
STATUS taskCpuAffinityGet(
int tid, /* task ID */
cpuset_t* pAffinity /* address to store task's affinity */
)
获得指定任务在哪个CPU上执行。
taskCpuAffinitySet()和taskCpuAffinityGet()都使用cpuset_t结构对CPU信息进行标示。前者用于分配任务到指定CPU;后者获取指定任务的cpu_set_t。
CPUset_ZERO()宏定义用于将cpuset_t清0(类似FD_ZERO),它必须被最先调用。
CPUset_set()宏定义在CPUset_ZERO()之后使用。
RTP任务与CPU Affinity
默认情况下,RTP任务会继承父任务的CPU Affinity属性。如果父任务没有CPU Affinity属性,则RTP任务也没有CPU Affinity属性。如果父任务有CPU Affinity属性,则RTP任务也继承该CPU Affinity属性并仅运行在对应的CPU上。在使用rtpSpawn()时,RTP_CPU_AFFINITY_NONE选项表示创建RTP时不继承CPU Affinity属性,即使父任务具有CPU Affinity属性。
任务级 CPU Affinity 示例:
以下代码说明了创建一个任务,并将该任务分配给CPU1执行的全过程:【蓝色部分表示调用的关键API】
STATUS affinitySetExample(void)
{
cpuset_t affinity;
int tid;
/* Create the task but only activate it after setting its affinity */
Tid = taskCreate(“myCpu1Task”, 100, 0, 5000, printf, (int)”myCpu1Task executed on CPU 1 !”, 0, 0, 0, 0, 0, 0, 0, 0, 0);
if (tid = = NULL) return ERROR;
/* Clear the affinity CPU set and set index for CPU 1 */
CPUSET_ZERO(affinity);
CPUSET_SET(affinity, 1);
if (taskCpuAffinitySet(tid, affinity) = = ERROR)
{
taskDelete(tid);
return ERROR;
}
/* Now let the task run on CPU 1 */
taskActivate(tid);
return OK;
}
下面这个例子是一个任务如何删除它的CPU affinity:
{
cpuset_t affinity;
CPUset_ZERO(affinity);
/* passing a tid equal to zero causes an affinity to be set for the calling task */
taskCpuAffinityset(0, affinity);
}
(2) 中断级CPU Affinity
SMP硬件需要可编程中断控制设备。VxWorks SMP利用这些硬件可以分配中断到指定CPU。默认情况下,中断是在VxWorks的CPU 0中触发的。
通过中断级CPU Affinity,可以将中断合理平均的分配到不同CPU上(而不是在一个CPU上存在很多中断)。
运行时刻分配中断到指定CPU是在启动时发生的,当系统启动从BSP中读取中断配置信息的时候。然后,中断控制器收到一条命令,该命令用于指示一条中断运行在指定的CPU上。
12. 将CPU预留给使用了CPU Affinity的任务(CPU预留机制)
VxWorks SMP提供了一种机制可以将CPU预留给那些已经使用了CPU Affinity的任务。这种机制可以防止其他任务与使用了CPU预留机制的任务抢占CPU资源,因此它提升了系统效率。CPU预留机制的API如表9所示。
表 9 CPU预留机制API
API
描述
STATUS vxCpuReservedGet (
cpuset_t *pCpuSet
)
获取可预留CPU的集合
STATUS vxCpuReserve(
cpuset_t cpus, /* CPUs to be reserved */
cpuset_t *pReservedCpus /* CPUs reserved */
)
预留CPU集合cpus,返回CPU预留的结果pReservedCpus
STATUS vxCpuUnreserve(
cpuset_t cpus
)
解除某个CPU的预留机制。
默认情况下,当CPU没有使用vxCpuReserve()时,所有CPU都是可以被预留的。可以通过配置VX_SMP_CPU_EXPLICIT_RESERVE参数,将指定CPU排除在CPU预留池之外。只有在CPU池中的CPU才可以被预留。
taskCpuAffinityset()与vxCpuReserve()没有明确的调用顺序。前者是把任务分配给指定的CPU,这样可以防止该任务运行在其他CPU上。而后者限定了CPU上可以运行哪些任务。可以根据具体情况分先后调用这两者。
【注意】如果一个任务使用了CPU Affinity,则它的子任务将会继承CPU-Affinity属性;如果一个CPU预留给了使用CPU Affinity的任务,则这些任务的子任务将会在这个CPU上运行。
CPU预留与任务级CPU Affinity的示例
以下程序片段展示了如何预留一个CPU以及设置一个任务级CPU affinity在预留CPU上执行的过程:【蓝色部分表示调用的关键API】
void myRtn(void)
{
cpuset_t cpuset; /* Input argument to vxCpuReserve() */
cpuset_t resCpuset; /* Return argument from vxCpuReserve() */
/* Passing an empty cpuset as input reserves an arbitrary CPU */
CPUSET_ZERO(cpuset);
if (vxCpuReserve(cpuset, &resCpuSet) = = OK)
{
/* set affinity for current task */
if (taskCpuAffinitySet(0, resCpuSet) != OK)
/* handle error */
}
else
{
/* handle error */
}
}
以下代码片段展示了如何预留一个或多个指定CPU并且为多个任务设置CPU affinity:
void myRtn(void)
{
extern int tids[3]; /* some task Ids */
int cpuIx[] = {1, 2, 4}; /* CPU indices to reserve */
cpuset_t cpuset;
cpuset_t tmpCpuset;
int i;
/* initialize cpuset with the desired CPU indices */
CPUSET_ZERO(cpuSet);
CPUSET_SET(cpuSet, cpuIx[0]);
CPUSET_SET(cpuSet, cpuIx[1]);
CPUSET_SET(cpuSet, cpuIx[2]);
/* Reserve the specified CPUs */
if (vxCpuReserve(cpuSet, NULL) = = OK)
{
for (i = 0; i < 3; ++i)
{
tmpCpuSet = CPUSET_FIRST_SET(cpuSet);
if (taskCpuAffinitySet(tids[i], tmpCpuSet) != OK)
/* handle error */
CPUSET_SUB(cpuSet, tmpCpuSet);
}
}
else
{
/* handle error */
}
}
13. CPU信息及管理
VxWorks SMP提供了一些API和宏定义用于获取及操作CPU的信息。
(1) CPU的信息及管理API
kernelLib和vxCpuLib库提供了用于获取CPU信息以及管理CPU的相关API。kernelLib中的CPU API如表10所示。
表 10 CPU内核信息API
API
描述
BOOL kernelIsCpuIdle(
unsigned int cpu /* CPU to query status of */
)
查看指定CPU是否为空闲状态。返回TRUE表示CPU为空闲状态。
BOOL kernelIsSystemIdle (void)
查看所有可用CPU是否为空闲状态。返回TRUE表示为空闲状态。
STATUS kernelCpuEnable (
unsigned int cpuToEnable /* logical index of CPU to enable */
)
通过输入index参数使能CPU
kernelCpuEnable()可以通过输入index参数来使能指定的CPU。一旦CPU使能,任务调度机制开始在该CPU上分配任务。所有CPU在默认情况下是使能状态,可以将组件ENABLE_ALL_CPUS设置为FALSE,这样VxWorks SMP系统启动后只有CPU 0为使能状态。然后,再通过kernelCpuEnable()可以使能指定的CPU。
vxCpuLib中的CPU API如表11所示。
表 11 CPU信息API
API
描述
unsigned int vxCpuConfiguredGet (void)
返回在SMP系统中已配置的CPU个数
cpuset_t vxCpuEnabledGet (void)
返回使能CPU的个数
unsigned int vxCpuIndexGet (void)
返回当前CPU的索引(逻辑编号)
cpuid_t vxCpuIdGet (void)
返回当前CPU的ID(有体系结构变量定义的,非OS定义的逻辑编号)
使用vxCpuConfiguredGet()返回的是配置在BSP中的VxWorks SMP系统的CPU个数。这个值可能与硬件实际存在的CPU个数不一致。
使用vxCpuEnabledGet()返回的是系统中运行CPU的个数。这个值可能与vxCpuConfiguredGet()返回的值不一致,也可能与硬件中实际存在的CPU个数不一致
vxCpuEnabledGet()的返回值类型为cpuset_t,因此我们需要注意:再给返回值赋值之前,我们必须使用CPUset_ZERO()将cpuset_t变量清0.
vxCpuIndexGet()返回的是当前调用任务使用的CPU索引(逻辑编号)。该编号在0和N-1之间(N是vxCpuConfiguredGet()的返回值)。需要注意的是:默认情况下,任务可以从一个CPU跑到另一个CPU上执行,所以不能保证任务结束后所在的CPU索引与刚才使用vxCpuIndexGet()返回的值是一致的。除非该任务是分配运行在指定CPU上的,或者使用了taskCpuLock()或者intCpuLock()。
(2) CPU相关变量以及宏定义
VxWorks SMP提供了一组变量和宏定义,通过设置这些值可以对CPU的配置进行控制。
例如cpuset_t,它用于标识配置在VxWorks SMP系统中的CPU。Cpuset_t的位值标识了CPU的逻辑索引,Cpuset_t的第一位标识了CPU0,第二位标识了CPU1,第三位标识了CPU2,以此类推(它与CPU在硬件中的物理位置无关)。
例如,有8个CPU的硬件系统,在BSP中为VxWorks SMP配置了4块CPU,通过CPUset_ZERO()可以讲cpuset_t中的位值清0。调用vxCpuIndexGet(),它的返回值只会设置前四位。
CPU宏定义用于设置和清除CPU索引(通过改变cpuset_t的值)。这些宏定义如图12所示。
表 12 操控CPU信息的宏定义
API
描述
CPUset_SET(cpuset, n)
设置CPU的索引(只针对一个CPU进行设置)
CPUSET_SETALL(cpuset)
设置CPU的索引(针对所有CPU进行设置)
CPUSET_SETALL_BUT_SELF(cpuset)
设置CPU的索引(除了调用该宏的CPU之外的所有CPU)
CPUSET_CLR(cpuset, n)
清除一个指定的CPU索引(只针对一个CPU进行设置)
CPUSET_ZERO(cpuset)
清除所有CPU索引(针对所有CPU进行设置)
CPUSET_ISSET(cpuset, n)
当指定索引存在于cpuset_t中时,返回TRUE,
CPUSET_ISZERO(cpuset)
当cpuset_t中没有索引时,返回TRUE,
CPUSET_atomicSET(cpuset, n)
原子地设置CPU的索引(只针对一个CPU进行设置)
CPUSET_AtomicCLR(cpuset, n)
原子地清除CPU的索引(只针对一个CPU进行设置)
【注意】不要直接对cpuset_t进行操作,而是要通过上面的宏定义间接的对cpuset_t进行操作。
14. 查看任务性能API
通过checkStack()可以查看所有任务栈的使用情况。在shell下输入checkStack就可以得到下面的信息。如图8所示。
图 8 通过checkStack检测任务栈情况
SIZE表示任务栈的大小,CUR表示当前使用任务栈的大小,HIGH表示使用任务栈的峰值,MARGIN表示从没有使用过的任务栈大小(其中MARGIN = SIZE - HIGH)。
通过spy()可以上报任务在内核空间、中断、idle中的tick使用情况。在shell下输入spy就可以得到下面的信息。如图9所示。
图 9 通过spy查看CPU使用率情况
Spy开启了tSpyTask用于监控系统任务的使用情况。IDLE表示空闲任务的CPU占用情况。
15. SMP性能优化
SMP的目的就是提高系统的性能。如果仅仅是简单的使用SMP的代码,并不能完全发挥出SMP的潜能。因此,在SMP代码的基础上还需要进行优化。
SMP算法是否能够提高系统性能很大程度上取决于算法并行性的程度以及多线程独立的程度。有些算法是高可并行性的,并且很好的利用了多CPU。一个很好的例子:图形压缩器可以在独立的线程中分别压缩一整块数据中的一小块。
如果SMP算法不好的话,那么同时执行两个线程的消耗将会抵消掉多个CPU所带来的好处。类似的,如果存在很多共享数据,即多个CPU需要争夺的数据,那么系统将会增大争夺、等待数据的消耗。
不好的算法会导致更糟糕的情况,即SMP系统反而不如UP系统运行的快。最好的情况是使运算速度提高一倍。
线程化
线程化包括将一个单线程的APP通过任务复制的方式变成多线程。一个典型的例子是:唤醒一个“工人”任务,这个任务的工作是从一个队列中获取工作,还有一个任务或ISR负责往这个队列中填充工作。假设瓶颈出现在“工人”任务中,我们可以通过复制“工人”任务的方式提高系统的性能。线程化不是一个新的概念了,在出现多线程OS的时候我们就已经知道这个词了。但是在一个UP系统中,线程化只能增加任务的吞吐量,即虽然线程增加了,但它们却在等待资源。到头来我们发现瓶颈是在CPU本身,而线程化并不能提高性能。例如,在一个UP系统上计算密集的APP,线程化不能帮上什么忙。但是在SMP系统上情况有所不同,线程化可以有效地提高系统性能,这是因为SMP系统解决了CPU的瓶颈问题。
使用SpinLock
使用spinLock会潜移默化的影响中断和任务抢占机制。因此必须谨慎的使用spinLock,使用的话也必须是在很短的周期内使用和释放。
使用浮点数和其他协处理器
出于效率的考虑,关于使用协处理器的任务创建选项(VP_FP_TASK)必须被谨慎使用。只有当任务确实需要使用时再添加。当一个任务在创建时开启了协处理器选项时,协处理器的状态将被保存,同时,系统也会保存每一次的上下文切换。这对于那些虽然开启了协处理器但是却没有使用它的任务而言显得没有必要了。
使用vmBaseLib
vmBaseLib库是VxWorks MMU的管理库,它允许内核APP和驱动管理MMU。SMP OS的一个重要任务是保证MMU的后备内存(TLB)的一致性。例如,CPU MPC8641D中有硬件资源可以保证TLB的一致性。其他CPU,例如MIPS体系结构家族,就没有这个能力。这时候就需要OS进行对MMU一致性的保护了。
任务和中断的CPU-Affinity
对于一些APP和系统,分配指定任务或中断到指定CPU上执行可以提高系统的效率。
16. 简单例子
VxWorks SMP提供了一些测试程序,用来测试SMP的特点和性能。以下程序测试了I/O功能和系统调用功能:
philDemo
smpLockDemo
以下测试了计算能力:
primesDemo
rawPerf
可以通过配置VxWorks内核的方式(添加INCLUDE_SMP_Demo组件),将这些测试程序链接到VxWorks SMP内核镜像中。测试源代码在installDir/vxworks-6.x/target/src/demo/smp中。
在此以installDir/vxworks-6.x/target/src/demo/smp/smpLockDemo.c为例来说明关于SMP编程的一些注意事项:
/ * smpLockDemo.c用于测试VxWorks SMP的同步/互斥机制
* 描述:
* 这个Demo描述了VxWorks SMP中的同步机制。在SMP系统中任务和ISR可以同时运* 行在不同的CPU上,这就涉及到同时访问共享数据的问题,对此,VxWorks SMP提供 * 了一系列机制:
* 信号量(Semaphore):可以使用信号量实现任务间的同步机制。例如,使用一个任务或* ISR“唤醒”另一个任务。
* VxWorks事件(VxWorks Events):同信号量。
* 原子操作(atomic Operator):能够安全的读、写内存。例如,完成一些类型的全局的
* 自增操作。
* SpinLock:它可以用在任务间、ISR间、任务与ISR间的同步。它一般用来对较多的共
* 享数据和临界资源进行保护。
*
* 本Demo对以上同步机制进行了对比。通过SMP系统中多个任务对一个共享int型变量
* 操作的实例,来对比这些同步机制。
*
* Demo执行
* 本Demo中包含了两个优先级一样的任务,每个任务都重复地(循环)增加一个共享计
* 数器。这个共享计数器是用户定义的一个累加值(即被更新次数)。每个任务还有一个
* 自己的计数器,在增加共享计数器的同时也增加自己的计数器。假设任务自己的计数器
* 不会出错,我们要看看使用不同同步机制的不同效果,即共享数据的值是否与两个任务
* 自身计数器的值之和一致。
*
* 以上过程重复5次,每次使用不同的同步机制:
* 1. 不使用同步机制:任务访问共享数据时不使用同步机制。
* 2. SpinLock:当任务累加共享数据的时候申请spinLock,然后再释放spinLock。
* 3. vxAtomicInc()原子操作:任务使用原子操作vxatomicInc()对这个共享数据进行累加。
* 4. vxTas()原子操作:在任务中通过vxTas()设置/清除一个flag,将这个flag当做一个普
* 通信号量使用。当这个flag被清除时表示信号量不可用,当被设置时表示可用。任务
* 需要使用这个信号量对共享数据进行累加,然后再释放信号量。
* 5. vxatomicAdd()原子操作:在任务中使用vxAtomicAdd()原子操作对共享数据进行累
* 加。
*
* 调用任务
* 在SHELL中输入 smpLockDemo 3
* 参数3表示任务更新共享数据和自己数据的时长(以秒为单位)。如果不填写参数,则
* 已2秒为默认值。
*
* 执行结果:
* METHOD TASK 0 TASK 1 SUM(COUNTS) GLOBAL RESULT
* --------- --------- --------- ----------- --------- ---------
* no-Lock 0x253d8ae 0x253b31c 0x4a78bca 0x470327d Failed
* spinLocks 0x782f43 0x78265a 0xf0559d 0xf0559d Passed
* atomicInc 0x20b600d 0x20b4623 0x416a630 0x416a630 Passed
* test-and-set 0x1509e72 0x150a628 0x2a1449a 0x2a1449a Passed
* Atomic Add 0x1e25b2a 0x1e26d2a 0x3c4c854 0x3c4c854 Passed
*
* METHOD栏表示任务为了更新共享数据而使用的同步机制。TASK0任务栏表示TASK0
* 自己的计数器值,TASK1任务栏表示TASK1自己的计数器值。计数器的值越大,说明
* 同步机制越好。SUM(COUNTS)栏表示前面两个栏之和(TASK0和TASK1栏的和)。
* GLOBAL栏表示共享数据的值。RESULT栏表示了SUM值是否与GLOBAL的值一致。
*
* 从上面的测试结果可以看出,不使用同步机制是不行的,即使对共享数据仅仅是一个小
* 小的自增操作,不使用同步机制也会造成错误。结果还可以看出,使用vxatomicInc()
* 原子操作不仅安全,而且其效率比vxAtomicAdd()要高,而vxatomicAdd()的效率比使
* 用SpinLock()要高。原子操作效率高于spinLock的原因在于它们就是为简单原子读写操
* 作设计的,而spinLock则是对相对复杂的同步机制设计的。test-and-set方法通过使用
* vxTas()执行一个信号量,这种方法比原子操作慢,比spinlock快。但是,在数据需要极
* 端小心操作时,还是使用spinlock这种锁机制比较好。Spinlock是普通信号量更安全的
* 替代品。
*
* 下面是同样的程序、同样的周期在一个UP系统上运行的结果。通过结果我们看到,这
* 里没有冲突、没有错误,这是因为没有真正意义的同时执行存在。但是,从计数的结果
* 来看,性能还是要比前者好的。这是因为,由于任务不能同时执行,很少有对共享数据* 的竞争存在。不过,这个例子从另一个方面说明:一个APP虽然有“同时执行”
* 的能力,但是过多的“竞争”使得这种并行的好处大打折扣。
*
* -> smpLockDemo
* METHOD TASK 0 TASK 1 SUM(COUNTS) GLOBAL RESULT
* --------- --------- --------- ----------- -------- ---------
* no-lock 0x265d794 0x2675ee8 0x4cd367c 0x4cd367c Passed
* spinLocks 0xaae678 0xaaefaf 0x155d627 0x155d627 Passed
* atomicInc 0x23c5135 0x23c7018 0x478c14d 0x478c14d Passed
* test-and-set 0x1a84dd6 0x1a864c0 0x350b296 0x350b296 Passed
* Atomic Add 0x20b9c80 0x20bb843 0x41754c3 0x41754c3 Passed
* /
【注意】在此只对关键部分进行说明,因此部分代码就不在写了,大家可以查看它的源文件进行对照。【蓝色的是本文介绍的关于SMP的一些API】
头文件略;
LOCAL volatile BOOL LockDemoWorkersReady = FALSE;
LOCAL atomic_t tasVar = 0; /* variable for test-and-set method */
/ * smpLockDemo smpLockDemo入口函数(也是SHELL命令)
* -> smpLockDemo
, , <[TRUE, FALSE]; (affinity)
* 这个函数会开启两个任务用于同时更新一个共享数据,与此同时,更新自己的一个数据。
* 参数表示使用不同同步机制进行测试的时间(默认为2s),即每种算法会给
* 进行测试。
* /
STATUS smpLockDemo (
unsigned int secs, /* The minimum life time of a worker task */
unsigned int reqNumOfTasks, /* number of tasks */
BOOL setAff /* do task have affinity */
)
{
unsigned int availCpus = vxCpuConfiguredGet (); //获取已配置CPU个数
unsigned int numOfTasks;
unsigned int eventsToWait = 0;
LockS_Demo_TYPE LockMethod = methodLockNone; //这是一个枚举,表示不同机制
if (reqNumOfTasks = = 0) numOfTasks = availCpus;
else numOfTasks = reqNumOfTasks;
if (secs = = 0) secs = 5;
/* Create, set affinity and activited tasks */
if (locksDemoTasksinit (secs, numOfTasks, setAff , &eventsToWait, lockMethod) != OK)
{
return ERROR;
}
lockDemoWorkersReady = TRUE;
/* Wait until all worker tasks are done */
if (eventReceive(eventsToWait, EVENTS_WAIT_ALL, WAIT_FOREVER, NULL) != OK)
{
return ERROR;
}
}
/ * locksDemoTasksinit初始化、激活、设置CPU-Affinity任务。默认情况下,任务的
* CPU-Affinity属性是关闭的。locksDemoTasksInit创建了数个任务。
* /
LOCAL STATUS locksDemoTasksInit (
int secs, /* how long to run */
unsigned int numOfTasks, /* number of tasks evolved */
BOOL affIsOn,
unsigned int * eventsToWait, /* Events to wait mask */
LockS_Demo_TYPE LockMethod /* Method of global count Lock */
)
{
cpuset_t taskAffinity;
unsigned int eventWorkerDone;
int i = 0;
workerTid [i++] = taskIdSelf (); //workerTid是一个数组,用于记录任务ID
while (i < (numOfTasks + 1))
{
/* Task's number is the bit to send an event on */
eventWorkerDone = 1 << i;
/* Add event to the receive list for the Main tasks receives */
*eventsToWait |= eventWorkerDone;
/* Create the specified number of tasks */
if ((workerTid[i] = taskCreate ("worker", WORKER_TASKS_PRIORITY, 0,
5000, (FUNCPTR)workerEntry, secs, taskIdSelf(), numOfTasks,
eventWorkerDone, i, lockMethod,0, 0, 0, 0)) == ERROR)
{
return ERROR;
}
/*
* set affinity for each task if requested and that number of tasks
* the number of CPU's available.
*/
if (affIsOn == TRUE)
{
taskAffinity = 1 << i;
/* set affinity the task affinity with the task index */
if (taskCpuAffinityset (workerTid[i], taskAffinity) != OK)
{
return (ERROR);
}
} /* setAff == TRUE */
/* Activate task ... */
if (taskActivate (workerTid[i]) != OK)
{
return (ERROR);
}
i++; /* Increment iteration */
}/* endwhile */
}
/ * workerEntry 开启任务的入口。在开启任务之前要确保LockDemoWorkersReady被设置为TRUE,这样可以允许所有任务同时开启。
*
*
* /
LOCAL void workerEntry(
int secs, /* the lenght of Demo duration */
int parentTask, /* Id of creator task */
unsigned int numOfTasks,
UINT32 eventWorkerDone, /* unique Id to indicate this task is done */
unsigned int threadNum, /* task sequence number */
LockS_DEMO_TYPE lockMethod /* method used to lock the global count */
)
{
int stopTick;
/* Don't start until all workers are ready */
while (LockDemoWorkersReady != TRUE);
/* start timer once everyone is ready */
stopTick = tickGet () + sysClkRateGet () * secs;
FOREVER
{
/* Update global and local count */
if (LocksDemoGlobalCountOp (lockMethod, threadNum, numOfTasks) != OK)
{
printf ("spinLocks fairness test failed");
eventSend (parentTask, eventWorkerDone);
break;
}
/* Keep incrementing while before we reach end time */
if (tickGet() > stopTick)
{
/* Inform that worker is done */
eventSend (parentTask, eventWorkerDone);
break;
} /* tickGet() > stopTick */
} /* FOREVER */
}
/ * locksDemoGlobalCountOp 对一个共享数据和任务的私有数据进行累加。
* 在这个函数中,会根据不同的同步机制进行运算。它有三个参数:第一个参数表示使用
* 的同步机制;第二个参数表示调用该函数的任务ID,这个参数用于累加任务私有的数
* 据;第三个参数表示任务个数。本函数会对五种同步机制进行运算。
* /
LOCAL STATUS locksDemoGlobalCountOp (
LockS_Demo_TYPE LockMethod, /* Means of mutual exclusion */
unsigned int threadNum,
unsigned int numOfTasks /* Number of tasks */
)
{
/* Check the method we are computing operation for */
switch (lockMethod)
{
case methodTaskSpinLock:
{
/* use a fair spinlock */
SPIN_LOCK_TASK_TAKE (&fairTaskSyncLock);
/* make sure that we only take a fair turn */
varsToUpdate [threadNum]++; /* Update my local count */
varsToUpdate [0]++; /* increment shared copy */
SPIN_LOCK_TASK_GIVE (&fairTaskSyncLock); /* release the lock */
break;
}
case methodIsrSpinLock:
/* use a spinlock */
SPIN_LOCK_ISR_TAKE (&isrSyncLock);
varsToUpdate [threadNum]++; /* Update my local count */
varsToUpdate [0]++; /* increment shared copy */
SPIN_Lock_ISR_GIVE (&isrSyncLock); /* release the lock */
break;
case methodIsrNdLock:
/* use a spinlock */
key = spinLockIsrNdTake (&isrNdLock);
varsToUpdate [threadNum]++; /* Update my local count */
varsToUpdate [0]++; /* increment shared copy */
spinLockIsrNdGive (&isrNdLock, key); /* release the lock */
break;
case methodatomicInc:
varsToUpdate [threadNum]++; /* Update my local count */
/* Atomiccally increment global count */
vxatomicInc ((Atomic_t *) varsToUpdate );
break;
case methodVxTas:
/* use test-and-set */
while ((vxTas ((void *)&tasVar) != TRUE));
varsToUpdate [threadNum]++; /* Update my local count */
varsToUpdate [0]++; /* Update global count */
VX_MEM_BARRIER_RW(); /* prevent above accesses from leaking */
vxatomicset ((Atomic_t *)&tasVar, 0);
break;
case methodVxatomicAdd:
/* Update my local count */
varsToUpdate [threadNum]++;
/* increment shared copy */
vxAtomicAdd ((atomic_t *) varsToUpdate , 1);
break;
default:
case methodLockNone:
/* update my local count */
varsToUpdate [threadNum]++;
/* Update the global count */
varsToUpdate [0] = varsToUpdate [0] + 1 ;
break;
} /* endSwitch */
return OK;
}
17. 向VxWorks SMP系统移植代码
向SMP系统移植代码最关键要考虑的就是对称多处理允许任务与任务间、任务与ISR间、ISR与ISR间可以同时执行。
从VxWorks UP向SMP移植代码需要一些步骤。移植的过程需要用到不同的多任务机制,还有一些互斥/同步机制等等。在移植代码时,我们需要将taskLock()替换成spinLockTaskTake(),类似的地方还有很多。
本大节对代码移植做了,包括OS设备需要考虑的一些问题,还有在移植过程中的一些细节问题等。
(1) 代码移植步骤
本小节描述了从VxWorks UP向SMP移植APP的模式和推荐步骤。
风河推荐:在将早期版本的VxWorks UP代码移植到当前版本的VxWorks SMP时,使用如下步骤:
a. 将原来的UP代码移植到当前VxWorks UP系统中
把使用VxWorks UP 6.previous (或更早)系统的代码移植到VxWorks UP 6.current系统中。在这个过程中要将一些API替换成当前版本中支持的API,要把一些与VxWorks SMP不兼容或不支持的API替换掉。
b. 将当前VxWorks UP代码移植为当前VxWorks SMP代码
把使用VxWorks UP 6.current系统的代码移植到VxWorks SMP 6.current。在这个过程中要注意“同时执行”会引起的一些bug(例如死锁等)。硬件的替换也包含在这个过程中(CPU从UP到SMP)。
c. 为提升SMP性能优化代码
对代码进行优化,这样可以使程序充分利用对称多处理的优势。
代码移植过程如图10所示。
图 10 UP代码向SMP的移植过程
(2) UP与SMP编程中不兼容的API
表13列出了UP与SMP编程中不兼容的API和LIB库。在UP代码向SMP代码移植的过程中,必须注意替换以下API:
表 13 UP与SMP中不兼容的API
不兼容的UP的API
SMP对应的API
备注
代码中的隐式同步机制
使用显示同步机制,例如信号量或spinLock
见 “任务的隐式同步机制”小节
若干cacheLib库函数
修改了对应函数的用法
见“cacheLib限制”小节
若干vmBaseLib库函数
修改了对应函数的用法
见“vmBaseLib限制”小节
taskLock(), intLock()
替换为spinLockLib, taskCpuLock(), intCpuLock(),原子操作
见“同步与互斥机制”小节,见“任务锁”小节
taskRtpLock(), taskRtpUnlock()
替换为信号量、原子操作
见“同步与互斥机制”小节,见“RTP中的任务锁”小节
任务变量,taskVarLib库函数
__thread存储类
见“任务变量管理”小节
tlsLIb库函数
__thread存储类
见“任务本地存储”小节
访问CPU-specific全局变量
替换为CPU-specific全局变量访问函数
见“SMP CPU-specific变量和UP全局变量”小节
内存访问属性与SMP内存不一致
见“内存访问属性”小节
不适用VxBus的驱动
适用于VxBus的驱动
见“驱动与BSP”小节
UP的BSP
SMP的BSP
见“驱动与BSP”小节
UP的boot loader
SMP的boot loader
(3) RTP应用与SMP
在VxWorks UP系统中,RTP(用户模式)应用对于互斥/同步机制的限制要比内核代码和内核应用要多。在VxWorks SMP中,RTP(用户模式)可以使用信号量、原子操作,但不支持spinLock,内存障碍以及CPU-specific这些互斥机制。此外,使用semExchange()可以实现一个信号量的原子性的give和exchange操作。
(4) 任务的隐式同步机制
VxWorks是一个支持多任务的OS,VxWorks和它的APP是支持可重入的。因此,当任务使用显示同步机制的时候,当代码移植到一个允许“同时执行”的系统上时,需要小心。例如,任务A释放了一个信号量给任务B得以使其运行,这是一个显示的同步机制。另一方面,隐式同步机制——通过任务优先级——不能在VxWorks SMP中使用。例如,如果一个高优先级的任务A生成了一个低优先级的任务B,希望在任务A释放CPU之前任务B不能运行,这种假设在SMP系统中是不成立的。
通过任务优先级实现的隐式同步机制不易被发现,需要仔细查阅所有使用了与生成任务相关的代码。例如,我们要重点查看使用了下述API的代码:
a. 创建任务的API
例如taskSpawn(),rtpSpawn()等一些创建新任务的API。在SMP系统中,无论新的任务与父任务的优先级谁高谁低,新任务可能会在创建任务的API返回后在另一块CPU上运行。如果父任务与子任务使用信号量、消息队列或其他机制进行通信,则它们必须在创建新任务之间被创建和被初始化。
b. 会激活一个处于waiting中的pend任务的API
例如semGive(),msgQSend(),eventSend()等一些会激活处于waiting中的pend任务的API,即使被激活的任务优先级比调用以上API函数的任务低,被激活的任务也有可能在调用函数返回之间开始运行。
例如,在一个VxWorks UP系统中,一个任务可以通过使用intLock()在保护临界资源,同时也会屏蔽所有中断以防止ISR访问临界资源。当ISR访问临界资源时,不会使用显示互斥机制,这是因为当UP系统中运行ISR时,任务是不能运行的(驱动会使用很多ISR,当任务使用intLock()时,这些ISR会排队等待工作)。而在SMP系统中,ISR工作的同时任务不工作这种假设是不成立的。因此,在ISR中必须使用显示互斥机制。关于ISR的spinlock可参见“spinlock互斥与同步”大节。
(5) 同步与互斥机制
由于SMP系统中允许真正意义上的“同时执行”存在,因此VxWorks UP与VxWorks SMP中的显示同步/互斥机制有所不同。
信号量在VxWorks UP和VxWorks SMP系统中的用法是相同的。但是关于中断锁和任务锁在两个系统中的用法就是不同的了。
例如,在VxWorks SMP中,一个任务和一个ISR可以同时运行,这在VxWorks UP中是不可能的。因此,一个任务和ISR之间的互斥机制也就随着系统的变化而变化。任务与ISR之间一个通用的同步机制就是二进制信号量,这种机制在VxWorks SMP系统中同样奏效。因此,UP代码中使用二进制信号量的地方在VxWorks SMP系统中不用修改。同样的情况也适用于消息队列和VxWorks事件(event)机制。
【注意】当一个ISR唤醒了一个任务(可以通过释放一个二进制信号量或发一个事件,再或者发送一个消息给一个消息队列等等),该任务必须立即运行在另一个CPU上。
(6) UP中API在SMP中的变异
VxWorks UP与VxWorks SMP提供的API大多数是相同的,也有一部分在函数行为上是不同的,这是因为在SMP系统的需求,即它们的使用有一定的限制。
cacheLib限制
VxWorks UP中的cache API是围绕UP系统设计的。包括:使能、使无效cache,清除cache中数据,cache天生具有CPU-specific属性,它们统统设定为本地CPU的cache。在SMP系统中,这种天生的CPU-specific属性变得没有意义。VxWorks SMP支持的系统提供硬件cache一致性,这种一致性包括SMP系统CPU之间的,也包括内存子系统与设备内存空间之间的。为了实现折现特点,下面描述了VxWorks SMP中关于cache限制和行为的改变。
cacheEnalbe()和cacheDisable()。能够使硬件cache保持一致性的唯一方法就是使cache一直处于开启状态。因此,在VxWorks SMP中每个CPU的cache都要处于使能状态,不可以将它们关闭。因此,在VxWorks SMP中调用cacheEnable()总会返回OK,而调用cacheDisable()总是返回ERROR。ERRNO是S_cacheLib_FUNCTION_UNSUPPORTED.
cacheClear(), CacheFlush()以及cacheInvalidate(). 为了确保硬件cache一致性,这些API不再需要。因此,如果在VxWorks SMP中调用这些API,它们不会进行任何操作除了返回OK。
cacheLock(),cacheUnlock()。VxWorks SMP中不支持这些API。如果调用,将会返回ERROR,错误码是S_cacheLib_FUNCTION_UNSUPPORTED
cacheTextUpdate()。该函数在VxWorks SMP中的使用方法与VxWorks UP中相同。
vmBaseLib限制
VxWorks SMP不提供交换内存页属性的API。在一个SMP系统中,RAM空间在CPU之间是共享的。如果系统RAM中的一个单独页属性被修改了,以至于它不再是硬件一致性的了,任何OS对这个页的操作(例如spinlock、共享数据结构等等)将会产生一个高风险的不可知的后果。这种不可知的行为甚至在页属性改变后许久还能发生。这种情况很难通过debug查到,这是因为在SMP中我们都是基于硬件一致性进行工作的。
vmBaseStateset()与vmStateSet()。在UP中,这些API用于修改虚拟内存中单独页的属性。在SMP系统中一个页的cache属性不能被修改。调用这些函数都会返回ERROR,并且错误码置为S_vmLib_BAD_STATE_PARAM.
(7) 在SMP中不支持的UP API以及SMP提供的替代API
VxWorks UP中的一些API在VxWorks SMP中不能使用,这是因为VxWorks SMP提供真正意义上的同时执行。VxWorks SMP针对对称多处理特点提供了这些API的替代品。
中断锁:intLock()和intUnlock()
在VxWorks UP中,当一个任务(或ISR)调用了intLock(),则它阻止了VxWorks调用其他的中断(即关中断)。这个API典型的用法是保证任务间、任务与ISR间、ISR间对临界资源的互斥访问。这种机制在SMP系统中不再适用,VxWorks SMP提供了以下替代的方法:
a. 如果中断锁用来在一块内存上进行虚拟(pseudo)原子操作,那么原子操作将是一个好的替代品;
b. 如果中断锁用在任务间的互斥机制,那么信号量或者任务级spinLock将会是一个好的替代品。Spinlock申请/释放的操作要比信号量快,因此对于要求性能的、占用时间较短的临界资源时,使用spinlock比较合适。而信号量用于保护占用时间较长的临界资源时比较合适;
c. 如果中断锁用在任务与ISR间,或者ISR间,则中断级 spinlock比较合适;
d. 如果中断锁用在任务间的互斥机制,并且所有参与互斥的任务有相同的CPU affinity,那么就可以使用taskCpuLock();
e. 如果中断锁用在任务间、任务与ISR间,或者ISR间,并且参与互斥的所有任务和ISR有相同的CPU affinity,那么就可以继续使用intCpuLock()。
任务锁:taskLock()与taskUnlock()
在VxWorks UP系统中,当一个任务调用了任务锁API,则它会禁止系统中其他任务的调度直到该任务释放了对应的任务锁。这些API的典型用法是保证临街资源的互斥访问。
在VxWorks UP中,内核API taskLock()会把任务调度机制挂起。这种机制在SMP系统中不再适用,VxWorks SMP提供了以下替代的方法:
a. 信号量;
b. 原子操作;
c. 任务级spinlock。Spinlock申请/释放的操作要比信号量快,因此对于要求性能的、占用时间较短的临界资源时,使用spinlock比较合适;
d. 如果中断锁用在任务间的互斥机制,并且所有参与互斥的任务有相同的CPU affinity,那么就可以使用taskCpuLock()。
RTP中的任务锁:taskRtpLock()与taskRtpUnlock()
在RTP APP中使用taskRtpLock(),使得调用该API的进程不能再调用其他任务。与taskLock()一样,taskRtpLock()在SMP中不再适用。