Coverage for backend/idaes_service/solver/timing.py: 98%

37 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from contextvars import ContextVar 

2import time 

3 

4 

5class TimingDebugHandler: 

6 def __init__(self, key: str = "root") -> None: 

7 self.timing: dict[str, dict] = {} 

8 self.pointer_stack = [self.timing] 

9 self.step_into(key) 

10 

11 def current_item(self): 

12 item = self.current_pointer() 

13 if len(item) == 0: 

14 return None 

15 return item[list(item.keys())[-1]] 

16 

17 def current_pointer(self): 

18 return self.pointer_stack[-1] 

19 

20 def update_last_timing(self): 

21 if self.current_item(): 

22 # update the duration of the previous key 

23 self.current_item()["duration"] = time.time() - self.current_item()["start_time"] 

24 

25 def add_timing(self, key: str): 

26 self.update_last_timing() 

27 self.current_pointer()[key] = { 

28 "start_time": time.time(), 

29 "duration": None, # calculated at the next timing 

30 "children": {} 

31 } 

32 

33 def step_into(self, key=None): 

34 self.add_timing(key) 

35 self.pointer_stack.append(self.current_item()["children"]) 

36 

37 def step_out(self): 

38 self.update_last_timing() 

39 self.pointer_stack.pop() 

40 

41 def close(self): 

42 # update root duration 

43 self.step_out() 

44 self.update_last_timing() 

45 return self.timing 

46 

47 

48def start_timing(key: str = "root"): 

49 """ 

50 start a new timing handler 

51 """ 

52 handler = TimingDebugHandler(key) 

53 timing.set(handler) 

54 return handler 

55 

56 

57def get_timer(): 

58 """ 

59 get the current timing handler 

60 """ 

61 return timing.get() 

62 

63 

64# Create a context variable to store the timing information 

65timing: ContextVar[TimingDebugHandler | None] = ContextVar("timing", default=None)