Module mesh.node
Expand source code
import typing
from .packet import Packet
from .link import Link
class Node:
def __init__(self, mac_address: str, x: float, y: float, hierarchy_class: str, transmit_distance: float, response_wait_time: int) -> None:
"""
Creates a node object
"""
self.mac_address: str = mac_address
self.x: float = x
self.y: float = y
self.hierarchy_class: str = hierarchy_class
self.transmit_distance: float = transmit_distance
self.response_wait_time: int = response_wait_time
self.links: dict[str, Link] = {}
self.queue: list[tuple[Packet, int]] = []
self.waiting_for_response: dict[int, Packet] = {}
# metrics
self.sent: dict[int, int] = {}
self.received: dict[int, int] = {}
self.received_packets = 0
def add_link(self, other: "Node") -> None:
"""
Adds a link from self to other.
"""
distance = min(self.transmit_distance, other.get_transmit_distance())
other_x, other_y = other.get_position()
actual = ((self.x - other_x) ** 2 + (self.y - other_y) ** 2) ** 0.5
if actual <= distance:
link = Link(self, other)
self.links[other.get_mac()] = link
return
def enqueue_packet(self, packet: Packet, timestep: int) -> None:
"""
Enqueues the given packet.
If self is destination of packet, check if self sent packet with this packet id.
If yes, we are done. If no, enqueue a response packet and send back to original src.
"""
# we are the final destination of a response packet
if packet.get_id() in self.sent or (not packet.get_is_request() and packet.get_path()[-1] == self.get_mac()):
self.received[packet.get_id()] = timestep
self.received_packets += 1
# we are generating the packet
elif packet.get_is_request() and packet.get_path()[0] == self.get_mac():
self.sent[packet.get_id()] = timestep
self.queue.append((packet, timestep))
# we are the final destination of request packet
elif packet.get_is_request() and packet.get_path()[-1] == self.get_mac():
self.waiting_for_response[timestep +
self.response_wait_time] = packet.get_reverse()
# we are an intermediate node in a packet's path
else:
self.queue.append((packet, timestep))
return
def learn_timestep(self, timestep: int) -> None:
"""
Tells this node the timestep that the arena is currently on. Updates the queue if it should
"""
if timestep not in self.waiting_for_response:
return
response_packet = self.waiting_for_response.pop(timestep)
self.queue.append((response_packet, timestep))
return
def get_next_destination(self) -> str or None:
"""
Returns the MAC address of the nexthop of the packet at the front of the queue.
"""
next_packet = self.queue[0][0]
packet_path = next_packet.get_path()
return packet_path[packet_path.index(self.mac_address) + 1]
def send_from_queue(self, timestep: int, hidden_terminal: bool, override: bool) -> Packet:
"""
Dequeues the next packet and sends the packet to its next hop.
If override, packet will always complete except if it's a hidden terminal
"""
nexthop = self.get_next_destination()
packet = self.queue.pop(0)[0]
if hidden_terminal:
return packet
self.links[nexthop].transmit(
packet, self.mac_address, timestep, override)
return packet
def packet_in_queue(self) -> bool:
"""
Returns True iff there is a packet in the queue
"""
return len(self.queue) > 0
def is_linked(self, other: str) -> bool:
"""
Given the MAC address of another node, returns True iff there is a link between self and other.
"""
return other in self.links
def in_range(self, x: float, y: float) -> bool:
"""
Given an (x,y) coordinate, determines if that coordinate is within range of this node.
"""
return ((x - self.x) ** 2 + (y - self.y) ** 2) ** 0.5 <= self.transmit_distance
def get_queue_state(self) -> list[Packet]:
"""
Returns an ordered list of packets, representing self's current queue.
"""
return [p[0] for p in self.queue]
def get_packets_received(self) -> int:
"""
Returns the number of packets that this node has received
"""
return self.received_packets
def get_position(self) -> tuple[float, float]:
"""
Returns the position of the node in (x, y) format
"""
return (self.x, self.y)
def get_mac(self) -> str:
"""
Returns the mac address of this node
"""
return self.mac_address
def get_transmit_distance(self) -> float:
"""
Returns the transmit distance of this node
"""
return self.transmit_distance
def get_neighbors(self) -> list['Node']:
"""
Return a list of nodes we are connected to.
"""
return list(self.links.keys())
def get_probability(self, neighbor: str) -> float:
"""
Return the probability that a transmission to a neighbor will succeed.
"""
if not neighbor in self.links:
return 0
return self.links[neighbor].get_probability()
def get_sent(self) -> dict[str, str]:
"""
Return a dictionary of packets sent by this node and their timesteps
"""
return {k: v for k, v in self.sent.items()}
def get_received(self) -> dict[str, str]:
"""
Return a dictionary of packets received by this node and their timesteps
"""
return {k: v for k, v in self.received.items()}
def __str__(self) -> str:
"""
Returns a human-readable version of the string.
"""
return f"Node with MAC {self.mac_address}, at position ({self.x}, {self.y}) of class {self.hierarchy_class}"
def __repr__(self) -> str:
"""
Returns Node representation
"""
return f"Node(mac_addr=\"{self.mac_address}\", x={self.x}, y={self.y}, transmit_distance={self.transmit_distance}, hierarchy_class=\"{self.hierarchy_class}\")"
Classes
class Node (mac_address: str, x: float, y: float, hierarchy_class: str, transmit_distance: float, response_wait_time: int)
-
Creates a node object
Expand source code
class Node: def __init__(self, mac_address: str, x: float, y: float, hierarchy_class: str, transmit_distance: float, response_wait_time: int) -> None: """ Creates a node object """ self.mac_address: str = mac_address self.x: float = x self.y: float = y self.hierarchy_class: str = hierarchy_class self.transmit_distance: float = transmit_distance self.response_wait_time: int = response_wait_time self.links: dict[str, Link] = {} self.queue: list[tuple[Packet, int]] = [] self.waiting_for_response: dict[int, Packet] = {} # metrics self.sent: dict[int, int] = {} self.received: dict[int, int] = {} self.received_packets = 0 def add_link(self, other: "Node") -> None: """ Adds a link from self to other. """ distance = min(self.transmit_distance, other.get_transmit_distance()) other_x, other_y = other.get_position() actual = ((self.x - other_x) ** 2 + (self.y - other_y) ** 2) ** 0.5 if actual <= distance: link = Link(self, other) self.links[other.get_mac()] = link return def enqueue_packet(self, packet: Packet, timestep: int) -> None: """ Enqueues the given packet. If self is destination of packet, check if self sent packet with this packet id. If yes, we are done. If no, enqueue a response packet and send back to original src. """ # we are the final destination of a response packet if packet.get_id() in self.sent or (not packet.get_is_request() and packet.get_path()[-1] == self.get_mac()): self.received[packet.get_id()] = timestep self.received_packets += 1 # we are generating the packet elif packet.get_is_request() and packet.get_path()[0] == self.get_mac(): self.sent[packet.get_id()] = timestep self.queue.append((packet, timestep)) # we are the final destination of request packet elif packet.get_is_request() and packet.get_path()[-1] == self.get_mac(): self.waiting_for_response[timestep + self.response_wait_time] = packet.get_reverse() # we are an intermediate node in a packet's path else: self.queue.append((packet, timestep)) return def learn_timestep(self, timestep: int) -> None: """ Tells this node the timestep that the arena is currently on. Updates the queue if it should """ if timestep not in self.waiting_for_response: return response_packet = self.waiting_for_response.pop(timestep) self.queue.append((response_packet, timestep)) return def get_next_destination(self) -> str or None: """ Returns the MAC address of the nexthop of the packet at the front of the queue. """ next_packet = self.queue[0][0] packet_path = next_packet.get_path() return packet_path[packet_path.index(self.mac_address) + 1] def send_from_queue(self, timestep: int, hidden_terminal: bool, override: bool) -> Packet: """ Dequeues the next packet and sends the packet to its next hop. If override, packet will always complete except if it's a hidden terminal """ nexthop = self.get_next_destination() packet = self.queue.pop(0)[0] if hidden_terminal: return packet self.links[nexthop].transmit( packet, self.mac_address, timestep, override) return packet def packet_in_queue(self) -> bool: """ Returns True iff there is a packet in the queue """ return len(self.queue) > 0 def is_linked(self, other: str) -> bool: """ Given the MAC address of another node, returns True iff there is a link between self and other. """ return other in self.links def in_range(self, x: float, y: float) -> bool: """ Given an (x,y) coordinate, determines if that coordinate is within range of this node. """ return ((x - self.x) ** 2 + (y - self.y) ** 2) ** 0.5 <= self.transmit_distance def get_queue_state(self) -> list[Packet]: """ Returns an ordered list of packets, representing self's current queue. """ return [p[0] for p in self.queue] def get_packets_received(self) -> int: """ Returns the number of packets that this node has received """ return self.received_packets def get_position(self) -> tuple[float, float]: """ Returns the position of the node in (x, y) format """ return (self.x, self.y) def get_mac(self) -> str: """ Returns the mac address of this node """ return self.mac_address def get_transmit_distance(self) -> float: """ Returns the transmit distance of this node """ return self.transmit_distance def get_neighbors(self) -> list['Node']: """ Return a list of nodes we are connected to. """ return list(self.links.keys()) def get_probability(self, neighbor: str) -> float: """ Return the probability that a transmission to a neighbor will succeed. """ if not neighbor in self.links: return 0 return self.links[neighbor].get_probability() def get_sent(self) -> dict[str, str]: """ Return a dictionary of packets sent by this node and their timesteps """ return {k: v for k, v in self.sent.items()} def get_received(self) -> dict[str, str]: """ Return a dictionary of packets received by this node and their timesteps """ return {k: v for k, v in self.received.items()} def __str__(self) -> str: """ Returns a human-readable version of the string. """ return f"Node with MAC {self.mac_address}, at position ({self.x}, {self.y}) of class {self.hierarchy_class}" def __repr__(self) -> str: """ Returns Node representation """ return f"Node(mac_addr=\"{self.mac_address}\", x={self.x}, y={self.y}, transmit_distance={self.transmit_distance}, hierarchy_class=\"{self.hierarchy_class}\")"
Methods
def add_link(self, other: Node) ‑> None
-
Adds a link from self to other.
Expand source code
def add_link(self, other: "Node") -> None: """ Adds a link from self to other. """ distance = min(self.transmit_distance, other.get_transmit_distance()) other_x, other_y = other.get_position() actual = ((self.x - other_x) ** 2 + (self.y - other_y) ** 2) ** 0.5 if actual <= distance: link = Link(self, other) self.links[other.get_mac()] = link return
def enqueue_packet(self, packet: Packet, timestep: int) ‑> None
-
Enqueues the given packet.
If self is destination of packet, check if self sent packet with this packet id. If yes, we are done. If no, enqueue a response packet and send back to original src.
Expand source code
def enqueue_packet(self, packet: Packet, timestep: int) -> None: """ Enqueues the given packet. If self is destination of packet, check if self sent packet with this packet id. If yes, we are done. If no, enqueue a response packet and send back to original src. """ # we are the final destination of a response packet if packet.get_id() in self.sent or (not packet.get_is_request() and packet.get_path()[-1] == self.get_mac()): self.received[packet.get_id()] = timestep self.received_packets += 1 # we are generating the packet elif packet.get_is_request() and packet.get_path()[0] == self.get_mac(): self.sent[packet.get_id()] = timestep self.queue.append((packet, timestep)) # we are the final destination of request packet elif packet.get_is_request() and packet.get_path()[-1] == self.get_mac(): self.waiting_for_response[timestep + self.response_wait_time] = packet.get_reverse() # we are an intermediate node in a packet's path else: self.queue.append((packet, timestep)) return
def get_mac(self) ‑> str
-
Returns the mac address of this node
Expand source code
def get_mac(self) -> str: """ Returns the mac address of this node """ return self.mac_address
def get_neighbors(self) ‑> list[Node]
-
Return a list of nodes we are connected to.
Expand source code
def get_neighbors(self) -> list['Node']: """ Return a list of nodes we are connected to. """ return list(self.links.keys())
def get_next_destination(self) ‑> str
-
Returns the MAC address of the nexthop of the packet at the front of the queue.
Expand source code
def get_next_destination(self) -> str or None: """ Returns the MAC address of the nexthop of the packet at the front of the queue. """ next_packet = self.queue[0][0] packet_path = next_packet.get_path() return packet_path[packet_path.index(self.mac_address) + 1]
def get_packets_received(self) ‑> int
-
Returns the number of packets that this node has received
Expand source code
def get_packets_received(self) -> int: """ Returns the number of packets that this node has received """ return self.received_packets
def get_position(self) ‑> tuple[float, float]
-
Returns the position of the node in (x, y) format
Expand source code
def get_position(self) -> tuple[float, float]: """ Returns the position of the node in (x, y) format """ return (self.x, self.y)
def get_probability(self, neighbor: str) ‑> float
-
Return the probability that a transmission to a neighbor will succeed.
Expand source code
def get_probability(self, neighbor: str) -> float: """ Return the probability that a transmission to a neighbor will succeed. """ if not neighbor in self.links: return 0 return self.links[neighbor].get_probability()
def get_queue_state(self) ‑> list[Packet]
-
Returns an ordered list of packets, representing self's current queue.
Expand source code
def get_queue_state(self) -> list[Packet]: """ Returns an ordered list of packets, representing self's current queue. """ return [p[0] for p in self.queue]
def get_received(self) ‑> dict[str, str]
-
Return a dictionary of packets received by this node and their timesteps
Expand source code
def get_received(self) -> dict[str, str]: """ Return a dictionary of packets received by this node and their timesteps """ return {k: v for k, v in self.received.items()}
def get_sent(self) ‑> dict[str, str]
-
Return a dictionary of packets sent by this node and their timesteps
Expand source code
def get_sent(self) -> dict[str, str]: """ Return a dictionary of packets sent by this node and their timesteps """ return {k: v for k, v in self.sent.items()}
def get_transmit_distance(self) ‑> float
-
Returns the transmit distance of this node
Expand source code
def get_transmit_distance(self) -> float: """ Returns the transmit distance of this node """ return self.transmit_distance
def in_range(self, x: float, y: float) ‑> bool
-
Given an (x,y) coordinate, determines if that coordinate is within range of this node.
Expand source code
def in_range(self, x: float, y: float) -> bool: """ Given an (x,y) coordinate, determines if that coordinate is within range of this node. """ return ((x - self.x) ** 2 + (y - self.y) ** 2) ** 0.5 <= self.transmit_distance
def is_linked(self, other: str) ‑> bool
-
Given the MAC address of another node, returns True iff there is a link between self and other.
Expand source code
def is_linked(self, other: str) -> bool: """ Given the MAC address of another node, returns True iff there is a link between self and other. """ return other in self.links
def learn_timestep(self, timestep: int) ‑> None
-
Tells this node the timestep that the arena is currently on. Updates the queue if it should
Expand source code
def learn_timestep(self, timestep: int) -> None: """ Tells this node the timestep that the arena is currently on. Updates the queue if it should """ if timestep not in self.waiting_for_response: return response_packet = self.waiting_for_response.pop(timestep) self.queue.append((response_packet, timestep)) return
def packet_in_queue(self) ‑> bool
-
Returns True iff there is a packet in the queue
Expand source code
def packet_in_queue(self) -> bool: """ Returns True iff there is a packet in the queue """ return len(self.queue) > 0
def send_from_queue(self, timestep: int, hidden_terminal: bool, override: bool) ‑> Packet
-
Dequeues the next packet and sends the packet to its next hop.
If override, packet will always complete except if it's a hidden terminal
Expand source code
def send_from_queue(self, timestep: int, hidden_terminal: bool, override: bool) -> Packet: """ Dequeues the next packet and sends the packet to its next hop. If override, packet will always complete except if it's a hidden terminal """ nexthop = self.get_next_destination() packet = self.queue.pop(0)[0] if hidden_terminal: return packet self.links[nexthop].transmit( packet, self.mac_address, timestep, override) return packet