From 914eeb855af00316bc10ef75feb2872e5230ea3c Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Tue, 15 Aug 2023 17:11:09 -0400
Subject: [PATCH 1/9] Added wait_for_service

---
 ros_acomms/src/message_queue_node.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index e537b8f1..22da15de 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -399,11 +399,13 @@ class MessageQueueNode(object):
                                                                 dest_address=params.destination,
                                                                 message_codec=message_codecs[params.codec_id], )
 
+                        rospy.wait_for_service(service_name, timeout=2)
                         self.dynamic_get_next_message_services[service_name] = rospy.ServiceProxy(service_name,
-                                                                                                  GetNextQueuedMessage)
+                                                                                                GetNextQueuedMessage)
                         self.dynamic_queue_update_publishers[service_name] = rospy.Publisher(
                             update_topic_name, UpdateQueuedMessage, queue_size=10)
-
+                    except rospy.ROSException as exception:
+                        rospy.logerr(f"Timed out waiting for {service_name}: {exception}")
                     except Exception as e:
                         rospy.logerr(f"Error setting up dynamic queue for {service_name}: {e}")
 
-- 
GitLab


From e9754ff183bf59f623064c91cf7d7762ebafc8d3 Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Tue, 15 Aug 2023 18:10:14 -0400
Subject: [PATCH 2/9] Added system for max failed attempts for dynamic queue
 update services

---
 ros_acomms/src/message_queue_node.py | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index 22da15de..f2960a3c 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -275,8 +275,7 @@ class MessageQueueNode(object):
 
         # Init dynamic queues
         self.dynamic_queues = {}
-        self.dynamic_get_next_message_services = {}
-        self.dynamic_queue_update_publishers = {}
+        self.dynamic_queue_intrinsics = {}
 
         # This needs to be called before any subscribers are created
         self.setup_queues()
@@ -301,6 +300,8 @@ class MessageQueueNode(object):
         # Subscribe to ACK message to update queues
         rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received)
 
+        self.max_failed_attempts = rospy.get_param('~dynamic_queue_service_max_failed_attempts', 5)
+
         rospy.loginfo("Message Queue started")
         self.spin()
 
@@ -398,14 +399,8 @@ class MessageQueueNode(object):
                                                                 dynamic_update_topic=update_topic_name,
                                                                 dest_address=params.destination,
                                                                 message_codec=message_codecs[params.codec_id], )
+                        self.dynamic_queue_intrinsics[service_name] = {"service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), "failed_attempts": 0}
 
-                        rospy.wait_for_service(service_name, timeout=2)
-                        self.dynamic_get_next_message_services[service_name] = rospy.ServiceProxy(service_name,
-                                                                                                GetNextQueuedMessage)
-                        self.dynamic_queue_update_publishers[service_name] = rospy.Publisher(
-                            update_topic_name, UpdateQueuedMessage, queue_size=10)
-                    except rospy.ROSException as exception:
-                        rospy.logerr(f"Timed out waiting for {service_name}: {exception}")
                     except Exception as e:
                         rospy.logerr(f"Error setting up dynamic queue for {service_name}: {e}")
 
@@ -540,11 +535,11 @@ class MessageQueueNode(object):
 
         # ... but we do track it for dynamic queues
         if isinstance(msg.queue, DynamicQueue):
-            if msg.queue.dynamic_message_service in self.dynamic_queue_update_publishers:
+            if msg.queue.dynamic_message_service in self.dynamic_queue_intrinsics.keys():
                 update_msg = UpdateQueuedMessage(header=Header(stamp=rospy.Time.now()),
                                                  message_id=msg.message_id_in_queue,
                                                  event=UpdateQueuedMessage.EVENT_LINK_TRANSMITTED)
-                self.dynamic_queue_update_publishers[msg.dynamic_queue_name].publish(update_msg)
+                self.dynamic_queue_intrinsics[msg.dynamic_queue_name]["publisher"].publish(update_msg)
                 rospy.logdebug(f"Marked message as transmitted: {msg.message_id_in_queue} on {msg.queue.dynamic_message_service}")
 
     def get_message_from_queue_response(self, dynamic_queue, queue_response: GetNextQueuedMessageResponse) -> QueuedMessage:
@@ -581,7 +576,7 @@ class MessageQueueNode(object):
                                             m.dynamic_queue_name == dynamic_queue_name]
 
                 # Get a reference to the service
-                dynamic_get_next_queued_message = self.dynamic_get_next_message_services[dynamic_queue_name]
+                dynamic_get_next_queued_message = self.dynamic_queue_intrinsics[dynamic_queue_name]["service"]
 
                 try:
                     queue_response = dynamic_get_next_queued_message(max_size_in_bits=max_size_in_bits,
@@ -595,6 +590,11 @@ class MessageQueueNode(object):
                     # no messages
                     queue_response = GetNextQueuedMessageResponse(has_message=False)
 
+                    self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] += 1
+
+                    if self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] > self.max_failed_attempts:
+                        del self.dynamic_queue_intrinsics[dynamic_queue_name]
+
                 # Cache the response
                 self.current_packet_dynamic_queue_cache[dynamic_queue_name] = queue_response
 
-- 
GitLab


From d014ffdb36ca5319f4925dd29c86774130c925a3 Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Tue, 15 Aug 2023 18:31:32 -0400
Subject: [PATCH 3/9] Added better logging and trigger deletion from
 dynamic_queues dict

---
 ros_acomms/src/message_queue_node.py | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index f2960a3c..3a1dea99 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -585,15 +585,18 @@ class MessageQueueNode(object):
                                                                          str(packet_codec) if packet_codec else ''),
                                                                      exclude_message_ids=this_queue_exclusion_ids)
                 except rospy.service.ServiceException as e:
-                    rospy.logerr_throttle(1, f"Error calling get_next_message service on {dynamic_queue_name}: {e}")
-                    # if we have an error, don't try again for this packet (pretend it told us that there are
-                    # no messages
-                    queue_response = GetNextQueuedMessageResponse(has_message=False)
-
                     self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] += 1
 
-                    if self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] > self.max_failed_attempts:
+                    if self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts:
                         del self.dynamic_queue_intrinsics[dynamic_queue_name]
+                        del self.dynamic_queues[dynamic_queue_name]
+
+                        rospy.logerr(f"Failed to call {dynamic_queue_name} {self.max_failed_attempts} times, will not retry.")
+                        return
+                    else:
+                        rospy.logwarn(f"Error calling get_next_message service on {dynamic_queue_name}: {e}")
+
+                    queue_response = GetNextQueuedMessageResponse(has_message=False)
 
                 # Cache the response
                 self.current_packet_dynamic_queue_cache[dynamic_queue_name] = queue_response
-- 
GitLab


From aaf785fedc68d8fe6e43f6e16b673707d7dbb3ff Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Wed, 16 Aug 2023 10:45:38 -0400
Subject: [PATCH 4/9] Combined dynamic queue object into single nested dict

