Heray-Was-Here
Server : Apache
System : Linux vps43555.mylogin.co 3.10.0-1160.53.1.vz7.185.3 #1 SMP Tue Jan 25 12:49:12 MSK 2022 x86_64
User : redsea ( 60651)
PHP Version : 7.4.32
Disable Function : NONE
Directory :  /proc/self/root/usr/local/python-3.2/lib/python3.2/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/usr/local/python-3.2/lib/python3.2/test/test_dummy_threading.py
from test import support
import unittest
import dummy_threading as _threading
import time

class DummyThreadingTestCase(unittest.TestCase):

    class TestThread(_threading.Thread):

        def run(self):
            global running
            global sema
            global mutex
            # Uncomment if testing another module, such as the real 'threading'
            # module.
            #delay = random.random() * 2
            delay = 0
            if support.verbose:
                print('task', self.name, 'will run for', delay, 'sec')
            sema.acquire()
            mutex.acquire()
            running += 1
            if support.verbose:
                print(running, 'tasks are running')
            mutex.release()
            time.sleep(delay)
            if support.verbose:
                print('task', self.name, 'done')
            mutex.acquire()
            running -= 1
            if support.verbose:
                print(self.name, 'is finished.', running, 'tasks are running')
            mutex.release()
            sema.release()

    def setUp(self):
        self.numtasks = 10
        global sema
        sema = _threading.BoundedSemaphore(value=3)
        global mutex
        mutex = _threading.RLock()
        global running
        running = 0
        self.threads = []

    def test_tasks(self):
        for i in range(self.numtasks):
            t = self.TestThread(name="<thread %d>"%i)
            self.threads.append(t)
            t.start()

        if support.verbose:
            print('waiting for all tasks to complete')
        for t in self.threads:
            t.join()
        if support.verbose:
            print('all tasks done')

def test_main():
    support.run_unittest(DummyThreadingTestCase)


if __name__ == '__main__':
    test_main()

Hry