diff --git a/ros_acomms/src/message_queue_node.py b/ros_acomms/src/message_queue_node.py index b18f79d0c138c4d607a1e694361d88ccefa55786..9ffe5908dc955e4a73e3790aaf6cbf1358eb3235 100755 --- a/ros_acomms/src/message_queue_node.py +++ b/ros_acomms/src/message_queue_node.py @@ -398,7 +398,8 @@ class MessageQueueNode(object): message_codec=message_codecs[params.codec_id], ) self.dynamic_get_next_message_services[service_name] = rospy.ServiceProxy(service_name, - GetNextQueuedMessage) + GetNextQueuedMessage, + persistent=False) self.dynamic_queue_update_publishers[service_name] = rospy.Publisher( update_topic_name, UpdateQueuedMessage, queue_size=10) @@ -571,28 +572,46 @@ class MessageQueueNode(object): dynamic_queue_name = dynamic_queue.dynamic_message_service # Check to see if we already have a cached response from the last time we did this. cached_response = self.current_packet_dynamic_queue_cache.get(dynamic_queue_name) - if not cached_response: - # Figure out which messages to exclude (because they are already queued) - this_queue_exclusion_ids = [m.message_id_in_queue for m in exclude_messages if - m.dynamic_queue_name == dynamic_queue_name] - - # Get a reference to the service - dynamic_get_next_queued_message = self.dynamic_get_next_message_services[dynamic_queue_name] - - try: - queue_response = dynamic_get_next_queued_message(max_size_in_bits=max_size_in_bits, - dest_address=dest_address, - packet_codec=( - str(packet_codec) if packet_codec else ''), - exclude_message_ids=this_queue_exclusion_ids) - except rospy.service.ServiceException as e: - rospy.logerr_throttle(1, f"Error calling get_next_message service on {dynamic_queue_name}: {e}") - # if we have an error, don't try again for this packet (pretend it told us that there are - # no messages - queue_response = GetNextQueuedMessageResponse(has_message=False) - - # Cache the response - self.current_packet_dynamic_queue_cache[dynamic_queue_name] = queue_response + if cached_response: + # Check to see if the cached response matches our current query + # There are some conditions that aren't covered here... we assume that the max_size_in_bits only gets + # smaller over the course of a packet, and that we only ever exclude more messages, for example. + if not cached_response.has_message: + # don't query queues with no messages more than once, go to the next queue + continue + # Now, check parameters we might care about + if cached_response.data_size_in_bits < max_size_in_bits and (cached_response.message_id not in exclude_messages): + if dest_address: + if cached_response.dest_address == dest_address: + continue + else: + pass + else: + continue + # TODO: This doesn't correctly check that the cached packet codec value matches + + # Figure out which messages to exclude (because they are already queued) + this_queue_exclusion_ids = [m.message_id_in_queue for m in exclude_messages if + m.dynamic_queue_name == dynamic_queue_name] + + # Get a reference to the service + dynamic_get_next_queued_message = self.dynamic_get_next_message_services[dynamic_queue_name] + + try: + queue_response = dynamic_get_next_queued_message(max_size_in_bits=max_size_in_bits, + dest_address=dest_address, + packet_codec=( + str(packet_codec) if packet_codec else ''), + exclude_message_ids=this_queue_exclusion_ids) + rospy.logdebug(f"Queried {dynamic_queue_name} for {max_size_in_bits} bits, got: {queue_response}") + except rospy.service.ServiceException as e: + rospy.logerr_throttle(1, f"Error calling get_next_message service on {dynamic_queue_name}: {e}") + # if we have an error, don't try again for this packet (pretend it told us that there are + # no messages + queue_response = GetNextQueuedMessageResponse(has_message=False) + + # Cache the response + self.current_packet_dynamic_queue_cache[dynamic_queue_name] = queue_response rospy.logdebug(f"Dynamic queue query took {time.time()-start_time} s")