ganeti.bdev_unittest.py 8.13 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/python
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for unittesting the bdev module"""


25
import os
26
27
28
import unittest

from ganeti import bdev
29
from ganeti import errors
30

31
32
import testutils

33

34
class TestDRBD8Runner(testutils.GanetiTestCase):
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
  """Testing case for DRBD8"""

  @staticmethod
  def _has_disk(data, dname, mname):
    """Check local disk corectness"""
    retval = (
      "local_dev" in data and
      data["local_dev"] == dname and
      "meta_dev" in data and
      data["meta_dev"] == mname and
      "meta_index" in data and
      data["meta_index"] == 0
      )
    return retval

  @staticmethod
  def _has_net(data, local, remote):
    """Check network connection parameters"""
    retval = (
      "local_addr" in data and
      data["local_addr"] == local and
      "remote_addr" in data and
      data["remote_addr"] == remote
      )
    return retval

  def testParserCreation(self):
    """Test drbdsetup show parser creation"""
    bdev.DRBD8._GetShowParser()

Manuel Franceschini's avatar
Manuel Franceschini committed
65
66
67
  def testParser80(self):
    """Test drbdsetup show parser for disk and network version 8.0"""
    data = self._ReadTestData("bdev-drbd-8.0.txt")
68
69
70
71
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
                                   "/dev/xenvg/test.meta"),
                    "Wrong local disk info")
72
73
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
                                  ("192.0.2.2", 11000)),
Iustin Pop's avatar
Iustin Pop committed
74
75
                    "Wrong network info (8.0.x)")

Manuel Franceschini's avatar
Manuel Franceschini committed
76
77
78
  def testParser83(self):
    """Test drbdsetup show parser for disk and network version 8.3"""
    data = self._ReadTestData("bdev-drbd-8.3.txt")
Iustin Pop's avatar
Iustin Pop committed
79
80
81
82
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
                                   "/dev/xenvg/test.meta"),
                    "Wrong local disk info")
83
84
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
                                  ("192.0.2.2", 11000)),
Manuel Franceschini's avatar
Manuel Franceschini committed
85
                    "Wrong network info (8.0.x)")
86

Manuel Franceschini's avatar
Manuel Franceschini committed
87
88
89
  def testParserNetIP4(self):
    """Test drbdsetup show parser for IPv4 network"""
    data = self._ReadTestData("bdev-drbd-net-ip4.txt")
90
91
92
93
94
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(("local_dev" not in result and
                     "meta_dev" not in result and
                     "meta_index" not in result),
                    "Should not find local disk info")
95
96
    self.failUnless(self._has_net(result, ("192.0.2.1", 11002),
                                  ("192.0.2.2", 11002)),
Manuel Franceschini's avatar
Manuel Franceschini committed
97
98
99
100
101
102
103
104
105
106
107
108
109
                    "Wrong network info (IPv4)")

  def testParserNetIP6(self):
    """Test drbdsetup show parser for IPv6 network"""
    data = self._ReadTestData("bdev-drbd-net-ip6.txt")
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(("local_dev" not in result and
                     "meta_dev" not in result and
                     "meta_index" not in result),
                    "Should not find local disk info")
    self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
                                  ("2001:db8:66::1", 11048)),
                    "Wrong network info (IPv6)")
110
111

  def testParserDisk(self):
Manuel Franceschini's avatar
Manuel Franceschini committed
112
113
    """Test drbdsetup show parser for disk"""
    data = self._ReadTestData("bdev-drbd-disk.txt")
114
115
116
117
118
119
120
121
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
                                   "/dev/xenvg/test.meta"),
                    "Wrong local disk info")
    self.failUnless(("local_addr" not in result and
                     "remote_addr" not in result),
                    "Should not find network info")

122

123
class TestDRBD8Status(testutils.GanetiTestCase):
124
125
126
127
  """Testing case for DRBD8 /proc status"""

  def setUp(self):
    """Read in txt data"""
Iustin Pop's avatar
Iustin Pop committed
128
    testutils.GanetiTestCase.setUp(self)
129
    proc_data = self._TestDataFilename("proc_drbd8.txt")
130
    proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
Iustin Pop's avatar
Iustin Pop committed
131
    proc83_data = self._TestDataFilename("proc_drbd83.txt")
132
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
133
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
Iustin Pop's avatar
Iustin Pop committed
134
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
135
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
136
    self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
Iustin Pop's avatar
Iustin Pop committed
137
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
138

139
140
141
142
143
144
145
  def testIOErrors(self):
    """Test handling of errors while reading the proc file."""
    temp_file = self._CreateTempFile()
    os.unlink(temp_file)
    self.failUnlessRaises(errors.BlockDeviceError,
                          bdev.DRBD8._GetProcData, filename=temp_file)

146
147
148
149
150
151
152
153
154
155
156
157
158
  def testHelper(self):
    """Test reading usermode_helper in /sys."""
    sys_drbd_helper = self._TestDataFilename("sys_drbd_usermode_helper.txt")
    drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
    self.failUnlessEqual(drbd_helper, "/bin/true")

  def testHelperIOErrors(self):
    """Test handling of errors while reading usermode_helper in /sys."""
    temp_file = self._CreateTempFile()
    os.unlink(temp_file)
    self.failUnlessRaises(errors.BlockDeviceError,
                          bdev.DRBD8.GetUsermodeHelper, filename=temp_file)

159
160
161
  def testMinorNotFound(self):
    """Test not-found-minor in /proc"""
    self.failUnless(9 not in self.mass_data)
Iustin Pop's avatar
Iustin Pop committed
162
    self.failUnless(9 not in self.mass83_data)
163
    self.failUnless(3 not in self.mass80e_data)
164
165
166
167
168
169
170

  def testLineNotMatch(self):
    """Test wrong line passed to DRBD8Status"""
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")

  def testMinor0(self):
    """Test connected, primary device"""
Iustin Pop's avatar
Iustin Pop committed
171
172
173
174
175
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[0])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_connected and stats.is_primary and
                      stats.peer_secondary and stats.is_disk_uptodate)
176
177
178

  def testMinor1(self):
    """Test connected, secondary device"""
Iustin Pop's avatar
Iustin Pop committed
179
180
181
182
183
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[1])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_connected and stats.is_secondary and
                      stats.peer_primary and stats.is_disk_uptodate)
184

185
186
  def testMinor2(self):
    """Test unconfigured device"""
187
    for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
Iustin Pop's avatar
Iustin Pop committed
188
189
      stats = bdev.DRBD8Status(data[2])
      self.failIf(stats.is_in_use)
190

191
192
  def testMinor4(self):
    """Test WFconn device"""
Iustin Pop's avatar
Iustin Pop committed
193
194
195
196
197
198
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[4])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_wfconn and stats.is_primary and
                      stats.rrole == 'Unknown' and
                      stats.is_disk_uptodate)
199
200
201

  def testMinor6(self):
    """Test diskless device"""
Iustin Pop's avatar
Iustin Pop committed
202
203
204
205
206
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[6])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_connected and stats.is_secondary and
                      stats.peer_primary and stats.is_diskless)
207
208
209

  def testMinor8(self):
    """Test standalone device"""
Iustin Pop's avatar
Iustin Pop committed
210
211
212
213
214
215
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[8])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_standalone and
                      stats.rrole == 'Unknown' and
                      stats.is_disk_uptodate)
216

217
if __name__ == '__main__':
218
  testutils.GanetiTestProgram()