Commit b88a159f by Sheng

Added a parameter for worker to close with a reason

parent e8924f1a
...@@ -126,7 +126,7 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler): ...@@ -126,7 +126,7 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
except socket.error: except socket.error:
raise ValueError('Unable to connect to {}:{}'.format(*dst_addr)) raise ValueError('Unable to connect to {}:{}'.format(*dst_addr))
except paramiko.BadAuthenticationType: except paramiko.BadAuthenticationType:
raise ValueError('Authentication failed.') raise ValueError('SSH authentication failed.')
except paramiko.BadHostKeyException: except paramiko.BadHostKeyException:
raise ValueError('Bad host key.') raise ValueError('Bad host key.')
...@@ -190,7 +190,7 @@ class WsockHandler(MixinHandler, tornado.websocket.WebSocketHandler): ...@@ -190,7 +190,7 @@ class WsockHandler(MixinHandler, tornado.websocket.WebSocketHandler):
self.worker_ref = weakref.ref(worker) self.worker_ref = weakref.ref(worker)
self.loop.add_handler(worker.fd, worker, IOLoop.READ) self.loop.add_handler(worker.fd, worker, IOLoop.READ)
else: else:
self.close() self.close(reason='Websocket authentication failed.')
def on_message(self, message): def on_message(self, message):
logging.debug('{!r} from {}:{}'.format(message, *self.src_addr)) logging.debug('{!r} from {}:{}'.format(message, *self.src_addr))
...@@ -202,4 +202,6 @@ class WsockHandler(MixinHandler, tornado.websocket.WebSocketHandler): ...@@ -202,4 +202,6 @@ class WsockHandler(MixinHandler, tornado.websocket.WebSocketHandler):
logging.info('Disconnected from {}:{}'.format(*self.src_addr)) logging.info('Disconnected from {}:{}'.format(*self.src_addr))
worker = self.worker_ref() if self.worker_ref else None worker = self.worker_ref() if self.worker_ref else None
if worker: if worker:
worker.close() if self.close_reason is None:
self.close_reason = 'client disconnected'
worker.close(reason=self.close_reason)
...@@ -13,9 +13,9 @@ workers = {} ...@@ -13,9 +13,9 @@ workers = {}
def recycle_worker(worker): def recycle_worker(worker):
if worker.handler: if worker.handler:
return return
logging.debug('Recycling worker {}'.format(worker.id)) logging.warn('Recycling worker {}'.format(worker.id))
workers.pop(worker.id, None) workers.pop(worker.id, None)
worker.close() worker.close(reason='worker recycled')
class Worker(object): class Worker(object):
...@@ -36,7 +36,7 @@ class Worker(object): ...@@ -36,7 +36,7 @@ class Worker(object):
if events & IOLoop.WRITE: if events & IOLoop.WRITE:
self.on_write() self.on_write()
if events & IOLoop.ERROR: if events & IOLoop.ERROR:
self.close() self.close(reason='error event occurred')
def set_handler(self, handler): def set_handler(self, handler):
if not self.handler: if not self.handler:
...@@ -54,18 +54,18 @@ class Worker(object): ...@@ -54,18 +54,18 @@ class Worker(object):
except (OSError, IOError) as e: except (OSError, IOError) as e:
logging.error(e) logging.error(e)
if errno_from_exception(e) in _ERRNO_CONNRESET: if errno_from_exception(e) in _ERRNO_CONNRESET:
self.close() self.close(reason='chan error on reading')
else: else:
logging.debug('{!r} from {}:{}'.format(data, *self.dst_addr)) logging.debug('{!r} from {}:{}'.format(data, *self.dst_addr))
if not data: if not data:
self.close() self.close(reason='chan closed')
return return
logging.debug('{!r} to {}:{}'.format(data, *self.handler.src_addr)) logging.debug('{!r} to {}:{}'.format(data, *self.handler.src_addr))
try: try:
self.handler.write_message(data) self.handler.write_message(data)
except tornado.websocket.WebSocketClosedError: except tornado.websocket.WebSocketClosedError:
self.close() self.close(reason='websocket closed')
def on_write(self): def on_write(self):
logging.debug('worker {} on write'.format(self.id)) logging.debug('worker {} on write'.format(self.id))
...@@ -80,7 +80,7 @@ class Worker(object): ...@@ -80,7 +80,7 @@ class Worker(object):
except (OSError, IOError) as e: except (OSError, IOError) as e:
logging.error(e) logging.error(e)
if errno_from_exception(e) in _ERRNO_CONNRESET: if errno_from_exception(e) in _ERRNO_CONNRESET:
self.close() self.close(reason='chan error on writing')
else: else:
self.update_handler(IOLoop.WRITE) self.update_handler(IOLoop.WRITE)
else: else:
...@@ -92,8 +92,10 @@ class Worker(object): ...@@ -92,8 +92,10 @@ class Worker(object):
else: else:
self.update_handler(IOLoop.READ) self.update_handler(IOLoop.READ)
def close(self): def close(self, reason=None):
logging.debug('Closing worker {}'.format(self.id)) logging.info(
'Closing worker {} with reason {}'.format(self.id, reason)
)
if self.handler: if self.handler:
self.loop.remove_handler(self.fd) self.loop.remove_handler(self.fd)
self.handler.close() self.handler.close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment