ganeti.utils_unittest.py 33.2 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/python
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for unittesting the utils module"""

import unittest
import os
import time
import tempfile
import os.path
29
import os
Iustin Pop's avatar
Iustin Pop committed
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
Iustin Pop's avatar
Iustin Pop committed
36
import string
Iustin Pop's avatar
Iustin Pop committed
37
38

import ganeti
39
import testutils
40
from ganeti import constants
41
from ganeti import utils
42
from ganeti import errors
Iustin Pop's avatar
Iustin Pop committed
43
from ganeti.utils import IsProcessAlive, RunCmd, \
44
     RemoveFile, MatchNameComponent, FormatUnit, \
Iustin Pop's avatar
Iustin Pop committed
45
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
46
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
Iustin Pop's avatar
Iustin Pop committed
47
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
48
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime
Iustin Pop's avatar
Iustin Pop committed
49

Iustin Pop's avatar
Iustin Pop committed
50
51
from ganeti.errors import LockError, UnitParseError, GenericError, \
     ProgrammerError
Iustin Pop's avatar
Iustin Pop committed
52

53

Iustin Pop's avatar
Iustin Pop committed
54
55
class TestIsProcessAlive(unittest.TestCase):
  """Testing case for IsProcessAlive"""
56

Iustin Pop's avatar
Iustin Pop committed
57
58
59
60
61
62
  def testExists(self):
    mypid = os.getpid()
    self.assert_(IsProcessAlive(mypid),
                 "can't find myself running")

  def testNotExisting(self):
63
64
65
66
67
68
69
70
    pid_non_existing = os.fork()
    if pid_non_existing == 0:
      os._exit(0)
    elif pid_non_existing < 0:
      raise SystemError("can't fork")
    os.waitpid(pid_non_existing, 0)
    self.assert_(not IsProcessAlive(pid_non_existing),
                 "nonexisting process detected")
Iustin Pop's avatar
Iustin Pop committed
71

72

73
class TestPidFileFunctions(unittest.TestCase):
74
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
75
76
77
78

  def setUp(self):
    self.dir = tempfile.mkdtemp()
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
Iustin Pop's avatar
Iustin Pop committed
79
    utils.DaemonPidFileName = self.f_dpn
80
81

  def testPidFileFunctions(self):
82
    pid_file = self.f_dpn('test')
83
    utils.WritePidFile('test')
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    self.failUnless(os.path.exists(pid_file),
                    "PID file should have been created")
    read_pid = utils.ReadPidFile(pid_file)
    self.failUnlessEqual(read_pid, os.getpid())
    self.failUnless(utils.IsProcessAlive(read_pid))
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
    utils.RemovePidFile('test')
    self.failIf(os.path.exists(pid_file),
                "PID file should not exist anymore")
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
                         "ReadPidFile should return 0 for missing pid file")
    fh = open(pid_file, "w")
    fh.write("blah\n")
    fh.close()
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
                         "ReadPidFile should return 0 for invalid pid file")
100
    utils.RemovePidFile('test')
101
102
    self.failIf(os.path.exists(pid_file),
                "PID file should not exist anymore")
103

Iustin Pop's avatar
Iustin Pop committed
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
  def testKill(self):
    pid_file = self.f_dpn('child')
    r_fd, w_fd = os.pipe()
    new_pid = os.fork()
    if new_pid == 0: #child
      utils.WritePidFile('child')
      os.write(w_fd, 'a')
      signal.pause()
      os._exit(0)
      return
    # else we are in the parent
    # wait until the child has written the pid file
    os.read(r_fd, 1)
    read_pid = utils.ReadPidFile(pid_file)
    self.failUnlessEqual(read_pid, new_pid)
    self.failUnless(utils.IsProcessAlive(new_pid))
120
    utils.KillProcess(new_pid, waitpid=True)
Iustin Pop's avatar
Iustin Pop committed
121
122
123
124
    self.failIf(utils.IsProcessAlive(new_pid))
    utils.RemovePidFile('child')
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)

125
  def tearDown(self):
126
127
    for name in os.listdir(self.dir):
      os.unlink(os.path.join(self.dir, name))
128
129
    os.rmdir(self.dir)

Iustin Pop's avatar
Iustin Pop committed
130

131
class TestRunCmd(testutils.GanetiTestCase):
Iustin Pop's avatar
Iustin Pop committed
132
133
134
  """Testing case for the RunCmd function"""

  def setUp(self):
Iustin Pop's avatar
Iustin Pop committed
135
    testutils.GanetiTestCase.setUp(self)
Iustin Pop's avatar
Iustin Pop committed
136
    self.magic = time.ctime() + " ganeti test"
Iustin Pop's avatar
Iustin Pop committed
137
    self.fname = self._CreateTempFile()
Iustin Pop's avatar
Iustin Pop committed
138
139

  def testOk(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
140
    """Test successful exit code"""
Iustin Pop's avatar
Iustin Pop committed
141
142
    result = RunCmd("/bin/sh -c 'exit 0'")
    self.assertEqual(result.exit_code, 0)
143
    self.assertEqual(result.output, "")
Iustin Pop's avatar
Iustin Pop committed
144
145
146
147
148

  def testFail(self):
    """Test fail exit code"""
    result = RunCmd("/bin/sh -c 'exit 1'")
    self.assertEqual(result.exit_code, 1)
149
    self.assertEqual(result.output, "")
Iustin Pop's avatar
Iustin Pop committed
150
151
152
153
154
155

  def testStdout(self):
    """Test standard output"""
    cmd = 'echo -n "%s"' % self.magic
    result = RunCmd("/bin/sh -c '%s'" % cmd)
    self.assertEqual(result.stdout, self.magic)
156
157
158
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
    self.assertEqual(result.output, "")
    self.assertFileContent(self.fname, self.magic)
Iustin Pop's avatar
Iustin Pop committed
159
160
161
162
163
164

  def testStderr(self):
    """Test standard error"""
    cmd = 'echo -n "%s"' % self.magic
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
    self.assertEqual(result.stderr, self.magic)
165
166
167
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
    self.assertEqual(result.output, "")
    self.assertFileContent(self.fname, self.magic)
Iustin Pop's avatar
Iustin Pop committed
168
169
170
171

  def testCombined(self):
    """Test combined output"""
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172
    expected = "A" + self.magic + "B" + self.magic
Iustin Pop's avatar
Iustin Pop committed
173
    result = RunCmd("/bin/sh -c '%s'" % cmd)
174
175
176
177
    self.assertEqual(result.output, expected)
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
    self.assertEqual(result.output, "")
    self.assertFileContent(self.fname, expected)
Iustin Pop's avatar
Iustin Pop committed
178
179

  def testSignal(self):
180
181
    """Test signal"""
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
Iustin Pop's avatar
Iustin Pop committed
182
    self.assertEqual(result.signal, 15)
183
    self.assertEqual(result.output, "")
Iustin Pop's avatar
Iustin Pop committed
184

185
186
187
188
189
190
191
192
193
194
195
196
197
  def testListRun(self):
    """Test list runs"""
    result = RunCmd(["true"])
    self.assertEqual(result.signal, None)
    self.assertEqual(result.exit_code, 0)
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
    self.assertEqual(result.signal, None)
    self.assertEqual(result.exit_code, 1)
    result = RunCmd(["echo", "-n", self.magic])
    self.assertEqual(result.signal, None)
    self.assertEqual(result.exit_code, 0)
    self.assertEqual(result.stdout, self.magic)

198
199
200
201
202
203
204
  def testFileEmptyOutput(self):
    """Test file output"""
    result = RunCmd(["true"], output=self.fname)
    self.assertEqual(result.signal, None)
    self.assertEqual(result.exit_code, 0)
    self.assertFileContent(self.fname, "")

205
206
  def testLang(self):
    """Test locale environment"""
207
208
209
210
211
212
213
214
215
216
217
218
219
220
    old_env = os.environ.copy()
    try:
      os.environ["LANG"] = "en_US.UTF-8"
      os.environ["LC_ALL"] = "en_US.UTF-8"
      result = RunCmd(["locale"])
      for line in result.output.splitlines():
        key, value = line.split("=", 1)
        # Ignore these variables, they're overridden by LC_ALL
        if key == "LANG" or key == "LANGUAGE":
          continue
        self.failIf(value and value != "C" and value != '"C"',
            "Variable %s is set to the invalid value '%s'" % (key, value))
    finally:
      os.environ = old_env
221

222
223
224
225
226
227
228
229
230
231
232
  def testDefaultCwd(self):
    """Test default working directory"""
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")

  def testCwd(self):
    """Test default working directory"""
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
    cwd = os.getcwd()
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)

Iustin Pop's avatar
Iustin Pop committed
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279

class TestRemoveFile(unittest.TestCase):
  """Test case for the RemoveFile function"""

  def setUp(self):
    """Create a temp dir and file for each case"""
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
    os.close(fd)

  def tearDown(self):
    if os.path.exists(self.tmpfile):
      os.unlink(self.tmpfile)
    os.rmdir(self.tmpdir)


  def testIgnoreDirs(self):
    """Test that RemoveFile() ignores directories"""
    self.assertEqual(None, RemoveFile(self.tmpdir))


  def testIgnoreNotExisting(self):
    """Test that RemoveFile() ignores non-existing files"""
    RemoveFile(self.tmpfile)
    RemoveFile(self.tmpfile)


  def testRemoveFile(self):
    """Test that RemoveFile does remove a file"""
    RemoveFile(self.tmpfile)
    if os.path.exists(self.tmpfile):
      self.fail("File '%s' not removed" % self.tmpfile)


  def testRemoveSymlink(self):
    """Test that RemoveFile does remove symlinks"""
    symlink = self.tmpdir + "/symlink"
    os.symlink("no-such-file", symlink)
    RemoveFile(symlink)
    if os.path.exists(symlink):
      self.fail("File '%s' not removed" % symlink)
    os.symlink(self.tmpfile, symlink)
    RemoveFile(symlink)
    if os.path.exists(symlink):
      self.fail("File '%s' not removed" % symlink)


280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class TestRename(unittest.TestCase):
  """Test case for RenameFile"""

  def setUp(self):
    """Create a temporary directory"""
    self.tmpdir = tempfile.mkdtemp()
    self.tmpfile = os.path.join(self.tmpdir, "test1")

    # Touch the file
    open(self.tmpfile, "w").close()

  def tearDown(self):
    """Remove temporary directory"""
    shutil.rmtree(self.tmpdir)

  def testSimpleRename1(self):
    """Simple rename 1"""
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))

  def testSimpleRename2(self):
    """Simple rename 2"""
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
                     mkdir=True)

  def testRenameMkdir(self):
    """Rename with mkdir"""
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
                     mkdir=True)


Iustin Pop's avatar
Iustin Pop committed
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
class TestMatchNameComponent(unittest.TestCase):
  """Test case for the MatchNameComponent function"""

  def testEmptyList(self):
    """Test that there is no match against an empty list"""

    self.failUnlessEqual(MatchNameComponent("", []), None)
    self.failUnlessEqual(MatchNameComponent("test", []), None)

  def testSingleMatch(self):
    """Test that a single match is performed correctly"""
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
    for key in "test2", "test2.example", "test2.example.com":
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])

  def testMultipleMatches(self):
    """Test that a multiple match is returned as None"""
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
    for key in "test1", "test1.example":
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)


class TestFormatUnit(unittest.TestCase):
  """Test case for the FormatUnit function"""

  def testMiB(self):
336
337
338
339
340
341
342
343
344
345
346
347
    self.assertEqual(FormatUnit(1, 'h'), '1M')
    self.assertEqual(FormatUnit(100, 'h'), '100M')
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')

    self.assertEqual(FormatUnit(1, 'm'), '1')
    self.assertEqual(FormatUnit(100, 'm'), '100')
    self.assertEqual(FormatUnit(1023, 'm'), '1023')

    self.assertEqual(FormatUnit(1024, 'm'), '1024')
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
Iustin Pop's avatar
Iustin Pop committed
348
349

  def testGiB(self):
350
351
352
353
354
355
356
357
358
359
360
361
362
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')

    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')

    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
Iustin Pop's avatar
Iustin Pop committed
363
364

  def testTiB(self):
365
366
367
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
Iustin Pop's avatar
Iustin Pop committed
368

369
370
371
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
Iustin Pop's avatar
Iustin Pop committed
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412

class TestParseUnit(unittest.TestCase):
  """Test case for the ParseUnit function"""

  SCALES = (('', 1),
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))

  def testRounding(self):
    self.assertEqual(ParseUnit('0'), 0)
    self.assertEqual(ParseUnit('1'), 4)
    self.assertEqual(ParseUnit('2'), 4)
    self.assertEqual(ParseUnit('3'), 4)

    self.assertEqual(ParseUnit('124'), 124)
    self.assertEqual(ParseUnit('125'), 128)
    self.assertEqual(ParseUnit('126'), 128)
    self.assertEqual(ParseUnit('127'), 128)
    self.assertEqual(ParseUnit('128'), 128)
    self.assertEqual(ParseUnit('129'), 132)
    self.assertEqual(ParseUnit('130'), 132)

  def testFloating(self):
    self.assertEqual(ParseUnit('0'), 0)
    self.assertEqual(ParseUnit('0.5'), 4)
    self.assertEqual(ParseUnit('1.75'), 4)
    self.assertEqual(ParseUnit('1.99'), 4)
    self.assertEqual(ParseUnit('2.00'), 4)
    self.assertEqual(ParseUnit('2.01'), 4)
    self.assertEqual(ParseUnit('3.99'), 4)
    self.assertEqual(ParseUnit('4.00'), 4)
    self.assertEqual(ParseUnit('4.01'), 8)
    self.assertEqual(ParseUnit('1.5G'), 1536)
    self.assertEqual(ParseUnit('1.8G'), 1844)
    self.assertEqual(ParseUnit('8.28T'), 8682212)

  def testSuffixes(self):
    for sep in ('', ' ', '   ', "\t", "\t "):
      for suffix, scale in TestParseUnit.SCALES:
        for func in (lambda x: x, str.lower, str.upper):
413
414
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
                           1024 * scale)
Iustin Pop's avatar
Iustin Pop committed
415
416
417
418
419
420
421
422
423
424

  def testInvalidInput(self):
    for sep in ('-', '_', ',', 'a'):
      for suffix, _ in TestParseUnit.SCALES:
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)

    for suffix, _ in TestParseUnit.SCALES:
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)


425
class TestSshKeys(testutils.GanetiTestCase):
Iustin Pop's avatar
Iustin Pop committed
426
427
428
429
430
431
  """Test case for the AddAuthorizedKey function"""

  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')

432
  def setUp(self):
Iustin Pop's avatar
Iustin Pop committed
433
434
435
    testutils.GanetiTestCase.setUp(self)
    self.tmpname = self._CreateTempFile()
    handle = open(self.tmpname, 'w')
Iustin Pop's avatar
Iustin Pop committed
436
    try:
Iustin Pop's avatar
Iustin Pop committed
437
438
439
440
      handle.write("%s\n" % TestSshKeys.KEY_A)
      handle.write("%s\n" % TestSshKeys.KEY_B)
    finally:
      handle.close()
Iustin Pop's avatar
Iustin Pop committed
441
442

  def testAddingNewKey(self):
443
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
Iustin Pop's avatar
Iustin Pop committed
444

445
446
447
448
449
    self.assertFileContent(self.tmpname,
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
Iustin Pop's avatar
Iustin Pop committed
450

451
  def testAddingAlmostButNotCompletelyTheSameKey(self):
452
453
454
455
456
457
458
459
    AddAuthorizedKey(self.tmpname,
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')

    self.assertFileContent(self.tmpname,
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
Iustin Pop's avatar
Iustin Pop committed
460
461

  def testAddingExistingKeyWithSomeMoreSpaces(self):
462
463
    AddAuthorizedKey(self.tmpname,
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
Iustin Pop's avatar
Iustin Pop committed
464

465
466
467
468
    self.assertFileContent(self.tmpname,
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
Iustin Pop's avatar
Iustin Pop committed
469
470

  def testRemovingExistingKeyWithSomeMoreSpaces(self):
471
472
    RemoveAuthorizedKey(self.tmpname,
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
Iustin Pop's avatar
Iustin Pop committed
473

474
475
476
    self.assertFileContent(self.tmpname,
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
Iustin Pop's avatar
Iustin Pop committed
477
478

  def testRemovingNonExistingKey(self):
479
480
    RemoveAuthorizedKey(self.tmpname,
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
Iustin Pop's avatar
Iustin Pop committed
481

482
483
484
485
    self.assertFileContent(self.tmpname,
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
Iustin Pop's avatar
Iustin Pop committed
486
487


488
class TestEtcHosts(testutils.GanetiTestCase):
489
490
  """Test functions modifying /etc/hosts"""

491
  def setUp(self):
Iustin Pop's avatar
Iustin Pop committed
492
493
494
    testutils.GanetiTestCase.setUp(self)
    self.tmpname = self._CreateTempFile()
    handle = open(self.tmpname, 'w')
495
    try:
Iustin Pop's avatar
Iustin Pop committed
496
497
498
499
500
      handle.write('# This is a test file for /etc/hosts\n')
      handle.write('127.0.0.1\tlocalhost\n')
      handle.write('192.168.1.1 router gw\n')
    finally:
      handle.close()
501

502
  def testSettingNewIp(self):
503
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
504

505
506
507
508
509
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 router gw\n"
      "1.2.3.4\tmyhost.domain.tld myhost\n")
510
    self.assertFileMode(self.tmpname, 0644)
511

512
  def testSettingExistingIp(self):
513
514
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
                     ['myhost'])
515

516
517
518
519
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1\tmyhost.domain.tld myhost\n")
520
    self.assertFileMode(self.tmpname, 0644)
521

522
523
524
525
526
527
528
529
  def testSettingDuplicateName(self):
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])

    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 router gw\n"
      "1.2.3.4\tmyhost\n")
530
    self.assertFileMode(self.tmpname, 0644)
531

532
  def testRemovingExistingHost(self):
533
    RemoveEtcHostsEntry(self.tmpname, 'router')
534

535
536
537
538
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 gw\n")
539
    self.assertFileMode(self.tmpname, 0644)
540
541

  def testRemovingSingleExistingHost(self):
542
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
543

544
545
546
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "192.168.1.1 router gw\n")
547
    self.assertFileMode(self.tmpname, 0644)
548
549

  def testRemovingNonExistingHost(self):
550
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
551

552
553
554
555
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 router gw\n")
556
    self.assertFileMode(self.tmpname, 0644)
557

558
  def testRemovingAlias(self):
559
    RemoveEtcHostsEntry(self.tmpname, 'gw')
560

561
562
563
564
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 router\n")
565
    self.assertFileMode(self.tmpname, 0644)
566

567

Iustin Pop's avatar
Iustin Pop committed
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
class TestShellQuoting(unittest.TestCase):
  """Test case for shell quoting functions"""

  def testShellQuote(self):
    self.assertEqual(ShellQuote('abc'), "abc")
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")

  def testShellQuoteArgs(self):
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")


584
585
586
587
588
class TestTcpPing(unittest.TestCase):
  """Testcase for TCP version of ping - against listen(2)ing port"""

  def setUp(self):
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
589
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
590
591
592
593
594
595
596
597
598
    self.listenerport = self.listener.getsockname()[1]
    self.listener.listen(1)

  def tearDown(self):
    self.listener.shutdown(socket.SHUT_RDWR)
    del self.listener
    del self.listenerport

  def testTcpPingToLocalHostAccept(self):
599
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
600
601
                         self.listenerport,
                         timeout=10,
602
603
604
                         live_port_needed=True,
                         source=constants.LOCALHOST_IP_ADDRESS,
                         ),
605
606
                 "failed to connect to test listener")

607
608
609
610
611
612
613
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
                         self.listenerport,
                         timeout=10,
                         live_port_needed=True,
                         ),
                 "failed to connect to test listener (no source)")

614
615
616
617
618
619

class TestTcpPingDeaf(unittest.TestCase):
  """Testcase for TCP version of ping - against non listen(2)ing port"""

  def setUp(self):
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
620
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
621
622
623
624
625
626
627
    self.deaflistenerport = self.deaflistener.getsockname()[1]

  def tearDown(self):
    del self.deaflistener
    del self.deaflistenerport

  def testTcpPingToLocalHostAcceptDeaf(self):
628
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
629
                        self.deaflistenerport,
630
                        timeout=constants.TCP_PING_TIMEOUT,
631
632
633
                        live_port_needed=True,
                        source=constants.LOCALHOST_IP_ADDRESS,
                        ), # need successful connect(2)
634
635
                "successfully connected to deaf listener")

636
637
638
639
640
641
642
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
                        self.deaflistenerport,
                        timeout=constants.TCP_PING_TIMEOUT,
                        live_port_needed=True,
                        ), # need successful connect(2)
                "successfully connected to deaf listener (no source addr)")

643
  def testTcpPingToLocalHostNoAccept(self):
644
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
645
                         self.deaflistenerport,
646
                         timeout=constants.TCP_PING_TIMEOUT,
647
648
649
                         live_port_needed=False,
                         source=constants.LOCALHOST_IP_ADDRESS,
                         ), # ECONNREFUSED is OK
650
651
                 "failed to ping alive host on deaf port")

652
653
654
655
656
657
658
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
                         self.deaflistenerport,
                         timeout=constants.TCP_PING_TIMEOUT,
                         live_port_needed=False,
                         ), # ECONNREFUSED is OK
                 "failed to ping alive host on deaf port (no source addr)")

659

660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
class TestOwnIpAddress(unittest.TestCase):
  """Testcase for OwnIpAddress"""

  def testOwnLoopback(self):
    """check having the loopback ip"""
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
                    "Should own the loopback address")

  def testNowOwnAddress(self):
    """check that I don't own an address"""

    # network 192.0.2.0/24 is reserved for test/documentation as per
    # rfc 3330, so we *should* not have an address of this range... if
    # this fails, we should extend the test to multiple addresses
    DST_IP = "192.0.2.1"
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)


678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
class TestListVisibleFiles(unittest.TestCase):
  """Test case for ListVisibleFiles"""

  def setUp(self):
    self.path = tempfile.mkdtemp()

  def tearDown(self):
    shutil.rmtree(self.path)

  def _test(self, files, expected):
    # Sort a copy
    expected = expected[:]
    expected.sort()

    for name in files:
      f = open(os.path.join(self.path, name), 'w')
      try:
        f.write("Test\n")
      finally:
        f.close()

    found = ListVisibleFiles(self.path)
    found.sort()

    self.assertEqual(found, expected)

  def testAllVisible(self):
    files = ["a", "b", "c"]
    expected = files
    self._test(files, expected)

  def testNoneVisible(self):
    files = [".a", ".b", ".c"]
    expected = []
    self._test(files, expected)

  def testSomeVisible(self):
    files = ["a", "b", ".c"]
    expected = ["a", "b"]
    self._test(files, expected)


720
721
class TestNewUUID(unittest.TestCase):
  """Test case for NewUUID"""
722
723
724
725
726

  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
                        '[a-f0-9]{4}-[a-f0-9]{12}$')

  def runTest(self):
727
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
728
729


730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
class TestUniqueSequence(unittest.TestCase):
  """Test case for UniqueSequence"""

  def _test(self, input, expected):
    self.assertEqual(utils.UniqueSequence(input), expected)

  def runTest(self):
    # Ordered input
    self._test([1, 2, 3], [1, 2, 3])
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
    self._test([1, 2, 2, 3], [1, 2, 3])
    self._test([1, 2, 3, 3], [1, 2, 3])

    # Unordered input
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])

    # Strings
    self._test(["a", "a"], ["a"])
    self._test(["a", "b"], ["a", "b"])
    self._test(["a", "b", "a"], ["a", "b"])

752

753
754
755
756
757
758
759
760
761
762
class TestFirstFree(unittest.TestCase):
  """Test case for the FirstFree function"""

  def test(self):
    """Test FirstFree"""
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
    self.failUnlessEqual(FirstFree([]), None)
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
763

764

Iustin Pop's avatar
Iustin Pop committed
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
class TestTailFile(testutils.GanetiTestCase):
  """Test case for the TailFile function"""

  def testEmpty(self):
    fname = self._CreateTempFile()
    self.failUnlessEqual(TailFile(fname), [])
    self.failUnlessEqual(TailFile(fname, lines=25), [])

  def testAllLines(self):
    data = ["test %d" % i for i in range(30)]
    for i in range(30):
      fname = self._CreateTempFile()
      fd = open(fname, "w")
      fd.write("\n".join(data[:i]))
      if i > 0:
        fd.write("\n")
      fd.close()
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])

  def testPartialLines(self):
    data = ["test %d" % i for i in range(30)]
    fname = self._CreateTempFile()
    fd = open(fname, "w")
    fd.write("\n".join(data))
    fd.write("\n")
    fd.close()
    for i in range(1, 30):
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])

  def testBigFile(self):
    data = ["test %d" % i for i in range(30)]
    fname = self._CreateTempFile()
    fd = open(fname, "w")
    fd.write("X" * 1048576)
    fd.write("\n")
    fd.write("\n".join(data))
    fd.write("\n")
    fd.close()
    for i in range(1, 30):
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])


807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
class TestFileLock(unittest.TestCase):
  """Test case for the FileLock class"""

  def setUp(self):
    self.tmpfile = tempfile.NamedTemporaryFile()
    self.lock = utils.FileLock(self.tmpfile.name)

  def testSharedNonblocking(self):
    self.lock.Shared(blocking=False)
    self.lock.Close()

  def testExclusiveNonblocking(self):
    self.lock.Exclusive(blocking=False)
    self.lock.Close()

  def testUnlockNonblocking(self):
    self.lock.Unlock(blocking=False)
    self.lock.Close()

  def testSharedBlocking(self):
    self.lock.Shared(blocking=True)
    self.lock.Close()

  def testExclusiveBlocking(self):
    self.lock.Exclusive(blocking=True)
    self.lock.Close()

  def testUnlockBlocking(self):
    self.lock.Unlock(blocking=True)
    self.lock.Close()

  def testSharedExclusiveUnlock(self):
    self.lock.Shared(blocking=False)
    self.lock.Exclusive(blocking=False)
    self.lock.Unlock(blocking=False)
    self.lock.Close()

  def testExclusiveSharedUnlock(self):
    self.lock.Exclusive(blocking=False)
    self.lock.Shared(blocking=False)
    self.lock.Unlock(blocking=False)
    self.lock.Close()

  def testCloseShared(self):
    self.lock.Close()
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)

  def testCloseExclusive(self):
    self.lock.Close()
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)

  def testCloseUnlock(self):
    self.lock.Close()
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)


863
864
865
866
867
class TestTimeFunctions(unittest.TestCase):
  """Test case for time functions"""

  def runTest(self):
    self.assertEqual(utils.SplitTime(1), (1, 0))
868
869
870
871
872
873
874
875
876
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))

    self.assertRaises(AssertionError, utils.SplitTime, -1)
877
878

    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
879
880
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
881

882
883
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
                     1218448917.481)
884
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
885
886

    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
887
888
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
889
890
891
892
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))


Iustin Pop's avatar
Iustin Pop committed
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
class FieldSetTestCase(unittest.TestCase):
  """Test case for FieldSets"""

  def testSimpleMatch(self):
    f = utils.FieldSet("a", "b", "c", "def")
    self.failUnless(f.Matches("a"))
    self.failIf(f.Matches("d"), "Substring matched")
    self.failIf(f.Matches("defghi"), "Prefix string matched")
    self.failIf(f.NonMatching(["b", "c"]))
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
    self.failUnless(f.NonMatching(["a", "d"]))

  def testRegexMatch(self):
    f = utils.FieldSet("a", "b([0-9]+)", "c")
    self.failUnless(f.Matches("b1"))
    self.failUnless(f.Matches("b99"))
    self.failIf(f.Matches("b/1"))
    self.failIf(f.NonMatching(["b12", "c"]))
    self.failUnless(f.NonMatching(["a", "1"]))

913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
class TestForceDictType(unittest.TestCase):
  """Test case for ForceDictType"""

  def setUp(self):
    self.key_types = {
      'a': constants.VTYPE_INT,
      'b': constants.VTYPE_BOOL,
      'c': constants.VTYPE_STRING,
      'd': constants.VTYPE_SIZE,
      }

  def _fdt(self, dict, allowed_values=None):
    if allowed_values is None:
      ForceDictType(dict, self.key_types)
    else:
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)

    return dict

  def testSimpleDict(self):
    self.assertEqual(self._fdt({}), {})
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})

  def testErrors(self):
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})

Iustin Pop's avatar
Iustin Pop committed
952

953
954
955
956
957
958
class TestIsAbsNormPath(unittest.TestCase):
  """Testing case for IsProcessAlive"""

  def _pathTestHelper(self, path, result):
    if result:
      self.assert_(IsNormAbsPath(path),
959
          "Path %s should result absolute and normalized" % path)
960
961
    else:
      self.assert_(not IsNormAbsPath(path),
962
          "Path %s should not result absolute and normalized" % path)
963
964
965
966
967
968
969
970

  def testBase(self):
    self._pathTestHelper('/etc', True)
    self._pathTestHelper('/srv', True)
    self._pathTestHelper('etc', False)
    self._pathTestHelper('/etc/../root', False)
    self._pathTestHelper('/etc/', False)

971

Iustin Pop's avatar
Iustin Pop committed
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
class TestSafeEncode(unittest.TestCase):
  """Test case for SafeEncode"""

  def testAscii(self):
    for txt in [string.digits, string.letters, string.punctuation]:
      self.failUnlessEqual(txt, SafeEncode(txt))

  def testDoubleEncode(self):
    for i in range(255):
      txt = SafeEncode(chr(i))
      self.failUnlessEqual(txt, SafeEncode(txt))

  def testUnicode(self):
    # 1024 is high enough to catch non-direct ASCII mappings
    for i in range(1024):
      txt = SafeEncode(unichr(i))
      self.failUnlessEqual(txt, SafeEncode(txt))


991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
class TestFormatTime(unittest.TestCase):
  """Testing case for FormatTime"""

  def testNone(self):
    self.failUnlessEqual(FormatTime(None), "N/A")

  def testInvalid(self):
    self.failUnlessEqual(FormatTime(()), "N/A")

  def testNow(self):
    # tests that we accept time.time input
    FormatTime(time.time())
    # tests that we accept int input
    FormatTime(int(time.time()))


Iustin Pop's avatar
Iustin Pop committed
1007
1008
if __name__ == '__main__':
  unittest.main()