| import unittest |
| |
| from asyncio import tools |
| |
| from collections import namedtuple |
| |
| FrameInfo = namedtuple('FrameInfo', ['funcname', 'filename', 'lineno']) |
| CoroInfo = namedtuple('CoroInfo', ['call_stack', 'task_name']) |
| TaskInfo = namedtuple('TaskInfo', ['task_id', 'task_name', 'coroutine_stack', 'awaited_by']) |
| AwaitedInfo = namedtuple('AwaitedInfo', ['thread_id', 'awaited_by']) |
| |
| |
| # mock output of get_all_awaited_by function. |
| TEST_INPUTS_TREE = [ |
| [ |
| # test case containing a task called timer being awaited in two |
| # different subtasks part of a TaskGroup (root1 and root2) which call |
| # awaiter functions. |
| ( |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="timer", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("awaiter3", "/path/to/app.py", 130), |
| FrameInfo("awaiter2", "/path/to/app.py", 120), |
| FrameInfo("awaiter", "/path/to/app.py", 110) |
| ], |
| task_name=4 |
| ), |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("awaiterB3", "/path/to/app.py", 190), |
| FrameInfo("awaiterB2", "/path/to/app.py", 180), |
| FrameInfo("awaiterB", "/path/to/app.py", 170) |
| ], |
| task_name=5 |
| ), |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("awaiterB3", "/path/to/app.py", 190), |
| FrameInfo("awaiterB2", "/path/to/app.py", 180), |
| FrameInfo("awaiterB", "/path/to/app.py", 170) |
| ], |
| task_name=6 |
| ), |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("awaiter3", "/path/to/app.py", 130), |
| FrameInfo("awaiter2", "/path/to/app.py", 120), |
| FrameInfo("awaiter", "/path/to/app.py", 110) |
| ], |
| task_name=7 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=8, |
| task_name="root1", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("main", "", 0) |
| ], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=9, |
| task_name="root2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("main", "", 0) |
| ], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="child1_1", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("blocho_caller", "", 0), |
| FrameInfo("bloch", "", 0) |
| ], |
| task_name=8 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=6, |
| task_name="child2_1", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("blocho_caller", "", 0), |
| FrameInfo("bloch", "", 0) |
| ], |
| task_name=8 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=7, |
| task_name="child1_2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("blocho_caller", "", 0), |
| FrameInfo("bloch", "", 0) |
| ], |
| task_name=9 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=5, |
| task_name="child2_2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("blocho_caller", "", 0), |
| FrameInfo("bloch", "", 0) |
| ], |
| task_name=9 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ), |
| ( |
| [ |
| [ |
| "└── (T) Task-1", |
| " └── main", |
| " └── __aexit__", |
| " └── _aexit", |
| " ├── (T) root1", |
| " │ └── bloch", |
| " │ └── blocho_caller", |
| " │ └── __aexit__", |
| " │ └── _aexit", |
| " │ ├── (T) child1_1", |
| " │ │ └── awaiter /path/to/app.py:110", |
| " │ │ └── awaiter2 /path/to/app.py:120", |
| " │ │ └── awaiter3 /path/to/app.py:130", |
| " │ │ └── (T) timer", |
| " │ └── (T) child2_1", |
| " │ └── awaiterB /path/to/app.py:170", |
| " │ └── awaiterB2 /path/to/app.py:180", |
| " │ └── awaiterB3 /path/to/app.py:190", |
| " │ └── (T) timer", |
| " └── (T) root2", |
| " └── bloch", |
| " └── blocho_caller", |
| " └── __aexit__", |
| " └── _aexit", |
| " ├── (T) child1_2", |
| " │ └── awaiter /path/to/app.py:110", |
| " │ └── awaiter2 /path/to/app.py:120", |
| " │ └── awaiter3 /path/to/app.py:130", |
| " │ └── (T) timer", |
| " └── (T) child2_2", |
| " └── awaiterB /path/to/app.py:170", |
| " └── awaiterB2 /path/to/app.py:180", |
| " └── awaiterB3 /path/to/app.py:190", |
| " └── (T) timer", |
| ] |
| ] |
| ), |
| ], |
| [ |
| # test case containing two roots |
| ( |
| AwaitedInfo( |
| thread_id=9, |
| awaited_by=[ |
| TaskInfo( |
| task_id=5, |
| task_name="Task-5", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=6, |
| task_name="Task-6", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main2", "", 0)], |
| task_name=5 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=7, |
| task_name="Task-7", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main2", "", 0)], |
| task_name=5 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=8, |
| task_name="Task-8", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main2", "", 0)], |
| task_name=5 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo( |
| thread_id=10, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=2, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=1 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="Task-3", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=1 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="Task-4", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=1 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=11, awaited_by=[]), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ), |
| ( |
| [ |
| [ |
| "└── (T) Task-5", |
| " └── main2", |
| " ├── (T) Task-6", |
| " ├── (T) Task-7", |
| " └── (T) Task-8", |
| ], |
| [ |
| "└── (T) Task-1", |
| " └── main", |
| " ├── (T) Task-2", |
| " ├── (T) Task-3", |
| " └── (T) Task-4", |
| ], |
| ] |
| ), |
| ], |
| [ |
| # test case containing two roots, one of them without subtasks |
| ( |
| [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-5", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ), |
| AwaitedInfo( |
| thread_id=3, |
| awaited_by=[ |
| TaskInfo( |
| task_id=4, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=5, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=4 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=6, |
| task_name="Task-3", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=4 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=7, |
| task_name="Task-4", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=4 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=8, awaited_by=[]), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ] |
| ), |
| ( |
| [ |
| ["└── (T) Task-5"], |
| [ |
| "└── (T) Task-1", |
| " └── main", |
| " ├── (T) Task-2", |
| " ├── (T) Task-3", |
| " └── (T) Task-4", |
| ], |
| ] |
| ), |
| ], |
| ] |
| |
| TEST_INPUTS_CYCLES_TREE = [ |
| [ |
| # this test case contains a cycle: two tasks awaiting each other. |
| ( |
| [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="a", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("awaiter2", "", 0)], |
| task_name=4 |
| ), |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="b", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("awaiter", "", 0)], |
| task_name=3 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ] |
| ), |
| ([[4, 3, 4]]), |
| ], |
| [ |
| # this test case contains two cycles |
| ( |
| [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="A", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0), |
| FrameInfo("task_b", "", 0) |
| ], |
| task_name=4 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="B", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0), |
| FrameInfo("task_c", "", 0) |
| ], |
| task_name=5 |
| ), |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0), |
| FrameInfo("task_a", "", 0) |
| ], |
| task_name=3 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=5, |
| task_name="C", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0) |
| ], |
| task_name=6 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=6, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0), |
| FrameInfo("task_b", "", 0) |
| ], |
| task_name=4 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ] |
| ), |
| ([[4, 3, 4], [4, 6, 5, 4]]), |
| ], |
| ] |
| |
| TEST_INPUTS_TABLE = [ |
| [ |
| # test case containing a task called timer being awaited in two |
| # different subtasks part of a TaskGroup (root1 and root2) which call |
| # awaiter functions. |
| ( |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="timer", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("awaiter3", "", 0), |
| FrameInfo("awaiter2", "", 0), |
| FrameInfo("awaiter", "", 0) |
| ], |
| task_name=4 |
| ), |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("awaiter1_3", "", 0), |
| FrameInfo("awaiter1_2", "", 0), |
| FrameInfo("awaiter1", "", 0) |
| ], |
| task_name=5 |
| ), |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("awaiter1_3", "", 0), |
| FrameInfo("awaiter1_2", "", 0), |
| FrameInfo("awaiter1", "", 0) |
| ], |
| task_name=6 |
| ), |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("awaiter3", "", 0), |
| FrameInfo("awaiter2", "", 0), |
| FrameInfo("awaiter", "", 0) |
| ], |
| task_name=7 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=8, |
| task_name="root1", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("main", "", 0) |
| ], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=9, |
| task_name="root2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("main", "", 0) |
| ], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="child1_1", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("blocho_caller", "", 0), |
| FrameInfo("bloch", "", 0) |
| ], |
| task_name=8 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=6, |
| task_name="child2_1", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("blocho_caller", "", 0), |
| FrameInfo("bloch", "", 0) |
| ], |
| task_name=8 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=7, |
| task_name="child1_2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("blocho_caller", "", 0), |
| FrameInfo("bloch", "", 0) |
| ], |
| task_name=9 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=5, |
| task_name="child2_2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("_aexit", "", 0), |
| FrameInfo("__aexit__", "", 0), |
| FrameInfo("blocho_caller", "", 0), |
| FrameInfo("bloch", "", 0) |
| ], |
| task_name=9 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ), |
| ( |
| [ |
| [1, "0x2", "Task-1", "", "", "", "0x0"], |
| [ |
| 1, |
| "0x3", |
| "timer", |
| "", |
| "awaiter3 -> awaiter2 -> awaiter", |
| "child1_1", |
| "0x4", |
| ], |
| [ |
| 1, |
| "0x3", |
| "timer", |
| "", |
| "awaiter1_3 -> awaiter1_2 -> awaiter1", |
| "child2_2", |
| "0x5", |
| ], |
| [ |
| 1, |
| "0x3", |
| "timer", |
| "", |
| "awaiter1_3 -> awaiter1_2 -> awaiter1", |
| "child2_1", |
| "0x6", |
| ], |
| [ |
| 1, |
| "0x3", |
| "timer", |
| "", |
| "awaiter3 -> awaiter2 -> awaiter", |
| "child1_2", |
| "0x7", |
| ], |
| [ |
| 1, |
| "0x8", |
| "root1", |
| "", |
| "_aexit -> __aexit__ -> main", |
| "Task-1", |
| "0x2", |
| ], |
| [ |
| 1, |
| "0x9", |
| "root2", |
| "", |
| "_aexit -> __aexit__ -> main", |
| "Task-1", |
| "0x2", |
| ], |
| [ |
| 1, |
| "0x4", |
| "child1_1", |
| "", |
| "_aexit -> __aexit__ -> blocho_caller -> bloch", |
| "root1", |
| "0x8", |
| ], |
| [ |
| 1, |
| "0x6", |
| "child2_1", |
| "", |
| "_aexit -> __aexit__ -> blocho_caller -> bloch", |
| "root1", |
| "0x8", |
| ], |
| [ |
| 1, |
| "0x7", |
| "child1_2", |
| "", |
| "_aexit -> __aexit__ -> blocho_caller -> bloch", |
| "root2", |
| "0x9", |
| ], |
| [ |
| 1, |
| "0x5", |
| "child2_2", |
| "", |
| "_aexit -> __aexit__ -> blocho_caller -> bloch", |
| "root2", |
| "0x9", |
| ], |
| ] |
| ), |
| ], |
| [ |
| # test case containing two roots |
| ( |
| AwaitedInfo( |
| thread_id=9, |
| awaited_by=[ |
| TaskInfo( |
| task_id=5, |
| task_name="Task-5", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=6, |
| task_name="Task-6", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main2", "", 0)], |
| task_name=5 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=7, |
| task_name="Task-7", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main2", "", 0)], |
| task_name=5 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=8, |
| task_name="Task-8", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main2", "", 0)], |
| task_name=5 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo( |
| thread_id=10, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=2, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=1 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="Task-3", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=1 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="Task-4", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=1 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=11, awaited_by=[]), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ), |
| ( |
| [ |
| [9, "0x5", "Task-5", "", "", "", "0x0"], |
| [9, "0x6", "Task-6", "", "main2", "Task-5", "0x5"], |
| [9, "0x7", "Task-7", "", "main2", "Task-5", "0x5"], |
| [9, "0x8", "Task-8", "", "main2", "Task-5", "0x5"], |
| [10, "0x1", "Task-1", "", "", "", "0x0"], |
| [10, "0x2", "Task-2", "", "main", "Task-1", "0x1"], |
| [10, "0x3", "Task-3", "", "main", "Task-1", "0x1"], |
| [10, "0x4", "Task-4", "", "main", "Task-1", "0x1"], |
| ] |
| ), |
| ], |
| [ |
| # test case containing two roots, one of them without subtasks |
| ( |
| [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-5", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ), |
| AwaitedInfo( |
| thread_id=3, |
| awaited_by=[ |
| TaskInfo( |
| task_id=4, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=5, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=4 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=6, |
| task_name="Task-3", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=4 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=7, |
| task_name="Task-4", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=4 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=8, awaited_by=[]), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ] |
| ), |
| ( |
| [ |
| [1, "0x2", "Task-5", "", "", "", "0x0"], |
| [3, "0x4", "Task-1", "", "", "", "0x0"], |
| [3, "0x5", "Task-2", "", "main", "Task-1", "0x4"], |
| [3, "0x6", "Task-3", "", "main", "Task-1", "0x4"], |
| [3, "0x7", "Task-4", "", "main", "Task-1", "0x4"], |
| ] |
| ), |
| ], |
| # CASES WITH CYCLES |
| [ |
| # this test case contains a cycle: two tasks awaiting each other. |
| ( |
| [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="a", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("awaiter2", "", 0)], |
| task_name=4 |
| ), |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="b", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("awaiter", "", 0)], |
| task_name=3 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ] |
| ), |
| ( |
| [ |
| [1, "0x2", "Task-1", "", "", "", "0x0"], |
| [1, "0x3", "a", "", "awaiter2", "b", "0x4"], |
| [1, "0x3", "a", "", "main", "Task-1", "0x2"], |
| [1, "0x4", "b", "", "awaiter", "a", "0x3"], |
| ] |
| ), |
| ], |
| [ |
| # this test case contains two cycles |
| ( |
| [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="A", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0), |
| FrameInfo("task_b", "", 0) |
| ], |
| task_name=4 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="B", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0), |
| FrameInfo("task_c", "", 0) |
| ], |
| task_name=5 |
| ), |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0), |
| FrameInfo("task_a", "", 0) |
| ], |
| task_name=3 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=5, |
| task_name="C", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0) |
| ], |
| task_name=6 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=6, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("nested", "", 0), |
| FrameInfo("nested", "", 0), |
| FrameInfo("task_b", "", 0) |
| ], |
| task_name=4 |
| ) |
| ] |
| ) |
| ] |
| ), |
| AwaitedInfo(thread_id=0, awaited_by=[]) |
| ] |
| ), |
| ( |
| [ |
| [1, "0x2", "Task-1", "", "", "", "0x0"], |
| [ |
| 1, |
| "0x3", |
| "A", |
| "", |
| "nested -> nested -> task_b", |
| "B", |
| "0x4", |
| ], |
| [ |
| 1, |
| "0x4", |
| "B", |
| "", |
| "nested -> nested -> task_c", |
| "C", |
| "0x5", |
| ], |
| [ |
| 1, |
| "0x4", |
| "B", |
| "", |
| "nested -> nested -> task_a", |
| "A", |
| "0x3", |
| ], |
| [ |
| 1, |
| "0x5", |
| "C", |
| "", |
| "nested -> nested", |
| "Task-2", |
| "0x6", |
| ], |
| [ |
| 1, |
| "0x6", |
| "Task-2", |
| "", |
| "nested -> nested -> task_b", |
| "B", |
| "0x4", |
| ], |
| ] |
| ), |
| ], |
| ] |
| |
| |
| class TestAsyncioToolsTree(unittest.TestCase): |
| def test_asyncio_utils(self): |
| for input_, tree in TEST_INPUTS_TREE: |
| with self.subTest(input_): |
| result = tools.build_async_tree(input_) |
| self.assertEqual(result, tree) |
| |
| def test_asyncio_utils_cycles(self): |
| for input_, cycles in TEST_INPUTS_CYCLES_TREE: |
| with self.subTest(input_): |
| try: |
| tools.build_async_tree(input_) |
| except tools.CycleFoundException as e: |
| self.assertEqual(e.cycles, cycles) |
| |
| |
| class TestAsyncioToolsTable(unittest.TestCase): |
| def test_asyncio_utils(self): |
| for input_, table in TEST_INPUTS_TABLE: |
| with self.subTest(input_): |
| result = tools.build_task_table(input_) |
| self.assertEqual(result, table) |
| |
| |
| class TestAsyncioToolsBasic(unittest.TestCase): |
| def test_empty_input_tree(self): |
| """Test build_async_tree with empty input.""" |
| result = [] |
| expected_output = [] |
| self.assertEqual(tools.build_async_tree(result), expected_output) |
| |
| def test_empty_input_table(self): |
| """Test build_task_table with empty input.""" |
| result = [] |
| expected_output = [] |
| self.assertEqual(tools.build_task_table(result), expected_output) |
| |
| def test_only_independent_tasks_tree(self): |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=10, |
| task_name="taskA", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=11, |
| task_name="taskB", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| expected = [["└── (T) taskA"], ["└── (T) taskB"]] |
| result = tools.build_async_tree(input_) |
| self.assertEqual(sorted(result), sorted(expected)) |
| |
| def test_only_independent_tasks_table(self): |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=10, |
| task_name="taskA", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=11, |
| task_name="taskB", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| self.assertEqual( |
| tools.build_task_table(input_), |
| [[1, '0xa', 'taskA', '', '', '', '0x0'], [1, '0xb', 'taskB', '', '', '', '0x0']] |
| ) |
| |
| def test_single_task_tree(self): |
| """Test build_async_tree with a single task and no awaits.""" |
| result = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| expected_output = [ |
| [ |
| "└── (T) Task-1", |
| ] |
| ] |
| self.assertEqual(tools.build_async_tree(result), expected_output) |
| |
| def test_single_task_table(self): |
| """Test build_task_table with a single task and no awaits.""" |
| result = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| expected_output = [[1, '0x2', 'Task-1', '', '', '', '0x0']] |
| self.assertEqual(tools.build_task_table(result), expected_output) |
| |
| def test_cycle_detection(self): |
| """Test build_async_tree raises CycleFoundException for cyclic input.""" |
| result = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=3 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ) |
| ] |
| ) |
| ] |
| with self.assertRaises(tools.CycleFoundException) as context: |
| tools.build_async_tree(result) |
| self.assertEqual(context.exception.cycles, [[3, 2, 3]]) |
| |
| def test_complex_tree(self): |
| """Test build_async_tree with a more complex tree structure.""" |
| result = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="Task-3", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=3 |
| ) |
| ] |
| ) |
| ] |
| ) |
| ] |
| expected_output = [ |
| [ |
| "└── (T) Task-1", |
| " └── main", |
| " └── (T) Task-2", |
| " └── main", |
| " └── (T) Task-3", |
| ] |
| ] |
| self.assertEqual(tools.build_async_tree(result), expected_output) |
| |
| def test_complex_table(self): |
| """Test build_task_table with a more complex tree structure.""" |
| result = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=2, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=4, |
| task_name="Task-3", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("main", "", 0)], |
| task_name=3 |
| ) |
| ] |
| ) |
| ] |
| ) |
| ] |
| expected_output = [ |
| [1, '0x2', 'Task-1', '', '', '', '0x0'], |
| [1, '0x3', 'Task-2', '', 'main', 'Task-1', '0x2'], |
| [1, '0x4', 'Task-3', '', 'main', 'Task-2', '0x3'] |
| ] |
| self.assertEqual(tools.build_task_table(result), expected_output) |
| |
| def test_deep_coroutine_chain(self): |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=10, |
| task_name="leaf", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("c1", "", 0), |
| FrameInfo("c2", "", 0), |
| FrameInfo("c3", "", 0), |
| FrameInfo("c4", "", 0), |
| FrameInfo("c5", "", 0) |
| ], |
| task_name=11 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=11, |
| task_name="root", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| expected = [ |
| [ |
| "└── (T) root", |
| " └── c5", |
| " └── c4", |
| " └── c3", |
| " └── c2", |
| " └── c1", |
| " └── (T) leaf", |
| ] |
| ] |
| result = tools.build_async_tree(input_) |
| self.assertEqual(result, expected) |
| |
| def test_multiple_cycles_same_node(self): |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="Task-A", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("call1", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=2, |
| task_name="Task-B", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("call2", "", 0)], |
| task_name=3 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="Task-C", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("call3", "", 0)], |
| task_name=1 |
| ), |
| CoroInfo( |
| call_stack=[FrameInfo("call4", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ) |
| ] |
| ) |
| ] |
| with self.assertRaises(tools.CycleFoundException) as ctx: |
| tools.build_async_tree(input_) |
| cycles = ctx.exception.cycles |
| self.assertTrue(any(set(c) == {1, 2, 3} for c in cycles)) |
| |
| def test_table_output_format(self): |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="Task-A", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("foo", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=2, |
| task_name="Task-B", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| table = tools.build_task_table(input_) |
| for row in table: |
| self.assertEqual(len(row), 7) |
| self.assertIsInstance(row[0], int) # thread ID |
| self.assertTrue( |
| isinstance(row[1], str) and row[1].startswith("0x") |
| ) # hex task ID |
| self.assertIsInstance(row[2], str) # task name |
| self.assertIsInstance(row[3], str) # coroutine stack |
| self.assertIsInstance(row[4], str) # coroutine chain |
| self.assertIsInstance(row[5], str) # awaiter name |
| self.assertTrue( |
| isinstance(row[6], str) and row[6].startswith("0x") |
| ) # hex awaiter ID |
| |
| |
| class TestAsyncioToolsEdgeCases(unittest.TestCase): |
| |
| def test_task_awaits_self(self): |
| """A task directly awaits itself - should raise a cycle.""" |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="Self-Awaiter", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("loopback", "", 0)], |
| task_name=1 |
| ) |
| ] |
| ) |
| ] |
| ) |
| ] |
| with self.assertRaises(tools.CycleFoundException) as ctx: |
| tools.build_async_tree(input_) |
| self.assertIn([1, 1], ctx.exception.cycles) |
| |
| def test_task_with_missing_awaiter_id(self): |
| """Awaiter ID not in task list - should not crash, just show 'Unknown'.""" |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="Task-A", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("coro", "", 0)], |
| task_name=999 |
| ) |
| ] |
| ) |
| ] |
| ) |
| ] |
| table = tools.build_task_table(input_) |
| self.assertEqual(len(table), 1) |
| self.assertEqual(table[0][5], "Unknown") |
| |
| def test_duplicate_coroutine_frames(self): |
| """Same coroutine frame repeated under a parent - should deduplicate.""" |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="Task-1", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("frameA", "", 0)], |
| task_name=2 |
| ), |
| CoroInfo( |
| call_stack=[FrameInfo("frameA", "", 0)], |
| task_name=3 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=2, |
| task_name="Task-2", |
| coroutine_stack=[], |
| awaited_by=[] |
| ), |
| TaskInfo( |
| task_id=3, |
| task_name="Task-3", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| tree = tools.build_async_tree(input_) |
| # Both children should be under the same coroutine node |
| flat = "\n".join(tree[0]) |
| self.assertIn("frameA", flat) |
| self.assertIn("Task-2", flat) |
| self.assertIn("Task-1", flat) |
| |
| flat = "\n".join(tree[1]) |
| self.assertIn("frameA", flat) |
| self.assertIn("Task-3", flat) |
| self.assertIn("Task-1", flat) |
| |
| def test_task_with_no_name(self): |
| """Task with no name in id2name - should still render with fallback.""" |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="root", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[FrameInfo("f1", "", 0)], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=2, |
| task_name=None, |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| # If name is None, fallback to string should not crash |
| tree = tools.build_async_tree(input_) |
| self.assertIn("(T) None", "\n".join(tree[0])) |
| |
| def test_tree_rendering_with_custom_emojis(self): |
| """Pass custom emojis to the tree renderer.""" |
| input_ = [ |
| AwaitedInfo( |
| thread_id=1, |
| awaited_by=[ |
| TaskInfo( |
| task_id=1, |
| task_name="MainTask", |
| coroutine_stack=[], |
| awaited_by=[ |
| CoroInfo( |
| call_stack=[ |
| FrameInfo("f1", "", 0), |
| FrameInfo("f2", "", 0) |
| ], |
| task_name=2 |
| ) |
| ] |
| ), |
| TaskInfo( |
| task_id=2, |
| task_name="SubTask", |
| coroutine_stack=[], |
| awaited_by=[] |
| ) |
| ] |
| ) |
| ] |
| tree = tools.build_async_tree(input_, task_emoji="🧵", cor_emoji="🔁") |
| flat = "\n".join(tree[0]) |
| self.assertIn("🧵 MainTask", flat) |
| self.assertIn("🔁 f1", flat) |
| self.assertIn("🔁 f2", flat) |
| self.assertIn("🧵 SubTask", flat) |