Commit 2e347029 by Sheng

Changed validation error messages

parent d43a0115
......@@ -64,31 +64,31 @@ class TestApp(AsyncHTTPTestCase):
self.assertEqual(response.code, 200)
body = 'hostname=&port=&username=&password'
response = self.fetch('/', method='POST', body=body)
self.assertIn(b'"status": "The hostname field is required"', response.body) # noqa
self.assertIn(b'The hostname field is required', response.body)
body = 'hostname=127.0.0.1&port=&username=&password'
response = self.fetch('/', method='POST', body=body)
self.assertIn(b'"status": "The port field is required"', response.body)
self.assertIn(b'The port field is required', response.body)
body = 'hostname=127.0.0&port=22&username=&password'
response = self.fetch('/', method='POST', body=body)
self.assertIn(b'"status": "Invalid hostname', response.body)
self.assertIn(b'Invalid hostname', response.body)
body = 'hostname=http://www.googe.com&port=22&username=&password'
response = self.fetch('/', method='POST', body=body)
self.assertIn(b'"status": "Invalid hostname', response.body)
self.assertIn(b'Invalid hostname', response.body)
body = 'hostname=127.0.0.1&port=port&username=&password'
response = self.fetch('/', method='POST', body=body)
self.assertIn(b'"status": "Invalid port', response.body)
self.assertIn(b'Invalid port', response.body)
body = 'hostname=127.0.0.1&port=70000&username=&password'
response = self.fetch('/', method='POST', body=body)
self.assertIn(b'"status": "Invalid port', response.body)
self.assertIn(b'Invalid port', response.body)
body = 'hostname=127.0.0.1&port=7000&username=&password'
response = self.fetch('/', method='POST', body=body)
self.assertIn(b'"status": "The username field is required"', response.body) # noqa
self.assertIn(b'The username field is required', response.body) # noqa
def test_app_with_wrong_credentials(self):
response = self.fetch('/')
......@@ -173,7 +173,7 @@ class TestApp(AsyncHTTPTestCase):
data = json.loads(to_str(response.body))
self.assertIsNone(data['id'])
self.assertIsNone(data['encoding'])
self.assertEqual(data['status'], 'Not a valid private key or wrong password for decrypting the key.') # noqa
self.assertTrue(data['status'].startswith('Invalid private key'))
@tornado.testing.gen_test
def test_app_auth_with_pubkey_exceeds_key_max_size(self):
......@@ -194,7 +194,7 @@ class TestApp(AsyncHTTPTestCase):
data = json.loads(to_str(response.body))
self.assertIsNone(data['id'])
self.assertIsNone(data['encoding'])
self.assertEqual(data['status'], 'Not a valid private key.')
self.assertTrue(data['status'].startswith('Invalid private key'))
@tornado.testing.gen_test
def test_app_auth_with_pubkey_cannot_be_decoded(self):
......@@ -218,7 +218,7 @@ class TestApp(AsyncHTTPTestCase):
data = json.loads(to_str(response.body))
self.assertIsNone(data['id'])
self.assertIsNone(data['encoding'])
self.assertEqual(data['status'], 'Not a valid private key.')
self.assertTrue(data['status'].startswith('Invalid private key'))
@tornado.testing.gen_test
def test_app_post_form_with_large_body_size(self):
......
......@@ -79,21 +79,24 @@ class TestIndexHandler(unittest.TestCase):
fname = 'test_ed25519.key'
cls = paramiko.Ed25519Key
key = read_file(os.path.join(base_dir, 'tests', fname))
pkey = IndexHandler.get_pkey_obj(key, None)
pkey = IndexHandler.get_pkey_obj(key, None, fname)
self.assertIsInstance(pkey, cls)
pkey = IndexHandler.get_pkey_obj(key, 'iginored')
pkey = IndexHandler.get_pkey_obj(key, 'iginored', fname)
self.assertIsInstance(pkey, cls)
with self.assertRaises(ValueError):
pkey = IndexHandler.get_pkey_obj('x'+key, None)
with self.assertRaises(ValueError) as exc:
pkey = IndexHandler.get_pkey_obj('x'+key, None, fname)
self.assertIn('Invalid private key', str(exc))
def test_get_pkey_obj_with_encrypted_key(self):
fname = 'test_ed25519_password.key'
password = 'abc123'
cls = paramiko.Ed25519Key
key = read_file(os.path.join(base_dir, 'tests', fname))
pkey = IndexHandler.get_pkey_obj(key, password)
pkey = IndexHandler.get_pkey_obj(key, password, fname)
self.assertIsInstance(pkey, cls)
with self.assertRaises(ValueError):
pkey = IndexHandler.get_pkey_obj(key, 'wrongpass')
with self.assertRaises(ValueError):
pkey = IndexHandler.get_pkey_obj('x'+key, password)
with self.assertRaises(ValueError) as exc:
pkey = IndexHandler.get_pkey_obj(key, 'wrongpass', fname)
self.assertIn('Wrong password', str(exc))
with self.assertRaises(ValueError) as exc:
pkey = IndexHandler.get_pkey_obj('x'+key, password, fname)
self.assertIn('Invalid private key', str(exc))
......@@ -68,18 +68,19 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
self.host_keys_settings = host_keys_settings
def get_privatekey(self):
try:
data = self.request.files.get('privatekey')[0]['body']
except TypeError: # no privatekey provided
lst = self.request.files.get('privatekey')
if not lst: # no privatekey provided
return
self.filename = lst[0]['filename']
data = lst[0]['body']
if len(data) < KEY_MAX_SIZE:
try:
return to_str(data)
except UnicodeDecodeError:
except (UnicodeDecodeError, ValueError, SyntaxError):
pass
raise ValueError('Not a valid private key.')
raise ValueError('Invalid private key: {}'.format(self.filename))
@classmethod
def get_specific_pkey(cls, pkeycls, privatekey, password):
......@@ -95,24 +96,30 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
return pkey
@classmethod
def get_pkey_obj(cls, privatekey, password):
password = to_bytes(password)
pkey = cls.get_specific_pkey(paramiko.RSAKey, privatekey, password)\
or cls.get_specific_pkey(paramiko.DSSKey, privatekey, password)\
or cls.get_specific_pkey(paramiko.ECDSAKey, privatekey, password)\
or cls.get_specific_pkey(paramiko.Ed25519Key, privatekey,
password)
def get_pkey_obj(cls, privatekey, password, filename):
bpass = to_bytes(password)
pkey = cls.get_specific_pkey(paramiko.RSAKey, privatekey, bpass)\
or cls.get_specific_pkey(paramiko.DSSKey, privatekey, bpass)\
or cls.get_specific_pkey(paramiko.ECDSAKey, privatekey, bpass)\
or cls.get_specific_pkey(paramiko.Ed25519Key, privatekey, bpass)
if not pkey:
raise ValueError('Not a valid private key or wrong password '
'for decrypting the key.')
if not password:
error = 'Invalid private key: {}'.format(filename)
else:
error = (
'Wrong password {!r} for decrypting the private key.'
) .format(password)
raise ValueError(error)
return pkey
def get_hostname(self):
value = self.get_value('hostname')
if not (is_valid_hostname(value) | is_valid_ipv4_address(value) |
is_valid_ipv6_address(value)):
raise ValueError('Invalid hostname {}'.format(value))
raise ValueError('Invalid hostname: {}.'.format(value))
return value
def get_port(self):
......@@ -125,12 +132,12 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
if is_valid_port(port):
return port
raise ValueError('Invalid port {}'.format(value))
raise ValueError('Invalid port: {}.'.format(value))
def get_value(self, name):
value = self.get_argument(name)
if not value:
raise ValueError('The {} field is required'.format(name))
raise ValueError('The {} field is required.'.format(name))
return value
def get_args(self):
......@@ -139,7 +146,8 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
username = self.get_value('username')
password = self.get_argument('password')
privatekey = self.get_privatekey()
pkey = self.get_pkey_obj(privatekey, password) if privatekey else None
pkey = self.get_pkey_obj(privatekey, password, self.filename) \
if privatekey else None
args = (hostname, port, username, password, pkey)
logging.debug(args)
return args
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment