diff --git a/lib/http/auth.py b/lib/http/auth.py index 670b897b3a8d1fc6253a158f398cbd14999ead7c..aa1fe7b5e770689a0d5e409be6e04ceb00054676 100644 --- a/lib/http/auth.py +++ b/lib/http/auth.py @@ -32,6 +32,11 @@ from ganeti import http from cStringIO import StringIO +try: + from hashlib import md5 +except ImportError: + from md5 import new as md5 + # Digest types from RFC2617 HTTP_BASIC_AUTH = "Basic" @@ -75,6 +80,10 @@ class HttpServerRequestAuthentication(object): # Default authentication realm AUTH_REALM = None + # Schemes for passwords + _CLEARTEXT_SCHEME = "{CLEARTEXT}" + _HA1_SCHEME = "{HA1}" + def GetAuthRealm(self, req): """Returns the authentication realm for a request. @@ -198,6 +207,63 @@ class HttpServerRequestAuthentication(object): """ raise NotImplementedError() + def VerifyBasicAuthPassword(self, req, username, password, expected): + """Checks the password for basic authentication. + + As long as they don't start with an opening brace ("{"), old passwords are + supported. A new scheme uses H(A1) from RFC2617, where H is MD5 and A1 + consists of the username, the authentication realm and the actual password. + + @type req: L{http.server._HttpServerRequest} + @param req: HTTP request context + @type username: string + @param username: Username from HTTP headers + @type password: string + @param password: Password from HTTP headers + @type expected: string + @param expected: Expected password with optional scheme prefix (e.g. from + users file) + + """ + # Backwards compatibility for old-style passwords without a scheme + if not expected.startswith("{"): + expected = self._CLEARTEXT_SCHEME + expected + + # Check again, just to be sure + if not expected.startswith("{"): + raise AssertionError("Invalid scheme") + + scheme_end_idx = expected.find("}", 1) + + # Ensure scheme has a length of at least one character + if scheme_end_idx <= 1: + logging.warning("Invalid scheme in password for user '%s'", username) + return False + + scheme = expected[:scheme_end_idx + 1].upper() + expected_password = expected[scheme_end_idx + 1:] + + # Good old plain text password + if scheme == self._CLEARTEXT_SCHEME: + return password == expected_password + + # H(A1) as described in RFC2617 + if scheme == self._HA1_SCHEME: + realm = self.GetAuthRealm(req) + if not realm: + # There can not be a valid password for this case + return False + + expha1 = md5() + expha1.update("%s:%s:%s" % (username, realm, password)) + + return (expected_password.lower() == expha1.hexdigest().lower()) + + logging.warning("Unknown scheme '%s' in password for user '%s'", + scheme, username) + + return False + class PasswordFileUser(object): """Data structure for users from password file. diff --git a/test/ganeti.http_unittest.py b/test/ganeti.http_unittest.py index 1394f3ddfc2f86fa21ad4a36a4df568da5d2e853..6e4f9dc0b19d672e4de39922802565655800f098 100755 --- a/test/ganeti.http_unittest.py +++ b/test/ganeti.http_unittest.py @@ -30,6 +30,7 @@ from ganeti import http import ganeti.http.server import ganeti.http.client +import ganeti.http.auth class TestStartLines(unittest.TestCase): @@ -93,5 +94,73 @@ class TestMisc(unittest.TestCase): self.assert_(message_reader_class.HEADER_LENGTH_MAX > 0) +class _FakeRequestAuth(http.auth.HttpServerRequestAuthentication): + def __init__(self, realm): + http.auth.HttpServerRequestAuthentication.__init__(self) + + self.realm = realm + + def GetAuthRealm(self, req): + return self.realm + + +class TestAuth(unittest.TestCase): + """Authentication tests""" + + hsra = http.auth.HttpServerRequestAuthentication + + def testConstants(self): + self.assertEqual(self.hsra._CLEARTEXT_SCHEME, + self.hsra._CLEARTEXT_SCHEME.upper()) + self.assertEqual(self.hsra._HA1_SCHEME, + self.hsra._HA1_SCHEME.upper()) + + def _testVerifyBasicAuthPassword(self, realm, user, password, expected): + ra = _FakeRequestAuth(realm) + + return ra.VerifyBasicAuthPassword(None, user, password, expected) + + + def testVerifyBasicAuthPassword(self): + tvbap = self._testVerifyBasicAuthPassword + + good_pws = ["pw", "pw{", "pw}", "pw{}", "pw{x}y", "}pw", + "0", "123", "foo...:xyz", "TeST"] + + for pw in good_pws: + # Try cleartext passwords + self.assert_(tvbap("abc", "user", pw, pw)) + self.assert_(tvbap("abc", "user", pw, "{cleartext}" + pw)) + self.assert_(tvbap("abc", "user", pw, "{ClearText}" + pw)) + self.assert_(tvbap("abc", "user", pw, "{CLEARTEXT}" + pw)) + + # Try with invalid password + self.failIf(tvbap("abc", "user", pw, "something")) + + # Try with invalid scheme + self.failIf(tvbap("abc", "user", pw, "{000}" + pw)) + self.failIf(tvbap("abc", "user", pw, "{unk}" + pw)) + self.failIf(tvbap("abc", "user", pw, "{Unk}" + pw)) + self.failIf(tvbap("abc", "user", pw, "{UNK}" + pw)) + + # Try with invalid scheme format + self.failIf(tvbap("abc", "user", "pw", "{something")) + + # Hash is MD5("user:This is only a test:pw") + self.assert_(tvbap("This is only a test", "user", "pw", + "{ha1}92ea58ae804481498c257b2f65561a17")) + self.assert_(tvbap("This is only a test", "user", "pw", + "{HA1}92ea58ae804481498c257b2f65561a17")) + + self.failIf(tvbap(None, "user", "pw", + "{HA1}92ea58ae804481498c257b2f65561a17")) + self.failIf(tvbap("Admin area", "user", "pw", + "{HA1}92ea58ae804481498c257b2f65561a17")) + self.failIf(tvbap("This is only a test", "someone", "pw", + "{HA1}92ea58ae804481498c257b2f65561a17")) + self.failIf(tvbap("This is only a test", "user", "something", + "{HA1}92ea58ae804481498c257b2f65561a17")) + + if __name__ == '__main__': unittest.main()