---
 ros_acomms/src/message_queue_node.py | 41 ++++++++++++++--------------
 1 file changed, 21 insertions(+), 20 deletions(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index 3a1dea99..fb7635c9 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -34,9 +34,6 @@ from rospy_yaml_include.yaml_include import RospyYamlInclude
 
 from version_node import version_node
 
-#TODO: move this
-from packet_dispatch_node import DecoderListEntry
-
 from codec_config_parser import (QueueParams, get_queue_params, get_message_codecs)
 
 
@@ -275,7 +272,6 @@ class MessageQueueNode(object):
 
         # Init dynamic queues
         self.dynamic_queues = {}
-        self.dynamic_queue_intrinsics = {}
 
         # This needs to be called before any subscribers are created
         self.setup_queues()
@@ -394,12 +390,18 @@ class MessageQueueNode(object):
                     update_topic_name = rospy.names.canonicalize_name(params.dynamic_update_topic)
                     rospy.loginfo(f"Initialized dynamic queue ID {queue_id} for {service_name}")
                     try:
-                        self.dynamic_queues[service_name] = DynamicQueue(message_codec_id=queue_id,
-                                                                dynamic_message_service=service_name,
-                                                                dynamic_update_topic=update_topic_name,
-                                                                dest_address=params.destination,
-                                                                message_codec=message_codecs[params.codec_id], )
-                        self.dynamic_queue_intrinsics[service_name] = {"service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), "failed_attempts": 0}
+                        self.dynamic_queues[service_name] = {
+                                                                        "object": 
+                                                                            DynamicQueue(message_codec_id=queue_id,
+                                                                            dynamic_message_service=service_name,
+                                                                            dynamic_update_topic=update_topic_name,
+                                                                            dest_address=params.destination,
+                                                                            message_codec=message_codecs[params.codec_id], 
+                                                                            ), 
+                                                                        "service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), 
+                                                                        "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), 
+                                                                        "failed_attempts": 0
+                                                                    }
 
                     except Exception as e:
                         rospy.logerr(f"Error setting up dynamic queue for {service_name}: {e}")
@@ -535,24 +537,24 @@ class MessageQueueNode(object):
 
         # ... but we do track it for dynamic queues
         if isinstance(msg.queue, DynamicQueue):
-            if msg.queue.dynamic_message_service in self.dynamic_queue_intrinsics.keys():
+            if msg.queue.dynamic_message_service in self.dynamic_queues.keys():
                 update_msg = UpdateQueuedMessage(header=Header(stamp=rospy.Time.now()),
                                                  message_id=msg.message_id_in_queue,
                                                  event=UpdateQueuedMessage.EVENT_LINK_TRANSMITTED)
-                self.dynamic_queue_intrinsics[msg.dynamic_queue_name]["publisher"].publish(update_msg)
+                self.dynamic_queues[msg.dynamic_queue_name]["publisher"].publish(update_msg)
                 rospy.logdebug(f"Marked message as transmitted: {msg.message_id_in_queue} on {msg.queue.dynamic_message_service}")
 
     def get_message_from_queue_response(self, dynamic_queue, queue_response: GetNextQueuedMessageResponse) -> QueuedMessage:
         message_bits = BitArray(bytes=queue_response.data, length=queue_response.data_size_in_bits)
 
-        next_message = QueuedMessage(queue=self.dynamic_queues[dynamic_queue],
+        next_message = QueuedMessage(queue=self.dynamic_queues[dynamic_queue]["object"],
                                      message=None,
-                                     message_codec=self.dynamic_queues[dynamic_queue].message_codec,
+                                     message_codec=self.dynamic_queues[dynamic_queue]["object"].message_codec,
                                      dest=120,
                                      time_added=datetime.utcnow(),
                                      fragmentation_tracker=None,
                                      allow_fragmentation=False,
-                                     dynamic_queue_name=self.dynamic_queues[dynamic_queue].dynamic_message_service,
+                                     dynamic_queue_name=self.dynamic_queues[dynamic_queue]["object"].dynamic_message_service,
                                      message_id_in_queue=queue_response.message_id,
                                      payload_bits=message_bits,
                                      length_bits=len(message_bits))
@@ -566,7 +568,7 @@ class MessageQueueNode(object):
                                     tuple[str, int]]:
 
         start_time = time.time()  # for profiling
-        for dynamic_queue in self.dynamic_queues.values():
+        for dynamic_queue in [dynamic_queues["object"] for dynamic_queues in self.dynamic_queues.values()]:
             dynamic_queue_name = dynamic_queue.dynamic_message_service
             # Check to see if we already have a cached response from the last time we did this.
             cached_response = self.current_packet_dynamic_queue_cache.get(dynamic_queue_name)
@@ -576,7 +578,7 @@ class MessageQueueNode(object):
                                             m.dynamic_queue_name == dynamic_queue_name]
 
                 # Get a reference to the service
-                dynamic_get_next_queued_message = self.dynamic_queue_intrinsics[dynamic_queue_name]["service"]
+                dynamic_get_next_queued_message = self.dynamic_queues[dynamic_queue_name]["service"]
 
                 try:
                     queue_response = dynamic_get_next_queued_message(max_size_in_bits=max_size_in_bits,
@@ -585,10 +587,9 @@ class MessageQueueNode(object):
                                                                          str(packet_codec) if packet_codec else ''),
                                                                      exclude_message_ids=this_queue_exclusion_ids)
                 except rospy.service.ServiceException as e:
-                    self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] += 1
+                    self.dynamic_queues[dynamic_queue_name]["failed_attempts"] += 1
 
-                    if self.dynamic_queue_intrinsics[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts:
-                        del self.dynamic_queue_intrinsics[dynamic_queue_name]
+                    if self.dynamic_queues[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts:
                         del self.dynamic_queues[dynamic_queue_name]
 
                         rospy.logerr(f"Failed to call {dynamic_queue_name} {self.max_failed_attempts} times, will not retry.")
-- 
GitLab


From 94f27007ed2b9398ec64c1a869716862aababf29 Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Thu, 24 Aug 2023 11:26:46 -0400
Subject: [PATCH 5/9] Added inactive_queue dict to revive dead queues

---
 ros_acomms/src/message_queue_node.py | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index fb7635c9..11944deb 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -25,7 +25,7 @@ from threading import Lock
 import sys
 import os
 import signal
-
+import rosservice
 from ltcodecs import RosMessageCodec, EncodingFailed
 from acomms_codecs.ros_packet_codec import RosPacketCodec
 from acomms_codecs.packet_codecs import packet_codecs
@@ -273,6 +273,8 @@ class MessageQueueNode(object):
         # Init dynamic queues
         self.dynamic_queues = {}
 
+        self.inactive_dynamic_queues = {}
+
         # This needs to be called before any subscribers are created
         self.setup_queues()
 
@@ -568,6 +570,14 @@ class MessageQueueNode(object):
                                     tuple[str, int]]:
 
         start_time = time.time()  # for profiling
+        if len(self.inactive_dynamic_queues) > 0:
+            services = rosservice.get_service_list()
+            for queue in self.inactive_dynamic_queues:
+                if self.inactive_dynamic_queues[queue]["service"] in services:
+                    rospy.loginfo(f"Reactivating dynamic queue {queue}")
+                    self.dynamic_queues[queue] = self.inactive_dynamic_queues[queue]
+                    del self.inactive_dynamic_queues[queue]
+
         for dynamic_queue in [dynamic_queues["object"] for dynamic_queues in self.dynamic_queues.values()]:
             dynamic_queue_name = dynamic_queue.dynamic_message_service
             # Check to see if we already have a cached response from the last time we did this.
@@ -590,6 +600,8 @@ class MessageQueueNode(object):
                     self.dynamic_queues[dynamic_queue_name]["failed_attempts"] += 1
 
                     if self.dynamic_queues[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts:
+                        self.dynamic_queues[dynamic_queue_name]["failed_attempts"] = 0
+                        self.inactive_dynamic_queues[dynamic_queue_name] = self.dynamic_queues[dynamic_queue_name]
                         del self.dynamic_queues[dynamic_queue_name]
 
                         rospy.logerr(f"Failed to call {dynamic_queue_name} {self.max_failed_attempts} times, will not retry.")
-- 
GitLab


From dc9aa015869705bd8d8750cb00c461b32fca6046 Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Thu, 24 Aug 2023 16:17:12 -0400
Subject: [PATCH 6/9] Cleaned up the way that services are checked

---
 ros_acomms/src/message_queue_node.py | 33 ++++++++--------------------
 1 file changed, 9 insertions(+), 24 deletions(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index 11944deb..934584df 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -273,8 +273,6 @@ class MessageQueueNode(object):
         # Init dynamic queues
         self.dynamic_queues = {}
 
-        self.inactive_dynamic_queues = {}
-
         # This needs to be called before any subscribers are created
         self.setup_queues()
 
@@ -298,8 +296,6 @@ class MessageQueueNode(object):
         # Subscribe to ACK message to update queues
         rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received)
 
-        self.max_failed_attempts = rospy.get_param('~dynamic_queue_service_max_failed_attempts', 5)
-
         rospy.loginfo("Message Queue started")
         self.spin()
 
@@ -402,7 +398,6 @@ class MessageQueueNode(object):
                                                                             ), 
                                                                         "service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), 
                                                                         "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), 
-                                                                        "failed_attempts": 0
                                                                     }
 
                     except Exception as e:
@@ -570,16 +565,16 @@ class MessageQueueNode(object):
                                     tuple[str, int]]:
 
         start_time = time.time()  # for profiling
-        if len(self.inactive_dynamic_queues) > 0:
-            services = rosservice.get_service_list()
-            for queue in self.inactive_dynamic_queues:
-                if self.inactive_dynamic_queues[queue]["service"] in services:
-                    rospy.loginfo(f"Reactivating dynamic queue {queue}")
-                    self.dynamic_queues[queue] = self.inactive_dynamic_queues[queue]
-                    del self.inactive_dynamic_queues[queue]
+        services = rosservice.get_service_list()
 
         for dynamic_queue in [dynamic_queues["object"] for dynamic_queues in self.dynamic_queues.values()]:
             dynamic_queue_name = dynamic_queue.dynamic_message_service
+
+            # Check to see if the service is available, skip if not
+            if dynamic_queue_name not in services:
+                rospy.logwarn(f"Dynamic queue {dynamic_queue_name} not available, skipping")
+                continue
+
             # Check to see if we already have a cached response from the last time we did this.
             cached_response = self.current_packet_dynamic_queue_cache.get(dynamic_queue_name)
             if not cached_response:
@@ -596,18 +591,8 @@ class MessageQueueNode(object):
                                                                      packet_codec=(
                                                                          str(packet_codec) if packet_codec else ''),
                                                                      exclude_message_ids=this_queue_exclusion_ids)
-                except rospy.service.ServiceException as e:
-                    self.dynamic_queues[dynamic_queue_name]["failed_attempts"] += 1
-
-                    if self.dynamic_queues[dynamic_queue_name]["failed_attempts"] >= self.max_failed_attempts:
-                        self.dynamic_queues[dynamic_queue_name]["failed_attempts"] = 0
-                        self.inactive_dynamic_queues[dynamic_queue_name] = self.dynamic_queues[dynamic_queue_name]
-                        del self.dynamic_queues[dynamic_queue_name]
-
-                        rospy.logerr(f"Failed to call {dynamic_queue_name} {self.max_failed_attempts} times, will not retry.")
-                        return
-                    else:
-                        rospy.logwarn(f"Error calling get_next_message service on {dynamic_queue_name}: {e}")
+                except rospy.service.ServiceException:
+                    rospy.logerr(f"Failed to call {dynamic_queue_name}.")
 
                     queue_response = GetNextQueuedMessageResponse(has_message=False)
 
-- 
GitLab


From 90cdde835fd324909a7bbcf1cef39cae2cdf0d7f Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Thu, 24 Aug 2023 16:40:09 -0400
Subject: [PATCH 7/9] Cleaned up the way that services are checked

---
 ros_acomms/src/message_queue_node.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index 934584df..85c5e969 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -571,7 +571,7 @@ class MessageQueueNode(object):
             dynamic_queue_name = dynamic_queue.dynamic_message_service
 
             # Check to see if the service is available, skip if not
-            if dynamic_queue_name not in services:
+            if rospy.get_namespace() + dynamic_queue_name not in services:
                 rospy.logwarn(f"Dynamic queue {dynamic_queue_name} not available, skipping")
                 continue
 
-- 
GitLab


From d28661e222c8187d5fa31f614ea0d9bf1146910b Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Thu, 24 Aug 2023 16:54:27 -0400
Subject: [PATCH 8/9] Updated to resolve name

---
 ros_acomms/src/message_queue_node.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index 85c5e969..0f0cb201 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -571,7 +571,7 @@ class MessageQueueNode(object):
             dynamic_queue_name = dynamic_queue.dynamic_message_service
 
             # Check to see if the service is available, skip if not
-            if rospy.get_namespace() + dynamic_queue_name not in services:
+            if rospy.names.resolve_name(dynamic_queue_name) not in services:
                 rospy.logwarn(f"Dynamic queue {dynamic_queue_name} not available, skipping")
                 continue
 
-- 
GitLab


From 2d3a0886db978d62c02af04b356f52b147e62f69 Mon Sep 17 00:00:00 2001
From: Brennan <brennanmk2200@gmail.com>
Date: Thu, 24 Aug 2023 18:08:41 -0400
Subject: [PATCH 9/9] Initial commit

---
 ros_acomms/src/message_queue_node.py | 920 +++++++++++++++++----------
 1 file changed, 591 insertions(+), 329 deletions(-)

diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py
index 0f0cb201..650bf85a 100755
--- a/ros_acomms/src/message_queue_node.py
+++ b/ros_acomms/src/message_queue_node.py
@@ -2,27 +2,43 @@
 from __future__ import unicode_literals, annotations
 
 from itertools import groupby
-from typing import Any, Deque, Dict, List, Optional, Tuple, Union
+from typing import Any, Deque, Dict, List, Optional, Tuple
 
 import rospy
 from rospy.msg import AnyMsg
 from std_msgs.msg import Header
 from functools import partial
 from importlib import import_module
-from collections import namedtuple, deque, defaultdict
-from dataclasses import dataclass
+from collections import deque, defaultdict
 from datetime import datetime, timedelta
 import time
-from ros_acomms_msgs.srv import GetNextPacketData, GetNextPacketDataRequest, GetNextPacketDataResponse
-from ros_acomms_msgs.srv import GetNextQueuedMessage, GetNextQueuedMessageRequest, GetNextQueuedMessageResponse
+from ros_acomms_msgs.srv import (
+    GetNextPacketData,
+    GetNextPacketDataRequest,
+    GetNextPacketDataResponse,
+)
+from ros_acomms_msgs.srv import (
+    GetNextQueuedMessage,
+    GetNextQueuedMessageResponse,
+)
 from ros_acomms_msgs.msg import UpdateQueuedMessage
-from ros_acomms_msgs.msg import (EncodedAck, NeighborReachability, QueueEnable,
-                            QueuePriority, QueueStatus, QueueSummary, SummaryMessageCount)
+from ros_acomms_msgs.msg import (
+    EncodedAck,
+    NeighborReachability,
+    QueueEnable,
+    QueuePriority,
+    QueueStatus,
+    QueueSummary,
+    SummaryMessageCount,
+)
 from bitstring import Bits, BitStream, BitArray
-from fragmentation_tracker import (FragmentationTracker, FragmentationStartHeader,
-                                   FragmentationContHeader, FragmentationAckHeader)
+from fragmentation_tracker import (
+    FragmentationTracker,
+    FragmentationStartHeader,
+    FragmentationContHeader,
+    FragmentationAckHeader,
+)
 from threading import Lock
-import sys
 import os
 import signal
 import rosservice
@@ -34,128 +50,70 @@ from rospy_yaml_include.yaml_include import RospyYamlInclude
 
 from version_node import version_node
 
-from codec_config_parser import (QueueParams, get_queue_params, get_message_codecs)
-
-
-@dataclass
-class DynamicQueue:
-    dest_address: int
-    message_codec_id: int  # the ID from the message codec params
-    message_codec: RosMessageCodec
-    dynamic_message_service: str
-    packet_codec: RosPacketCodec = None
-    allow_fragmentation: bool = False
-    dynamic_update_topic: Optional[str] = None
-
-
-@dataclass
-class QueuedMessage:
-    queue: Any  # Should be MessageQueue, but that's not defined yet.
-    message_codec: RosMessageCodec
-    message: AnyMsg
-    dest: int
-    payload_bits: Bits
-    length_bits: int
-    time_added: datetime
-    fragmentation_tracker: Optional[FragmentationTracker]
-    message_id_in_queue: Optional[int] = None
-    transmitted: bool = False
-    sequence_number: Optional[int] = None
-    allow_fragmentation: bool = False
-    fragment_size_bits: int = 8
-    dynamic_queue_name: Optional[str] = None
+from codec_config_parser import QueueParams, get_queue_params, get_message_codecs
 
-    @property
-    def priority(self) -> int:
-        return self.queue.priority
-
-    def get_min_fragment_size_bits(self) -> int:
-        if self.fragmentation_tracker is None:
-            size = (FragmentationStartHeader.length_bits + self.fragment_size_bits)
-        elif self.fragmentation_tracker.start_header_acked:
-            size = (FragmentationContHeader.length_bits + self.fragment_size_bits)
-        else:
-            size = (FragmentationStartHeader.length_bits + self.fragment_size_bits)
-
-        rospy.logdebug("Minimum fragment size: {0}".format(size))
-        return size
-
-    def get_next_payload_smaller_than_bits(self, size_in_bits: int,
-                                           dest_sequence_nums,
-                                           remove_from_queue: bool = False
-                                           ) -> Tuple[Bits, Bits]:
-        fragmented = False
-        inc_seq_num = False
-
-        rospy.logdebug("Getting next bits up to: {0}".format(size_in_bits))
-        rospy.logdebug("Self lengthbits: {0}".format(self.length_bits))
 
-        if self.length_bits <= size_in_bits:
-            message_bits = self.payload_bits
-            message_header = self.queue.message_codec_id
-
-        else:
-            fragmented = True
-            rospy.logdebug("Fragmenting...")
-            # If we only just decided to fragment... generate tracking object
-            if self.fragmentation_tracker is None:
-                sequence_number = dest_sequence_nums[self.dest]
-                dest_sequence_nums[self.dest] += 1
-                rospy.logdebug("Queued Message length bits: {0}".format(self.length_bits))
-                self.sequence_number = sequence_number
-                self.fragmentation_tracker = FragmentationTracker(sequence_number, 0, self.dest,
-                                                                  int(self.time_added.timestamp()), self.length_bits,
-                                                                  self.fragment_size_bits,
-                                                                  payload_bits=self.payload_bits)
-
-            try:
-                rospy.logdebug("FragmentationTracker sent: {0}-{1}  {2}".format(
-                    self.fragmentation_tracker.transferred_blocks[0][0],
-                    self.fragmentation_tracker.transferred_blocks[-1][-1],
-                    len(self.fragmentation_tracker.transferred_blocks)))
-            except:
-                rospy.logdebug("No sent bytes yet")
-
-            try:
-                rospy.logdebug("FragmentationTracker acked: {0}-{1}  {2}".format(
-                    self.fragmentation_tracker.acked_blocks[0][0],
-                    self.fragmentation_tracker.acked_blocks[-1][-1],
-                    len(self.fragmentation_tracker.acked_blocks)))
-            except:
-                rospy.logdebug("No acked bytes yet")
-
-            message_bits = self.fragmentation_tracker.next_fragment_to_transfer(size_in_bits)
-            message_header = self.queue.message_codec_id + 128
-
-        rospy.logdebug("Returning {0} of {1} bits".format(message_bits.length, self.payload_bits.length))
-        if remove_from_queue and not fragmented and isinstance(self.queue, MessageQueue):
-            self.queue.mark_transmitted(self)
-
-        return message_header, message_bits
+class Queue:
+    def __init__(
+        self, dest_address, message_codec_id, message_codec, allow_fragmentation
+    ) -> None:
+        self.dest_address = dest_address
+        self.message_codec_id = message_codec_id
+        self.message_codec = message_codec
+        self.allow_fragmentation = allow_fragmentation
+
+
+class DynamicQueue(Queue):
+    def __init__(
+        self,
+        dest_address: int,
+        message_codec_id: int,
+        message_codec: RosMessageCodec,
+        dynamic_message_service: str,
+        service: rospy.ServiceProxy,
+        publisher: rospy.Publisher,
+        packet_codec: RosPacketCodec = None,
+        allow_fragmentation: bool = False,
+        dynamic_update_topic: Optional[str] = None,
+    ) -> None:
+        super().__init__(
+            dest_address=dest_address,
+            message_codec_id=message_codec_id,
+            message_codec=message_codec,
+            allow_fragmentation=allow_fragmentation,
+        )
 
+        self.dynamic_message_service = dynamic_message_service
+        self.packet_codec = packet_codec
+        self.dynamic_update_topic = dynamic_update_topic
+        self.service = service
+        self.publisher = publisher
+
+
+class StaticQueue(Queue):
+    def __init__(
+        self,
+        master_message_queue: List[QueuedMessage],
+        dest_address: int,
+        message_codec_id: int,  # the ID from the message codec params
+        topic: str,
+        message_codec: RosMessageCodec,
+        priority: int = 1,
+        order: str = "lifo",
+        maxsize: int = 100,
+        allow_fragmentation: bool = False,
+    ) -> None:
+        super().__init__(
+            dest_address=dest_address,
+            message_codec_id=message_codec_id,
+            message_codec=message_codec,
+            allow_fragmentation=allow_fragmentation,
+        )
 
-class MessageQueue(object):
-    def __init__(self,
-                 master_message_queue: List[QueuedMessage],
-                 dest_address: int,
-                 message_codec_id: int,  # the ID from the message codec params
-                 topic: str,
-                 message_codec: RosMessageCodec,
-                 priority: int = 1,
-                 order: str = 'lifo',
-                 maxsize: int = 100,
-                 allow_fragmentation: bool = False,
-                 dynamic_message_service: Optional[str] = None,
-                 ) -> None:
         self.master_message_queue = master_message_queue
-        self.dynamic_message_service = dynamic_message_service
         self.order = order
-        self.dest_address: int = dest_address
         self.priority: int = priority
-        self.message_codec_id = message_codec_id
         self.subscribe_topic: str = topic
-        self.message_codec = message_codec
-        self.allow_fragmentation: bool = allow_fragmentation
 
         self._last_tx_time = datetime.utcnow()
         self._queue: Deque[QueuedMessage] = deque(maxlen=maxsize)
@@ -169,28 +127,31 @@ class MessageQueue(object):
         try:
             encoded_bits, metadata = self.message_codec.encode(message)
         except EncodingFailed as e:
-            rospy.logwarn(f"Failed to encode message: {e.message} (message is {message}, exception is {e})")
+            rospy.logwarn(
+                f"Failed to encode message: {e.message} (message is {message}, exception is {e})"
+            )
             return
 
         rospy.logdebug("Append: {}".format(encoded_bits))
         dest = self.dest_address
         if metadata:
-            if 'dest' in metadata:
-                dest = metadata['dest']
-
-        queued_message = QueuedMessage(message=message,
-                                       message_codec=self.message_codec,
-                                       queue=self,
-                                       dest=dest,
-                                       payload_bits=encoded_bits,
-                                       length_bits=encoded_bits.length,
-                                       time_added=datetime.utcnow(),
-                                       transmitted=False,
-                                       fragmentation_tracker=None,
-                                       allow_fragmentation=self.allow_fragmentation,
-                                       )
-
-        if self.order == 'fifo':
+            if "dest" in metadata:
+                dest = metadata["dest"]
+
+        queued_message = QueuedMessage(
+            message=message,
+            message_codec=self.message_codec,
+            queue=self,
+            dest=dest,
+            payload_bits=encoded_bits,
+            length_bits=encoded_bits.length,
+            time_added=datetime.utcnow(),
+            transmitted=False,
+            fragmentation_tracker=None,
+            allow_fragmentation=self.allow_fragmentation,
+        )
+
+        if self.order == "fifo":
             if len(self._queue) < self._queue.maxlen:
                 # drop this message if the queue is full.
                 self._queue.appendleft(queued_message)
@@ -205,7 +166,9 @@ class MessageQueue(object):
             self._queue.append(queued_message)
             self.master_message_queue.insert(0, queued_message)
 
-        rospy.logdebug_throttle(5, "Added message to queue, {} messages".format(len(self._queue)))
+        rospy.logdebug_throttle(
+            5, "Added message to queue, {} messages".format(len(self._queue))
+        )
 
     def mark_transmitted(self, queued_message):
         # type: (QueuedMessage) -> None
@@ -215,7 +178,9 @@ class MessageQueue(object):
             self._queue.remove(queued_message)
             self.master_message_queue.remove(queued_message)
 
-    def get_next_message(self, remove_from_queue: bool = True) -> Optional[QueuedMessage]:
+    def get_next_message(
+        self, remove_from_queue: bool = True
+    ) -> Optional[QueuedMessage]:
         if len(self._queue) == 0:
             return None
         queued_message = self._queue[-1]
@@ -224,7 +189,9 @@ class MessageQueue(object):
             self._last_tx_time = datetime.utcnow()
         return queued_message
 
-    def get_next_message_smaller_than_bits(self, size_in_bits: int, remove_from_queue: bool = False) -> Optional[QueuedMessage]:
+    def get_next_message_smaller_than_bits(
+        self, size_in_bits: int, remove_from_queue: bool = False
+    ) -> Optional[QueuedMessage]:
         if len(self._queue) == 0:
             return None
         for qm in reversed(self._queue):
@@ -255,13 +222,139 @@ class MessageQueue(object):
             return datetime.utcnow() - self._last_tx_time
 
 
+class QueuedMessage(Queue):
+    def __init__(
+        self,
+        queue: Any,
+        message_codec: RosMessageCodec,
+        message: AnyMsg,
+        dest: int,
+        payload_bits: Bits,
+        length_bits: int,
+        time_added: datetime,
+        fragmentation_tracker: Optional[FragmentationTracker],
+        message_id_in_queue: Optional[int] = None,
+        transmitted: bool = False,
+        sequence_number: Optional[int] = None,
+        allow_fragmentation: bool = False,
+        fragment_size_bits: int = 8,
+        dynamic_queue_name: Optional[str] = None,
+    ) -> None:
+        self.queue = queue
+        self.message_codec = message_codec
+        self.message = message
+        self.dest = dest
+        self.payload_bits = payload_bits
+        self.length_bits = length_bits
+        self.time_added = time_added
+        self.fragmentation_tracker = fragmentation_tracker
+        self.message_id_in_queue = message_id_in_queue
+        self.transmitted = transmitted
+        self.sequence_number = sequence_number
+        self.allow_fragmentation = allow_fragmentation
+        self.fragment_size_bits = fragment_size_bits
+        self.dynamic_queue_name = dynamic_queue_name
+
+    @property
+    def priority(self) -> int:
+        return self.queue.priority
+
+    def get_min_fragment_size_bits(self) -> int:
+        if self.fragmentation_tracker is None:
+            size = FragmentationStartHeader.length_bits + self.fragment_size_bits
+        elif self.fragmentation_tracker.start_header_acked:
+            size = FragmentationContHeader.length_bits + self.fragment_size_bits
+        else:
+            size = FragmentationStartHeader.length_bits + self.fragment_size_bits
+
+        rospy.logdebug("Minimum fragment size: {0}".format(size))
+        return size
+
+    def get_next_payload_smaller_than_bits(
+        self, size_in_bits: int, dest_sequence_nums, remove_from_queue: bool = False
+    ) -> Tuple[Bits, Bits]:
+        fragmented = False
+        inc_seq_num = False
+
+        rospy.logdebug("Getting next bits up to: {0}".format(size_in_bits))
+        rospy.logdebug("Self lengthbits: {0}".format(self.length_bits))
+
+        if self.length_bits <= size_in_bits:
+            message_bits = self.payload_bits
+            message_header = self.queue.message_codec_id
+
+        else:
+            fragmented = True
+            rospy.logdebug("Fragmenting...")
+            # If we only just decided to fragment... generate tracking object
+            if self.fragmentation_tracker is None:
+                sequence_number = dest_sequence_nums[self.dest]
+                dest_sequence_nums[self.dest] += 1
+                rospy.logdebug(
+                    "Queued Message length bits: {0}".format(self.length_bits)
+                )
+                self.sequence_number = sequence_number
+                self.fragmentation_tracker = FragmentationTracker(
+                    sequence_number,
+                    0,
+                    self.dest,
+                    int(self.time_added.timestamp()),
+                    self.length_bits,
+                    self.fragment_size_bits,
+                    payload_bits=self.payload_bits,
+                )
+
+            try:
+                rospy.logdebug(
+                    "FragmentationTracker sent: {0}-{1}  {2}".format(
+                        self.fragmentation_tracker.transferred_blocks[0][0],
+                        self.fragmentation_tracker.transferred_blocks[-1][-1],
+                        len(self.fragmentation_tracker.transferred_blocks),
+                    )
+                )
+            except:
+                rospy.logdebug("No sent bytes yet")
+
+            try:
+                rospy.logdebug(
+                    "FragmentationTracker acked: {0}-{1}  {2}".format(
+                        self.fragmentation_tracker.acked_blocks[0][0],
+                        self.fragmentation_tracker.acked_blocks[-1][-1],
+                        len(self.fragmentation_tracker.acked_blocks),
+                    )
+                )
+            except:
+                rospy.logdebug("No acked bytes yet")
+
+            message_bits = self.fragmentation_tracker.next_fragment_to_transfer(
+                size_in_bits
+            )
+            message_header = self.queue.message_codec_id + 128
+
+        rospy.logdebug(
+            "Returning {0} of {1} bits".format(
+                message_bits.length, self.payload_bits.length
+            )
+        )
+        if (
+            remove_from_queue
+            and not fragmented
+            and isinstance(self.queue, StaticQueue)
+        ):
+            self.queue.mark_transmitted(self)
+
+        return message_header, message_bits
+
+
 class MessageQueueNode(object):
     def __init__(self) -> None:
-        rospy.init_node('message_queue')
-        self.update_rate = rospy.get_param('~update_rate', 1)
-        self.unknown_dests_are_reachable: bool = rospy.get_param('~unknown_dests_are_reachable', True)
-        self.default_dest = rospy.get_param('~default_dest', 121)
-        self.default_priority = rospy.get_param('~default_priority', 10)
+        rospy.init_node("message_queue")
+        self.update_rate = rospy.get_param("~update_rate", 1)
+        self.unknown_dests_are_reachable: bool = rospy.get_param(
+            "~unknown_dests_are_reachable", True
+        )
+        self.default_dest = rospy.get_param("~default_dest", 121)
+        self.default_priority = rospy.get_param("~default_priority", 10)
 
         try:
             version = version_node()
@@ -277,21 +370,32 @@ class MessageQueueNode(object):
         self.setup_queues()
 
         # Status publishers
-        self.queue_status_pub = rospy.Publisher('queue_status', QueueStatus, queue_size=10, latch=True)
+        self.queue_status_pub = rospy.Publisher(
+            "queue_status", QueueStatus, queue_size=10, latch=True
+        )
 
         # Queue priority/enable update Subscriptions
-        self.priority_sub = rospy.Subscriber("queue_priority", QueuePriority, self.handle_queue_priority)
-        self.enable_sub = rospy.Subscriber("queue_enable", QueueEnable, self.handle_queue_enable)
+        self.priority_sub = rospy.Subscriber(
+            "queue_priority", QueuePriority, self.handle_queue_priority
+        )
+        self.enable_sub = rospy.Subscriber(
+            "queue_enable", QueueEnable, self.handle_queue_enable
+        )
 
         # Subscribe to the Neighbor Reachability updates (which may be published by more than one node
-        rospy.Subscriber("neighbor_reachability", NeighborReachability, self.on_neighbor_reachability)
+        rospy.Subscriber(
+            "neighbor_reachability", NeighborReachability, self.on_neighbor_reachability
+        )
 
         # We use a lock to ensure that we handle only one get_next_packet_data call at a time
         self.get_packet_data_lock = Lock()
         self.get_packet_data_lock_fail_count = 0
-        self.current_packet_dynamic_queue_cache: dict[DynamicQueue, GetNextQueuedMessageResponse] = {}
-        self.get_packet_data_service = rospy.Service('get_next_packet_data', GetNextPacketData,
-                                                     self.handle_get_next_packet)
+        self.current_packet_dynamic_queue_cache: dict[
+            DynamicQueue, GetNextQueuedMessageResponse
+        ] = {}
+        self.get_packet_data_service = rospy.Service(
+            "get_next_packet_data", GetNextPacketData, self.handle_get_next_packet
+        )
 
         # Subscribe to ACK message to update queues
         rospy.Subscriber("from_acomms/encoded_ack", EncodedAck, self.on_ack_received)
@@ -314,19 +418,25 @@ class MessageQueueNode(object):
         # this is a little awkward, since it's conceptually backward here.
         # when queuing for TX, we go from topic -> message -> packet
         # However, organizing it this way allows us to use a single config YAML for both TX and RX
-        packet_codec_filename = rospy.get_param('~packet_codec_file', None)
-        codec_dir = rospy.get_param('~codec_directory', None)
+        packet_codec_filename = rospy.get_param("~packet_codec_file", None)
+        codec_dir = rospy.get_param("~codec_directory", None)
         if packet_codec_filename:
             try:
-                with open(packet_codec_filename, 'r') as yaml_file:
+                with open(packet_codec_filename, "r") as yaml_file:
                     constructor = RospyYamlInclude(base_directory=codec_dir)
-                    packet_codec_param = yaml.load(yaml_file, Loader=constructor.add_constructor())
+                    packet_codec_param = yaml.load(
+                        yaml_file, Loader=constructor.add_constructor()
+                    )
             except Exception as e:
-                rospy.logerr(f"Fatal error reading message codec config from {packet_codec_filename} (with codec directory {codec_dir}): {e}")
+                rospy.logerr(
+                    f"Fatal error reading message codec config from {packet_codec_filename} (with codec directory {codec_dir}): {e}"
+                )
         else:
-            packet_codec_param = rospy.get_param('~packet_codecs')
-            rospy.logwarn("Using the packet_codecs parameter is deprecated (and doesn't support features like " +
-                          "imports). Use the packet_codec_file parameter instead.  This parameter will be removed.")
+            packet_codec_param = rospy.get_param("~packet_codecs")
+            rospy.logwarn(
+                "Using the packet_codecs parameter is deprecated (and doesn't support features like "
+                + "imports). Use the packet_codec_file parameter instead.  This parameter will be removed."
+            )
 
         # QUESTION(lindzey): Do we ever have more than one packet codec? I'm
         #   not convinced that the code in this block actually handles that case
@@ -337,27 +447,39 @@ class MessageQueueNode(object):
         self.queue_params: Dict[int, QueueParams] = {}
         for packet_codec_details in packet_codec_param:
             rospy.loginfo("Packet codec details: {}".format(packet_codec_details))
-            packet_codec_class = packet_codecs[packet_codec_details['packet_codec']]  # type: PacketCodec
+            packet_codec_class = packet_codecs[
+                packet_codec_details["packet_codec"]
+            ]  # type: PacketCodec
 
             self.queue_params.update(get_queue_params(packet_codec_details))
             message_codecs = get_message_codecs(packet_codec_details)
 
             # Convert SRC IDs to integers, since yaml dict keys have to be strings
             try:
-                src_names: dict = {int(k):v for k,v in packet_codec_details.get('src_names', {}).items()}
+                src_names: dict = {
+                    int(k): v
+                    for k, v in packet_codec_details.get("src_names", {}).items()
+                }
             except Exception as e:
                 rospy.logerr(f"Error parsing src_names parameter: {e}")
 
             for queue_id, params in self.queue_params.items():
                 # We support three cases here: Publish only, subscribe to a topic, or call a dynamic queue service
                 # If there is no subscribe_topic or dynamic queue, this is a publish-only entry, so we skip it here
-                if params.subscribe_topic is None and params.dynamic_queue_service is None:
-                    rospy.logdebug(f"ID {queue_id} has no subscribe topic or dynamic queue, skipping it.")
+                if (
+                    params.subscribe_topic is None
+                    and params.dynamic_queue_service is None
+                ):
+                    rospy.logdebug(
+                        f"ID {queue_id} has no subscribe topic or dynamic queue, skipping it."
+                    )
                     continue
                 # If both the subscribe_topic and dynamic_queue are specified, that's bad.
                 # Throw an error (but don't crash for now)
                 if params.subscribe_topic and params.dynamic_queue_service:
-                    rospy.logerr(f"ID {queue_id} has both a subscribe_topic and dynamic_queue specified.  You can't have both.")
+                    rospy.logerr(
+                        f"ID {queue_id} has both a subscribe_topic and dynamic_queue specified.  You can't have both."
+                    )
                     continue
 
                 # If no destination is specified on this queue, fill in our global default
@@ -368,65 +490,92 @@ class MessageQueueNode(object):
                 if params.priority is None:
                     params.priority = self.default_priority
 
-                self.destination_reachable[params.destination] = self.unknown_dests_are_reachable
+                self.destination_reachable[
+                    params.destination
+                ] = self.unknown_dests_are_reachable
 
                 # Now, the "normal" case: the subscribe_topic is specified, so we want to set up a new queue for it.
                 if params.subscribe_topic:
                     topic_name = params.subscribe_topic
-                    if '{src}' in topic_name or '{name}' in topic_name:
+                    if "{src}" in topic_name or "{name}" in topic_name:
                         # We are going to attach multiple subscribers to this queue, one for each entry in the
                         # src_names dict
                         for src, name in src_names.items():
                             complete_topic_name = topic_name.format(src=src, name=name)
-                            self.init_static_queue(topic_name=complete_topic_name, queue_id=queue_id,
-                                                   params=params, message_codecs=message_codecs)
+                            self.init_static_queue(
+                                topic_name=complete_topic_name,
+                                queue_id=queue_id,
+                                params=params,
+                                message_codecs=message_codecs,
+                            )
                     else:
-                        self.init_static_queue(topic_name=topic_name, queue_id=queue_id,
-                                               params=params, message_codecs=message_codecs)
+                        self.init_static_queue(
+                            topic_name=topic_name,
+                            queue_id=queue_id,
+                            params=params,
+                            message_codecs=message_codecs,
+                        )
                 elif params.dynamic_queue_service:
-                    service_name = rospy.names.canonicalize_name(params.dynamic_queue_service)
-                    update_topic_name = rospy.names.canonicalize_name(params.dynamic_update_topic)
-                    rospy.loginfo(f"Initialized dynamic queue ID {queue_id} for {service_name}")
+                    service_name = rospy.names.canonicalize_name(
+                        params.dynamic_queue_service
+                    )
+                    update_topic_name = rospy.names.canonicalize_name(
+                        params.dynamic_update_topic
+                    )
+                    rospy.loginfo(
+                        f"Initialized dynamic queue ID {queue_id} for {service_name}"
+                    )
                     try:
-                        self.dynamic_queues[service_name] = {
-                                                                        "object": 
-                                                                            DynamicQueue(message_codec_id=queue_id,
-                                                                            dynamic_message_service=service_name,
-                                                                            dynamic_update_topic=update_topic_name,
-                                                                            dest_address=params.destination,
-                                                                            message_codec=message_codecs[params.codec_id], 
-                                                                            ), 
-                                                                        "service": rospy.ServiceProxy(service_name, GetNextQueuedMessage), 
-                                                                        "publisher": rospy.Publisher(update_topic_name, UpdateQueuedMessage, queue_size=10), 
-                                                                    }
+                        self.dynamic_queues[service_name] = DynamicQueue(
+                            message_codec_id=queue_id,
+                            dynamic_message_service=service_name,
+                            dynamic_update_topic=update_topic_name,
+                            dest_address=params.destination,
+                            message_codec=message_codecs[params.codec_id],
+                            service=rospy.ServiceProxy(
+                                service_name, GetNextQueuedMessage
+                            ),
+                            publisher=rospy.Publisher(
+                                update_topic_name, UpdateQueuedMessage, queue_size=10
+                            ),
+                        )
 
                     except Exception as e:
-                        rospy.logerr(f"Error setting up dynamic queue for {service_name}: {e}")
+                        rospy.logerr(
+                            f"Error setting up dynamic queue for {service_name}: {e}"
+                        )
 
             # QUESTION(lindzey): It looks like the packet codec is never used?
             #   => AAAAH. This is called for its side effects. Maybe rename to make more clear?
             # The packet codec initializer sets the packet codec on each message codec
-            packet_codec = packet_codec_class(message_codecs=message_codecs,
-                                              miniframe_header=packet_codec_details.get('miniframe_header', bytes()),
-                                              dataframe_header=packet_codec_details.get('dataframe_header', bytes()))
-            rospy.loginfo("Added packet codec with {} message codecs".format(len(message_codecs)))
-
-    def init_static_queue(self, topic_name: str, queue_id: int, params: QueueParams, message_codecs) -> None:
-        new_queue = MessageQueue(master_message_queue=self._queued_messages,
-                                 dest_address=params.destination,
-                                 priority=params.priority,
-                                 message_codec_id=queue_id,
-                                 topic=topic_name,
-                                 order=params.queue_order,
-                                 maxsize=params.queue_maxsize,
-                                 allow_fragmentation=params.allow_fragmentation,
-                                 # QUESTION: Why does the queue own the codec?
-                                 message_codec=message_codecs[queue_id],
-                                 )
+            packet_codec = packet_codec_class(
+                message_codecs=message_codecs,
+                miniframe_header=packet_codec_details.get("miniframe_header", bytes()),
+                dataframe_header=packet_codec_details.get("dataframe_header", bytes()),
+            )
+            rospy.loginfo(
+                "Added packet codec with {} message codecs".format(len(message_codecs))
+            )
+
+    def init_static_queue(
+        self, topic_name: str, queue_id: int, params: QueueParams, message_codecs
+    ) -> None:
+        new_queue = StaticQueue(
+            master_message_queue=self._queued_messages,
+            dest_address=params.destination,
+            priority=params.priority,
+            message_codec_id=queue_id,
+            topic=topic_name,
+            order=params.queue_order,
+            maxsize=params.queue_maxsize,
+            allow_fragmentation=params.allow_fragmentation,
+            # QUESTION: Why does the queue own the codec?
+            message_codec=message_codecs[queue_id],
+        )
         self.topic_queue[topic_name] = new_queue
-        rospy.Subscriber(topic_name,
-                         AnyMsg,
-                         partial(self.on_incoming_message, topic_name=topic_name))
+        rospy.Subscriber(
+            topic_name, AnyMsg, partial(self.on_incoming_message, topic_name=topic_name)
+        )
 
         if params.is_active:
             self.active_queue_names.add(topic_name)
@@ -438,7 +587,9 @@ class MessageQueueNode(object):
         queued_dest_addresses = set()
         message_counts = {}
 
-        for key, group in groupby(self._queued_messages, lambda x: (x.priority, x.dest)):
+        for key, group in groupby(
+            self._queued_messages, lambda x: (x.priority, x.dest)
+        ):
             group_count = sum(1 for _ in group)
             message_counts[key] = group_count
             highest_priority = max(highest_priority, key[0])
@@ -449,8 +600,10 @@ class MessageQueueNode(object):
         # Build the QueueStatus message
         hdr = Header(stamp=rospy.get_rostime())
         summary_message_counts = []
-        for k,v in message_counts.items():
-            summary = SummaryMessageCount(priority=k[0], dest_address=k[1], message_count=v)
+        for k, v in message_counts.items():
+            summary = SummaryMessageCount(
+                priority=k[0], dest_address=k[1], message_count=v
+            )
             summary_message_counts.append(summary)
         queue_summaries = []
         for topic, queue in self.topic_queue.items():
@@ -459,29 +612,33 @@ class MessageQueueNode(object):
             except IndexError:
                 next_message_size_bits = 0
             total_bits_in_queue = sum(q.length_bits for q in queue._queue)
-            queue_summary = QueueSummary(queue_id=queue.message_codec_id,
-                                         topic=topic,
-                                         priority=queue.priority,
-                                         enabled=(topic in self.active_queue_names),
-                                         message_count=len(queue._queue),
-                                         next_message_size_bits=next_message_size_bits,
-                                         total_bits_in_queue=total_bits_in_queue)
+            queue_summary = QueueSummary(
+                queue_id=queue.message_codec_id,
+                topic=topic,
+                priority=queue.priority,
+                enabled=(topic in self.active_queue_names),
+                message_count=len(queue._queue),
+                next_message_size_bits=next_message_size_bits,
+                total_bits_in_queue=total_bits_in_queue,
+            )
             queue_summaries.append(queue_summary)
 
-        msg = QueueStatus(header=hdr,
-                          total_message_count=total_message_count,
-                          highest_priority=highest_priority,
-                          queued_dest_addresses=queued_dest_addresses,
-                          summary_message_counts=summary_message_counts,
-                          queue_summaries=queue_summaries)
+        msg = QueueStatus(
+            header=hdr,
+            total_message_count=total_message_count,
+            highest_priority=highest_priority,
+            queued_dest_addresses=queued_dest_addresses,
+            summary_message_counts=summary_message_counts,
+            queue_summaries=queue_summaries,
+        )
         self.queue_status_pub.publish(msg)
 
     def on_incoming_message(self, msg_data, topic_name):
         # type: (AnyMsg, str) -> None
-        connection_header = msg_data._connection_header['type'].split('/')
-        ros_pkg = connection_header[0] + '.msg'
+        connection_header = msg_data._connection_header["type"].split("/")
+        ros_pkg = connection_header[0] + ".msg"
         msg_type = connection_header[1]
-        #print 'Message type detected as ' + msg_type
+        # print 'Message type detected as ' + msg_type
         msg_class = getattr(import_module(ros_pkg), msg_type)
         msg = msg_class()
         msg.deserialize(msg_data._buff)
@@ -490,138 +647,196 @@ class MessageQueueNode(object):
 
         # Check if incoming msg topic name matches any of the active queue topic names
         if topic_name in self.active_queue_names:
-            rospy.logdebug("TOPIC: {} in active queue list, appending msg".format(topic_name))
+            rospy.logdebug(
+                "TOPIC: {} in active queue list, appending msg".format(topic_name)
+            )
             self.topic_queue[topic_name].append(msg)
             # NOTE(lindzey): Don't publish status messages here, because it
             #    will lead to maxing out CPU thanks to the subsea queue_node
             #    subscribing to its own status messages.
         else:
-            rospy.logdebug("TOPIC: {} not in active queue list, skipping msg".format(topic_name))
+            rospy.logdebug(
+                "TOPIC: {} not in active queue list, skipping msg".format(topic_name)
+            )
 
     def on_neighbor_reachability(self, msg: NeighborReachability) -> None:
         self.destination_reachable[msg.dest] = msg.reachable
 
     def on_ack_received(self, msg: EncodedAck) -> None:
         ack = FragmentationAckHeader.from_bits(BitStream(bytes=msg.encoded_ack))
-        rospy.logdebug("Received ACK with src: {0} seq num: {1}". format(msg.src, ack.sequence_num))
+        rospy.logdebug(
+            "Received ACK with src: {0} seq num: {1}".format(msg.src, ack.sequence_num)
+        )
         rospy.logdebug("Ack blocks {}-{}".format(ack.blocks[0][0], ack.blocks[-1][1]))
         rospy.logdebug("ACK N-Blocks: {}".format(ack.n_blocks))
 
         for message in self._queued_messages:
-            rospy.logdebug("Message dest: {0} Sequence number: {1}".format(message.dest, message.sequence_number))
+            rospy.logdebug(
+                "Message dest: {0} Sequence number: {1}".format(
+                    message.dest, message.sequence_number
+                )
+            )
             if message.dest == msg.src and message.sequence_number == ack.sequence_num:
-                if len(ack.blocks) == 1 and ack.blocks[0][0] == 0 and ack.blocks[-1][-1] == 1 and ack.ack_flag == 0:
+                if (
+                    len(ack.blocks) == 1
+                    and ack.blocks[0][0] == 0
+                    and ack.blocks[-1][-1] == 1
+                    and ack.ack_flag == 0
+                ):
                     rospy.logwarn(
-                        "Empty ACK received indicated the message recipient does not have start fragment for this message")
+                        "Empty ACK received indicated the message recipient does not have start fragment for this message"
+                    )
                     message.fragmentation_tracker.reset()
                     return
 
                 rospy.logdebug("Recording blocks ACKED")
-                message.fragmentation_tracker.record_blocks_acked(ack.blocks, ack.ack_flag)
-                rospy.logdebug("Payload size={0}".format(message.fragmentation_tracker.payload_size_blocks))
+                message.fragmentation_tracker.record_blocks_acked(
+                    ack.blocks, ack.ack_flag
+                )
+                rospy.logdebug(
+                    "Payload size={0}".format(
+                        message.fragmentation_tracker.payload_size_blocks
+                    )
+                )
 
                 if message.fragmentation_tracker.acked():
                     # Remove this message from the queue
                     rospy.logdebug(
-                        "Message seq_num: {0} to dest: {1} acked. Removing from queue.".format(message.sequence_number,
-                                                                                               message.dest))
+                        "Message seq_num: {0} to dest: {1} acked. Removing from queue.".format(
+                            message.sequence_number, message.dest
+                        )
+                    )
                     message.queue._queue.remove(message)
                     self._queued_messages.remove(message)
 
     def mark_transmitted(self, msg: QueuedMessage) -> None:
         # For now, we don't do this for locally queued messages (since we don't track it properly)
-        #msg.queue.mark_transmitted(msg)
+        # msg.queue.mark_transmitted(msg)
 
         # ... but we do track it for dynamic queues
         if isinstance(msg.queue, DynamicQueue):
             if msg.queue.dynamic_message_service in self.dynamic_queues.keys():
-                update_msg = UpdateQueuedMessage(header=Header(stamp=rospy.Time.now()),
-                                                 message_id=msg.message_id_in_queue,
-                                                 event=UpdateQueuedMessage.EVENT_LINK_TRANSMITTED)
-                self.dynamic_queues[msg.dynamic_queue_name]["publisher"].publish(update_msg)
-                rospy.logdebug(f"Marked message as transmitted: {msg.message_id_in_queue} on {msg.queue.dynamic_message_service}")
-
-    def get_message_from_queue_response(self, dynamic_queue, queue_response: GetNextQueuedMessageResponse) -> QueuedMessage:
-        message_bits = BitArray(bytes=queue_response.data, length=queue_response.data_size_in_bits)
-
-        next_message = QueuedMessage(queue=self.dynamic_queues[dynamic_queue]["object"],
-                                     message=None,
-                                     message_codec=self.dynamic_queues[dynamic_queue]["object"].message_codec,
-                                     dest=120,
-                                     time_added=datetime.utcnow(),
-                                     fragmentation_tracker=None,
-                                     allow_fragmentation=False,
-                                     dynamic_queue_name=self.dynamic_queues[dynamic_queue]["object"].dynamic_message_service,
-                                     message_id_in_queue=queue_response.message_id,
-                                     payload_bits=message_bits,
-                                     length_bits=len(message_bits))
+                update_msg = UpdateQueuedMessage(
+                    header=Header(stamp=rospy.Time.now()),
+                    message_id=msg.message_id_in_queue,
+                    event=UpdateQueuedMessage.EVENT_LINK_TRANSMITTED,
+                )
+                self.dynamic_queues[msg.dynamic_queue_name].publisher.publish(
+                    update_msg
+                )
+                rospy.logdebug(
+                    f"Marked message as transmitted: {msg.message_id_in_queue} on {msg.queue.dynamic_message_service}"
+                )
+
+    def get_message_from_queue_response(
+        self, dynamic_queue, queue_response: GetNextQueuedMessageResponse
+    ) -> QueuedMessage:
+        message_bits = BitArray(
+            bytes=queue_response.data, length=queue_response.data_size_in_bits
+        )
+
+        next_message = QueuedMessage(
+            queue=self.dynamic_queues[dynamic_queue],
+            message=None,
+            message_codec=self.dynamic_queues[dynamic_queue].message_codec,
+            dest=120,
+            time_added=datetime.utcnow(),
+            fragmentation_tracker=None,
+            allow_fragmentation=False,
+            dynamic_queue_name=self.dynamic_queues[dynamic_queue].dynamic_message_service,
+            message_id_in_queue=queue_response.message_id,
+            payload_bits=message_bits,
+            length_bits=len(message_bits),
+        )
         return next_message
 
-    def query_dynamic_queues(self, max_size_in_bits: int = 131072,
-                             dest_address: Optional[int] = None,
-                             packet_codec: Optional[str] = None,
-                             check_reachability: bool = True,
-                             exclude_messages: Optional[List[QueuedMessage]] = []) -> Optional[
-                                    tuple[str, int]]:
-
+    def query_dynamic_queues(
+        self,
+        max_size_in_bits: int = 131072,
+        dest_address: Optional[int] = None,
+        packet_codec: Optional[str] = None,
+        check_reachability: bool = True,
+        exclude_messages: Optional[List[QueuedMessage]] = [],
+    ) -> Optional[tuple[str, int]]:
         start_time = time.time()  # for profiling
         services = rosservice.get_service_list()
 
-        for dynamic_queue in [dynamic_queues["object"] for dynamic_queues in self.dynamic_queues.values()]:
+        for dynamic_queue in [
+            dynamic_queues for dynamic_queues in self.dynamic_queues.values()
+        ]:
             dynamic_queue_name = dynamic_queue.dynamic_message_service
 
             # Check to see if the service is available, skip if not
             if rospy.names.resolve_name(dynamic_queue_name) not in services:
-                rospy.logwarn(f"Dynamic queue {dynamic_queue_name} not available, skipping")
+                rospy.logwarn(
+                    f"Dynamic queue {dynamic_queue_name} not available, skipping"
+                )
                 continue
 
             # Check to see if we already have a cached response from the last time we did this.
-            cached_response = self.current_packet_dynamic_queue_cache.get(dynamic_queue_name)
+            cached_response = self.current_packet_dynamic_queue_cache.get(
+                dynamic_queue_name
+            )
             if not cached_response:
                 # Figure out which messages to exclude (because they are already queued)
-                this_queue_exclusion_ids = [m.message_id_in_queue for m in exclude_messages if
-                                            m.dynamic_queue_name == dynamic_queue_name]
+                this_queue_exclusion_ids = [
+                    m.message_id_in_queue
+                    for m in exclude_messages
+                    if m.dynamic_queue_name == dynamic_queue_name
+                ]
 
                 # Get a reference to the service
-                dynamic_get_next_queued_message = self.dynamic_queues[dynamic_queue_name]["service"]
+                dynamic_get_next_queued_message = self.dynamic_queues[
+                    dynamic_queue_name
+                ].service
 
                 try:
-                    queue_response = dynamic_get_next_queued_message(max_size_in_bits=max_size_in_bits,
-                                                                     dest_address=dest_address,
-                                                                     packet_codec=(
-                                                                         str(packet_codec) if packet_codec else ''),
-                                                                     exclude_message_ids=this_queue_exclusion_ids)
+                    queue_response = dynamic_get_next_queued_message(
+                        max_size_in_bits=max_size_in_bits,
+                        dest_address=dest_address,
+                        packet_codec=(str(packet_codec) if packet_codec else ""),
+                        exclude_message_ids=this_queue_exclusion_ids,
+                    )
                 except rospy.service.ServiceException:
                     rospy.logerr(f"Failed to call {dynamic_queue_name}.")
 
                     queue_response = GetNextQueuedMessageResponse(has_message=False)
 
                 # Cache the response
-                self.current_packet_dynamic_queue_cache[dynamic_queue_name] = queue_response
+                self.current_packet_dynamic_queue_cache[
+                    dynamic_queue_name
+                ] = queue_response
 
         rospy.logdebug(f"Dynamic queue query took {time.time()-start_time} s")
 
         # Now, figure out what the highest priority dynamic message is, if any
-        priorities = {queue: response.priority for queue, response in
-                      self.current_packet_dynamic_queue_cache.items()
-                      if response.has_message}
+        priorities = {
+            queue: response.priority
+            for queue, response in self.current_packet_dynamic_queue_cache.items()
+            if response.has_message
+        }
 
         if len(priorities) > 0:
-            sorted_priorities = sorted(priorities.items(), key=lambda x:x[1])
+            sorted_priorities = sorted(priorities.items(), key=lambda x: x[1])
             highest_priority_queue = sorted_priorities[-1][0]
             highest_priority = sorted_priorities[-1][1]
             return highest_priority_queue, highest_priority
         # No messages in dynamic queues
         return None, None
 
-    def query_static_queues(self, max_size_in_bits: int = 131072,
-                         dest_address: Optional[int] = None,
-                         packet_codec: Optional[str] = None,
-                         check_reachability: bool = True,
-                         exclude_messages: Optional[List[QueuedMessage]] = []) -> Optional[QueuedMessage]:
+    def query_static_queues(
+        self,
+        max_size_in_bits: int = 131072,
+        dest_address: Optional[int] = None,
+        packet_codec: Optional[str] = None,
+        check_reachability: bool = True,
+        exclude_messages: Optional[List[QueuedMessage]] = [],
+    ) -> Optional[QueuedMessage]:
         next_message = None
 
-        for message in sorted(self._queued_messages, key=lambda m: m.priority, reverse=True):
+        for message in sorted(
+            self._queued_messages, key=lambda m: m.priority, reverse=True
+        ):
             if message.queue.subscribe_topic not in self.active_queue_names:
                 rospy.logdebug("Disqualified: inactive queue")
                 continue
@@ -636,13 +851,22 @@ class MessageQueueNode(object):
                 # Move on to check the next message
                 rospy.logdebug("Disqualified: packet_codec")
                 continue
-            if not message.allow_fragmentation and message.length_bits > max_size_in_bits:
+            if (
+                not message.allow_fragmentation
+                and message.length_bits > max_size_in_bits
+            ):
                 rospy.logdebug("Disqualified: too large, no fragmentation")
                 continue
-            if message.allow_fragmentation and message.get_min_fragment_size_bits() > max_size_in_bits:
+            if (
+                message.allow_fragmentation
+                and message.get_min_fragment_size_bits() > max_size_in_bits
+            ):
                 rospy.logdebug("Disqualified: fragments too large")
                 continue
-            if message.allow_fragmentation and message.get_min_fragment_size_bits() == 0:
+            if (
+                message.allow_fragmentation
+                and message.get_min_fragment_size_bits() == 0
+            ):
                 rospy.logdebug("Disqualified: no fragments to send at this time")
                 continue
             # if we aren't disqualified by now, this is our message.
@@ -651,33 +875,41 @@ class MessageQueueNode(object):
 
         return next_message
 
-    def get_next_message(self, max_size_in_bits: int = 131072,
-                         dest_address: Optional[int] = None,
-                         packet_codec: Optional[str] = None,
-                         check_reachability: bool = True,
-                         exclude_messages: Optional[List[QueuedMessage]] = []) -> Optional[QueuedMessage]:
-
+    def get_next_message(
+        self,
+        max_size_in_bits: int = 131072,
+        dest_address: Optional[int] = None,
+        packet_codec: Optional[str] = None,
+        check_reachability: bool = True,
+        exclude_messages: Optional[List[QueuedMessage]] = [],
+    ) -> Optional[QueuedMessage]:
         # Query registered dynamic queues
         # This will update the cache (self.current_packet_dynamic_queue_cache) and return the highest priority queue.
         # If one of these messages has the highest priority (below), we get the cached GetNextQueuedMessageResponse and
         #  use it to make the QueuedMessage.
         try:
-            highest_priority_dynamic_queue, highest_dynamic_priority = self.query_dynamic_queues(
+            (
+                highest_priority_dynamic_queue,
+                highest_dynamic_priority,
+            ) = self.query_dynamic_queues(
                 max_size_in_bits=max_size_in_bits,
                 dest_address=dest_address,
                 packet_codec=packet_codec,
                 check_reachability=check_reachability,
-                exclude_messages=exclude_messages)
+                exclude_messages=exclude_messages,
+            )
         except Exception as e:
             rospy.logerr(f"get_next_message Error querying dynamic queues: {e}")
             highest_priority_dynamic_queue, highest_dynamic_priority = None, None
 
         # Query the static queues
-        highest_priority_static_message = self.query_static_queues(max_size_in_bits=max_size_in_bits,
-                                                                   dest_address=dest_address,
-                                                                   packet_codec=packet_codec,
-                                                                   check_reachability=check_reachability,
-                                                                   exclude_messages=exclude_messages)
+        highest_priority_static_message = self.query_static_queues(
+            max_size_in_bits=max_size_in_bits,
+            dest_address=dest_address,
+            packet_codec=packet_codec,
+            check_reachability=check_reachability,
+            exclude_messages=exclude_messages,
+        )
         if not highest_priority_static_message and not highest_dynamic_priority:
             # No message
             return None
@@ -687,10 +919,16 @@ class MessageQueueNode(object):
         elif highest_dynamic_priority and not highest_priority_static_message:
             # Dynamic queues have a message, but static don't
             # Clear the entry from the cache
-            message = self.get_message_from_queue_response(highest_priority_dynamic_queue, self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue])
+            message = self.get_message_from_queue_response(
+                highest_priority_dynamic_queue,
+                self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue],
+            )
             del self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue]
         elif highest_dynamic_priority > highest_priority_static_message.priority:
-            message = self.get_message_from_queue_response(highest_priority_dynamic_queue, self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue])
+            message = self.get_message_from_queue_response(
+                highest_priority_dynamic_queue,
+                self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue],
+            )
 
             del self.current_packet_dynamic_queue_cache[highest_priority_dynamic_queue]
         else:
@@ -718,24 +956,35 @@ class MessageQueueNode(object):
             try:
                 self.active_queue_names.remove(sub_topic)
             except KeyError:
-                rospy.loginfo("Cannot disable currently-inactive queue ID {} ({})"
-                              .format(msg.queue_id, sub_topic))
-
-    def handle_get_next_packet(self, req: GetNextPacketDataRequest) -> GetNextPacketDataResponse:
+                rospy.loginfo(
+                    "Cannot disable currently-inactive queue ID {} ({})".format(
+                        msg.queue_id, sub_topic
+                    )
+                )
+
+    def handle_get_next_packet(
+        self, req: GetNextPacketDataRequest
+    ) -> GetNextPacketDataResponse:
         # Things get squirrely if there is more than one request at the same time... our behavior isn't defined.
         # If there is still a request in progress when we get another one, we just return nothing on the later request
         has_lock = self.get_packet_data_lock.acquire(blocking=False)
         if not has_lock:
             self.get_packet_data_lock_fail_count += 1
-            rospy.logwarn(f"Getting next packet data failed because we are still servicing an earlier request " +
-                          f"({self.get_packet_data_lock_fail_count} times)")
+            rospy.logwarn(
+                f"Getting next packet data failed because we are still servicing an earlier request "
+                + f"({self.get_packet_data_lock_fail_count} times)"
+            )
             if self.get_packet_data_lock_fail_count > 10:
                 # This is a brutish way to try to avoid an error condition where we get "stuck" servicing one request
-                rospy.logerr("Getting next packet data failed 10 times.  Killing the process so (hopefully) it will work on restart.")
+                rospy.logerr(
+                    "Getting next packet data failed 10 times.  Killing the process so (hopefully) it will work on restart."
+                )
                 # sys.exit doesn't work when called from a thread.  This sends a keyboard interrupt
                 # to the process and does work.
                 os.kill(os.getpid(), signal.SIGINT)
-            return GetNextPacketDataResponse(num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0)
+            return GetNextPacketDataResponse(
+                num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0
+            )
         self.get_packet_data_lock_fail_count = 0
 
         try:
@@ -743,7 +992,10 @@ class MessageQueueNode(object):
             # been serviced
             self.current_packet_dynamic_queue_cache = {}
 
-            rospy.logdebug("Getting next packet data for transmission: " + str(req).replace('\n', ', '))
+            rospy.logdebug(
+                "Getting next packet data for transmission: "
+                + str(req).replace("\n", ", ")
+            )
             if req.match_dest:
                 requested_dest = req.dest
             else:
@@ -754,17 +1006,25 @@ class MessageQueueNode(object):
             # To get the next packet, the first thing we do is get the highest priority message to transmit
             # Then, we get a packet using its packet codec.
             # This is also where we should eventually evaluate which rate to use
-            highest_priority_message = self.get_next_message(dest_address=requested_dest)
+            highest_priority_message = self.get_next_message(
+                dest_address=requested_dest
+            )
             rospy.logdebug(f"Highest priority message: {highest_priority_message}")
             if not highest_priority_message:
                 self.get_packet_data_lock.release()
-                return GetNextPacketDataResponse(num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0)
+                return GetNextPacketDataResponse(
+                    num_miniframe_bytes=0, num_dataframe_bytes=0, num_messages=0
+                )
 
             # TODO: check for a race condition where the dest of the top message in the queue changes
             packet_codec = highest_priority_message.message_codec.packet_codec
-            messages_in_packet, miniframe_bytes, dataframe_bytes = packet_codec.encode_payload(num_miniframe_bytes,
-                                                                                               num_dataframe_bytes,
-                                                                                               self)
+            (
+                messages_in_packet,
+                miniframe_bytes,
+                dataframe_bytes,
+            ) = packet_codec.encode_payload(
+                num_miniframe_bytes, num_dataframe_bytes, self
+            )
             for msg in messages_in_packet:
                 self.mark_transmitted(msg)
         except Exception as e:
@@ -775,13 +1035,15 @@ class MessageQueueNode(object):
         else:
             self.get_packet_data_lock.release()
 
-            return GetNextPacketDataResponse(dest=highest_priority_message.dest,
-                                             miniframe_bytes=miniframe_bytes,
-                                             dataframe_bytes=dataframe_bytes,
-                                             queued_message_ids=[],
-                                             num_miniframe_bytes=len(miniframe_bytes),
-                                             num_dataframe_bytes=len(dataframe_bytes),
-                                             num_messages=len(messages_in_packet))
+            return GetNextPacketDataResponse(
+                dest=highest_priority_message.dest,
+                miniframe_bytes=miniframe_bytes,
+                dataframe_bytes=dataframe_bytes,
+                queued_message_ids=[],
+                num_miniframe_bytes=len(miniframe_bytes),
+                num_dataframe_bytes=len(dataframe_bytes),
+                num_messages=len(messages_in_packet),
+            )
 
     def spin(self) -> None:
         # Issue status periodically.
@@ -791,7 +1053,7 @@ class MessageQueueNode(object):
             rate.sleep()
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     try:
         node = MessageQueueNode()
         rospy.loginfo("Message Queue shutdown")
-- 
GitLab