From 914eeb855af00316bc10ef75feb2872e5230ea3c Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Tue, 15 Aug 2023 17:11:09 -0400 Subject: [PATCH 1/9] Added wait_for_service --- ros_acomms/src/message_queue_node.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index e537b8f1..22da15de 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -399,11 +399,13 @@ class MessageQueueNode(object): dest_address=params.destination, message_codec=message_codecs[params.codec_id], ) + rospy.wait_for_service(service_name, timeout=2) self.dynamic_get_next_message_services[service_name] = rospy.ServiceProxy(service_name, - GetNextQueuedMessage) + GetNextQueuedMessage) self.dynamic_queue_update_publishers[service_name] = rospy.Publisher( update_topic_name, UpdateQueuedMessage, queue_size=10) - + except rospy.ROSException as exception: + rospy.logerr(f"Timed out waiting for {service_name}: {exception}") except Exception as e: rospy.logerr(f"Error setting up dynamic queue for {service_name}: {e}") -- GitLab From e9754ff183bf59f623064c91cf7d7762ebafc8d3 Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Tue, 15 Aug 2023 18:10:14 -0400 Subject: [PATCH 2/9] Added system for max failed attempts for dynamic queue update services --- ros_acomms/src/message_queue_node.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index 22da15de..f2960a3c 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -275,8 +275,7 @@ class MessageQueueNode(object): # Init dynamic queues self.dynamic_queues = {} - self.dynamic_get_next_message_services = {} - self.dynamic_queue_update_publishers = {} + self.dynamic_queue_intrinsics = {} # This needs to be called before any subscribers are created self.setup_queues() @@ -301,6 +300,8 @@ class MessageQueueNode(object): # Subscribe to ACK message to update queues rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received) + self.max_failed_attempts = rospy.get_param('~dynamic_queue_service_max_failed_attempts', 5) + rospy.loginfo("Message Queue started") self.spin() @@ -398,14 +399,8 @@ class MessageQueueNode(object): dynamic_update_topic=update_topic_name, dest_address=params.destination, message_codec=message_codecs[params.codec_id], ) + self.dynamic_queue_intrinsics[service_name] = {"service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), "failed_attempts": 0} - rospy.wait_for_service(service_name, timeout=2) - self.dynamic_get_next_message_services[service_name] = rospy.ServiceProxy(service_name, - GetNextQueuedMessage) - self.dynamic_queue_update_publishers[service_name] = rospy.Publisher( - update_topic_name, UpdateQueuedMessage, queue_size=10) - except rospy.ROSException as exception: - rospy.logerr(f"Timed out waiting for {service_name}: {exception}") except Exception as e: rospy.logerr(f"Error setting up dynamic queue for {service_name}: {e}") @@ -540,11 +535,11 @@ class MessageQueueNode(object): # ... but we do track it for dynamic queues if isinstance(msg.queue, DynamicQueue): - if msg.queue.dynamic_message_service in self.dynamic_queue_update_publishers: + if msg.queue.dynamic_message_service in self.dynamic_queue_intrinsics.keys(): update_msg = UpdateQueuedMessage(header=Header(stamp=rospy.Time.now()), message_id=msg.message_id_in_queue, event=UpdateQueuedMessage.EVENT_LINK_TRANSMITTED) - self.dynamic_queue_update_publishers[msg.dynamic_queue_name].publish(update_msg) + self.dynamic_queue_intrinsics[msg.dynamic_queue_name]["publisher"].publish(update_msg) rospy.logdebug(f"Marked message as transmitted: {msg.message_id_in_queue} on {msg.queue.dynamic_message_service}") def get_message_from_queue_response(self, dynamic_queue, queue_response: GetNextQueuedMessageResponse) -> QueuedMessage: @@ -581,7 +576,7 @@ class MessageQueueNode(object): m.dynamic_queue_name == dynamic_queue_name] # Get a reference to the service - dynamic_get_next_queued_message = self.dynamic_get_next_message_services[dynamic_queue_name] + dynamic_get_next_queued_message = self.dynamic_queue_intrinsics[dynamic_queue_name]["service"] try: queue_response = dynamic_get_next_queued_message(max_size_in_bits=max_size_in_bits, @@ -595,6 +590,11 @@ class MessageQueueNode(object): # no messages queue_response = GetNextQueuedMessageResponse(has_message=False) + self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] += 1 + + if self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] > self.max_failed_attempts: + del self.dynamic_queue_intrinsics[dynamic_queue_name] + # Cache the response self.current_packet_dynamic_queue_cache[dynamic_queue_name] = queue_response -- GitLab From d014ffdb36ca5319f4925dd29c86774130c925a3 Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Tue, 15 Aug 2023 18:31:32 -0400 Subject: [PATCH 3/9] Added better logging and trigger deletion from dynamic_queues dict --- ros_acomms/src/message_queue_node.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index f2960a3c..3a1dea99 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -585,15 +585,18 @@ class MessageQueueNode(object): str(packet_codec) if packet_codec else ''), exclude_message_ids=this_queue_exclusion_ids) except rospy.service.ServiceException as e: - rospy.logerr_throttle(1, f"Error calling get_next_message service on {dynamic_queue_name}: {e}") - # if we have an error, don't try again for this packet (pretend it told us that there are - # no messages - queue_response = GetNextQueuedMessageResponse(has_message=False) - self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] += 1 - if self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] > self.max_failed_attempts: + if self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts: del self.dynamic_queue_intrinsics[dynamic_queue_name] + del self.dynamic_queues[dynamic_queue_name] + + rospy.logerr(f"Failed to call {dynamic_queue_name} {self.max_failed_attempts} times, will not retry.") + return + else: + rospy.logwarn(f"Error calling get_next_message service on {dynamic_queue_name}: {e}") + + queue_response = GetNextQueuedMessageResponse(has_message=False) # Cache the response self.current_packet_dynamic_queue_cache[dynamic_queue_name] = queue_response -- GitLab From aaf785fedc68d8fe6e43f6e16b673707d7dbb3ff Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Wed, 16 Aug 2023 10:45:38 -0400 Subject: [PATCH 4/9] Combined dynamic queue object into single nested dict --- ros_acomms/src/message_queue_node.py | 41 ++++++++++++++-------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index 3a1dea99..fb7635c9 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -34,9 +34,6 @@ from rospy_yaml_include.yaml_include import RospyYamlInclude from version_node import version_node -#TODO: move this -from packet_dispatch_node import DecoderListEntry - from codec_config_parser import (QueueParams, get_queue_params, get_message_codecs) @@ -275,7 +272,6 @@ class MessageQueueNode(object): # Init dynamic queues self.dynamic_queues = {} - self.dynamic_queue_intrinsics = {} # This needs to be called before any subscribers are created self.setup_queues() @@ -394,12 +390,18 @@ class MessageQueueNode(object): update_topic_name = rospy.names.canonicalize_name(params.dynamic_update_topic) rospy.loginfo(f"Initialized dynamic queue ID {queue_id} for {service_name}") try: - self.dynamic_queues[service_name] = DynamicQueue(message_codec_id=queue_id, - dynamic_message_service=service_name, - dynamic_update_topic=update_topic_name, - dest_address=params.destination, - message_codec=message_codecs[params.codec_id], ) - self.dynamic_queue_intrinsics[service_name] = {"service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), "failed_attempts": 0} + self.dynamic_queues[service_name] = { + "object": + DynamicQueue(message_codec_id=queue_id, + dynamic_message_service=service_name, + dynamic_update_topic=update_topic_name, + dest_address=params.destination, + message_codec=message_codecs[params.codec_id], + ), + "service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), + "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), + "failed_attempts": 0 + } except Exception as e: rospy.logerr(f"Error setting up dynamic queue for {service_name}: {e}") @@ -535,24 +537,24 @@ class MessageQueueNode(object): # ... but we do track it for dynamic queues if isinstance(msg.queue, DynamicQueue): - if msg.queue.dynamic_message_service in self.dynamic_queue_intrinsics.keys(): + if msg.queue.dynamic_message_service in self.dynamic_queues.keys(): update_msg = UpdateQueuedMessage(header=Header(stamp=rospy.Time.now()), message_id=msg.message_id_in_queue, event=UpdateQueuedMessage.EVENT_LINK_TRANSMITTED) - self.dynamic_queue_intrinsics[msg.dynamic_queue_name]["publisher"].publish(update_msg) + self.dynamic_queues[msg.dynamic_queue_name]["publisher"].publish(update_msg) rospy.logdebug(f"Marked message as transmitted: {msg.message_id_in_queue} on {msg.queue.dynamic_message_service}") def get_message_from_queue_response(self, dynamic_queue, queue_response: GetNextQueuedMessageResponse) -> QueuedMessage: message_bits = BitArray(bytes=queue_response.data, length=queue_response.data_size_in_bits) - next_message = QueuedMessage(queue=self.dynamic_queues[dynamic_queue], + next_message = QueuedMessage(queue=self.dynamic_queues[dynamic_queue]["object"], message=None, - message_codec=self.dynamic_queues[dynamic_queue].message_codec, + message_codec=self.dynamic_queues[dynamic_queue]["object"].message_codec, dest=120, time_added=datetime.utcnow(), fragmentation_tracker=None, allow_fragmentation=False, - dynamic_queue_name=self.dynamic_queues[dynamic_queue].dynamic_message_service, + dynamic_queue_name=self.dynamic_queues[dynamic_queue]["object"].dynamic_message_service, message_id_in_queue=queue_response.message_id, payload_bits=message_bits, length_bits=len(message_bits)) @@ -566,7 +568,7 @@ class MessageQueueNode(object): tuple[str, int]]: start_time = time.time() # for profiling - for dynamic_queue in self.dynamic_queues.values(): + for dynamic_queue in [dynamic_queues["object"] for dynamic_queues in self.dynamic_queues.values()]: dynamic_queue_name = dynamic_queue.dynamic_message_service # Check to see if we already have a cached response from the last time we did this. cached_response = self.current_packet_dynamic_queue_cache.get(dynamic_queue_name) @@ -576,7 +578,7 @@ class MessageQueueNode(object): m.dynamic_queue_name == dynamic_queue_name] # Get a reference to the service - dynamic_get_next_queued_message = self.dynamic_queue_intrinsics[dynamic_queue_name]["service"] + dynamic_get_next_queued_message = self.dynamic_queues[dynamic_queue_name]["service"] try: queue_response = dynamic_get_next_queued_message(max_size_in_bits=max_size_in_bits, @@ -585,10 +587,9 @@ class MessageQueueNode(object): str(packet_codec) if packet_codec else ''), exclude_message_ids=this_queue_exclusion_ids) except rospy.service.ServiceException as e: - self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] += 1 + self.dynamic_queues[dynamic_queue_name]["failed_attempts"] += 1 - if self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts: - del self.dynamic_queue_intrinsics[dynamic_queue_name] + if self.dynamic_queues[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts: del self.dynamic_queues[dynamic_queue_name] rospy.logerr(f"Failed to call {dynamic_queue_name} {self.max_failed_attempts} times, will not retry.") -- GitLab From 94f27007ed2b9398ec64c1a869716862aababf29 Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Thu, 24 Aug 2023 11:26:46 -0400 Subject: [PATCH 5/9] Added inactive_queue dict to revive dead queues --- ros_acomms/src/message_queue_node.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index fb7635c9..11944deb 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -25,7 +25,7 @@ from threading import Lock import sys import os import signal - +import rosservice from ltcodecs import RosMessageCodec, EncodingFailed from acomms_codecs.ros_packet_codec import RosPacketCodec from acomms_codecs.packet_codecs import packet_codecs @@ -273,6 +273,8 @@ class MessageQueueNode(object): # Init dynamic queues self.dynamic_queues = {} + self.inactive_dynamic_queues = {} + # This needs to be called before any subscribers are created self.setup_queues() @@ -568,6 +570,14 @@ class MessageQueueNode(object): tuple[str, int]]: start_time = time.time() # for profiling + if len(self.inactive_dynamic_queues) > 0: + services = rosservice.get_service_list() + for queue in self.inactive_dynamic_queues: + if self.inactive_dynamic_queues[queue]["service"] in services: + rospy.loginfo(f"Reactivating dynamic queue {queue}") + self.dynamic_queues[queue] = self.inactive_dynamic_queues[queue] + del self.inactive_dynamic_queues[queue] + for dynamic_queue in [dynamic_queues["object"] for dynamic_queues in self.dynamic_queues.values()]: dynamic_queue_name = dynamic_queue.dynamic_message_service # Check to see if we already have a cached response from the last time we did this. @@ -590,6 +600,8 @@ class MessageQueueNode(object): self.dynamic_queues[dynamic_queue_name]["failed_attempts"] += 1 if self.dynamic_queues[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts: + self.dynamic_queues[dynamic_queue_name]["failed_attempts"] = 0 + self.inactive_dynamic_queues[dynamic_queue_name] = self.dynamic_queues[dynamic_queue_name] del self.dynamic_queues[dynamic_queue_name] rospy.logerr(f"Failed to call {dynamic_queue_name} {self.max_failed_attempts} times, will not retry.") -- GitLab From dc9aa015869705bd8d8750cb00c461b32fca6046 Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Thu, 24 Aug 2023 16:17:12 -0400 Subject: [PATCH 6/9] Cleaned up the way that services are checked --- ros_acomms/src/message_queue_node.py | 33 ++++++++-------------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index 11944deb..934584df 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -273,8 +273,6 @@ class MessageQueueNode(object): # Init dynamic queues self.dynamic_queues = {} - self.inactive_dynamic_queues = {} - # This needs to be called before any subscribers are created self.setup_queues() @@ -298,8 +296,6 @@ class MessageQueueNode(object): # Subscribe to ACK message to update queues rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received) - self.max_failed_attempts = rospy.get_param('~dynamic_queue_service_max_failed_attempts', 5) - rospy.loginfo("Message Queue started") self.spin() @@ -402,7 +398,6 @@ class MessageQueueNode(object): ), "service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), - "failed_attempts": 0 } except Exception as e: @@ -570,16 +565,16 @@ class MessageQueueNode(object): tuple[str, int]]: start_time = time.time() # for profiling - if len(self.inactive_dynamic_queues) > 0: - services = rosservice.get_service_list() - for queue in self.inactive_dynamic_queues: - if self.inactive_dynamic_queues[queue]["service"] in services: - rospy.loginfo(f"Reactivating dynamic queue {queue}") - self.dynamic_queues[queue] = self.inactive_dynamic_queues[queue] - del self.inactive_dynamic_queues[queue] + services = rosservice.get_service_list() for dynamic_queue in [dynamic_queues["object"] for dynamic_queues in self.dynamic_queues.values()]: dynamic_queue_name = dynamic_queue.dynamic_message_service + + # Check to see if the service is available, skip if not + if dynamic_queue_name not in services: + rospy.logwarn(f"Dynamic queue {dynamic_queue_name} not available, skipping") + continue + # Check to see if we already have a cached response from the last time we did this. cached_response = self.current_packet_dynamic_queue_cache.get(dynamic_queue_name) if not cached_response: @@ -596,18 +591,8 @@ class MessageQueueNode(object): packet_codec=( str(packet_codec) if packet_codec else ''), exclude_message_ids=this_queue_exclusion_ids) - except rospy.service.ServiceException as e: - self.dynamic_queues[dynamic_queue_name]["failed_attempts"] += 1 - - if self.dynamic_queues[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts: - self.dynamic_queues[dynamic_queue_name]["failed_attempts"] = 0 - self.inactive_dynamic_queues[dynamic_queue_name] = self.dynamic_queues[dynamic_queue_name] - del self.dynamic_queues[dynamic_queue_name] - - rospy.logerr(f"Failed to call {dynamic_queue_name} {self.max_failed_attempts} times, will not retry.") - return - else: - rospy.logwarn(f"Error calling get_next_message service on {dynamic_queue_name}: {e}") + except rospy.service.ServiceException: + rospy.logerr(f"Failed to call {dynamic_queue_name}.") queue_response = GetNextQueuedMessageResponse(has_message=False) -- GitLab From 90cdde835fd324909a7bbcf1cef39cae2cdf0d7f Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Thu, 24 Aug 2023 16:40:09 -0400 Subject: [PATCH 7/9] Cleaned up the way that services are checked --- ros_acomms/src/message_queue_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index 934584df..85c5e969 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -571,7 +571,7 @@ class MessageQueueNode(object): dynamic_queue_name = dynamic_queue.dynamic_message_service # Check to see if the service is available, skip if not - if dynamic_queue_name not in services: + if rospy.get_namespace() + dynamic_queue_name not in services: rospy.logwarn(f"Dynamic queue {dynamic_queue_name} not available, skipping") continue -- GitLab From d28661e222c8187d5fa31f614ea0d9bf1146910b Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Thu, 24 Aug 2023 16:54:27 -0400 Subject: [PATCH 8/9] Updated to resolve name --- ros_acomms/src/message_queue_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index 85c5e969..0f0cb201 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -571,7 +571,7 @@ class MessageQueueNode(object): dynamic_queue_name = dynamic_queue.dynamic_message_service # Check to see if the service is available, skip if not - if rospy.get_namespace() + dynamic_queue_name not in services: + if rospy.names.resolve_name(dynamic_queue_name) not in services: rospy.logwarn(f"Dynamic queue {dynamic_queue_name} not available, skipping") continue -- GitLab From 2d3a0886db978d62c02af04b356f52b147e62f69 Mon Sep 17 00:00:00 2001 From: Brennan <brennanmk2200@gmail.com> Date: Thu, 24 Aug 2023 18:08:41 -0400 Subject: [PATCH 9/9] Initial commit --- ros_acomms/src/message_queue_node.py | 920 +++++++++++++++++---------- 1 file changed, 591 insertions(+), 329 deletions(-) diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index 0f0cb201..650bf85a 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -2,27 +2,43 @@ from __future__ import unicode_literals, annotations from itertools import groupby -from typing import Any, Deque, Dict, List, Optional, Tuple, Union +from typing import Any, Deque, Dict, List, Optional, Tuple import rospy from rospy.msg import AnyMsg from std_msgs.msg import Header from functools import partial from importlib import import_module -from collections import namedtuple, deque, defaultdict -from dataclasses import dataclass +from collections import deque, defaultdict from datetime import datetime, timedelta import time -from ros_acomms_msgs.srv import GetNextPacketData, GetNextPacketDataRequest, GetNextPacketDataResponse -from ros_acomms_msgs.srv import GetNextQueuedMessage, GetNextQueuedMessageRequest, GetNextQueuedMessageResponse +from ros_acomms_msgs.srv import ( + GetNextPacketData, + GetNextPacketDataRequest, + GetNextPacketDataResponse, +) +from ros_acomms_msgs.srv import ( + GetNextQueuedMessage, + GetNextQueuedMessageResponse, +) from ros_acomms_msgs.msg import UpdateQueuedMessage -from ros_acomms_msgs.msg import (EncodedAck, NeighborReachability, QueueEnable, - QueuePriority, QueueStatus, QueueSummary, SummaryMessageCount) +from ros_acomms_msgs.msg import ( + EncodedAck, + NeighborReachability, + QueueEnable, + QueuePriority, + QueueStatus, + QueueSummary, + SummaryMessageCount, +) from bitstring import Bits, BitStream, BitArray -from fragmentation_tracker import (FragmentationTracker, FragmentationStartHeader, - FragmentationContHeader, FragmentationAckHeader) +from fragmentation_tracker import ( + FragmentationTracker, + FragmentationStartHeader, + FragmentationContHeader, + FragmentationAckHeader, +) from threading import Lock -import sys import os import signal import rosservice @@ -34,128 +50,70 @@ from rospy_yaml_include.yaml_include import RospyYamlInclude from version_node import version_node -from codec_config_parser import (QueueParams, get_queue_params, get_message_codecs) - - -@dataclass -class DynamicQueue: - dest_address: int - message_codec_id: int # the ID from the message codec params - message_codec: RosMessageCodec - dynamic_message_service: str - packet_codec: RosPacketCodec = None - allow_fragmentation: bool = False - dynamic_update_topic: Optional[str] = None - - -@dataclass -class QueuedMessage: - queue: Any # Should be MessageQueue, but that's not defined yet. - message_codec: RosMessageCodec - message: AnyMsg - dest: int - payload_bits: Bits - length_bits: int - time_added: datetime - fragmentation_tracker: Optional[FragmentationTracker] - message_id_in_queue: Optional[int] = None - transmitted: bool = False - sequence_number: Optional[int] = None - allow_fragmentation: bool = False - fragment_size_bits: int = 8 - dynamic_queue_name: Optional[str] = None +from codec_config_parser import QueueParams, get_queue_params, get_message_codecs - @property - def priority(self) -> int: - return self.queue.priority - - def get_min_fragment_size_bits(self) -> int: - if self.fragmentation_tracker is None: - size = (FragmentationStartHeader.length_bits + self.fragment_size_bits) - elif self.fragmentation_tracker.start_header_acked: - size = (FragmentationContHeader.length_bits + self.fragment_size_bits) - else: - size = (FragmentationStartHeader.length_bits + self.fragment_size_bits) - - rospy.logdebug("Minimum fragment size: {0}".format(size)) - return size - - def get_next_payload_smaller_than_bits(self, size_in_bits: int, - dest_sequence_nums, - remove_from_queue: bool = False - ) -> Tuple[Bits, Bits]: - fragmented = False - inc_seq_num = False - - rospy.logdebug("Getting next bits up to: {0}".format(size_in_bits)) - rospy.logdebug("Self lengthbits: {0}".format(self.length_bits)) - if self.length_bits <= size_in_bits: - message_bits = self.payload_bits - message_header = self.queue.message_codec_id - - else: - fragmented = True - rospy.logdebug("Fragmenting...") - # If we only just decided to fragment... generate tracking object - if self.fragmentation_tracker is None: - sequence_number = dest_sequence_nums[self.dest] - dest_sequence_nums[self.dest] += 1 - rospy.logdebug("Queued Message length bits: {0}".format(self.length_bits)) - self.sequence_number = sequence_number - self.fragmentation_tracker = FragmentationTracker(sequence_number, 0, self.dest, - int(self.time_added.timestamp()), self.length_bits, - self.fragment_size_bits, - payload_bits=self.payload_bits) - - try: - rospy.logdebug("FragmentationTracker sent: {0}-{1} {2}".format( - self.fragmentation_tracker.transferred_blocks[0][0], - self.fragmentation_tracker.transferred_blocks[-1][-1], - len(self.fragmentation_tracker.transferred_blocks))) - except: - rospy.logdebug("No sent bytes yet") - - try: - rospy.logdebug("FragmentationTracker acked: {0}-{1} {2}".format( - self.fragmentation_tracker.acked_blocks[0][0], - self.fragmentation_tracker.acked_blocks[-1][-1], - len(self.fragmentation_tracker.acked_blocks))) - except: - rospy.logdebug("No acked bytes yet") - - message_bits = self.fragmentation_tracker.next_fragment_to_transfer(size_in_bits) - message_header = self.queue.message_codec_id + 128 - - rospy.logdebug("Returning {0} of {1} bits".format(message_bits.length, self.payload_bits.length)) - if remove_from_queue and not fragmented and isinstance(self.queue, MessageQueue): - self.queue.mark_transmitted(self) - - return message_header, message_bits +class Queue: + def __init__( + self, dest_address, message_codec_id, message_codec, allow_fragmentation + ) -> None: + self.dest_address = dest_address + self.message_codec_id = message_codec_id + self.message_codec = message_codec + self.allow_fragmentation = allow_fragmentation + + +class DynamicQueue(Queue): + def __init__( + self, + dest_address: int, + message_codec_id: int, + message_codec: RosMessageCodec, + dynamic_message_service: str, + service: rospy.ServiceProxy, + publisher: rospy.Publisher, + packet_codec: RosPacketCodec = None, + allow_fragmentation: bool = False, + dynamic_update_topic: Optional[str] = None, + ) -> None: + super().__init__( + dest_address=dest_address, + message_codec_id=message_codec_id, + message_codec=message_codec, + allow_fragmentation=allow_fragmentation, + ) + self.dynamic_message_service = dynamic_message_service + self.packet_codec = packet_codec + self.dynamic_update_topic = dynamic_update_topic + self.service = service + self.publisher = publisher + + +class StaticQueue(Queue): + def __init__( + self, + master_message_queue: List[QueuedMessage], + dest_address: int, + message_codec_id: int, # the ID from the message codec params + topic: str, + message_codec: RosMessageCodec, + priority: int = 1, + order: str = "lifo", + maxsize: int = 100, + allow_fragmentation: bool = False, + ) -> None: + super().__init__( + dest_address=dest_address, + message_codec_id=message_codec_id, + message_codec=message_codec, + allow_fragmentation=allow_fragmentation, + ) -class MessageQueue(object): - def __init__(self, - master_message_queue: List[QueuedMessage], - dest_address: int, - message_codec_id: int, # the ID from the message codec params - topic: str, - message_codec: RosMessageCodec, - priority: int = 1, - order: str = 'lifo', - maxsize: int = 100, - allow_fragmentation: bool = False, - dynamic_message_service: Optional[str] = None, - ) -> None: self.master_message_queue = master_message_queue - self.dynamic_message_service = dynamic_message_service self.order = order - self.dest_address: int = dest_address self.priority: int = priority - self.message_codec_id = message_codec_id self.subscribe_topic: str = topic - self.message_codec = message_codec - self.allow_fragmentation: bool = allow_fragmentation self._last_tx_time = datetime.utcnow() self._queue: Deque[QueuedMessage] = deque(maxlen=maxsize) @@ -169,28 +127,31 @@ class MessageQueue(object): try: encoded_bits, metadata = self.message_codec.encode(message) except EncodingFailed as e: - rospy.logwarn(f"Failed to encode message: {e.message} (message is {message}, exception is {e})") + rospy.logwarn( + f"Failed to encode message: {e.message} (message is {message}, exception is {e})" + ) return rospy.logdebug("Append: {}".format(encoded_bits)) dest = self.dest_address if metadata: - if 'dest' in metadata: - dest = metadata['dest'] - - queued_message = QueuedMessage(message=message, - message_codec=self.message_codec, - queue=self, - dest=dest, - payload_bits=encoded_bits, - length_bits=encoded_bits.length, - time_added=datetime.utcnow(), - transmitted=False, - fragmentation_tracker=None, - allow_fragmentation=self.allow_fragmentation, - ) - - if self.order == 'fifo': + if "dest" in metadata: + dest = metadata["dest"] + + queued_message = QueuedMessage( + message=message, + message_codec=self.message_codec, + queue=self, + dest=dest, + payload_bits=encoded_bits, + length_bits=encoded_bits.length, + time_added=datetime.utcnow(), + transmitted=False, + fragmentation_tracker=None, + allow_fragmentation=self.allow_fragmentation, + ) + + if self.order == "fifo": if len(self._queue) < self._queue.maxlen: # drop this message if the queue is full. self._queue.appendleft(queued_message) @@ -205,7 +166,9 @@ class MessageQueue(object): self._queue.append(queued_message) self.master_message_queue.insert(0, queued_message) - rospy.logdebug_throttle(5, "Added message to queue, {} messages".format(len(self._queue))) + rospy.logdebug_throttle( + 5, "Added message to queue, {} messages".format(len(self._queue)) + ) def mark_transmitted(self, queued_message): # type: (QueuedMessage) -> None @@ -215,7 +178,9 @@ class MessageQueue(object): self._queue.remove(queued_message) self.master_message_queue.remove(queued_message) - def get_next_message(self, remove_from_queue: bool = True) -> Optional[QueuedMessage]: + def get_next_message( + self, remove_from_queue: bool = True + ) -> Optional[QueuedMessage]: if len(self._queue) == 0: return None queued_message = self._queue[-1] @@ -224,7 +189,9 @@ class MessageQueue(object): self._last_tx_time = datetime.utcnow() return queued_message - def get_next_message_smaller_than_bits(self, size_in_bits: int, remove_from_queue: bool = False) -> Optional[QueuedMessage]: + def get_next_message_smaller_than_bits( + self, size_in_bits: int, remove_from_queue: bool = False + ) -> Optional[QueuedMessage]: if len(self._queue) == 0: return None for qm in reversed(self._queue): @@ -255,13 +222,139 @@ class MessageQueue(object): return datetime.utcnow() - self._last_tx_time +class QueuedMessage(Queue): + def __init__( + self, + queue: Any, + message_codec: RosMessageCodec, + message: AnyMsg, + dest: int, + payload_bits: Bits, + length_bits: int, + time_added: datetime, + fragmentation_tracker: Optional[FragmentationTracker], + message_id_in_queue: Optional[int] = None, + transmitted: bool = False, + sequence_number: Optional[int] = None, + allow_fragmentation: bool = False, + fragment_size_bits: int = 8, + dynamic_queue_name: Optional[str] = None, + ) -> None: + self.queue = queue + self.message_codec = message_codec + self.message = message + self.dest = dest + self.payload_bits = payload_bits + self.length_bits = length_bits + self.time_added = time_added + self.fragmentation_tracker = fragmentation_tracker + self.message_id_in_queue = message_id_in_queue + self.transmitted = transmitted + self.sequence_number = sequence_number + self.allow_fragmentation = allow_fragmentation + self.fragment_size_bits = fragment_size_bits + self.dynamic_queue_name = dynamic_queue_name + + @property + def priority(self) -> int: + return self.queue.priority + + def get_min_fragment_size_bits(self) -> int: + if self.fragmentation_tracker is None: + size = FragmentationStartHeader.length_bits + self.fragment_size_bits + elif self.fragmentation_tracker.start_header_acked: + size = FragmentationContHeader.length_bits + self.fragment_size_bits + else: + size = FragmentationStartHeader.length_bits + self.fragment_size_bits + + rospy.logdebug("Minimum fragment size: {0}".format(size)) + return size + + def get_next_payload_smaller_than_bits( + self, size_in_bits: int, dest_sequence_nums, remove_from_queue: bool = False + ) -> Tuple[Bits, Bits]: + fragmented = False + inc_seq_num = False + + rospy.logdebug("Getting next bits up to: {0}".format(size_in_bits)) + rospy.logdebug("Self lengthbits: {0}".format(self.length_bits)) + + if self.length_bits <= size_in_bits: + message_bits = self.payload_bits + message_header = self.queue.message_codec_id + + else: + fragmented = True + rospy.logdebug("Fragmenting...") + # If we only just decided to fragment... generate tracking object + if self.fragmentation_tracker is None: + sequence_number = dest_sequence_nums[self.dest] + dest_sequence_nums[self.dest] += 1 + rospy.logdebug( + "Queued Message length bits: {0}".format(self.length_bits) + ) + self.sequence_number = sequence_number + self.fragmentation_tracker = FragmentationTracker( + sequence_number, + 0, + self.dest, + int(self.time_added.timestamp()), + self.length_bits, + self.fragment_size_bits, + payload_bits=self.payload_bits, + ) + + try: + rospy.logdebug( + "FragmentationTracker sent: {0}-{1} {2}".format( + self.fragmentation_tracker.transferred_blocks[0][0], + self.fragmentation_tracker.transferred_blocks[-1][-1], + len(self.fragmentation_tracker.transferred_blocks), + ) + ) + except: + rospy.logdebug("No sent bytes yet") + + try: + rospy.logdebug( + "FragmentationTracker acked: {0}-{1} {2}".format( + self.fragmentation_tracker.acked_blocks[0][0], + self.fragmentation_tracker.acked_blocks[-1][-1], + len(self.fragmentation_tracker.acked_blocks), + ) + ) + except: + rospy.logdebug("No acked bytes yet") + + message_bits = self.fragmentation_tracker.next_fragment_to_transfer( + size_in_bits + ) + message_header = self.queue.message_codec_id + 128 + + rospy.logdebug( + "Returning {0} of {1} bits".format( + message_bits.length, self.payload_bits.length + ) + ) + if ( + remove_from_queue + and not fragmented + and isinstance(self.queue, StaticQueue) + ): + self.queue.mark_transmitted(self) + + return message_header, message_bits + + class MessageQueueNode(object): def __init__(self) -> None: - rospy.init_node('message_queue') - self.update_rate = rospy.get_param('~update_rate', 1) - self.unknown_dests_are_reachable: bool = rospy.get_param('~unknown_dests_are_reachable', True) - self.default_dest = rospy.get_param('~default_dest', 121) - self.default_priority = rospy.get_param('~default_priority', 10) + rospy.init_node("message_queue") + self.update_rate = rospy.get_param("~update_rate", 1) + self.unknown_dests_are_reachable: bool = rospy.get_param( + "~unknown_dests_are_reachable", True + ) + self.default_dest = rospy.get_param("~default_dest", 121) + self.default_priority = rospy.get_param("~default_priority", 10) try: version = version_node() @@ -277,21 +370,32 @@ class MessageQueueNode(object): self.setup_queues() # Status publishers - self.queue_status_pub = rospy.Publisher('queue_status', QueueStatus, queue_size=10, latch=True) + self.queue_status_pub = rospy.Publisher( + "queue_status", QueueStatus, queue_size=10, latch=True + ) # Queue priority/enable update Subscriptions - self.priority_sub = rospy.Subscriber("queue_priority", QueuePriority, self.handle_queue_priority) - self.enable_sub = rospy.Subscriber("queue_enable", QueueEnable, self.handle_queue_enable) + self.priority_sub = rospy.Subscriber( + "queue_priority", QueuePriority, self.handle_queue_priority + ) + self.enable_sub = rospy.Subscriber( + "queue_enable", QueueEnable, self.handle_queue_enable + ) # Subscribe to the Neighbor Reachability updates (which may be published by more than one node - rospy.Subscriber("neighbor_reachability", NeighborReachability, self.on_neighbor_reachability) + rospy.Subscriber( + "neighbor_reachability", NeighborReachability, self.on_neighbor_reachability + ) # We use a lock to ensure that we handle only one get_next_packet_data call at a time self.get_packet_data_lock = Lock() self.get_packet_data_lock_fail_count = 0 - self.current_packet_dynamic_queue_cache: dict[DynamicQueue, GetNextQueuedMessageResponse] = {} - self.get_packet_data_service = rospy.Service('get_next_packet_data', GetNextPacketData, - self.handle_get_next_packet) + self.current_packet_dynamic_queue_cache: dict[ + DynamicQueue, GetNextQueuedMessageResponse + ] = {} + self.get_packet_data_service = rospy.Service( + "get_next_packet_data", GetNextPacketData, self.handle_get_next_packet + ) # Subscribe to ACK message to update queues rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received) @@ -314,19 +418,25 @@ class MessageQueueNode(object): # this is a little awkward, since it's conceptually backward here. # when queuing for TX, we go from topic -> message -> packet # However, organizing it this way allows us to use a single config YAML for both TX and RX - packet_codec_filename = rospy.get_param('~packet_codec_file', None) - codec_dir = rospy.get_param('~codec_directory', None) + packet_codec_filename = rospy.get_param("~packet_codec_file", None) + codec_dir = rospy.get_param("~codec_directory", None) if packet_codec_filename: try: - with open(packet_codec_filename, 'r') as yaml_file: + with open(packet_codec_filename, "r") as yaml_file: constructor = RospyYamlInclude(base_directory=codec_dir) - packet_codec_param = yaml.load(yaml_file, Loader=constructor.add_constructor()) + packet_codec_param = yaml.load( + yaml_file, Loader=constructor.add_constructor() + ) except Exception as e: - rospy.logerr(f"Fatal error reading message codec config from {packet_codec_filename} (with codec directory {codec_dir}): {e}") + rospy.logerr( + f"Fatal error reading message codec config from {packet_codec_filename} (with codec directory {codec_dir}): {e}" + ) else: - packet_codec_param = rospy.get_param('~packet_codecs') - rospy.logwarn("Using the packet_codecs parameter is deprecated (and doesn't support features like " + - "imports). Use the packet_codec_file parameter instead. This parameter will be removed.") + packet_codec_param = rospy.get_param("~packet_codecs") + rospy.logwarn( + "Using the packet_codecs parameter is deprecated (and doesn't support features like " + + "imports). Use the packet_codec_file parameter instead. This parameter will be removed." + ) # QUESTION(lindzey): Do we ever have more than one packet codec? I'm # not convinced that the code in this block actually handles that case @@ -337,27 +447,39 @@ class MessageQueueNode(object): self.queue_params: Dict[int, QueueParams] = {} for packet_codec_details in packet_codec_param: rospy.loginfo("Packet codec details: {}".format(packet_codec_details)) - packet_codec_class = packet_codecs[packet_codec_details['packet_codec']] # type: PacketCodec + packet_codec_class = packet_codecs[ + packet_codec_details["packet_codec"] + ] # type: PacketCodec self.queue_params.update(get_queue_params(packet_codec_details)) message_codecs = get_message_codecs(packet_codec_details) # Convert SRC IDs to integers, since yaml dict keys have to be strings try: - src_names: dict = {int(k):v for k,v in packet_codec_details.get('src_names', {}).items()} + src_names: dict = { + int(k): v + for k, v in packet_codec_details.get("src_names", {}).items() + } except Exception as e: rospy.logerr(f"Error parsing src_names parameter: {e}") for queue_id, params in self.queue_params.items(): # We support three cases here: Publish only, subscribe to a topic, or call a dynamic queue service # If there is no subscribe_topic or dynamic queue, this is a publish-only entry, so we skip it here - if params.subscribe_topic is None and params.dynamic_queue_service is None: - rospy.logdebug(f"ID {queue_id} has no subscribe topic or dynamic queue, skipping it.") + if ( + params.subscribe_topic is None + and params.dynamic_queue_service is None + ): + rospy.logdebug( + f"ID {queue_id} has no subscribe topic or dynamic queue, skipping it." + ) continue # If both the subscribe_topic and dynamic_queue are specified, that's bad. # Throw an error (but don't crash for now) if params.subscribe_topic and params.dynamic_queue_service: - rospy.logerr(f"ID {queue_id} has both a subscribe_topic and dynamic_queue specified. You can't have both.") + rospy.logerr( + f"ID {queue_id} has both a subscribe_topic and dynamic_queue specified. You can't have both." + ) continue # If no destination is specified on this queue, fill in our global default @@ -368,65 +490,92 @@ class MessageQueueNode(object): if params.priority is None: params.priority = self.default_priority - self.destination_reachable[params.destination] = self.unknown_dests_are_reachable + self.destination_reachable[ + params.destination + ] = self.unknown_dests_are_reachable # Now, the "normal" case: the subscribe_topic is specified, so we want to set up a new queue for it. if params.subscribe_topic: topic_name = params.subscribe_topic - if '{src}' in topic_name or '{name}' in topic_name: + if "{src}" in topic_name or "{name}" in topic_name: # We are going to attach multiple subscribers to this queue, one for each entry in the # src_names dict for src, name in src_names.items(): complete_topic_name = topic_name.format(src=src, name=name) - self.init_static_queue(topic_name=complete_topic_name, queue_id=queue_id, - params=params, message_codecs=message_codecs) + self.init_static_queue( + topic_name=complete_topic_name, + queue_id=queue_id, + params=params, + message_codecs=message_codecs, + ) else: - self.init_static_queue(topic_name=topic_name, queue_id=queue_id, - params=params, message_codecs=message_codecs) + self.init_static_queue( + topic_name=topic_name, + queue_id=queue_id, + params=params, + message_codecs=message_codecs, + ) elif params.dynamic_queue_service: - service_name = rospy.names.canonicalize_name(params.dynamic_queue_service) - update_topic_name = rospy.names.canonicalize_name(params.dynamic_update_topic) - rospy.loginfo(f"Initialized dynamic queue ID {queue_id} for {service_name}") + service_name = rospy.names.canonicalize_name( + params.dynamic_queue_service + ) + update_topic_name = rospy.names.canonicalize_name( + params.dynamic_update_topic + ) + rospy.loginfo( + f"Initialized dynamic queue ID {queue_id} for {service_name}" + ) try: - self.dynamic_queues[service_name] = { - "object": - DynamicQueue(message_codec_id=queue_id, - dynamic_message_service=service_name, - dynamic_update_topic=update_topic_name, - dest_address=params.destination, - message_codec=message_codecs[params.codec_id], - ), - "service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), - "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), - } + self.dynamic_queues[service_name] = DynamicQueue( + message_codec_id=queue_id, + dynamic_message_service=service_name, + dynamic_update_topic=update_topic_name, + dest_address=params.destination, + message_codec=message_codecs[params.codec_id], + service=rospy.ServiceProxy( + service_name, GetNextQueuedMessage + ), + publisher=rospy.Publisher( + update_topic_name, UpdateQueuedMessage, queue_size=10 + ), + ) except Exception as e: - rospy.logerr(f"Error setting up dynamic queue for {service_name}: {e}") + rospy.logerr( + f"Error setting up dynamic queue for {service_name}: {e}" + ) # QUESTION(lindzey): It looks like the packet codec is never used? # => AAAAH. This is called for its side effects. Maybe rename to make more clear? # The packet codec initializer sets the packet codec on each message codec - packet_codec = packet_codec_class(message_codecs=message_codecs, - miniframe_header=packet_codec_details.get('miniframe_header', bytes()), - dataframe_header=packet_codec_details.get('dataframe_header', bytes())) - rospy.loginfo("Added packet codec with {} message codecs".format(len(message_codecs))) - - def init_static_queue(self, topic_name: str, queue_id: int, params: QueueParams, message_codecs) -> None: - new_queue = MessageQueue(master_message_queue=self._queued_messages, - dest_address=params.destination, - priority=params.priority, - message_codec_id=queue_id, - topic=topic_name, - order=params.queue_order, - maxsize=params.queue_maxsize, - allow_fragmentation=params.allow_fragmentation, - # QUESTION: Why does the queue own the codec? - message_codec=message_codecs[queue_id], - ) + packet_codec = packet_codec_class( + message_codecs=message_codecs, + miniframe_header=packet_codec_details.get("miniframe_header", bytes()), + dataframe_header=packet_codec_details.get("dataframe_header", bytes()), + ) + rospy.loginfo( + "Added packet codec with {} message codecs".format(len(message_codecs)) + ) + + def init_static_queue( + self, topic_name: str, queue_id: int, params: QueueParams, message_codecs + ) -> None: + new_queue = StaticQueue( + master_message_queue=self._queued_messages, + dest_address=params.destination, + priority=params.priority, + message_codec_id=queue_id, + topic=topic_name, + order=params.queue_order, + maxsize=params.queue_maxsize, + allow_fragmentation=params.allow_fragmentation, + # QUESTION: Why does the queue own the codec? + message_codec=message_codecs[queue_id], + ) self.topic_queue[topic_name] = new_queue - rospy.Subscriber(topic_name, - AnyMsg, - partial(self.on_incoming_message, topic_name=topic_name)) + rospy.Subscriber( + topic_name, AnyMsg, partial(self.on_incoming_message, topic_name=topic_name) + ) if params.is_active: self.active_queue_names.add(topic_name) @@ -438,7 +587,9 @@ class MessageQueueNode(object): queued_dest_addresses = set() message_counts = {} - for key, group in groupby(self._queued_messages, lambda x: (x.priority, x.dest)): + for key, group in groupby( + self._queued_messages, lambda x: (x.priority, x.dest) + ): group_count = sum(1 for _ in group) message_counts[key] = group_count highest_priority = max(highest_priority, key[0]) @@ -449,8 +600,10 @@ class MessageQueueNode(object): # Build the QueueStatus message hdr = Header(stamp=rospy.get_rostime()) summary_message_counts = [] - for k,v in message_counts.items(): - summary = SummaryMessageCount(priority=k[0], dest_address=k[1], message_count=v) + for k, v in message_counts.items(): + summary = SummaryMessageCount( + priority=k[0], dest_address=k[1], message_count=v + ) summary_message_counts.append(summary) queue_summaries = [] for topic, queue in self.topic_queue.items(): @@ -459,29 +612,33 @@ class MessageQueueNode(object): except IndexError: next_message_size_bits = 0 total_bits_in_queue = sum(q.length_bits for q in queue._queue) - queue_summary = QueueSummary(queue_id=queue.message_codec_id, - topic=topic, - priority=queue.priority, - enabled=(topic in self.active_queue_names), - message_count=len(queue._queue), - next_message_size_bits=next_message_size_bits, - total_bits_in_queue=total_bits_in_queue) + queue_summary = QueueSummary( + queue_id=queue.message_codec_id, + topic=topic, + priority=queue.priority, + enabled=(topic in self.active_queue_names), + message_count=len(queue._queue), + next_message_size_bits=next_message_size_bits, + total_bits_in_queue=total_bits_in_queue, + ) queue_summaries.append(queue_summary) - msg = QueueStatus(header=hdr, - total_message_count=total_message_count, - highest_priority=highest_priority, - queued_dest_addresses=queued_dest_addresses, - summary_message_counts=summary_message_counts, - queue_summaries=queue_summaries) + msg = QueueStatus( + header=hdr, + total_message_count=total_message_count, + highest_priority=highest_priority, + queued_dest_addresses=queued_dest_addresses, + summary_message_counts=summary_message_counts, + queue_summaries=queue_summaries, + ) self.queue_status_pub.publish(msg) def on_incoming_message(self, msg_data, topic_name): # type: (AnyMsg, str) -> None - connection_header = msg_data._connection_header['type'].split('/') - ros_pkg = connection_header[0] + '.msg' + connection_header = msg_data._connection_header["type"].split("/") + ros_pkg = connection_header[0] + ".msg" msg_type = connection_header[1] - #print 'Message type detected as ' + msg_type + # print 'Message type detected as ' + msg_type msg_class = getattr(import_module(ros_pkg), msg_type) msg = msg_class() msg.deserialize(msg_data._buff) @@ -490,138 +647,196 @@ class MessageQueueNode(object): # Check if incoming msg topic name matches any of the active queue topic names if topic_name in self.active_queue_names: - rospy.logdebug("TOPIC: {} in active queue list, appending msg".format(topic_name)) + rospy.logdebug( + "TOPIC: {} in active queue list, appending msg".format(topic_name) + ) self.topic_queue[topic_name].append(msg) # NOTE(lindzey): Don't publish status messages here, because it # will lead to maxing out CPU thanks to the subsea queue_node # subscribing to its own status messages. else: - rospy.logdebug("TOPIC: {} not in active queue list, skipping msg".format(topic_name)) + rospy.logdebug( + "TOPIC: {} not in active queue list, skipping msg".format(topic_name) + ) def on_neighbor_reachability(self, msg: NeighborReachability) -> None: self.destination_reachable[msg.dest] = msg.reachable def on_ack_received(self, msg: EncodedAck) -> None: ack = FragmentationAckHeader.from_bits(BitStream(bytes=msg.encoded_ack)) - rospy.logdebug("Received ACK with src: {0} seq num: {1}". format(msg.src, ack.sequence_num)) + rospy.logdebug( + "Received ACK with src: {0} seq num: {1}".format(msg.src, ack.sequence_num) + ) rospy.logdebug("Ack blocks {}-{}".format(ack.blocks[0][0], ack.blocks[-1][1])) rospy.logdebug("ACK N-Blocks: {}".format(ack.n_blocks)) for message in self._queued_messages: - rospy.logdebug("Message dest: {0} Sequence number: {1}".format(message.dest, message.sequence_number)) + rospy.logdebug( + "Message dest: {0} Sequence number: {1}".format( + message.dest, message.sequence_number + ) + ) if message.dest == msg.src and message.sequence_number == ack.sequence_num: - if len(ack.blocks) == 1 and ack.blocks[0][0] == 0 and ack.blocks[-1][-1] == 1 and ack.ack_flag == 0: + if ( + len(ack.blocks) == 1 + and ack.blocks[0][0] == 0 + and ack.blocks[-1][-1] == 1 + and ack.ack_flag == 0 + ): rospy.logwarn( - "Empty ACK received indicated the message recipient does not have start fragment for this message") + "Empty ACK received indicated the message recipient does not have start fragment for this message" + ) message.fragmentation_tracker.reset() return rospy.logdebug("Recording blocks ACKED") - message.fragmentation_tracker.record_blocks_acked(ack.blocks, ack.ack_flag) - rospy.logdebug("Payload size={0}".format(message.fragmentation_tracker.payload_size_blocks)) + message.fragmentation_tracker.record_blocks_acked( + ack.blocks, ack.ack_flag + ) + rospy.logdebug( + "Payload size={0}".format( + message.fragmentation_tracker.payload_size_blocks + ) + ) if message.fragmentation_tracker.acked(): # Remove this message from the queue rospy.logdebug( - "Message seq_num: {0} to dest: {1} acked. Removing from queue.".format(message.sequence_number, - message.dest)) + "Message seq_num: {0} to dest: {1} acked. Removing from queue.".format( + message.sequence_number, message.dest + ) + ) message.queue._queue.remove(message) self._queued_messages.remove(message) def mark_transmitted(self, msg: QueuedMessage) -> None: # For now, we don't do this for locally queued messages (since we don't track it properly) - #msg.queue.mark_transmitted(msg) + # msg.queue.mark_transmitted(msg) # ... but we do track it for dynamic queues if isinstance(msg.queue, DynamicQueue): if msg.queue.dynamic_message_service in self.dynamic_queues.keys(): - update_msg = UpdateQueuedMessage(header=Header(stamp=rospy.Time.now()), - message_id=msg.message_id_in_queue, - event=UpdateQueuedMessage.EVENT_LINK_TRANSMITTED) - self.dynamic_queues[msg.dynamic_queue_name]["publisher"].publish(update_msg) - rospy.logdebug(f"Marked message as transmitted: {msg.message_id_in_queue} on {msg.queue.dynamic_message_service}") - - def get_message_from_queue_response(self, dynamic_queue, queue_response: GetNextQueuedMessageResponse) -> QueuedMessage: - message_bits = BitArray(bytes=queue_response.data, length=queue_response.data_size_in_bits) - - next_message = QueuedMessage(queue=self.dynamic_queues[dynamic_queue]["object"], - message=None, - message_codec=self.dynamic_queues[dynamic_queue]["object"].message_codec, - dest=120, - time_added=datetime.utcnow(), - fragmentation_tracker=None, - allow_fragmentation=False, - dynamic_queue_name=self.dynamic_queues[dynamic_queue]["object"].dynamic_message_service, - message_id_in_queue=queue_response.message_id, - payload_bits=message_bits, - length_bits=len(message_bits)) + update_msg = UpdateQueuedMessage( + header=Header(stamp=rospy.Time.now()), + message_id=msg.message_id_in_queue, + event=UpdateQueuedMessage.EVENT_LINK_TRANSMITTED, + ) + self.dynamic_queues[msg.dynamic_queue_name].publisher.publish( + update_msg + ) + rospy.logdebug( + f"Marked message as transmitted: {msg.message_id_in_queue} on {msg.queue.dynamic_message_service}" + ) + + def get_message_from_queue_response( + self, dynamic_queue, queue_response: GetNextQueuedMessageResponse + ) -> QueuedMessage: + message_bits = BitArray( + bytes=queue_response.data, length=queue_response.data_size_in_bits + ) + + next_message = QueuedMessage( + queue=self.dynamic_queues[dynamic_queue], + message=None, + message_codec=self.dynamic_queues[dynamic_queue].message_codec, + dest=120, + time_added=datetime.utcnow(), + fragmentation_tracker=None, + allow_fragmentation=False, + dynamic_queue_name=self.dynamic_queues[dynamic_queue].dynamic_message_service, + message_id_in_queue=queue_response.message_id, + payload_bits=message_bits, + length_bits=len(message_bits), + ) return next_message - def query_dynamic_queues(self, max_size_in_bits: int = 131072, - dest_address: Optional[int] = None, - packet_codec: Optional[str] = None, - check_reachability: bool = True, - exclude_messages: Optional[List[QueuedMessage]] = []) -> Optional[ - tuple[str, int]]: - + def query_dynamic_queues( + self, + max_size_in_bits: int = 131072, + dest_address: Optional[int] = None, + packet_codec: Optional[str] = None, + check_reachability: bool = True, + exclude_messages: Optional[List[QueuedMessage]] = [], + ) -> Optional[tuple[str, int]]: start_time = time.time() # for profiling services = rosservice.get_service_list() - for dynamic_queue in [dynamic_queues["object"] for dynamic_queues in self.dynamic_queues.values()]: + for dynamic_queue in [ + dynamic_queues for dynamic_queues in self.dynamic_queues.values() + ]: dynamic_queue_name = dynamic_queue.dynamic_message_service # Check to see if the service is available, skip if not if rospy.names.resolve_name(dynamic_queue_name) not in services: - rospy.logwarn(f"Dynamic queue {dynamic_queue_name} not available, skipping") + rospy.logwarn( + f"Dynamic queue {dynamic_queue_name} not available, skipping" + ) continue # Check to see if we already have a cached response from the last time we did this. - cached_response = self.current_packet_dynamic_queue_cache.get(dynamic_queue_name) + cached_response = self.current_packet_dynamic_queue_cache.get( + dynamic_queue_name + ) if not cached_response: # Figure out which messages to exclude (because they are already queued) - this_queue_exclusion_ids = [m.message_id_in_queue for m in exclude_messages if - m.dynamic_queue_name == dynamic_queue_name] + this_queue_exclusion_ids = [ + m.message_id_in_queue + for m in exclude_messages + if m.dynamic_queue_name == dynamic_queue_name + ] # Get a reference to the service - dynamic_get_next_queued_message = self.dynamic_queues[dynamic_queue_name]["service"] + dynamic_get_next_queued_message = self.dynamic_queues[ + dynamic_queue_name + ].service try: - queue_response = dynamic_get_next_queued_message(max_size_in_bits=max_size_in_bits, - dest_address=dest_address, - packet_codec=( - str(packet_codec) if packet_codec else ''), - exclude_message_ids=this_queue_exclusion_ids) + queue_response = dynamic_get_next_queued_message( + max_size_in_bits=max_size_in_bits, + dest_address=dest_address, + packet_codec=(str(packet_codec) if packet_codec else ""), + exclude_message_ids=this_queue_exclusion_ids, + ) except rospy.service.ServiceException: rospy.logerr(f"Failed to call {dynamic_queue_name}.") queue_response = GetNextQueuedMessageResponse(has_message=False) # Cache the response - self.current_packet_dynamic_queue_cache[dynamic_queue_name] = queue_response + self.current_packet_dynamic_queue_cache[ + dynamic_queue_name + ] = queue_response rospy.logdebug(f"Dynamic queue query took {time.time()-start_time} s") # Now, figure out what the highest priority dynamic message is, if any - priorities = {queue: response.priority for queue, response in - self.current_packet_dynamic_queue_cache.items() - if response.has_message} + priorities = { + queue: response.priority + for queue, response in self.current_packet_dynamic_queue_cache.items() + if response.has_message + } if len(priorities) > 0: - sorted_priorities = sorted(priorities.items(), key=lambda x:x[1]) + sorted_priorities = sorted(priorities.items(), key=lambda x: x[1]) highest_priority_queue = sorted_priorities[-1][0] highest_priority = sorted_priorities[-1][1] return highest_priority_queue, highest_priority # No messages in dynamic queues return None, None - def query_static_queues(self, max_size_in_bits: int = 131072, - dest_address: Optional[int] = None, - packet_codec: Optional[str] = None, - check_reachability: bool = True, - exclude_messages: Optional[List[QueuedMessage]] = []) -> Optional[QueuedMessage]: + def query_static_queues( + self, + max_size_in_bits: int = 131072, + dest_address: Optional[int] = None, + packet_codec: Optional[str] = None, + check_reachability: bool = True, + exclude_messages: Optional[List[QueuedMessage]] = [], + ) -> Optional[QueuedMessage]: next_message = None - for message in sorted(self._queued_messages, key=lambda m: m.priority, reverse=True): + for message in sorted( + self._queued_messages, key=lambda m: m.priority, reverse=True + ): if message.queue.subscribe_topic not in self.active_queue_names: rospy.logdebug("Disqualified: inactive queue") continue @@ -636,13 +851,22 @@ class MessageQueueNode(object): # Move on to check the next message rospy.logdebug("Disqualified: packet_codec") continue - if not message.allow_fragmentation and message.length_bits > max_size_in_bits: + if ( + not message.allow_fragmentation + and message.length_bits > max_size_in_bits + ): rospy.logdebug("Disqualified: too large, no fragmentation") continue - if message.allow_fragmentation and message.get_min_fragment_size_bits() > max_size_in_bits: + if ( + message.allow_fragmentation + and message.get_min_fragment_size_bits() > max_size_in_bits + ): rospy.logdebug("Disqualified: fragments too large") continue - if message.allow_fragmentation and message.get_min_fragment_size_bits() == 0: + if ( + message.allow_fragmentation + and message.get_min_fragment_size_bits() == 0 + ): rospy.logdebug("Disqualified: no fragments to send at this time") continue # if we aren't disqualified by now, this is our message. @@ -651,33 +875,41 @@ class MessageQueueNode(object): return next_message - def get_next_message(self, max_size_in_bits: int = 131072, - dest_address: Optional[int] = None, - packet_codec: Optional[str] = None, - check_reachability: bool = True, - exclude_messages: Optional[List[QueuedMessage]] = []) -> Optional[QueuedMessage]: - + def get_next_message( + self, + max_size_in_bits: int = 131072, + dest_address: Optional[int] = None, + packet_codec: Optional[str] = None, + check_reachability: bool = True, + exclude_messages: Optional[List[QueuedMessage]] = [], + ) -> Optional[QueuedMessage]: # Query registered dynamic queues # This will update the cache (self.current_packet_dynamic_queue_cache) and return the highest priority queue. # If one of these messages has the highest priority (below), we get the cached GetNextQueuedMessageResponse and # use it to make the QueuedMessage. try: - highest_priority_dynamic_queue, highest_dynamic_priority = self.query_dynamic_queues( + ( + highest_priority_dynamic_queue, + highest_dynamic_priority, + ) = self.query_dynamic_queues( max_size_in_bits=max_size_in_bits, dest_address=dest_address, packet_codec=packet_codec, check_reachability=check_reachability, - exclude_messages=exclude_messages) + exclude_messages=exclude_messages, + ) except Exception as e: rospy.logerr(f"get_next_message Error querying dynamic queues: {e}") highest_priority_dynamic_queue, highest_dynamic_priority = None, None # Query the static queues - highest_priority_static_message = self.query_static_queues(max_size_in_bits=max_size_in_bits, - dest_address=dest_address, - packet_codec=packet_codec, - check_reachability=check_reachability, - exclude_messages=exclude_messages) + highest_priority_static_message = self.query_static_queues( + max_size_in_bits=max_size_in_bits, + dest_address=dest_address, + packet_codec=packet_codec, + check_reachability=check_reachability, + exclude_messages=exclude_messages, + ) if not highest_priority_static_message and not highest_dynamic_priority: # No message return None @@ -687,10 +919,16 @@ class MessageQueueNode(object): elif highest_dynamic_priority and not highest_priority_static_message: # Dynamic queues have a message, but static don't # Clear the entry from the cache - message = self.get_message_from_queue_response(highest_priority_dynamic_queue, self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue]) + message = self.get_message_from_queue_response( + highest_priority_dynamic_queue, + self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue], + ) del self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue] elif highest_dynamic_priority > highest_priority_static_message.priority: - message = self.get_message_from_queue_response(highest_priority_dynamic_queue, self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue]) + message = self.get_message_from_queue_response( + highest_priority_dynamic_queue, + self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue], + ) del self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue] else: @@ -718,24 +956,35 @@ class MessageQueueNode(object): try: self.active_queue_names.remove(sub_topic) except KeyError: - rospy.loginfo("Cannot disable currently-inactive queue ID {} ({})" - .format(msg.queue_id, sub_topic)) - - def handle_get_next_packet(self, req: GetNextPacketDataRequest) -> GetNextPacketDataResponse: + rospy.loginfo( + "Cannot disable currently-inactive queue ID {} ({})".format( + msg.queue_id, sub_topic + ) + ) + + def handle_get_next_packet( + self, req: GetNextPacketDataRequest + ) -> GetNextPacketDataResponse: # Things get squirrely if there is more than one request at the same time... our behavior isn't defined. # If there is still a request in progress when we get another one, we just return nothing on the later request has_lock = self.get_packet_data_lock.acquire(blocking=False) if not has_lock: self.get_packet_data_lock_fail_count += 1 - rospy.logwarn(f"Getting next packet data failed because we are still servicing an earlier request " + - f"({self.get_packet_data_lock_fail_count} times)") + rospy.logwarn( + f"Getting next packet data failed because we are still servicing an earlier request " + + f"({self.get_packet_data_lock_fail_count} times)" + ) if self.get_packet_data_lock_fail_count > 10: # This is a brutish way to try to avoid an error condition where we get "stuck" servicing one request - rospy.logerr("Getting next packet data failed 10 times. Killing the process so (hopefully) it will work on restart.") + rospy.logerr( + "Getting next packet data failed 10 times. Killing the process so (hopefully) it will work on restart." + ) # sys.exit doesn't work when called from a thread. This sends a keyboard interrupt # to the process and does work. os.kill(os.getpid(), signal.SIGINT) - return GetNextPacketDataResponse(num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0) + return GetNextPacketDataResponse( + num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0 + ) self.get_packet_data_lock_fail_count = 0 try: @@ -743,7 +992,10 @@ class MessageQueueNode(object): # been serviced self.current_packet_dynamic_queue_cache = {} - rospy.logdebug("Getting next packet data for transmission: " + str(req).replace('\n', ', ')) + rospy.logdebug( + "Getting next packet data for transmission: " + + str(req).replace("\n", ", ") + ) if req.match_dest: requested_dest = req.dest else: @@ -754,17 +1006,25 @@ class MessageQueueNode(object): # To get the next packet, the first thing we do is get the highest priority message to transmit # Then, we get a packet using its packet codec. # This is also where we should eventually evaluate which rate to use - highest_priority_message = self.get_next_message(dest_address=requested_dest) + highest_priority_message = self.get_next_message( + dest_address=requested_dest + ) rospy.logdebug(f"Highest priority message: {highest_priority_message}") if not highest_priority_message: self.get_packet_data_lock.release() - return GetNextPacketDataResponse(num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0) + return GetNextPacketDataResponse( + num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0 + ) # TODO: check for a race condition where the dest of the top message in the queue changes packet_codec = highest_priority_message.message_codec.packet_codec - messages_in_packet, miniframe_bytes, dataframe_bytes = packet_codec.encode_payload(num_miniframe_bytes, - num_dataframe_bytes, - self) + ( + messages_in_packet, + miniframe_bytes, + dataframe_bytes, + ) = packet_codec.encode_payload( + num_miniframe_bytes, num_dataframe_bytes, self + ) for msg in messages_in_packet: self.mark_transmitted(msg) except Exception as e: @@ -775,13 +1035,15 @@ class MessageQueueNode(object): else: self.get_packet_data_lock.release() - return GetNextPacketDataResponse(dest=highest_priority_message.dest, - miniframe_bytes=miniframe_bytes, - dataframe_bytes=dataframe_bytes, - queued_message_ids=[], - num_miniframe_bytes=len(miniframe_bytes), - num_dataframe_bytes=len(dataframe_bytes), - num_messages=len(messages_in_packet)) + return GetNextPacketDataResponse( + dest=highest_priority_message.dest, + miniframe_bytes=miniframe_bytes, + dataframe_bytes=dataframe_bytes, + queued_message_ids=[], + num_miniframe_bytes=len(miniframe_bytes), + num_dataframe_bytes=len(dataframe_bytes), + num_messages=len(messages_in_packet), + ) def spin(self) -> None: # Issue status periodically. @@ -791,7 +1053,7 @@ class MessageQueueNode(object): rate.sleep() -if __name__ == '__main__': +if __name__ == "__main__": try: node = MessageQueueNode() rospy.loginfo("Message Queue shutdown") -- GitLab