Further state tracking fixes

This commit is contained in:
MTRNord 2023-01-29 18:43:39 +01:00
parent d23b54a772
commit f7312b8497
No known key found for this signature in database
1 changed files with 75 additions and 54 deletions

View File

@ -36,7 +36,6 @@ logger.level = logbook.INFO
# directory to store recordings
RECORDING_PATH = "./recordings/" # local directory
# TODO: do not rely on room id
UserID = str
ConfID = str
RoomID = str
@ -63,6 +62,7 @@ class Recorder:
client: AsyncClient
loop: asyncio.AbstractEventLoop
conf_room: Dict[ConfID, MatrixRoom]
recording_rooms: List[ConfID]
room_conf: Dict[RoomID, ConfID]
others: Dict[ConfID, List[Others]]
@ -75,6 +75,7 @@ class Recorder:
self.conf_room = {}
self.room_conf = {}
self.others = {}
self.recording_rooms = list()
self.party_id = "".join(
random.choices(string.ascii_letters + string.digits, k=8)
)
@ -222,32 +223,31 @@ class Recorder:
call_id = state["state_key"]
logger.info(f"Found call id {call_id} for {room.room_id}")
self.add_call(call_id, room)
await self.client.room_put_state(
room.room_id,
"org.matrix.msc3401.call.member",
{
"m.calls": [
{
# TODO: Invalid call_id
"m.call_id": self.room_conf[room.room_id],
"m.devices": [
{
"device_id": self.client.device_id,
"expires_ts": int(time.time() * 1000)
+ (1000 * 60 * 60),
"session_id": f"{self.client.user_id}_{self.client.device_id}_session",
"feeds": [
{
"purpose": "m.usermedia",
}
],
}
],
}
]
},
state_key=self.client.user_id,
)
await self.client.room_put_state(
room.room_id,
"org.matrix.msc3401.call.member",
{
"m.calls": [
{
"m.call_id": self.room_conf[room.room_id],
"m.devices": [
{
"device_id": self.client.device_id,
"expires_ts": int(time.time() * 1000)
+ (1000 * 60 * 60),
"session_id": f"{self.client.user_id}_{self.client.device_id}_session",
"feeds": [
{
"purpose": "m.usermedia",
}
],
}
],
}
]
},
state_key=self.client.user_id,
)
async def handle_call_invite(
self,
@ -290,26 +290,32 @@ class Recorder:
# output_tracks = {"audio": MediaBlackhole(), "video": MediaBlackhole()}
async def task() -> None:
if isinstance(event, CallInviteEvent):
conf_path = RECORDING_PATH
else:
conf_path = os.path.join(RECORDING_PATH, f"{event.conf_id}")
if not os.path.exists(conf_path):
os.mkdir(conf_path)
base_name_audio = f"{event.sender}_{event.call_id}"
base_name_video = f"{event.sender}_{event.call_id}"
if os.path.exists(os.path.join(RECORDING_PATH, f"{base_name_audio}.wav")):
if os.path.exists(os.path.join(conf_path, f"{base_name_audio}.wav")):
i = 1
while os.path.exists(
os.path.join(RECORDING_PATH, f"{base_name_audio}_{i}.wav")
os.path.join(conf_path, f"{base_name_audio}_{i}.wav")
):
i += 1
base_name_audio = f"{base_name_audio}_{i}"
if os.path.exists(os.path.join(RECORDING_PATH, f"{base_name_video}.mp4")):
if os.path.exists(os.path.join(conf_path, f"{base_name_video}.mp4")):
i = 1
while os.path.exists(
os.path.join(RECORDING_PATH, f"{base_name_video}_{i}.mp4")
os.path.join(conf_path, f"{base_name_video}_{i}.mp4")
):
i += 1
base_name_video = f"{base_name_video}_{i}"
wav_file = os.path.join(RECORDING_PATH, f"{base_name_audio}.wav")
mp4_file = os.path.join(RECORDING_PATH, f"{base_name_video}.mp4")
wav_file = os.path.join(conf_path, f"{base_name_audio}.wav")
mp4_file = os.path.join(conf_path, f"{base_name_video}.mp4")
audio_recorder = MediaRecorder(wav_file, format="wav")
audio_recorder.addTrack(input_tracks["audio"])
await audio_recorder.start()
@ -465,26 +471,30 @@ class Recorder:
)
await self.client.to_device(to_device_message)
if room:
await self.client.room_send(
room.room_id,
"m.room.message",
{
"msgtype": "m.notice",
"body": "Successfully started recording",
},
ignore_unverified_devices=True,
)
if event.call_id not in self.recording_rooms:
self.recording_rooms.append(event.call_id)
await self.client.room_send(
room.room_id,
"m.room.message",
{
"msgtype": "m.notice",
"body": "Successfully started recording",
},
ignore_unverified_devices=True,
)
logger.info(f"Sent answer to {event.sender} in {room.room_id}")
elif isinstance(event, ToDeviceCallInviteEvent):
await self.client.room_send(
self.conf_room[event.conf_id].room_id,
"m.room.message",
{
"msgtype": "m.notice",
"body": "Successfully started recording",
},
ignore_unverified_devices=True,
)
if event.conf_id not in self.recording_rooms:
self.recording_rooms.append(event.conf_id)
await self.client.room_send(
self.conf_room[event.conf_id].room_id,
"m.room.message",
{
"msgtype": "m.notice",
"body": "Successfully started recording",
},
ignore_unverified_devices=True,
)
logger.info(
f"Sent answer to {event.sender} with device {event.source['sender_device']}"
)
@ -494,8 +504,8 @@ class Recorder:
room: Optional[MatrixRoom],
event: Union[CallCandidatesEvent, ToDeviceCallCandidatesEvent],
) -> None:
logger.info("Delaying candidates for 3 seconds")
await asyncio.sleep(3)
# logger.info("Delaying candidates for 3 seconds")
# await asyncio.sleep(3)
if room:
logger.info(
f"Received call candidates from {event.sender} in {room.room_id}"
@ -507,6 +517,14 @@ class Recorder:
unique_id: UniqueCallID = (event.sender, event.call_id)
else:
unique_id: UniqueCallID = (event.sender, event.conf_id)
if event.conf_id not in self.conf_room:
logger.warning("Got invalid to-device call candidates")
return
while unique_id not in self.conns:
logger.info("Waiting for connection to be created")
await asyncio.sleep(0.2)
try:
conn = self.conns[unique_id]
except KeyError as e:
@ -518,7 +536,10 @@ class Recorder:
for raw_candidate in event.candidates:
if not raw_candidate.get("candidate"):
# End of candidates
conn.candidate_waiter.set_result(None)
try:
conn.candidate_waiter.set_result(None)
except asyncio.InvalidStateError:
pass
break
try:
candidate = candidate_from_aioice(