Source code for cs143sim.tla

"""This module includes simple transport layer algorithm: stop and wait

.. autosummary::

    TCPTahoe
    TCPVegas

.. moduleauthor:: Junlin Zhang <neicullyn@gmail.com>
.. moduleauthor:: Jan Van Bruggen <jancvanbruggen@gmail.com>
"""
from math import floor

from cs143sim.constants import PACKET_SIZE
from cs143sim.events import PacketTimeOut
from cs143sim.events import VegasTimeOut


[docs]class TCPTahoe: """This is the class that implements TCP Tahoe, TCP Tahoe with fast retransmit, TCP Vegas. :param enable_fast_retransmit: enable fast retransmit :param enable_fast_recovery: enable fast recovery or not :param rtt_alpha: change rate of rtt_avg :param rtt_beta: change rate of rtt_div :ivar W: window size :ivar packet_number: number of packets to be sent :ivar time_out: timer's waiting time :ivar transmitter_not_sent: packets that have not been sent :ivar transmitter_sending: list of packets that are being sent :ivar transmitter_acked: packets that have been acked :ivar duplicate_ack_number: record last acked packet number :ivar duplicate_ack_times: record how many times the packet has been continuous acked :ivar last_reset: last effective timeout time :ivar time_out_event: current time out event :ivar slow_start_treshold: treshold of slow start :ivar rtt_avg: the average value of rtt :ivar rtt_div: the divergence of rtt :ivar slow_start_flag: :ivar fast_recovery_flag: """ def __init__(self, env, flow): self.enable_fast_retransmit = False self.enable_fast_recovery = False self.ka = 1 self.ks = 1 self.divide_factor = 2 self.rtt_alpha = 0.125 self.rtt_beta = 0.25 self.flow = flow self.env = env self.packet_number = self.flow.amount / PACKET_SIZE if self.packet_number * PACKET_SIZE < self.flow.amount: self.packet_number += 1 self.time_out = 1000 self.first_ack_flag = True self.transmitter_not_sent = 0 self.transmitter_sending = list() self.transmitter_acked = -1 self.duplicate_ack_number = -1 self.duplicate_ack_times = 0 self.slow_start_threshold = 240 self.change_W(W=1) self.last_reset = 0 self.last_half = 0 self.time_out_event = None self.slow_start_flag = True self.fast_recovery_flag = False def __str__(self): return self.flow.__str__() def react_to_flow_start(self, event): self.send_new_packets() def react_to_ack(self, ack_packet): #RFC 6298 if self.first_ack_flag: t = self.env.now - ack_packet.timestamp self.rtt_avg = t self.rtt_div = t self.first_ack_flag = False else: t = self.env.now - ack_packet.timestamp self.rtt_div = (1 - self.rtt_beta) * self.rtt_div + self.rtt_beta * abs(t - self.rtt_avg) self.rtt_avg = (1 - self.rtt_alpha) * self.rtt_avg + self.rtt_alpha * t self.time_out = int(1 + (self.rtt_avg + 4 * self.rtt_div)) if self.time_out < 1000: self.time_out = 1000 #Process Duplicate Ack n = ack_packet.number if n > self.transmitter_acked: if n == self.duplicate_ack_number: self.duplicate_ack_times += 1 else: self.duplicate_ack_number = n self.duplicate_ack_times = 0 if self.enable_fast_retransmit and self.duplicate_ack_times == 4 and self.env.now - self.last_reset > self.time_out: self.react_to_time_out_base() elif self.enable_fast_recovery and self.duplicate_ack_times == 4: self.fast_recovery_flag = True self.slow_start_threshold = self.W / self.divide_factor #self.change_W(self.W / 2 + 3) # Actually, W is not windows size at that means. # W is number of packets between the first and the last unacked packets while len(self.transmitter_sending) >= self.W: self.transmitter_sending.pop() n = self.duplicate_ack_number packet = self.flow.make_packet(packet_number=n) self.flow.send_packet(packet) elif self.enable_fast_recovery and self.duplicate_ack_times > 4: self.change_W(self.W + 1) self.send_new_packets() # elif ack_packet.timestamp > self.last_reset: else: # Process Ack if self.enable_fast_recovery and self.fast_recovery_flag: # Just leave fast recovery if self.env.now - self.last_half > self.rtt_avg: self.change_W(floor(self.slow_start_threshold)) self.last_half = self.env.now self.fast_recovery_flag = False self.slow_start_flag = False # Note: you can not start sending a lot of packets now. # See send_new_packets: limit the packets send for each ack # (D Burst: RFC3782) del_list = list() for x in self.transmitter_sending: if x < n: del_list.append(x) if self.transmitter_sending[0] in del_list: self.reset_timer() for x in del_list: self.transmitter_sending.remove(x) n = ack_packet.number self.transmitter_acked = max([self.transmitter_acked, n - 1]) # Process sending # Note: you can not start sending a lot of packets now. # See send_new_packets: limit the packets send for each ack # (Data Burst: RFC3782) if self.W - len(self.transmitter_sending) < 2: if self.slow_start_flag: self.change_W(self.W + self.ks * 1.0) if self.W > self.slow_start_threshold: self.slow_start_flag = False else: self.change_W(self.W + self.ka * 1.0 / max(self.W, 1)) self.send_new_packets() def react_to_time_out(self, event): if event == self.time_out_event: self.react_to_time_out_base() def react_to_time_out_base(self): if len(self.transmitter_sending) > 0: self.time_out *= 2 self.reset_timer() self.slow_start_threshold = self.W / self.divide_factor self.slow_start_flag = True self.change_W(W=1) self.transmitter_sending = [] self.send_new_packets() self.last_reset = self.env.now def send_new_packets(self): send_flag = False count = 2 while len(self.transmitter_sending) < self.W and count > 0: count -= 1 send_flag = True l = len(self.transmitter_sending) if l == 0: n = self.transmitter_acked + 1 else: n = self.transmitter_sending[l - 1] + 1 if n > self.packet_number: break packet = self.flow.make_packet(packet_number=n) self.flow.send_packet(packet) self.transmitter_sending.append(n) if send_flag and not self.time_out_event: self.reset_timer() def change_W(self, W): self.W = W self.env.controller.record_window_size(flow=self.flow, window_size=self.W) def reset_timer(self): self.time_out_event = PacketTimeOut(env=self.env, delay=self.time_out, actor=self, expected_time=self.env.now + self.time_out)
class TCPVegas: """This is the class that implements TCP Vegas and FAST TCP. :param enable_fast: :param vegas_alpha: :param vegas_beta: :param vegas_gamma: :param fast_alpha: :param rtt_alpha: change rate of rtt_avg :param rtt_beta: change rate of rtt_div :ivar W: window size :ivar packet_number: number of packets to be sent :ivar time_out: timer's waiting time :ivar transmitter_not_sent: packets that have not been sent :ivar transmitter_sending: list of packets that are being sent :ivar transmitter_acked: packets that have been acked :ivar duplicate_ack_number: record last acked packet number :ivar duplicate_ack_times: record how many times the packet has been continuous acked :ivar last_reset: last effective timeout time :ivar time_out_event: current time out event :ivar slow_start_treshold: treshold of slow start :ivar rtt_avg: the average value of rtt :ivar rtt_div: the divergence of rtt :ivar slow_start_flag: flag for slow start :ivar vegas_rtt : last rtt :ivar vegas_rtt_base : the minimum of rtt :ivar vegas_time_out_event : the time out event used by vegas """ def __init__(self, env, flow): self.vegas_alpha = 4 self.vegas_beta = 8 self.vegas_gamma = 6 self.fast_alpha = 4 self.vegas_virtual_rtt = 0 self.enable_fast = False self.ka = 1 self.ks = 1 self.rtt_alpha = 0.125 self.rtt_beta = 0.25 self.flow = flow self.env = env self.packet_number = self.flow.amount / PACKET_SIZE if self.packet_number * PACKET_SIZE < self.flow.amount: self.packet_number += 1 self.time_out = 1000 self.first_ack_flag = True self.transmitter_not_sent = 0 self.transmitter_sending = list() self.transmitter_acked = -1 self.duplicate_ack_number = -1 self.duplicate_ack_times = 0 self.slow_start_threshold = 240 self.change_W(W=1) self.last_reset = 0 self.last_half = 0 self.time_out_event = None self.slow_start_flag = True self.vegas_time_out_event = None def __str__(self): return self.flow.__str__() def react_to_flow_start(self, event): self.send_new_packets() def react_to_ack(self, ack_packet): #RFC 6298 if self.first_ack_flag: t = self.env.now - ack_packet.timestamp self.rtt_avg = t self.rtt_div = t self.first_ack_flag = False self.vegas_rtt = t self.vegas_rtt_base = t else: t = self.env.now - ack_packet.timestamp self.rtt_div = (1 - self.rtt_beta) * self.rtt_div + self.rtt_beta * abs(t - self.rtt_avg) self.rtt_avg = (1 - self.rtt_alpha) * self.rtt_avg + self.rtt_alpha * t self.time_out = int(1 + (self.rtt_avg + 4 * self.rtt_div)) if self.time_out < 1000: self.time_out = 1000 self.vegas_rtt = t if self.vegas_rtt_base > t: self.vegas_rtt_base = t # Process Duplicate Ack n = ack_packet.number if n > self.transmitter_acked: if n == self.duplicate_ack_number: self.duplicate_ack_times += 1 else: self.duplicate_ack_number = n self.duplicate_ack_times = 0 if self.duplicate_ack_times == 4: self.react_to_time_out_base() elif ack_packet.timestamp >= self.last_reset: # Process Ack del_list = list() for x in self.transmitter_sending: if x < n: del_list.append(x) if self.transmitter_sending[0] in del_list: self.reset_timer() for x in del_list: self.transmitter_sending.remove(x) n = ack_packet.number self.transmitter_acked = max([self.transmitter_acked, n - 1]) # Process sending # Note: you can not start sending a lot of packets now. # See send_new_packets: limit the packets send for each ack # (Data Burst: RFC3782) if self.slow_start_flag: self.change_W(self.W + self.ks * 1.0) if self.W / self.vegas_rtt_base - self.W / self.vegas_rtt > self.vegas_gamma / self.vegas_rtt_base: self.slow_start_flag = False self.send_new_packets(4) else: if not self.vegas_time_out_event: self.vegas_time_out_event = VegasTimeOut(env=self.env, delay= self.vegas_rtt, actor=self) self.send_new_packets(2) def react_to_vegas_time_out(self, event): if self.vegas_virtual_rtt == 0: vrtt = self.vegas_rtt_base else: vrtt = self.vegas_virtual_rtt if not self.enable_fast: if self.W / self.vegas_rtt_base - self.W / self.vegas_rtt < self.vegas_alpha / vrtt: self.change_W(W=self.W + 1) if self.W / self.vegas_rtt_base - self.W / self.vegas_rtt > self.vegas_beta / vrtt: self.change_W(W=self.W - 1) else: self.change_W(W=self.W * self.vegas_rtt_base / self.vegas_rtt + self.fast_alpha) if self.transmitter_acked < self.packet_number - 1: self.vegas_time_out_event = VegasTimeOut(env=self.env, delay= self.vegas_rtt, actor=self) def react_to_time_out(self, event): if event == self.time_out_event: self.react_to_time_out_base() def react_to_time_out_base(self): if len(self.transmitter_sending) > 0: self.time_out *= 2 self.reset_timer() self.change_W(W=1) self.transmitter_sending = [] self.send_new_packets() self.last_reset = self.env.now def send_new_packets(self, count=1): send_flag = False while len(self.transmitter_sending) < self.W and count > 0: count -= 1 send_flag = True l = len(self.transmitter_sending) if l == 0: n = self.transmitter_acked + 1 else: n = self.transmitter_sending[l - 1] + 1 if n > self.packet_number: break packet = self.flow.make_packet(packet_number=n) self.flow.send_packet(packet) self.transmitter_sending.append(n) if send_flag and not self.time_out_event: self.reset_timer() return count def change_W(self, W): self.W = W self.env.controller.record_window_size(flow=self.flow, window_size=self.W) def reset_timer(self): self.time_out_event=PacketTimeOut(env=self.env, delay=self.time_out, actor=self, expected_time=self.env.now + self.time_out)