ganeti.bdev_unittest.py 6.71 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/python
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for unittesting the bdev module"""


25
import os
26
27
import unittest

28
import testutils
29
from ganeti import bdev
30
from ganeti import errors
31
32


33
class TestDRBD8Runner(testutils.GanetiTestCase):
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
  """Testing case for DRBD8"""

  @staticmethod
  def _has_disk(data, dname, mname):
    """Check local disk corectness"""
    retval = (
      "local_dev" in data and
      data["local_dev"] == dname and
      "meta_dev" in data and
      data["meta_dev"] == mname and
      "meta_index" in data and
      data["meta_index"] == 0
      )
    return retval

  @staticmethod
  def _has_net(data, local, remote):
    """Check network connection parameters"""
    retval = (
      "local_addr" in data and
      data["local_addr"] == local and
      "remote_addr" in data and
      data["remote_addr"] == remote
      )
    return retval

  def testParserCreation(self):
    """Test drbdsetup show parser creation"""
    bdev.DRBD8._GetShowParser()

Iustin Pop's avatar
Iustin Pop committed
64
  def testParserBoth80(self):
65
    """Test drbdsetup show parser for disk and network"""
66
    data = self._ReadTestData("bdev-both.txt")
67
68
69
70
71
72
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
                                   "/dev/xenvg/test.meta"),
                    "Wrong local disk info")
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
                                  ("192.168.1.2", 11000)),
Iustin Pop's avatar
Iustin Pop committed
73
74
75
76
77
78
79
80
81
82
83
84
                    "Wrong network info (8.0.x)")

  def testParserBoth83(self):
    """Test drbdsetup show parser for disk and network"""
    data = self._ReadTestData("bdev-8.3-both.txt")
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
                                   "/dev/xenvg/test.meta"),
                    "Wrong local disk info")
    self.failUnless(self._has_net(result, ("192.168.1.1", 11000),
                                  ("192.168.1.2", 11000)),
                    "Wrong network info (8.2.x)")
85
86
87

  def testParserNet(self):
    """Test drbdsetup show parser for disk and network"""
88
    data = self._ReadTestData("bdev-net.txt")
89
90
91
92
93
94
95
96
97
98
99
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(("local_dev" not in result and
                     "meta_dev" not in result and
                     "meta_index" not in result),
                    "Should not find local disk info")
    self.failUnless(self._has_net(result, ("192.168.1.1", 11002),
                                  ("192.168.1.2", 11002)),
                    "Wrong network info")

  def testParserDisk(self):
    """Test drbdsetup show parser for disk and network"""
100
    data = self._ReadTestData("bdev-disk.txt")
101
102
103
104
105
106
107
108
    result = bdev.DRBD8._GetDevInfo(data)
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
                                   "/dev/xenvg/test.meta"),
                    "Wrong local disk info")
    self.failUnless(("local_addr" not in result and
                     "remote_addr" not in result),
                    "Should not find network info")

109

110
class TestDRBD8Status(testutils.GanetiTestCase):
111
112
113
114
  """Testing case for DRBD8 /proc status"""

  def setUp(self):
    """Read in txt data"""
Iustin Pop's avatar
Iustin Pop committed
115
    testutils.GanetiTestCase.setUp(self)
116
    proc_data = self._TestDataFilename("proc_drbd8.txt")
Iustin Pop's avatar
Iustin Pop committed
117
    proc83_data = self._TestDataFilename("proc_drbd83.txt")
118
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
Iustin Pop's avatar
Iustin Pop committed
119
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
120
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
Iustin Pop's avatar
Iustin Pop committed
121
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
122

123
124
125
126
127
128
129
  def testIOErrors(self):
    """Test handling of errors while reading the proc file."""
    temp_file = self._CreateTempFile()
    os.unlink(temp_file)
    self.failUnlessRaises(errors.BlockDeviceError,
                          bdev.DRBD8._GetProcData, filename=temp_file)

130
131
132
  def testMinorNotFound(self):
    """Test not-found-minor in /proc"""
    self.failUnless(9 not in self.mass_data)
Iustin Pop's avatar
Iustin Pop committed
133
    self.failUnless(9 not in self.mass83_data)
134
135
136
137
138
139
140

  def testLineNotMatch(self):
    """Test wrong line passed to DRBD8Status"""
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")

  def testMinor0(self):
    """Test connected, primary device"""
Iustin Pop's avatar
Iustin Pop committed
141
142
143
144
145
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[0])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_connected and stats.is_primary and
                      stats.peer_secondary and stats.is_disk_uptodate)
146
147
148

  def testMinor1(self):
    """Test connected, secondary device"""
Iustin Pop's avatar
Iustin Pop committed
149
150
151
152
153
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[1])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_connected and stats.is_secondary and
                      stats.peer_primary and stats.is_disk_uptodate)
154

155
156
  def testMinor2(self):
    """Test unconfigured device"""
Iustin Pop's avatar
Iustin Pop committed
157
158
159
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[2])
      self.failIf(stats.is_in_use)
160

161
162
  def testMinor4(self):
    """Test WFconn device"""
Iustin Pop's avatar
Iustin Pop committed
163
164
165
166
167
168
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[4])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_wfconn and stats.is_primary and
                      stats.rrole == 'Unknown' and
                      stats.is_disk_uptodate)
169
170
171

  def testMinor6(self):
    """Test diskless device"""
Iustin Pop's avatar
Iustin Pop committed
172
173
174
175
176
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[6])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_connected and stats.is_secondary and
                      stats.peer_primary and stats.is_diskless)
177
178
179

  def testMinor8(self):
    """Test standalone device"""
Iustin Pop's avatar
Iustin Pop committed
180
181
182
183
184
185
    for data in [self.mass_data, self.mass83_data]:
      stats = bdev.DRBD8Status(data[8])
      self.failUnless(stats.is_in_use)
      self.failUnless(stats.is_standalone and
                      stats.rrole == 'Unknown' and
                      stats.is_disk_uptodate)
186

187
188
if __name__ == '__main__':
  unittest.main()