ganeti.utils_unittest.py 20.1 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/python
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for unittesting the utils module"""

import unittest
import os
import time
import tempfile
import os.path
29
import os
Iustin Pop's avatar
Iustin Pop committed
30
import md5
31
import socket
32
import shutil
33
import re
Iustin Pop's avatar
Iustin Pop committed
34
35

import ganeti
36
from ganeti import constants
37
from ganeti import utils
Iustin Pop's avatar
Iustin Pop committed
38
39
40
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
41
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
42
     SetEtcHostsEntry, RemoveEtcHostsEntry
Iustin Pop's avatar
Iustin Pop committed
43
44
from ganeti.errors import LockError, UnitParseError

45

46
47
48
49
50
51
52
53
54
55
56
57
class GanetiTestCase(unittest.TestCase):
  def assertFileContent(self, file_name, content):
    """Checks the content of a file.

    """
    handle = open(file_name, 'r')
    try:
      self.assertEqual(handle.read(), content)
    finally:
      handle.close()


Iustin Pop's avatar
Iustin Pop committed
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class TestIsProcessAlive(unittest.TestCase):
  """Testing case for IsProcessAlive"""
  def setUp(self):
    # create a zombie and a (hopefully) non-existing process id
    self.pid_zombie = os.fork()
    if self.pid_zombie == 0:
      os._exit(0)
    elif self.pid_zombie < 0:
      raise SystemError("can't fork")
    self.pid_non_existing = os.fork()
    if self.pid_non_existing == 0:
      os._exit(0)
    elif self.pid_non_existing > 0:
      os.waitpid(self.pid_non_existing, 0)
    else:
      raise SystemError("can't fork")


  def testExists(self):
    mypid = os.getpid()
    self.assert_(IsProcessAlive(mypid),
                 "can't find myself running")

  def testZombie(self):
    self.assert_(not IsProcessAlive(self.pid_zombie),
                 "zombie not detected as zombie")


  def testNotExisting(self):
    self.assert_(not IsProcessAlive(self.pid_non_existing),
                 "noexisting process detected")


class TestLocking(unittest.TestCase):
  """Testing case for the Lock/Unlock functions"""
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

  def setUp(self):
    lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
                                suffix=".locking")
    self.old_lock_dir = constants.LOCK_DIR
    constants.LOCK_DIR = lock_dir

  def tearDown(self):
    try:
      ganeti.utils.Unlock("unittest")
    except LockError:
      pass
    shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
    constants.LOCK_DIR = self.old_lock_dir

Iustin Pop's avatar
Iustin Pop committed
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
  def clean_lock(self, name):
    try:
      ganeti.utils.Unlock("unittest")
    except LockError:
      pass


  def testLock(self):
    self.clean_lock("unittest")
    self.assertEqual(None, Lock("unittest"))


  def testUnlock(self):
    self.clean_lock("unittest")
    ganeti.utils.Lock("unittest")
    self.assertEqual(None, Unlock("unittest"))

  def testDoubleLock(self):
    self.clean_lock("unittest")
    ganeti.utils.Lock("unittest")
    self.assertRaises(LockError, Lock, "unittest")


class TestRunCmd(unittest.TestCase):
  """Testing case for the RunCmd function"""

  def setUp(self):
    self.magic = time.ctime() + " ganeti test"

  def testOk(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
138
    """Test successful exit code"""
Iustin Pop's avatar
Iustin Pop committed
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
    result = RunCmd("/bin/sh -c 'exit 0'")
    self.assertEqual(result.exit_code, 0)

  def testFail(self):
    """Test fail exit code"""
    result = RunCmd("/bin/sh -c 'exit 1'")
    self.assertEqual(result.exit_code, 1)


  def testStdout(self):
    """Test standard output"""
    cmd = 'echo -n "%s"' % self.magic
    result = RunCmd("/bin/sh -c '%s'" % cmd)
    self.assertEqual(result.stdout, self.magic)


  def testStderr(self):
    """Test standard error"""
    cmd = 'echo -n "%s"' % self.magic
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
    self.assertEqual(result.stderr, self.magic)


  def testCombined(self):
    """Test combined output"""
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
    result = RunCmd("/bin/sh -c '%s'" % cmd)
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)

  def testSignal(self):
    """Test standard error"""
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
    self.assertEqual(result.signal, 15)

173
174
175
176
177
178
179
180
181
182
183
184
185
  def testListRun(self):
    """Test list runs"""
    result = RunCmd(["true"])
    self.assertEqual(result.signal, None)
    self.assertEqual(result.exit_code, 0)
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
    self.assertEqual(result.signal, None)
    self.assertEqual(result.exit_code, 1)
    result = RunCmd(["echo", "-n", self.magic])
    self.assertEqual(result.signal, None)
    self.assertEqual(result.exit_code, 0)
    self.assertEqual(result.stdout, self.magic)

186
187
  def testLang(self):
    """Test locale environment"""
188
189
190
191
192
193
194
195
196
197
198
199
200
201
    old_env = os.environ.copy()
    try:
      os.environ["LANG"] = "en_US.UTF-8"
      os.environ["LC_ALL"] = "en_US.UTF-8"
      result = RunCmd(["locale"])
      for line in result.output.splitlines():
        key, value = line.split("=", 1)
        # Ignore these variables, they're overridden by LC_ALL
        if key == "LANG" or key == "LANGUAGE":
          continue
        self.failIf(value and value != "C" and value != '"C"',
            "Variable %s is set to the invalid value '%s'" % (key, value))
    finally:
      os.environ = old_env
202

Iustin Pop's avatar
Iustin Pop committed
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352

class TestRemoveFile(unittest.TestCase):
  """Test case for the RemoveFile function"""

  def setUp(self):
    """Create a temp dir and file for each case"""
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
    os.close(fd)

  def tearDown(self):
    if os.path.exists(self.tmpfile):
      os.unlink(self.tmpfile)
    os.rmdir(self.tmpdir)


  def testIgnoreDirs(self):
    """Test that RemoveFile() ignores directories"""
    self.assertEqual(None, RemoveFile(self.tmpdir))


  def testIgnoreNotExisting(self):
    """Test that RemoveFile() ignores non-existing files"""
    RemoveFile(self.tmpfile)
    RemoveFile(self.tmpfile)


  def testRemoveFile(self):
    """Test that RemoveFile does remove a file"""
    RemoveFile(self.tmpfile)
    if os.path.exists(self.tmpfile):
      self.fail("File '%s' not removed" % self.tmpfile)


  def testRemoveSymlink(self):
    """Test that RemoveFile does remove symlinks"""
    symlink = self.tmpdir + "/symlink"
    os.symlink("no-such-file", symlink)
    RemoveFile(symlink)
    if os.path.exists(symlink):
      self.fail("File '%s' not removed" % symlink)
    os.symlink(self.tmpfile, symlink)
    RemoveFile(symlink)
    if os.path.exists(symlink):
      self.fail("File '%s' not removed" % symlink)


class TestCheckdict(unittest.TestCase):
  """Test case for the CheckDict function"""

  def testAdd(self):
    """Test that CheckDict adds a missing key with the correct value"""

    tgt = {'a':1}
    tmpl = {'b': 2}
    CheckDict(tgt, tmpl)
    if 'b' not in tgt or tgt['b'] != 2:
      self.fail("Failed to update dict")


  def testNoUpdate(self):
    """Test that CheckDict does not overwrite an existing key"""
    tgt = {'a':1, 'b': 3}
    tmpl = {'b': 2}
    CheckDict(tgt, tmpl)
    self.failUnlessEqual(tgt['b'], 3)


class TestMatchNameComponent(unittest.TestCase):
  """Test case for the MatchNameComponent function"""

  def testEmptyList(self):
    """Test that there is no match against an empty list"""

    self.failUnlessEqual(MatchNameComponent("", []), None)
    self.failUnlessEqual(MatchNameComponent("test", []), None)

  def testSingleMatch(self):
    """Test that a single match is performed correctly"""
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
    for key in "test2", "test2.example", "test2.example.com":
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])

  def testMultipleMatches(self):
    """Test that a multiple match is returned as None"""
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
    for key in "test1", "test1.example":
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)


class TestFormatUnit(unittest.TestCase):
  """Test case for the FormatUnit function"""

  def testMiB(self):
    self.assertEqual(FormatUnit(1), '1M')
    self.assertEqual(FormatUnit(100), '100M')
    self.assertEqual(FormatUnit(1023), '1023M')

  def testGiB(self):
    self.assertEqual(FormatUnit(1024), '1.0G')
    self.assertEqual(FormatUnit(1536), '1.5G')
    self.assertEqual(FormatUnit(17133), '16.7G')
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')

  def testTiB(self):
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')


class TestParseUnit(unittest.TestCase):
  """Test case for the ParseUnit function"""

  SCALES = (('', 1),
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))

  def testRounding(self):
    self.assertEqual(ParseUnit('0'), 0)
    self.assertEqual(ParseUnit('1'), 4)
    self.assertEqual(ParseUnit('2'), 4)
    self.assertEqual(ParseUnit('3'), 4)

    self.assertEqual(ParseUnit('124'), 124)
    self.assertEqual(ParseUnit('125'), 128)
    self.assertEqual(ParseUnit('126'), 128)
    self.assertEqual(ParseUnit('127'), 128)
    self.assertEqual(ParseUnit('128'), 128)
    self.assertEqual(ParseUnit('129'), 132)
    self.assertEqual(ParseUnit('130'), 132)

  def testFloating(self):
    self.assertEqual(ParseUnit('0'), 0)
    self.assertEqual(ParseUnit('0.5'), 4)
    self.assertEqual(ParseUnit('1.75'), 4)
    self.assertEqual(ParseUnit('1.99'), 4)
    self.assertEqual(ParseUnit('2.00'), 4)
    self.assertEqual(ParseUnit('2.01'), 4)
    self.assertEqual(ParseUnit('3.99'), 4)
    self.assertEqual(ParseUnit('4.00'), 4)
    self.assertEqual(ParseUnit('4.01'), 8)
    self.assertEqual(ParseUnit('1.5G'), 1536)
    self.assertEqual(ParseUnit('1.8G'), 1844)
    self.assertEqual(ParseUnit('8.28T'), 8682212)

  def testSuffixes(self):
    for sep in ('', ' ', '   ', "\t", "\t "):
      for suffix, scale in TestParseUnit.SCALES:
        for func in (lambda x: x, str.lower, str.upper):
353
354
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
                           1024 * scale)
Iustin Pop's avatar
Iustin Pop committed
355
356
357
358
359
360
361
362
363
364

  def testInvalidInput(self):
    for sep in ('-', '_', ',', 'a'):
      for suffix, _ in TestParseUnit.SCALES:
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)

    for suffix, _ in TestParseUnit.SCALES:
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)


365
class TestSshKeys(GanetiTestCase):
Iustin Pop's avatar
Iustin Pop committed
366
367
368
369
370
371
  """Test case for the AddAuthorizedKey function"""

  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')

372
373
  def setUp(self):
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
Iustin Pop's avatar
Iustin Pop committed
374
    try:
375
376
377
378
379
380
381
382
383
      handle = os.fdopen(fd, 'w')
      try:
        handle.write("%s\n" % TestSshKeys.KEY_A)
        handle.write("%s\n" % TestSshKeys.KEY_B)
      finally:
        handle.close()
    except:
      utils.RemoveFile(self.tmpname)
      raise
Iustin Pop's avatar
Iustin Pop committed
384

385
386
387
  def tearDown(self):
    utils.RemoveFile(self.tmpname)
    del self.tmpname
Iustin Pop's avatar
Iustin Pop committed
388
389

  def testAddingNewKey(self):
390
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
Iustin Pop's avatar
Iustin Pop committed
391

392
393
394
395
396
    self.assertFileContent(self.tmpname,
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
Iustin Pop's avatar
Iustin Pop committed
397

398
  def testAddingAlmostButNotCompletelyTheSameKey(self):
399
400
401
402
403
404
405
406
    AddAuthorizedKey(self.tmpname,
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')

    self.assertFileContent(self.tmpname,
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
Iustin Pop's avatar
Iustin Pop committed
407
408

  def testAddingExistingKeyWithSomeMoreSpaces(self):
409
410
    AddAuthorizedKey(self.tmpname,
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
Iustin Pop's avatar
Iustin Pop committed
411

412
413
414
415
    self.assertFileContent(self.tmpname,
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
Iustin Pop's avatar
Iustin Pop committed
416
417

  def testRemovingExistingKeyWithSomeMoreSpaces(self):
418
419
    RemoveAuthorizedKey(self.tmpname,
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
Iustin Pop's avatar
Iustin Pop committed
420

421
422
423
    self.assertFileContent(self.tmpname,
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
Iustin Pop's avatar
Iustin Pop committed
424
425

  def testRemovingNonExistingKey(self):
426
427
    RemoveAuthorizedKey(self.tmpname,
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
Iustin Pop's avatar
Iustin Pop committed
428

429
430
431
432
    self.assertFileContent(self.tmpname,
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
Iustin Pop's avatar
Iustin Pop committed
433
434


435
class TestEtcHosts(GanetiTestCase):
436
437
  """Test functions modifying /etc/hosts"""

438
439
  def setUp(self):
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
440
    try:
441
442
443
444
445
446
447
448
449
450
      handle = os.fdopen(fd, 'w')
      try:
        handle.write('# This is a test file for /etc/hosts\n')
        handle.write('127.0.0.1\tlocalhost\n')
        handle.write('192.168.1.1 router gw\n')
      finally:
        handle.close()
    except:
      utils.RemoveFile(self.tmpname)
      raise
451

452
453
454
  def tearDown(self):
    utils.RemoveFile(self.tmpname)
    del self.tmpname
455

456
  def testSettingNewIp(self):
457
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
458

459
460
461
462
463
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 router gw\n"
      "1.2.3.4\tmyhost.domain.tld myhost\n")
464

465
  def testSettingExistingIp(self):
466
467
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
                     ['myhost'])
468

469
470
471
472
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1\tmyhost.domain.tld myhost\n")
473

474
475
476
477
478
479
480
481
482
  def testSettingDuplicateName(self):
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])

    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 router gw\n"
      "1.2.3.4\tmyhost\n")

483
  def testRemovingExistingHost(self):
484
    RemoveEtcHostsEntry(self.tmpname, 'router')
485

486
487
488
489
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 gw\n")
490
491

  def testRemovingSingleExistingHost(self):
492
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
493

494
495
496
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "192.168.1.1 router gw\n")
497
498

  def testRemovingNonExistingHost(self):
499
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
500

501
502
503
504
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 router gw\n")
505

506
  def testRemovingAlias(self):
507
    RemoveEtcHostsEntry(self.tmpname, 'gw')
508

509
510
511
512
    self.assertFileContent(self.tmpname,
      "# This is a test file for /etc/hosts\n"
      "127.0.0.1\tlocalhost\n"
      "192.168.1.1 router\n")
513

514

Iustin Pop's avatar
Iustin Pop committed
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
class TestShellQuoting(unittest.TestCase):
  """Test case for shell quoting functions"""

  def testShellQuote(self):
    self.assertEqual(ShellQuote('abc'), "abc")
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")

  def testShellQuoteArgs(self):
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")


531
532
533
534
535
class TestTcpPing(unittest.TestCase):
  """Testcase for TCP version of ping - against listen(2)ing port"""

  def setUp(self):
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
536
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
537
538
539
540
541
542
543
544
545
    self.listenerport = self.listener.getsockname()[1]
    self.listener.listen(1)

  def tearDown(self):
    self.listener.shutdown(socket.SHUT_RDWR)
    del self.listener
    del self.listenerport

  def testTcpPingToLocalHostAccept(self):
546
547
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
                         constants.LOCALHOST_IP_ADDRESS,
548
549
550
551
552
553
554
555
556
557
558
                         self.listenerport,
                         timeout=10,
                         live_port_needed=True),
                 "failed to connect to test listener")


class TestTcpPingDeaf(unittest.TestCase):
  """Testcase for TCP version of ping - against non listen(2)ing port"""

  def setUp(self):
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
559
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
560
561
562
563
564
565
566
    self.deaflistenerport = self.deaflistener.getsockname()[1]

  def tearDown(self):
    del self.deaflistener
    del self.deaflistenerport

  def testTcpPingToLocalHostAcceptDeaf(self):
567
568
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
                        constants.LOCALHOST_IP_ADDRESS,
569
                        self.deaflistenerport,
570
                        timeout=constants.TCP_PING_TIMEOUT,
571
572
573
574
                        live_port_needed=True), # need successful connect(2)
                "successfully connected to deaf listener")

  def testTcpPingToLocalHostNoAccept(self):
575
576
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
                         constants.LOCALHOST_IP_ADDRESS,
577
                         self.deaflistenerport,
578
                         timeout=constants.TCP_PING_TIMEOUT,
579
580
581
582
                         live_port_needed=False), # ECONNREFUSED is OK
                 "failed to ping alive host on deaf port")


583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
class TestListVisibleFiles(unittest.TestCase):
  """Test case for ListVisibleFiles"""

  def setUp(self):
    self.path = tempfile.mkdtemp()

  def tearDown(self):
    shutil.rmtree(self.path)

  def _test(self, files, expected):
    # Sort a copy
    expected = expected[:]
    expected.sort()

    for name in files:
      f = open(os.path.join(self.path, name), 'w')
      try:
        f.write("Test\n")
      finally:
        f.close()

    found = ListVisibleFiles(self.path)
    found.sort()

    self.assertEqual(found, expected)

  def testAllVisible(self):
    files = ["a", "b", "c"]
    expected = files
    self._test(files, expected)

  def testNoneVisible(self):
    files = [".a", ".b", ".c"]
    expected = []
    self._test(files, expected)

  def testSomeVisible(self):
    files = ["a", "b", ".c"]
    expected = ["a", "b"]
    self._test(files, expected)


625
626
class TestNewUUID(unittest.TestCase):
  """Test case for NewUUID"""
627
628
629
630
631

  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
                        '[a-f0-9]{4}-[a-f0-9]{12}$')

  def runTest(self):
632
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
633
634


635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
class TestUniqueSequence(unittest.TestCase):
  """Test case for UniqueSequence"""

  def _test(self, input, expected):
    self.assertEqual(utils.UniqueSequence(input), expected)

  def runTest(self):
    # Ordered input
    self._test([1, 2, 3], [1, 2, 3])
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
    self._test([1, 2, 2, 3], [1, 2, 3])
    self._test([1, 2, 3, 3], [1, 2, 3])

    # Unordered input
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])

    # Strings
    self._test(["a", "a"], ["a"])
    self._test(["a", "b"], ["a", "b"])
    self._test(["a", "b", "a"], ["a", "b"])


Iustin Pop's avatar
Iustin Pop committed
658
659
if __name__ == '__main__':
  unittest.main()