Commit 1e4ece58 by Sheng

Added functions for validating ip and port

parent babd9bd2
import unittest
from webssh.utils import (is_valid_ipv4_address, is_valid_ipv6_address,
is_valid_port, to_str)
class TestUitls(unittest.TestCase):
def test_to_str(self):
b = b'hello'
u = u'hello'
self.assertEqual(to_str(b), u)
self.assertEqual(to_str(u), u)
def test_is_valid_ipv4_address(self):
self.assertFalse(is_valid_ipv4_address('127.0.0'))
self.assertFalse(is_valid_ipv4_address(b'127.0.0'))
self.assertTrue(is_valid_ipv4_address('127.0.0.1'))
self.assertTrue(is_valid_ipv4_address(b'127.0.0.1'))
def test_is_valid_ipv6_address(self):
self.assertFalse(is_valid_ipv6_address('abc'))
self.assertFalse(is_valid_ipv6_address(b'abc'))
self.assertTrue(is_valid_ipv6_address('::1'))
self.assertTrue(is_valid_ipv6_address(b'::1'))
def test_is_valid_port(self):
self.assertTrue(is_valid_port(80))
self.assertFalse(is_valid_port(0))
self.assertFalse(is_valid_port(65536))
...@@ -12,6 +12,8 @@ import tornado.web ...@@ -12,6 +12,8 @@ import tornado.web
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tornado.util import basestring_type from tornado.util import basestring_type
from webssh.worker import Worker, recycle_worker, workers from webssh.worker import Worker, recycle_worker, workers
from webssh.utils import (is_valid_ipv4_address, is_valid_ipv6_address,
is_valid_port)
try: try:
from concurrent.futures import Future from concurrent.futures import Future
...@@ -40,15 +42,16 @@ class MixinHandler(object): ...@@ -40,15 +42,16 @@ class MixinHandler(object):
ip = self.request.headers.get('X-Real-Ip') ip = self.request.headers.get('X-Real-Ip')
port = self.request.headers.get('X-Real-Port') port = self.request.headers.get('X-Real-Port')
if ip is None and port is None: if ip is None and port is None: # suppose the server doesn't use nginx
return return
if is_valid_ipv4_address(ip) or is_valid_ipv6_address(ip):
try: try:
port = int(port) port = int(port)
except (TypeError, ValueError): except (TypeError, ValueError):
pass pass
else: else:
if ip: # does not validate ip and port here if is_valid_port(port):
return (ip, port) return (ip, port)
logging.warning('Bad nginx configuration.') logging.warning('Bad nginx configuration.')
...@@ -101,9 +104,9 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler): ...@@ -101,9 +104,9 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
try: try:
port = int(value) port = int(value)
except ValueError: except ValueError:
port = 0 pass
else:
if 0 < port < 65536: if is_valid_port(port):
return port return port
raise ValueError('Invalid port {}'.format(value)) raise ValueError('Invalid port {}'.format(value))
...@@ -135,7 +138,7 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler): ...@@ -135,7 +138,7 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
except paramiko.SSHException: except paramiko.SSHException:
result = None result = None
else: else:
data = stdout.read().decode() data = stdout.read().decode('utf-8')
result = parse_encoding(data) result = parse_encoding(data)
return result if result else 'utf-8' return result if result else 'utf-8'
......
import ipaddress
def to_str(s):
if isinstance(s, bytes):
return s.decode('utf-8')
return s
def is_valid_ipv4_address(ipstr):
ipstr = to_str(ipstr)
try:
ipaddress.IPv4Address(ipstr)
except ipaddress.AddressValueError:
return False
return True
def is_valid_ipv6_address(ipstr):
ipstr = to_str(ipstr)
try:
ipaddress.IPv6Address(ipstr)
except ipaddress.AddressValueError:
return False
return True
def is_valid_port(port):
return 0 < port < 65536
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment