mς %U²Ic@sΉdZdkadkZdkZdkZdkZdklZda dei fd„ƒYZ dei fd„ƒYZ dei fd „ƒYZ dd „Zed jo eƒndS( sΥGeneric thread tests. Meant to be used by dummy_thread and thread. To allow for different modules to be used, test_main() can be called with the module to use as the thread implementation as its sole argument. N(s test_supportit LockTestscBs_tZdZd„Zd„Zd„Zd„Zd„Zd„Zd„Z d„Z d „Z RS( sTest lock objects.cCstiƒ|_dS(N(t_threadt allocate_locktselftlock(R((t3/data/zmath/lib/python2.4/test/test_dummy_thread.pytsetUpscCs|i|iiƒ dƒdS(Ns(Lock object is not initialized unlocked.(Rt failUnlessRtlocked(R((Rt test_initlockscCs8|iiƒ|iiƒ|i|iiƒ dƒdS(Ns%Lock object did not release properly.(RRtacquiretreleaseRR(R((Rt test_releases  cCs|iti|iiƒdS(N(RtfailUnlessRaisesRterrorRR (R((Rttest_improper_release%scCs |i|iidƒdƒdS(Nis)Conditional acquiring of the lock failed.(RRRR (R((Rttest_cond_acquire_success)scCs1|iidƒ|i|iidƒ dƒdS(Nis=Conditional acquiring of a locked lock incorrectly succeeded.(RRR R(R((Rttest_cond_acquire_fail.scCs*|iiƒ|i|iiƒdƒdS(NsUncondional locking failed.(RRR RR(R((Rttest_uncond_acquire_success5s cCs&|i|iidƒtjdƒdS(Nis*Unconditional locking did not return True.(RRRR tTrue(R((Rttest_uncond_acquire_return_val;scCs©d„}|iiƒttiƒƒ}ti||it fƒt i oHdt GHn|iiƒttiƒƒ}t i o dGHn|i ||t jdƒdS(NcCsti|ƒ|iƒdS(s:Hold on to lock for a set amount of time before unlocking.N(ttimetsleeptdelayt to_unlockR (RR((Rt delay_unlockBs s@*** Waiting for thread to release the lock (approx. %s sec.) ***tdones+Blocking by unconditional acquiring failed.(RRRR tintRt start_timeRtstart_new_threadtDELAYt test_supporttverbosetend_timeR(RR!RR((Rttest_uncond_acquire_blocking@s       ( t__name__t __module__t__doc__RR R RRRRRR"(((RRs         t MiscTestscBs;tZdZd„Zd„Zd„Zd„Zd„ZRS(sMiscellaneous tests.cCs|ittiƒdS(N(RR t SystemExitRtexit(R((Rt test_exitXscCs?|ittiƒtƒdƒ|itiƒdjdƒdS(Ns*_thread.get_ident() returned a non-integeris_thread.get_ident() returned 0(RRt isinstanceRt get_identR(R((Rt test_ident\scCs&|ittiƒtiƒdƒdS(NsR_thread.LockType is not an instance of what is returned by _thread.allocate_lock()(RRR*RRtLockType(R((Rt test_LockTypecscCs)d„}|itti|tƒƒdS(NcCstiƒdS(N(Rtinterrupt_main(((Rtcall_interruptls(R0RR tKeyboardInterruptRRttuple(RR0((Rttest_interrupt_mainis cCs|ittiƒdS(N(RR R1RR/(R((Rttest_interrupt_in_mainqs(R#R$R%R)R,R.R3R4(((RR&Us     t ThreadTestscBs tZdZd„Zd„ZRS(sTest thread creation.cCsttd„}tidƒ}ti||ttfƒ|iƒ}|i |do|ddƒti|t ƒhd|<dt<dt<ƒ|iƒ}|i |do|ddƒti||tfhdt<ƒ|iƒ}|i |do|dd ƒdS( NcCs|i||fƒdS(s<Use to test _thread.start_new_thread() passes args properly.N(tqueuetputtarg1targ2(R6R8R9((Rt arg_tester{siis7Argument passing for thread creation using tuple failedR6R8R9s8Argument passing for thread creation using kwargs failedsGArgument passing for thread creation using both tuple and kwargs failed( tFalseR:tQueuet testing_queueRRRtgettresultRRR2(RR:R?R=((Rttest_arg_passingys 1 " cCsΩd„}d}ti|ƒ}tioHdt|fGHnxPt|ƒD]B}tot t i ƒdƒ}nd}t i |||fƒqIWtitƒtio dGHn|i|iƒ|jd|tfƒdS(NcCs$ti|ƒ|itiƒƒdS(s@Wait for ``delay`` seconds and then put something into ``queue``N(RRRR6R7RR+(R6R((Rt queue_mark‘s isJ*** Testing multiple thread creation (will take approx. %s to %s sec.) ***iiRs2Not all %s threads executed properly after %s sec.(RAt thread_countR<R=RR Rtxrangetcounttroundtrandomt local_delayRRRRRRtqsize(RRARBRDRGR=((Rttest_multi_creations$       (R#R$R%R@RI(((RR5vs  cCsF|o|adantioHdtGHntitttƒdS(Nis"*** Using %s as _thread module ***( timported_moduleRRRR t run_unittestRR&R5(RJ((Rt test_mainͺs   t__main__(R%t dummy_threadRRR<RFtunittestttestRRtTestCaseRR&R5tNoneRLR#( RRORFR5R<RLR&RR((Rt?s      C!4