Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 75 additions & 31 deletions monarch/pcf/app_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ def unblock(self, ports=None):
self.run_cmd_on_diego_cell(cmds, suppress_output=True)

def manipulate_network(self, *, latency=None, latency_sd=None, loss=None, loss_r=None,
duplication=None, corruption=None):
duplication=None, corruption=None, rate=None, direction='egress'):
"""
Manipulate the network traffic from the application instance and its services. This will not work simultaneously
with network shaping. (Manipulates egress traffic).
with network shaping, but the network shaping behavior can also be achieved via the rate parameter of this
method.

:param latency: int; Latency to introduce in milliseconds.
:param latency_sd: int; Standard deviation of the latency in milliseconds, if None, there will be no variance.
Expand All @@ -169,40 +170,82 @@ def manipulate_network(self, *, latency=None, latency_sd=None, loss=None, loss_r
:param loss_r: float; Correlation coefficient in the range [0, 1] of the packet loss.
:param duplication: float; Percent in the range [0, 1] of packets which should be duplicated.
:param corruption: float; Percent in the range [0, 1] of packets which should be corrupted.
:param direction: str; Traffic direction to manipulate.
:param rate: Throughput rate limiting in kbps. See `rate` in https://man7.org/linux/man-pages/man8/tc-netem.8.html
:return: int; A returncode if any of the bosh ssh instances do not return 0.
"""
if not (latency or loss or duplication or corruption):
if not (latency or loss or duplication or corruption or rate):
# if no actions are specified, it is a noop
return 0

cmd = ['sudo', 'tc', 'qdisc', 'add', 'dev', self['diego_vi'], 'root', 'netem']
if latency:
assert latency > 0
cmd.extend(['delay', '{}ms'.format(latency)])
if latency_sd:
assert latency_sd > 0
cmd.extend(['{}ms'.format(latency_sd), 'distribution', 'normal'])
if loss:
assert 0 <= loss <= 1
cmd.extend(['loss', '{}%'.format(loss * 100)])
if loss_r:
assert 0 <= loss_r <= 1
cmd.append('{}%'.format(loss_r * 100))
if duplication:
assert 0 <= duplication <= 1
cmd.extend(['duplicate', '{}%'.format(duplication * 100)])
if corruption:
assert 0 <= corruption <= 1
cmd.extend(['corrupt', '{}%'.format(corruption * 100)])
rcode, _, _ = self.run_cmd_on_diego_cell(' '.join(cmd))
if rcode:
logger.error("Failed to manipulate network for app instance with rcode %d!", rcode)
self.unmanipulate_network()
return rcode
direction = util.parse_direction(direction)
assert direction, "Could not parse direction!"

setup_cmds = []
netem_cmds = []
iface = self['diego_vi']

# For notes regarding applying netem to ingress traffic see:
# https://wiki.linuxfoundation.org/networking/netem#how_can_i_use_netem_on_incoming_traffic3f

if direction in {'ingress', 'both'}:
# NOTE: ifb module will be left as loaded. this seems harmless enough and is simpler than trying to
# determine if we are the ones who loaded it. likewise with the ifb0 ip link being left in the up state
# N.B.: if changes are made to the filter command for some reason, then corresponding changes may be
# needed in the `unmanipulate_network` method since the del command used their is quite specific.
setup_cmds.extend([
'sudo modprobe ifb',
'sudo ip link set dev ifb0 up',
f'sudo tc qdisc add dev {iface} ingress',
f'sudo tc filter add dev {iface} parent ffff: protocol ip u32 match u32 0 0 flowid 1:1 action mirred egress redirect dev ifb0'
])
netem_cmds.append(['sudo', 'tc', 'qdisc', 'add', 'dev', 'ifb0', 'root', 'netem'])

if direction in {'egress', 'both'}:
netem_cmds.append(['sudo', 'tc', 'qdisc', 'add', 'dev', iface, 'root', 'netem'])

for netem_cmd in netem_cmds:
if latency:
assert latency > 0
netem_cmd.extend(['delay', '{}ms'.format(latency)])
if latency_sd:
assert latency_sd > 0
netem_cmd.extend(['{}ms'.format(latency_sd), 'distribution', 'normal'])
if loss:
assert 0 <= loss <= 1
netem_cmd.extend(['loss', '{}%'.format(loss * 100)])
if loss_r:
assert 0 <= loss_r <= 1
netem_cmd.append('{}%'.format(loss_r * 100))
if duplication:
assert 0 <= duplication <= 1
netem_cmd.extend(['duplicate', '{}%'.format(duplication * 100)])
if corruption:
assert 0 <= corruption <= 1
netem_cmd.extend(['corrupt', '{}%'.format(corruption * 100)])
if rate:
assert rate > 0
netem_cmd.extend(['rate', f'{rate}kbit'])

if len(setup_cmds) > 0:
self.run_cmd_on_diego_cell(setup_cmds, suppress_output=True)

for netem_cmd in netem_cmds:
rcode, _, _ = self.run_cmd_on_diego_cell(' '.join(netem_cmd))
if rcode:
logger.error("Failed to manipulate network for app instance with rcode %d!", rcode)
self.unmanipulate_network()
return rcode

return 0

def shape_network(self, download_limit=None, upload_limit=None):
"""
TODO: recommend deprecating this method. The new `rate` param in manipulate_network functionally replaces it,
and this seems appropriate based on the following note from https://man7.org/linux/man-pages/man8/tc-netem.8.html...
"rate - delay packets based on packet size and is a replacement for TBF." TBF is what shape_network
utilizes.

Impose bandwidth limits on the application's ingress traffic. This will not work simultaneously with other
network traffic manipulations and will also be undone by calling `unmanipulate_network`.

Expand Down Expand Up @@ -258,12 +301,13 @@ def unmanipulate_network(self):
"""
Undo traffic manipulation changes to the application and its services.
"""
# https://serverfault.com/a/488914/648174 (and the link given there)
# By just deleting the root/ingress devices, it will reset everything else.
iface = self['diego_vi']
self.run_cmd_on_diego_cell([
'sudo tc qdisc del dev {} root'.format(iface),
'sudo tc filter del dev {} parent ffff: protocol ip prio 1 u32 match ip src 0.0.0.0/0'.format(iface),
'sudo tc qdisc del dev {} handle ffff: ingress'.format(iface),
'sudo tc qdisc del dev {} ingress'.format(iface)
f'sudo tc qdisc del dev {iface} root',
f'sudo tc qdisc del dev {iface} ingress',
'sudo tc qdisc del dev ifb0 root',
], suppress_output=True)

def perform_speedtest(self, server=None):
Expand Down
4 changes: 4 additions & 0 deletions monarch/pcf/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ def from_service_info(service_type, service_config):
dnslookup(hostname), 'tcp',
credentials['amqp']['protocols']['management']['port']
))
elif re.match("postgresql-\d+-odb", service['type']):
service['user'] = credentials['username']
service['password'] = credentials['password']
service['hosts'].add((credentials['db_host'], 'tcp', credentials['db_port']))
else:
logger.warning("Unrecognized service '%s'", service['type'])

Expand Down
8 changes: 7 additions & 1 deletion monarch/pcf/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ def run_cmd_on_container(dcid, contid, cmd, suppress_output=False):
:param suppress_output: bool; If true, no extra debug output will be printed when an error occurs.
:return: int, str, str; Returncode, stdout, stderr.
"""
shell_cmd = 'exec sudo /var/vcap/packages/runc/bin/runc exec -t {} /bin/bash'.format(contid)
cfg = Config()
use_containerd = cfg.get("use-containerd") is not None
if use_containerd:
# Refer to: https://devops.stackexchange.com/a/13781/27344
shell_cmd = f'exec sudo /var/vcap/packages/containerd/bin/ctr -a /var/vcap/sys/run/containerd/containerd.sock -n garden tasks exec --exec-id my-shell --tty {contid} /bin/bash'
else:
shell_cmd = f'exec sudo /var/vcap/packages/runc/bin/runc exec -t {contid} /bin/bash'
if isinstance(cmd, list):
cmd.insert(0, shell_cmd)
else:
Expand Down