Commit b2592d38 authored by Nikos Skalkotos's avatar Nikos Skalkotos
Browse files

Monitor helper through UDP packages in xen

parent 6edac4de
......@@ -44,6 +44,11 @@ MSG_TYPE_TASK_END="TASK_END"
STDERR_LINE_SIZE=10
IP="10.0.0.1"
NETWORK="$IP/24"
BROADCAST="${IP%.*}.255"
PORT="48000"
add_cleanup() {
local cmd=""
for arg; do cmd+=$(printf "%q " "$arg"); done
......@@ -69,7 +74,8 @@ send_result_xen() {
}
send_monitor_message_xen() {
echo "$@" | socat STDIN INTERFACE:eth0
#Broadcast the message
echo "$@" | socat STDIO UDP-DATAGRAM:${BROADCAST}:${PORT},broadcast
}
prepare_helper() {
......@@ -93,9 +99,9 @@ prepare_helper() {
;;
xen-hvm|xen-pvm)
$MOUNT -t xenfs xenfs /proc/xen
iptables -P OUTPUT DROP
ip link set eth0 arp off
ip addr add "$NETWORK" dev eth0
ip link set eth0 up
ip route add default dev eth0
export DOMID=$(xenstore-read domid)
HYPERVISOR=xen
;;
......
......@@ -22,6 +22,9 @@ import os
import time
import json
import re
import optparse
import socket
from scapy.all import sniff
LINESIZE = 512
BUFSIZE = 512
......@@ -38,115 +41,155 @@ PROTOCOL = {
'ERROR': ('error', 'messages')}
def error(msg):
sys.stderr.write("HELPER-MONITOR ERROR: %s\n" % msg)
sys.exit(1)
def parse_options(input_args):
usage = "Usage: %prog [options] <file-sescriptor>"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-i", "--interface", type="string", dest="ifname",
default=None, metavar="IFNAME",
help="listen on interface IFNAME for monitoring data")
def send(fd, msg_type, value):
subtype, value_name = PROTOCOL[msg_type]
parser.add_option("-f", "--filter", type="string", dest="filter",
help="add FILTER to incomint traffice when working on an interface",
default=None, metavar="FILTER")
msg = {}
msg['type'] = MSG_TYPE
msg['subtype'] = subtype
msg[value_name] = value
msg['timestamp'] = time.time()
os.write(fd, "%s\n" % json.dumps(msg))
options, args = parser.parse_args(input_args)
if len(args) != 1:
parser.error('Wrong number of argumets')
if __name__ == "__main__":
usage = "Usage: %s <file-descriptor>\n" % PROGNAME
options.fd = args[0]
if len(sys.argv) != 2:
sys.stderr.write(usage)
sys.exit(1)
if options.filter is not None and options.ifname is None:
parser.error('You need to define an interface since filters are' \
'defined')
try:
fd = int(sys.argv[1])
except ValueError:
error("File descriptor is not an integer")
return options
try:
os.fstat(fd)
except OSError:
error("File descriptor is not valid")
lines_left = 0
line_count = 0
stderr = ""
line = ""
while True:
# Can't use sys.stdin.readline since I want unbuffered I/O
new_data = os.read(sys.stdin.fileno(), BUFSIZE)
def error(msg):
sys.stderr.write("HELPER-MONITOR ERROR: %s\n" % msg)
sys.exit(1)
if not new_data:
if not line:
break
class HelperMonitor(object):
def __init__(self, fd):
self.fd = fd
self.lines_left = 0
self.line_count = 0
self.stderr = ""
self.line = ""
def process(self, data):
if not data:
if not self.line:
return
else:
new_data = '\n'
data = '\n'
while True:
split = new_data.split('\n', 1)
line += split[0]
split = data.split('\n', 1)
self.line += split[0]
if len(split) == 1:
if len(line) > LINESIZE:
if len(self.line) > LINESIZE:
error("Line size exceeded the maximum allowed size")
break
new_data = split[1]
data = split[1]
line_count += 1
if line_count >= MAXLINES + 1:
self.line_count += 1
if self.line_count >= MAXLINES + 1:
error("Exceeded maximum allowed number of lines: %d." %
MAXLINES)
if lines_left > 0:
stderr += "%s\n" % line
lines_left -= 1
if lines_left == 0:
send(fd, "STDERR", stderr)
stderr = ""
line = ""
if self.lines_left > 0:
self.stderr += "%s\n" % line
self.lines_left -= 1
if self.lines_left == 0:
self.send("STDERR", stderr)
self.stderr = ""
self.line = ""
continue
line = line.strip()
if len(line) == 0:
self.line = self.line.strip()
if len(self.line) == 0:
continue
if line.startswith("STDERR:"):
if self.line.startswith("STDERR:"):
m = re.match("STDERR:(\d+):(.*)", line)
if not m:
error("Invalid syntax for STDERR line")
try:
lines_left = int(m.group(1))
self.lines_left = int(m.group(1))
except ValueError:
error("Second field in STDERR line must be an integer")
if lines_left > STDERR_MAXLINES:
if self.lines_left > STDERR_MAXLINES:
error("Too many lines in the STDERR output")
elif lines_left < 0:
elif self.lines_left < 0:
error("Second field of STDERR: %d is invalid" % lines_left)
if lines_left > 0:
stderr = m.group(2) + "\n"
lines_left -= 1
if lines_left == 0:
send(fd, "STDERR", stderr)
stderr = ""
elif line.startswith("TASK_START:") \
or line.startswith("TASK_END:") \
or line.startswith("WARNING:") \
or line.startswith("ERROR:"):
(msg_type, _, value) = line.partition(':')
if line.startswith("WARNING:") or line.startswith("ERROR:"):
if self.lines_left > 0:
self.stderr = m.group(2) + "\n"
self.lines_left -= 1
if self.lines_left == 0:
self.send("STDERR", stderr)
self.stderr = ""
elif self.line.startswith("TASK_START:") \
or self.line.startswith("TASK_END:") \
or self.line.startswith("WARNING:") \
or self.line.startswith("ERROR:"):
(msg_type, _, value) = self.line.partition(':')
if self.line.startswith("WARNING:") or \
self.line.startswith("ERROR:"):
value = [value]
send(fd, msg_type, value)
self.send(msg_type, value)
else:
error("Unknown command!")
# Remove the processed line
line = ""
self.line = ""
def send(self, msg_type, value):
subtype, value_name = PROTOCOL[msg_type]
msg = {}
msg['type'] = MSG_TYPE
msg['subtype'] = subtype
msg[value_name] = value
msg['timestamp'] = time.time()
os.write(self.fd, "%s\n" % json.dumps(msg))
if __name__ == "__main__":
options = parse_options(sys.argv[1:])
try:
fd = int(options.fd)
except ValueError:
error("File descriptor is not an integer")
try:
os.fstat(fd)
except OSError:
error("File descriptor is not valid")
monitor = HelperMonitor(fd)
if options.ifname is not None:
try:
sniff(filter=options.filter, iface=options.ifname,
prn=lambda x: monitor.process(x.payload.getfieldval("load")))
except socket.error as e:
# Network is down
if e.errno == 100:
monitor.process(None)
else:
raise
else:
for data in os.read(sys.stdin.fileno(), BUFSIZE):
monitor.process(data)
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :
......@@ -3,13 +3,10 @@
dir=$(dirname "$0")
. "$dir"/vif-common.sh
ip link set "$vif" arp off
ip link set "$vif" allmulticast off
ip link set "$vif" dynamic off
ip link set "$vif" multicast off
echo 0 > /proc/sys/net/ipv6/conf/$vif/autoconf
echo 1 > /proc/sys/net/ipv6/conf/$vif/disable_ipv6
echo 0 > /proc/sys/net/ipv4/conf/$vif/forwarding
echo 0 > /proc/sys/net/ipv4/conf/$vif/proxy_arp
ip link set $vif up
......
......@@ -29,7 +29,7 @@ launch_helper() {
if ! xenstore-exists snf-image-helper; then
xenstore-write snf-image-helper ""
#add_cleanup xenstore-rm snf-image-helper
#add_cleanup xenstore-rm snf-image-helper
fi
helperid=$(xm domid "$name")
......@@ -37,14 +37,14 @@ launch_helper() {
add_cleanup xenstore-rm snf-image-helper/${helperid}
xenstore-chmod snf-image-helper/${helperid} r0 w${helperid}
socat INTERFACE:vif${helperid}.0 EXEC:"./helper-monitor.py ${MONITOR_FD}" &
filter='udp and dst port 48000 and dst host 10.0.0.255 and src host 10.0.0.1'
$TIMEOUT -k $HELPER_HARD_TIMEOUT $HELPER_SOFT_TIMEOUT \
./helper-monitor.py -i "vif${helperid}.0" -f "$filter" ${MONITOR_FD} &
monitor_pid=$!
set +e
$TIMEOUT -k $HELPER_HARD_TIMEOUT $HELPER_SOFT_TIMEOUT \
socat EXEC:"xm console $name",pty STDOUT | sed -u 's|^|HELPER: |g'
rc=$?
set -e
......@@ -52,6 +52,17 @@ launch_helper() {
check_helper_rc "$rc"
set +e
wait "$monitor_pid"
monitor_rc=$?
set -e
if [ $rc -ne 0 ];
log_error "Helper VM monitoring failed"
report_error "Helper VM monitoring failed"
exit 1
fi
report_info "Checking customization status..."
result=$(xenstore-read snf-image-helper/$helperid)
report_info "Customization status is: $result"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment