diff --git a/.gitignore b/.gitignore index 5107f1e..d421e83 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ credentials.json /recordings __pycache__ /dist +/test.png diff --git a/matrix_call_multitrack_recorder/__main__.py b/matrix_call_multitrack_recorder/__main__.py index 77d573d..281fff1 100644 --- a/matrix_call_multitrack_recorder/__main__.py +++ b/matrix_call_multitrack_recorder/__main__.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import asyncio +import uvloop import sys from logbook import Logger, StreamHandler @@ -29,15 +30,33 @@ async def main() -> None: if __name__ == "__main__": - try: - asyncio.get_event_loop().run_until_complete(main()) - except Exception: - if BOT: - asyncio.get_event_loop().run_until_complete(BOT.stop()) - logger.exception("Fatal Runtime error.") - sys.exit(1) - except KeyboardInterrupt: - if BOT: - asyncio.get_event_loop().run_until_complete(BOT.stop()) - print("Received keyboard interrupt.") - sys.exit(0) + + if sys.version_info >= (3, 11): + with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner: + try: + runner.get_loop().run_until_complete(main()) + except Exception: + if BOT: + runner.get_loop().run_until_complete(BOT.stop()) + logger.exception("Fatal Runtime error.") + sys.exit(1) + except KeyboardInterrupt: + if BOT: + runner.get_loop().run_until_complete(BOT.stop()) + print("Received keyboard interrupt.") + sys.exit(0) + + else: + uvloop.install() + try: + asyncio.get_event_loop().run_until_complete(main()) + except Exception: + if BOT: + asyncio.get_event_loop().run_until_complete(BOT.stop()) + logger.exception("Fatal Runtime error.") + sys.exit(1) + except KeyboardInterrupt: + if BOT: + asyncio.get_event_loop().run_until_complete(BOT.stop()) + print("Received keyboard interrupt.") + sys.exit(0) diff --git a/matrix_call_multitrack_recorder/bot.py b/matrix_call_multitrack_recorder/bot.py index 50649b4..6ad56cc 100644 --- a/matrix_call_multitrack_recorder/bot.py +++ b/matrix_call_multitrack_recorder/bot.py @@ -10,20 +10,21 @@ import sys from nio.events import to_device from nio import ( AsyncClient, - LoginResponse, - MatrixRoom, + ToDeviceCallAnswerEvent, CallInviteEvent, CallCandidatesEvent, CallHangupEvent, ToDeviceCallInviteEvent, ToDeviceCallCandidatesEvent, ToDeviceCallHangupEvent, - AsyncClientConfig, - InviteEvent, MSC3401CallEvent, CallMemberEvent, RoomMessageText, + MatrixRoom, + InviteEvent, ProfileGetDisplayNameResponse, + LoginResponse, + AsyncClientConfig, ) from logbook import Logger, StreamHandler @@ -77,6 +78,9 @@ class RecordingBot: self.client.add_to_device_callback( self.to_device_call_hangup, (ToDeviceCallHangupEvent,) # type: ignore ) + self.client.add_to_device_callback( + self.call_answer, (ToDeviceCallAnswerEvent,) # type: ignore + ) self.client.add_event_callback(self.cb_autojoin_room, InviteEvent) # type: ignore logger.info("Listening for calls") @@ -104,19 +108,14 @@ class RecordingBot: }, ignore_unverified_devices=True, ) + await self.client.update_receipt_marker(room.room_id, event.event_id) elif event.body.startswith("!stop"): await self.recorder.leave_call(room) - await self.client.room_send( - room.room_id, - "m.room.message", - { - "msgtype": "m.notice", - "body": "Recording stopped", - }, - ignore_unverified_devices=True, - ) + await self.client.update_receipt_marker(room.room_id, event.event_id) + elif event.body.startswith("!start"): await self.recorder.join_call(room) + await self.client.update_receipt_marker(room.room_id, event.event_id) asyncio.create_task(command_handling()) @@ -133,13 +132,17 @@ class RecordingBot: # logger.info(f"MSC3401 call member event: {event}") if not event.calls: await self.recorder.remove_connection(room) + self.recorder.remove_other(event.sender) for call in event.calls: for device in call["m.devices"]: if device["device_id"] != self.client.device_id: self.recorder.add_call(call["m.call_id"], room) self.recorder.track_others( - call["m.call_id"], device["device_id"], event.sender + call["m.call_id"], + device["device_id"], + event.sender, + device["session_id"], ) # TODO: Can I reuse the same connection? Do I have the info needed? Is it a new connection? How do I see if it changed? # asyncio.create_task(self.handle_call_invite(event, room)) @@ -153,6 +156,11 @@ class RecordingBot: asyncio.create_task(self.recorder.handle_call_invite(event, room)) + async def call_answer(self, event: ToDeviceCallAnswerEvent) -> None: + """Handles incoming call answers.""" + + asyncio.create_task(self.recorder.handle_call_answer(event)) + async def to_device_call_invite(self, event: CallInviteEvent) -> None: """Handles incoming call invites.""" logger.info("Received to-device call invite") diff --git a/matrix_call_multitrack_recorder/recorder.py b/matrix_call_multitrack_recorder/recorder.py index fe23c35..40c0a86 100644 --- a/matrix_call_multitrack_recorder/recorder.py +++ b/matrix_call_multitrack_recorder/recorder.py @@ -4,6 +4,7 @@ import asyncio from dataclasses import dataclass +from fractions import Fraction import os import random import string @@ -23,11 +24,28 @@ from nio import ( CallHangupEvent, ToDeviceCallInviteEvent, ToDeviceCallHangupEvent, + ToDeviceCallAnswerEvent, + # ToDeviceCallNegotiateEvent, + # CallNegotiateEvent, ) -from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack -from aiortc.contrib.media import MediaRecorder -from aiortc.rtcicetransport import candidate_from_aioice +from aiortc import ( + RTCPeerConnection, + RTCSessionDescription, + MediaStreamTrack, + RTCIceGatherer, + RTCIceCandidate, +) +from aiortc.contrib.media import MediaRecorder, MediaPlayer, MediaStreamError, Frame +from aiortc.rtcicetransport import candidate_from_aioice, candidate_to_aioice from aioice.candidate import Candidate +import av +from av.filter import Filter, Graph + +# import logging + +# logging.basicConfig(level=logging.INFO) +# logging.getLogger("libav").setLevel(logging.DEBUG) + StreamHandler(sys.stdout).push_application() logger = Logger(__name__) @@ -45,8 +63,8 @@ UniqueCallID = Tuple[UserID, ConfID] @dataclass class WrappedConn: pc: RTCPeerConnection - prepare_waiter: asyncio.Future - candidate_waiter: asyncio.Future + prepare_waiter: Optional[asyncio.Future] + candidate_waiter: Optional[asyncio.Future] room_id: Optional[str] @@ -54,10 +72,60 @@ class WrappedConn: class Others: user_id: str device_id: str + session_id: str + + +class ProxyTrack(MediaStreamTrack): + _source: MediaStreamTrack + _source_wait: Optional[asyncio.Future] + _graph: Optional[Graph] + + def __init__( + self, + source: MediaPlayer, + ) -> None: + super().__init__() + self.kind = source.video.kind + self._source = source.video + self._graph = None + + async def start(self, frame: Frame) -> None: + self._graph = Graph() + graph_source = self._graph.add_buffer(template=frame) + + graph_filter = self._graph.add( + "drawtext", + r"text='Recording Duration: %{pts:gmtime:0:%H\:%M\:%S}':x=(w-text_w)/2:y=(h-text_h)/2:fontcolor=white:fontsize=128", + ) + graph_sink = self._graph.add("buffersink") + + graph_source.link_to(graph_filter, 0, 0) + graph_filter.link_to(graph_sink, 0, 0) + self._graph.configure() + + async def recv(self) -> Frame: + try: + frame = await self._source.recv() + if self._graph is None: + await self.start(frame) + self._graph.push(frame) + filtered_frame = self._graph.pull() + return filtered_frame + except MediaStreamError as e: + frame = await self._source.recv() + logger.warning(f"Error in recv: {e}") + return frame + except Exception as e: + logger.warning(f"Error in recv: {e}") + return await self._source.recv() + + def stop(self) -> None: + self._source.stop() + super().stop() class Recorder: - conns: Dict[UniqueCallID, WrappedConn] + __conns: Dict[UniqueCallID, WrappedConn] party_id: str client: AsyncClient loop: asyncio.AbstractEventLoop @@ -65,16 +133,27 @@ class Recorder: recording_rooms: List[ConfID] room_conf: Dict[RoomID, ConfID] others: Dict[ConfID, List[Others]] + output_track: ProxyTrack def __init__(self, client) -> None: self.client = client self.loop = asyncio.get_event_loop() async def start(self) -> None: - self.conns = {} + self.__conns = {} self.conf_room = {} self.room_conf = {} self.others = {} + self.output_track = ProxyTrack( + MediaPlayer( + "./test.png", + options={ + "loop": "1", + "framerate": "1", + }, + ) + ) + self.recording_rooms = list() self.party_id = "".join( random.choices(string.ascii_letters + string.digits, k=8) @@ -87,115 +166,103 @@ class Recorder: async def stop(self) -> None: logger.info("Stopping recording handler") - for (_, conf_or_call_id), conn in self.conns.items(): - - if conn.room_id: - hangup = { - "content": { - "call_id": conf_or_call_id, - "version": 1, - "party_id": self.party_id, - } - } - # We are lazy and send it to the room and as to_device message - await self.client.room_send( - conn.room_id, - "m.call.hangup", - hangup, - ignore_unverified_devices=True, - ) - else: - hangup = { - "content": { - "call_id": conf_or_call_id, - "version": "1", - "party_id": self.party_id, - } - } - if conf_or_call_id in self.others: - # Send it as to_device message - others = self.others[conf_or_call_id] - - for data in others: - message = ToDeviceMessage( - "m.call.hangup", - data.user_id, - data.device_id, - hangup, - ) - await self.client.to_device(message) - await self.client.room_put_state( - self.conf_room[conf_or_call_id].room_id, - "org.matrix.msc3401.call.member", - { - "m.calls": [ - { - "m.call_id": conf_or_call_id, - "m.devices": [], - } - ] - }, - state_key=self.client.user_id, - ) - await conn.pc.close() + for (_, conf_or_call_id), conn in self.__conns.items(): + await self.hangup(conf_or_call_id, conn) def add_call(self, conf_id: ConfID, room: MatrixRoom) -> None: logger.info(f"Adding conf {conf_id} to room {room.room_id}") self.conf_room[conf_id] = room self.room_conf[room.room_id] = conf_id + async def hangup(self, conf_or_call_id: ConfID, conn: WrappedConn) -> None: + hangup = { + "call_id": conf_or_call_id, + "version": "1", + "party_id": self.party_id, + "conf_id": conf_or_call_id, + } + if conn.room_id: + # We are lazy and send it to the room and as to_device message + await self.client.room_send( + conn.room_id, + "m.call.hangup", + hangup, + ignore_unverified_devices=True, + ) + if conf_or_call_id in self.others: + # Send it as to_device message + others = self.others[conf_or_call_id] + + for data in others: + if data.user_id == self.client.user_id: + continue + + message = ToDeviceMessage( + "m.call.hangup", + data.user_id, + data.device_id, + hangup, + ) + logger.info("Sending hangup") + await self.client.to_device(message) + await self.client.room_put_state( + self.conf_room[conf_or_call_id].room_id, + "org.matrix.msc3401.call.member", + {"m.calls": []}, + state_key=self.client.user_id, + ) + + await conn.pc.close() + async def leave_call(self, room: MatrixRoom) -> None: if room.room_id in self.room_conf: conf_id = self.room_conf[room.room_id] + for (user_id, conf_or_call_id), conn in list(self.__conns.items()): + if conf_or_call_id == conf_id: + await self.hangup(conf_or_call_id, conn) + del self.__conns[(user_id, conf_or_call_id)] + if conf_id in self.conf_room: del self.conf_room[conf_id] del self.room_conf[room.room_id] + self.output_track = ProxyTrack( + MediaPlayer( + "./test.png", + options={ + "loop": "1", + "framerate": "1", + }, + ) + ) - for (user_id, conf_or_call_id), conn in list(self.conns.items()): - if conf_or_call_id in self.others: - others = self.others[conf_or_call_id] - for data in others: - hangup_message = ToDeviceMessage( - "m.call.hangup", - data.user_id, - data.device_id, - { - "call_id": conf_id, - "version": "1", - "party_id": self.party_id, - }, - ) - logger.info(f"Sending hangup {hangup_message}") - await self.client.to_device(hangup_message) - - await conn.pc.close() - del self.conns[(user_id, conf_or_call_id)] - await self.client.room_put_state( + await self.client.room_send( room.room_id, - "org.matrix.msc3401.call.member", + "m.room.message", { - "m.calls": [ - { - "m.call_id": conf_id, - "m.devices": [], - } - ] + "msgtype": "m.notice", + "body": "Recording stopped", }, - state_key=self.client.user_id, + ignore_unverified_devices=True, ) async def remove_connection(self, room: MatrixRoom) -> None: if room.room_id in self.room_conf: call_id = self.room_conf[room.room_id] - if (self.client.user_id, call_id) in self.conns: - del self.conns[(self.client.user_id, call_id)] + if (self.client.user_id, call_id) in self.__conns: + del self.__conns[(self.client.user_id, call_id)] del self.room_conf[room.room_id] del self.conf_room[call_id] - def track_others(self, conf_id: str, device_id: str, user_id: str) -> None: + def track_others( + self, conf_id: str, device_id: str, user_id: str, session_id: str + ) -> None: if conf_id not in self.others: self.others[conf_id] = list() - self.others[conf_id].append(Others(user_id, device_id)) + self.others[conf_id].append(Others(user_id, device_id, session_id)) + + def remove_other(self, user_id: str): + for other in self.others.values(): + other[:] = [o for o in other if o.user_id != user_id] async def join_call(self, room: MatrixRoom) -> None: if room.room_id not in self.room_conf: @@ -208,7 +275,7 @@ class Recorder: return # Find "org.matrix.msc3401.call" state event - state = next( + call_state = next( ( event for event in room_state.events @@ -216,37 +283,233 @@ class Recorder: ), None, ) - if state is None: + if call_state is None: logger.warning(f"No call state event in {room.room_id}") return - call_id = state["state_key"] + + member_states = [ + event + for event in room_state.events + if event["type"] == "org.matrix.msc3401.call.member" + ] + + call_id = call_state["state_key"] logger.info(f"Found call id {call_id} for {room.room_id}") self.add_call(call_id, room) - await self.client.room_put_state( - room.room_id, - "org.matrix.msc3401.call.member", - { - "m.calls": [ - { - "m.call_id": self.room_conf[room.room_id], - "m.devices": [ - { - "device_id": self.client.device_id, - "expires_ts": int(time.time() * 1000) - + (1000 * 60 * 60), - "session_id": f"{self.client.user_id}_{self.client.device_id}_session", - "feeds": [ - { - "purpose": "m.usermedia", - } - ], - } - ], - } - ] - }, - state_key=self.client.user_id, + for member in member_states: + for call in member["content"]["m.calls"]: + for device in call["m.devices"]: + self.track_others( + call["m.call_id"], + device["device_id"], + member["sender"], + device["session_id"], + ) + + logger.info(f"Joining call in {room.room_id}") + await self.client.room_put_state( + room.room_id, + "org.matrix.msc3401.call.member", + {"m.calls": []}, + state_key=self.client.user_id, + ) + + # Send offer to others + conf_id = self.room_conf[room.room_id] + logger.info(f"Sending offer to others: {conf_id}") + if self.room_conf[room.room_id] in self.others: + # Send it as to_device message + others = self.others[conf_id] + logger.info(f"Sending offers to {others}") + + call_id = "".join(random.choices(string.ascii_letters + string.digits, k=8)) + for data in others: + if data.user_id == self.client.user_id: + continue + + logger.info(f"Making offer for {data.user_id}") + # Create offer + pc = RTCPeerConnection() + logger.info(f"Started ice for {call_id}") + unique_id = (data.user_id, self.room_conf[room.room_id]) + conn = self.__conns[unique_id] = WrappedConn( + pc=pc, + candidate_waiter=None, + prepare_waiter=None, + room_id=room.room_id if room else None, + ) + logger.info(f"Created connection {unique_id}") + + # conn.pc.addTransceiver("video", direction="recvonly") + # conn.pc.addTransceiver("audio", direction="recvonly") + + conn.pc.addTrack(self.output_track) + + offer = await conn.pc.createOffer() + await conn.pc.setLocalDescription(offer) + + logger.info(f"Got local candidates for {call_id}") + candidates: List[Tuple[RTCIceCandidate, str]] = [] + for transceiver in conn.pc.getTransceivers(): + gatherer: RTCIceGatherer = ( + transceiver.sender.transport.transport.iceGatherer + ) + # await gatherer.gather() + for candidate in gatherer.getLocalCandidates(): + candidate.sdpMid = transceiver.mid + candidates.append( + ( + candidate, + str(gatherer.getLocalParameters().usernameFragment), + ) + ) + + # @conn.pc.on("connectionstatechange") + # async def on_connectionstatechange() -> None: + # if conn.pc.connectionState == "failed": + # logger.info(f"Connection {unique_id} failed") + # await conn.pc.close() + # self.__conns.pop(unique_id, None) + + offer_message = ToDeviceMessage( + "m.call.invite", + recipient=data.user_id, + recipient_device=data.device_id, + content={ + "lifetime": 60000, + "invitee": data.user_id, + "offer": { + "sdp": conn.pc.localDescription.sdp, + "type": conn.pc.localDescription.type, + }, + "version": "1", + "conf_id": conf_id, + "call_id": call_id, + "party_id": self.party_id, + "seq": 0, + "device_id": self.client.device_id, + "sender_session_id": f"{self.client.user_id}_{self.client.device_id}_session", + "dest_session_id": data.session_id, + "capabilities": { + "m.call.transferee": False, + "m.call.dtmf": False, + }, + }, + ) + await self.client.to_device(offer_message) + + logger.info(f"Sending candidates to {data.user_id} {data.device_id}") + candidates_message = ToDeviceMessage( + "m.call.candidates", + recipient=data.user_id, + recipient_device=data.device_id, + content={ + "candidates": [ + { + "candidate": f"candidate:{candidate_to_aioice(c).to_sdp()}", + "sdpMid": c.sdpMid, + # "sdpMLineIndex": c.sdpMLineIndex, + "usernameFragment": usernameFragment, + } + for (c, usernameFragment) in candidates + ], + "call_id": call_id, + "party_id": self.party_id, + "version": "1", + "seq": 1, + "conf_id": conf_id, + "device_id": self.client.device_id, + "sender_session_id": f"{self.client.user_id}_{self.client.device_id}_session", + "dest_session_id": data.session_id, + }, + ) + await self.client.to_device(candidates_message) + await asyncio.sleep(0) + + # async def handle_negotiation( + # self, + # room: Optional[MatrixRoom], + # event: [ToDeviceCallNegotiateEvent, CallNegotiateEvent], + # ): + # pass + + async def handle_call_answer(self, event: ToDeviceCallAnswerEvent): + logger.info(f"Received call answer from {event.sender}") + + unique_id: UniqueCallID = (event.sender, event.conf_id) + logger.info(f"Received call answer for {unique_id}") + + if event.conf_id not in self.conf_room: + logger.warning("Got invalid to-device call answer") + return + + while unique_id not in self.__conns: + logger.debug("Waiting for connection of answer to be created") + await asyncio.sleep(0.2) + + try: + conn = self.__conns[unique_id] + except KeyError: + logger.warning("Received answer for unknown call") + return + + logger.info(f"Setting remote description for {unique_id}") + await conn.pc.setRemoteDescription( + RTCSessionDescription( + sdp=str(event.answer.get("sdp")), type=str(event.answer.get("type")) ) + ) + + others = self.others[event.conf_id] + data = next((x for x in others if x.user_id == event.sender), None) + + if not data: + logger.warning("Received answer for unknown call") + return + + message = ToDeviceMessage( + "m.call.select_answer", + recipient=event.sender, + recipient_device=event.source["content"]["device_id"], + content={ + "selected_party_id": event.party_id, + "call_id": event.call_id, + "party_id": self.party_id, + "version": "1", + "seq": 2, + "conf_id": event.conf_id, + "device_id": self.client.device_id, + "sender_session_id": f"{self.client.user_id}_{self.client.device_id}_session", + "dest_session_id": data.session_id, + }, + ) + await self.client.to_device(message) + + await self.client.room_put_state( + self.conf_room[event.conf_id].room_id, + "org.matrix.msc3401.call.member", + { + "m.calls": [ + { + "m.call_id": event.conf_id, + "m.devices": [ + { + "device_id": self.client.device_id, + "expires_ts": int(time.time() * 1000) + + (1000 * 60 * 60), + "session_id": f"{self.client.user_id}_{self.client.device_id}_session", + "feeds": [ + { + "purpose": "m.usermedia", + } + ], + } + ], + } + ] + }, + state_key=self.client.user_id, + ) async def handle_call_invite( self, @@ -278,7 +541,7 @@ class Recorder: unique_id: UniqueCallID = (event.sender, event.call_id) else: unique_id: UniqueCallID = (event.sender, event.conf_id) - conn = self.conns[unique_id] = WrappedConn( + conn = self.__conns[unique_id] = WrappedConn( pc=pc, candidate_waiter=self.loop.create_future(), prepare_waiter=self.loop.create_future(), @@ -286,7 +549,6 @@ class Recorder: ) logger.info("Adding tracks") input_tracks = {} - # output_tracks = {"audio": MediaBlackhole(), "video": MediaBlackhole()} async def task() -> None: if isinstance(event, CallInviteEvent): @@ -344,7 +606,7 @@ class Recorder: async def on_connectionstatechange() -> None: if pc.connectionState == "failed": await pc.close() - self.conns.pop(unique_id, None) + self.__conns.pop(unique_id, None) if pc.connectionState == "connected": asyncio.create_task(task()) @@ -352,78 +614,31 @@ class Recorder: def on_track(track: MediaStreamTrack) -> None: input_tracks[track.kind] = track - # pc.addTrack(output_tracks[track.kind]) - logger.info("Waiting for prepare") await pc.setRemoteDescription(offer) - conn.prepare_waiter.set_result(None) + + logger.info("Ready to receive candidates") + if conn.prepare_waiter: + conn.prepare_waiter.set_result(None) if room and isinstance(event, CallInviteEvent): logger.info("Sending receipt") await self.client.update_receipt_marker(room.room_id, event.event_id) - logger.info("Waiting for candidates") - await conn.candidate_waiter + if conn.candidate_waiter: + await conn.candidate_waiter + logger.info("Got candidates") logger.info("Creating answer") answer = await pc.createAnswer() if not answer: logger.warning("Failed to create answer") await pc.close() - self.conns.pop(unique_id, None) + self.__conns.pop(unique_id, None) if room: - await self.client.room_send( - room.room_id, - "m.call.hangup", - { - "call_id": event.call_id, - "version": "1", - "party_id": self.party_id, - "reason": "ice_failed", - }, - ) - await self.client.room_send( - room.room_id, - "m.room.message", - { - "msgtype": "m.notice", - "body": f"Call with {event.sender} failed", - }, - ) + await self.hangup(event.call_id, conn) elif isinstance(event, ToDeviceCallInviteEvent): - if unique_id not in self.conns: - return - if event.conf_id not in self.others: - return - else: - data = self.others[event.conf_id] - user_data: Optional[Others] = None - # Find user in data - for user in data: - if user.user_id == event.sender: - user_data = user - break - - message = ToDeviceMessage( - type="m.call.hangup", - recipient=event.sender, - recipient_device=user_data.device_id, # type: ignore - content={ - "call_id": event.call_id, - "version": "1", - "party_id": self.party_id, - "reason": "ice_failed", - }, - ) - await self.client.to_device(message) - await self.client.room_send( - self.conf_room[event.conf_id].room_id, - "m.room.message", - { - "msgtype": "m.notice", - "body": f"Call with {event.sender} failed", - }, - ) + await self.hangup(event.conf_id, conn) return await pc.setLocalDescription(answer) @@ -434,8 +649,7 @@ class Recorder: "version": "1", "party_id": self.party_id, "answer": { - # "type": pc.localDescription.type, - "type": "answer", + "type": pc.localDescription.type, "sdp": pc.localDescription.sdp, }, } @@ -453,8 +667,7 @@ class Recorder: "m.call.dtmf": False, }, "answer": { - # "type": pc.localDescription.type, - "type": "answer", + "type": pc.localDescription.type, "sdp": pc.localDescription.sdp, }, "device_id": self.client.device_id, @@ -503,8 +716,6 @@ class Recorder: room: Optional[MatrixRoom], event: Union[CallCandidatesEvent, ToDeviceCallCandidatesEvent], ) -> None: - # logger.info("Delaying candidates for 3 seconds") - # await asyncio.sleep(3) if room: logger.info( f"Received call candidates from {event.sender} in {room.room_id}" @@ -520,23 +731,26 @@ class Recorder: if event.conf_id not in self.conf_room: logger.warning("Got invalid to-device call candidates") return - while unique_id not in self.conns: - logger.info("Waiting for connection to be created") + while unique_id not in self.__conns: + logger.debug("Waiting for connection to be created") await asyncio.sleep(0.2) try: - conn = self.conns[unique_id] + conn = self.__conns[unique_id] except KeyError: logger.warning("Received candidates for unknown call") return + logger.info("Waiting for prepare") - await conn.prepare_waiter + if conn.prepare_waiter: + await conn.prepare_waiter logger.info("Adding candidates") for raw_candidate in event.candidates: if not raw_candidate.get("candidate"): # End of candidates try: - conn.candidate_waiter.set_result(None) + if conn.candidate_waiter: + conn.candidate_waiter.set_result(None) except asyncio.InvalidStateError: pass break @@ -554,7 +768,8 @@ class Recorder: await conn.pc.addIceCandidate(candidate) logger.info("Done adding candidates") try: - conn.candidate_waiter.set_result(None) + if conn.candidate_waiter: + conn.candidate_waiter.set_result(None) except asyncio.InvalidStateError: pass if room and isinstance(event, CallCandidatesEvent): @@ -565,14 +780,30 @@ class Recorder: room: Optional[MatrixRoom], event: Union[CallHangupEvent, ToDeviceCallHangupEvent], ) -> None: + reason = None + if "reason" in event.source["content"]: + reason = event.source["content"]["reason"] if room: - logger.info(f"Received call hangup from {event.sender} in {room.room_id}") + logger.info( + f"Received call hangup from {event.sender} in {room.room_id} with reason {reason}" + ) + else: + logger.info( + f"Received call hangup from {event.sender} with reason {reason}" + ) + if event.sender == self.client.user_id: + return + + # TODO: This is incorrect: + if reason == "replaced": + logger.warning("Call was replaced but we ignore that for now") + return try: if isinstance(event, CallHangupEvent): unique_id: UniqueCallID = (event.sender, event.call_id) else: unique_id: UniqueCallID = (event.sender, event.conf_id) - await self.conns.pop(unique_id).pc.close() + await self.__conns.pop(unique_id).pc.close() if room and isinstance(event, CallHangupEvent): await self.client.update_receipt_marker(room.room_id, event.event_id) diff --git a/pyproject.toml b/pyproject.toml index 88d2ad9..8eaf2a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,11 +24,12 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] dependencies = [ - "matrix-nio[e2e] @ git+https://github.com/MTRNord/matrix-nio@41e90f3f64bf43f4d3cbaa2b55652173073e5511", + "matrix-nio[e2e] @ git+https://github.com/MTRNord/matrix-nio@e0e130ec70784b46d0a87f9f9bb1b8df3255de2f", "aioice", "aiortc", "asyncio", - "logbook" + "logbook", + "uvloop" ] dynamic = ["version"]