Skip to content
Snippets Groups Projects
Commit 04e21cd0 authored by David Huss's avatar David Huss :speech_balloon:
Browse files

Cooldown timeout for projector

parent 19b600fd
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@ import mediactl
from datetime import datetime, timedelta
from result import Result, Ok, Err
from functools import wraps
from typing import Optional
class ConnectionState(mediactl.EnumState):
......@@ -95,7 +96,9 @@ def projector_connection(endpoint: str, purpose: str, method: str = "post"):
try:
d = response.json()
except requests.exceptions.JSONDecodeError as e:
print(f"When sending a {method}-request to {self.addr}/{endpoint} error just happened:")
print(
f"When sending a {method}-request to {self.addr}/{endpoint} error just happened:"
)
print(f" {e}")
print(f" status: {response.status_code}")
print(repr(response))
......@@ -127,13 +130,22 @@ class Projector(mediactl.WithLogger):
self.addr = addr
self.timeout = 1.0
self.status = self.get_default_status()
self._cooldown_since: Optional[datetime] = None
@projector_connection(endpoint="status", purpose="get status", method="get")
async def update_status(self) -> Result[dict, str]:
global data
data = data # noqa: F821 variable "data" set by decorator
self.status["connection"] = ConnectionState.connected
self.status["power"] = PowerState(data["power"]["state"])
# 2) Compute the new PowerState from whatever raw payload we got:
new_power = PowerState(data["power"]["state"])
# 3) If it changed, record the timestamp:
old_power = self.status.get("power")
if old_power is None or new_power is not old_power:
# State actually changed, so set “power_since = now”
self.status["power_since"] = datetime.now()
self.status["power"] = new_power
self.status["shutter"] = ShutterState(data["shutter"]["state"])
self.status["testpattern"] = TestPatternState(data["testpattern"])
self.status["lamp"] = data["lamp"]["hours"]
......@@ -144,19 +156,49 @@ class Projector(mediactl.WithLogger):
return {
"connection": ConnectionState.unknown,
"power": PowerState.unknown,
"power_since": None,
"shutter": ShutterState.unknown,
"testpattern": TestPatternState.unknown,
"time": None,
}
async def get_status(self) -> dict:
"""
1) Call update_status() to fetch the raw status.
2) If status["power"] == PowerState.cooldown, start/check a cooldown timer.
- If this cooldown has lasted > 60 seconds, force status["power"] to PowerState.off.
3) Return the (possibly modified) self.status.
"""
response = await self.update_status()
if response.is_err():
self.log_error(response.err_value)
# Return previous status on error
return self.status
return self.status # on error, return last‐known
proj_stat = response.unwrap()
now = datetime.now()
# ───── You can adjust this timeout as needed ─────
COOLDOWN_TIMEOUT = timedelta(seconds=60)
current_power = proj_stat.get("power")
if current_power == PowerState.cooldown:
# If we just entered "cooldown", record the timestamp
if self._cooldown_since is None:
self._cooldown_since = now
else:
# Already in "cooldown"—check elapsed time
elapsed = now - self._cooldown_since
if elapsed > COOLDOWN_TIMEOUT:
# Timed out → treat as "off" now
proj_stat["power"] = PowerState.off
# Clear the timer so we don't re‐enter this block
self._cooldown_since = None
else:
return response.unwrap()
# If we're not in "cooldown" anymore, reset the timer
self._cooldown_since = None
return proj_stat
@projector_connection(endpoint="power-on", purpose="power on")
async def power_on(self) -> Result[None, str]:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment