From 1d7a6faa5475eb2027276ff123ca76fb72491292 Mon Sep 17 00:00:00 2001 From: WingCode Date: Fri, 17 Jun 2022 19:19:10 +0530 Subject: [PATCH 1/4] Add initial version of multiple queues per host. --- qunetsim/components/network.py | 176 +++++++++++++++++---------------- 1 file changed, 92 insertions(+), 84 deletions(-) diff --git a/qunetsim/components/network.py b/qunetsim/components/network.py index 0a028a6..81adbeb 100644 --- a/qunetsim/components/network.py +++ b/qunetsim/components/network.py @@ -38,7 +38,7 @@ def __init__(self): self._quantum_routing_algo = nx.shortest_path self._classical_routing_algo = nx.shortest_path self._use_hop_by_hop = True - self._packet_queue = Queue() + self._packet_queues = {} self._stop_thread = False self._use_ent_swap = False self._queue_processor_thread = None @@ -178,6 +178,12 @@ def packet_drop_rate(self, drop_rate): def arp(self): return self.ARP + def add_queue(self, host_id): + self._packet_queues[host_id] = Queue() + + def remove_queue(self, host_id): + del self._packet_queues[host_id] + @property def num_hosts(self): return len(self.arp.keys()) @@ -191,6 +197,7 @@ def add_host(self, host): """ Logger.get_instance().debug('host added: ' + host.host_id) + self.add_queue(host.host_id) self.ARP[host.host_id] = host self._update_network_graph(host) @@ -214,6 +221,7 @@ def remove_host(self, host): if host.host_id in self.ARP: del self.ARP[host.host_id] + self.remove_queue(host.host_id) if self.quantum_network.has_node(host.host_id): self.quantum_network.remove_node(host.host_id) if self.classical_network.has_node(host.host_id): @@ -504,87 +512,88 @@ def _process_queue(self): """ while True: - - packet = self._packet_queue.get() - - if not packet: - # Stop the network - self._stop_thread = True - break - - # Artificially delay the network - if self.delay > 0: - time.sleep(self.delay) - - # Simulate packet loss - packet_drop_var = random.random() - if packet_drop_var > (1 - self.packet_drop_rate): - Logger.get_instance().log("PACKET DROPPED") - if packet.payload_type == Constants.QUANTUM: - packet.payload.release() - continue - - sender, receiver = packet.sender, packet.receiver - - if packet.payload_type == Constants.QUANTUM: - if not self._route_quantum_info(sender, receiver, - [packet.payload]): - continue - - try: - if packet.protocol == Constants.RELAY and not self.use_hop_by_hop: - full_route = packet.route - route = full_route[full_route.index(sender):] - else: - if packet.protocol == Constants.REC_EPR: - route = self.get_classical_route(sender, receiver) - else: - route = self.get_classical_route(sender, receiver) - - if len(route) < 2: - raise Exception('No route exists') - - elif len(route) == 2: - if packet.protocol != Constants.RELAY: - if packet.protocol == Constants.REC_EPR: - host_sender = self.get_host(sender) - q = host_sender \ - .backend \ - .create_EPR(host_sender.host_id, - receiver, - q_id=packet.payload['q_id'], - block=packet.payload['blocked']) - host_sender.add_epr(receiver, q) - self.ARP[receiver].rec_packet(packet) - else: - self.ARP[receiver].rec_packet(packet.payload) - else: - if packet.protocol == Constants.REC_EPR: - q_id = packet.payload['q_id'] - blocked = packet.payload['blocked'] - q_route = self.get_quantum_route(sender, receiver) - - if self.use_ent_swap: - DaemonThread(self._entanglement_swap, - args=(sender, receiver, q_route, q_id, - packet.seq_num, blocked)) - else: - DaemonThread(self._establish_epr, - args=(sender, receiver, q_id, - packet.seq_num, blocked)) - - else: - network_packet = self._encode(route, packet) - self.ARP[route[1]].rec_packet(network_packet) - - except nx.NodeNotFound: - Logger.get_instance().error( - "route couldn't be calculated, node doesn't exist") - except ValueError: - Logger.get_instance().error( - "route couldn't be calculated, value error") - except Exception as e: - Logger.get_instance().error('Error in network: ' + str(e)) + for _, queue in self._packet_queues.items(): + # If queue is not empty, process it + if queue: + for packet in queue: + if not packet: + # Stop the network + self._stop_thread = True + break + + # Artificially delay the network + if self.delay > 0: + time.sleep(self.delay) + + # Simulate packet loss + packet_drop_var = random.random() + if packet_drop_var > (1 - self.packet_drop_rate): + Logger.get_instance().log("PACKET DROPPED") + if packet.payload_type == Constants.QUANTUM: + packet.payload.release() + continue + + sender, receiver = packet.sender, packet.receiver + + if packet.payload_type == Constants.QUANTUM: + if not self._route_quantum_info(sender, receiver, + [packet.payload]): + continue + + try: + if packet.protocol == Constants.RELAY and not self.use_hop_by_hop: + full_route = packet.route + route = full_route[full_route.index(sender):] + else: + if packet.protocol == Constants.REC_EPR: + route = self.get_classical_route(sender, receiver) + else: + route = self.get_classical_route(sender, receiver) + + if len(route) < 2: + raise Exception('No route exists') + + elif len(route) == 2: + if packet.protocol != Constants.RELAY: + if packet.protocol == Constants.REC_EPR: + host_sender = self.get_host(sender) + q = host_sender \ + .backend \ + .create_EPR(host_sender.host_id, + receiver, + q_id=packet.payload['q_id'], + block=packet.payload['blocked']) + host_sender.add_epr(receiver, q) + self.ARP[receiver].rec_packet(packet) + else: + self.ARP[receiver].rec_packet(packet.payload) + else: + if packet.protocol == Constants.REC_EPR: + q_id = packet.payload['q_id'] + blocked = packet.payload['blocked'] + q_route = self.get_quantum_route(sender, receiver) + + if self.use_ent_swap: + DaemonThread(self._entanglement_swap, + args=(sender, receiver, q_route, q_id, + packet.seq_num, blocked)) + else: + DaemonThread(self._establish_epr, + args=(sender, receiver, q_id, + packet.seq_num, blocked)) + + else: + network_packet = self._encode(route, packet) + self.ARP[route[1]].rec_packet(network_packet) + + except nx.NodeNotFound: + Logger.get_instance().error( + "route couldn't be calculated, node doesn't exist") + except ValueError: + Logger.get_instance().error( + "route couldn't be calculated, value error") + except Exception as e: + Logger.get_instance().error('Error in network: ' + str(e)) def send(self, packet): """ @@ -593,8 +602,7 @@ def send(self, packet): Args: packet (Packet): Packet to be sent """ - - self._packet_queue.put(packet) + self._packet_queues.get(self.get_host(packet.sender)).put(packet) def stop(self, stop_hosts=False): """ From f31a53d6fd56fea641f42b1127f5387678ac3db6 Mon Sep 17 00:00:00 2001 From: WingCode Date: Mon, 20 Jun 2022 13:29:12 +0530 Subject: [PATCH 2/4] Fix multiple queues. --- qunetsim/components/network.py | 189 +++++++++++---------- qunetsim/objects/packets/routing_packet.py | 2 +- 2 files changed, 102 insertions(+), 89 deletions(-) diff --git a/qunetsim/components/network.py b/qunetsim/components/network.py index 81adbeb..e366d1a 100644 --- a/qunetsim/components/network.py +++ b/qunetsim/components/network.py @@ -2,6 +2,7 @@ import time from inspect import signature from queue import Queue +from threading import Thread import matplotlib.pyplot as plt import networkx as nx @@ -39,7 +40,6 @@ def __init__(self): self._classical_routing_algo = nx.shortest_path self._use_hop_by_hop = True self._packet_queues = {} - self._stop_thread = False self._use_ent_swap = False self._queue_processor_thread = None self._delay = 0.1 @@ -506,94 +506,105 @@ def transfer_qubits(r, s, original_sender=None): i += 1 return True - def _process_queue(self): + def _process_queues(self): """ - Runs a thread for processing the packets in the packet queue. + Runs multiple threads for processing the packets in the packet queues. """ + def process_queue(packet_queue): + packet = packet_queue.get() + + # If None packet is received, then stop thread + if not packet.payload: + return + + # Artificially delay the network + if self.delay > 0: + time.sleep(self.delay) + + # Simulate packet loss + packet_drop_var = random.random() + if packet_drop_var > (1 - self.packet_drop_rate): + Logger.get_instance().log("PACKET DROPPED") + if packet.payload_type == Constants.QUANTUM: + packet.payload.release() + return + + sender, receiver = packet.sender, packet.receiver + + if packet.payload_type == Constants.QUANTUM: + if not self._route_quantum_info(sender, receiver, + [packet.payload]): + return + + try: + if packet.protocol == Constants.RELAY and not self.use_hop_by_hop: + full_route = packet.route + route = full_route[full_route.index(sender):] + else: + if packet.protocol == Constants.REC_EPR: + route = self.get_classical_route(sender, receiver) + else: + route = self.get_classical_route(sender, receiver) + + if len(route) < 2: + raise Exception('No route exists') + + elif len(route) == 2: + if packet.protocol != Constants.RELAY: + if packet.protocol == Constants.REC_EPR: + host_sender = self.get_host(sender) + q = host_sender \ + .backend \ + .create_EPR(host_sender.host_id, + receiver, + q_id=packet.payload['q_id'], + block=packet.payload['blocked']) + host_sender.add_epr(receiver, q) + self.ARP[receiver].rec_packet(packet) + else: + self.ARP[receiver].rec_packet(packet.payload) + else: + if packet.protocol == Constants.REC_EPR: + q_id = packet.payload['q_id'] + blocked = packet.payload['blocked'] + q_route = self.get_quantum_route(sender, receiver) + + if self.use_ent_swap: + DaemonThread(self._entanglement_swap, + args=(sender, receiver, q_route, q_id, + packet.seq_num, blocked)) + else: + DaemonThread(self._establish_epr, + args=(sender, receiver, q_id, + packet.seq_num, blocked)) + + else: + network_packet = self._encode(route, packet) + self.ARP[route[1]].rec_packet(network_packet) + + except nx.NodeNotFound: + Logger.get_instance().error( + "route couldn't be calculated, node doesn't exist") + except ValueError: + Logger.get_instance().error( + "route couldn't be calculated, value error") + except Exception as e: + import traceback + traceback.format_exc() + Logger.get_instance().error('Error in network: ' + str(e)) + while True: - for _, queue in self._packet_queues.items(): - # If queue is not empty, process it - if queue: - for packet in queue: - if not packet: - # Stop the network - self._stop_thread = True - break - - # Artificially delay the network - if self.delay > 0: - time.sleep(self.delay) - - # Simulate packet loss - packet_drop_var = random.random() - if packet_drop_var > (1 - self.packet_drop_rate): - Logger.get_instance().log("PACKET DROPPED") - if packet.payload_type == Constants.QUANTUM: - packet.payload.release() - continue - - sender, receiver = packet.sender, packet.receiver - - if packet.payload_type == Constants.QUANTUM: - if not self._route_quantum_info(sender, receiver, - [packet.payload]): - continue - - try: - if packet.protocol == Constants.RELAY and not self.use_hop_by_hop: - full_route = packet.route - route = full_route[full_route.index(sender):] - else: - if packet.protocol == Constants.REC_EPR: - route = self.get_classical_route(sender, receiver) - else: - route = self.get_classical_route(sender, receiver) - - if len(route) < 2: - raise Exception('No route exists') - - elif len(route) == 2: - if packet.protocol != Constants.RELAY: - if packet.protocol == Constants.REC_EPR: - host_sender = self.get_host(sender) - q = host_sender \ - .backend \ - .create_EPR(host_sender.host_id, - receiver, - q_id=packet.payload['q_id'], - block=packet.payload['blocked']) - host_sender.add_epr(receiver, q) - self.ARP[receiver].rec_packet(packet) - else: - self.ARP[receiver].rec_packet(packet.payload) - else: - if packet.protocol == Constants.REC_EPR: - q_id = packet.payload['q_id'] - blocked = packet.payload['blocked'] - q_route = self.get_quantum_route(sender, receiver) - - if self.use_ent_swap: - DaemonThread(self._entanglement_swap, - args=(sender, receiver, q_route, q_id, - packet.seq_num, blocked)) - else: - DaemonThread(self._establish_epr, - args=(sender, receiver, q_id, - packet.seq_num, blocked)) - - else: - network_packet = self._encode(route, packet) - self.ARP[route[1]].rec_packet(network_packet) - - except nx.NodeNotFound: - Logger.get_instance().error( - "route couldn't be calculated, node doesn't exist") - except ValueError: - Logger.get_instance().error( - "route couldn't be calculated, value error") - except Exception as e: - Logger.get_instance().error('Error in network: ' + str(e)) + threads = [] + for queue in self._packet_queues.values(): + if not queue.empty(): + threads.append(Thread(target=process_queue, args=[queue], daemon=False)) + + for t in threads: + t.start() + + for t in threads: + t.join() def send(self, packet): """ @@ -602,7 +613,7 @@ def send(self, packet): Args: packet (Packet): Packet to be sent """ - self._packet_queues.get(self.get_host(packet.sender)).put(packet) + self._packet_queues.get(packet.sender).put(packet) def stop(self, stop_hosts=False): """ @@ -614,8 +625,10 @@ def stop(self, stop_hosts=False): if stop_hosts: for host in self.ARP: self.ARP[host].stop(release_qubits=True) + # Send None to queue to stop the queue + self.send(RoutingPacket(sender=host, receiver=None, protocol=None, payload_type=None, payload=None, + ttl=None, route=None)) - self.send(None) # Send None to queue to stop the queue if self._backend is not None: self._backend.stop() except Exception as e: @@ -632,7 +645,7 @@ def start(self, nodes=None, backend=None): self._backend = backend if nodes is not None: self._backend.start(nodes=nodes) - self._queue_processor_thread = DaemonThread(target=self._process_queue) + self._queue_processor_thread = DaemonThread(target=self._process_queues) def draw_classical_network(self): """ diff --git a/qunetsim/objects/packets/routing_packet.py b/qunetsim/objects/packets/routing_packet.py index 0f07770..eeecf93 100644 --- a/qunetsim/objects/packets/routing_packet.py +++ b/qunetsim/objects/packets/routing_packet.py @@ -20,7 +20,7 @@ def __init__(self, sender, receiver, protocol, payload_type, payload, ttl, route ttl(int): Time-to-Live parameter route (List): Route the packet takes to its target host. """ - if not isinstance(payload, Packet): + if not isinstance(payload, Packet) and payload is not None: raise ValueError("For the routing packet the payload has to be a packet.") self._ttl = ttl From 43b01e10c5818edf33525b52dd64978358cbe32c Mon Sep 17 00:00:00 2001 From: WingCode Date: Mon, 20 Jun 2022 19:54:35 +0530 Subject: [PATCH 3/4] Fix dictionary access of queues. --- qunetsim/components/network.py | 15 +++++---------- qunetsim/objects/daemon_thread.py | 6 ++++++ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/qunetsim/components/network.py b/qunetsim/components/network.py index e366d1a..3254e4b 100644 --- a/qunetsim/components/network.py +++ b/qunetsim/components/network.py @@ -9,6 +9,7 @@ from qunetsim.backends import EQSNBackend from qunetsim.objects import Qubit, RoutingPacket, Logger, DaemonThread +from qunetsim.objects.daemon_thread import is_thread_alive from qunetsim.utils.constants import Constants @@ -595,16 +596,10 @@ def process_queue(packet_queue): Logger.get_instance().error('Error in network: ' + str(e)) while True: - threads = [] - for queue in self._packet_queues.values(): - if not queue.empty(): - threads.append(Thread(target=process_queue, args=[queue], daemon=False)) - - for t in threads: - t.start() - - for t in threads: - t.join() + for host_name, queue in self._packet_queues.items(): + if not queue.empty() and not is_thread_alive(host_name): + thread = Thread(target=process_queue, args=[queue], daemon=False, name=host_name) + thread.start() def send(self, packet): """ diff --git a/qunetsim/objects/daemon_thread.py b/qunetsim/objects/daemon_thread.py index e1f67df..852cdea 100644 --- a/qunetsim/objects/daemon_thread.py +++ b/qunetsim/objects/daemon_thread.py @@ -1,6 +1,12 @@ import threading +def is_thread_alive(thread_name): + for thread in threading.enumerate(): + if thread_name == thread: + return thread.isAlive() + + class DaemonThread(threading.Thread): """ A Daemon thread that runs a task until completion and then exits. """ From cc7d6d9c1fd8b2bb3d81a78860eacd8166b0a5b0 Mon Sep 17 00:00:00 2001 From: WingCode Date: Mon, 20 Jun 2022 20:18:00 +0530 Subject: [PATCH 4/4] Add docstring. Removed unnecessary traceback. --- qunetsim/components/network.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/qunetsim/components/network.py b/qunetsim/components/network.py index 3254e4b..f8de64a 100644 --- a/qunetsim/components/network.py +++ b/qunetsim/components/network.py @@ -513,6 +513,10 @@ def _process_queues(self): """ def process_queue(packet_queue): + """ + A single thread processes the packet in a single queue. + Each host has it's own queue and thread for processing the queue. + """ packet = packet_queue.get() # If None packet is received, then stop thread @@ -591,8 +595,6 @@ def process_queue(packet_queue): Logger.get_instance().error( "route couldn't be calculated, value error") except Exception as e: - import traceback - traceback.format_exc() Logger.get_instance().error('Error in network: ' + str(e)) while True: