/pyzmq-2.2.0/zmq/web/proxy.py
# · Python · 247 lines · 145 code · 23 blank · 79 comment · 28 complexity · 24a6a405d43bc0d77096d31ae96f1550 MD5 · raw file
- """Proxy classes for forwarding tornado handlers to be run in separate processes.
- This module uses ZeroMQ/PyZMQ sockets (DEALER/ROUTER) to enable individual
- Tornado handlers to be run in a separate backend process. Through the
- usage of DEALER/ROUTER sockets, multiple backend processes for a given
- handler can be started and requests will be load balanced among the backend
- processes.
-
- Authors:
- * Brian Granger
- """
- #-----------------------------------------------------------------------------
- # Copyright (c) 2012 Brian Granger, Min Ragan-Kelley
- #
- # This file is part of pyzmq
- #
- # Distributed under the terms of the New BSD License. The full license is in
- # the file COPYING.BSD, distributed as part of this software.
- #-----------------------------------------------------------------------------
- #-----------------------------------------------------------------------------
- # Imports
- #-----------------------------------------------------------------------------
- import logging
- import socket
- import time
- import uuid
- from tornado import web
- from tornado import stack_context
- import zmq
- from zmq.eventloop.zmqstream import ZMQStream
- from zmq.eventloop.ioloop import IOLoop, DelayedCallback
- from zmq.utils import jsonapi
- from .zmqweb import ZMQHTTPRequest
- #-----------------------------------------------------------------------------
- # Service client
- #-----------------------------------------------------------------------------
- class ZMQApplicationProxy(object):
- """A proxy for a ZeroMQ based ZMQApplication that is using ZMQHTTPRequest.
- This class is a proxy for a backend that is running a
- ZMQApplication and MUST be used with the ZMQHTTPRequest class. This
- version sends the reply parts (each generated by RequestHandler.flush) as
- a single multipart message for low latency replies. See
- ZMQStreamingApplicationProxy, for a version that has higher latency, but
- which sends each reply part as a separate zmq message.
- """
- def __init__(self, loop=None, context=None):
- self.loop = loop if loop is not None else IOLoop.instance()
- self.context = context if context is not None else zmq.Context.instance()
- self._callbacks = {}
- self.socket = self.context.socket(zmq.DEALER)
- self.stream = ZMQStream(self.socket, self.loop)
- self.stream.on_recv(self._handle_reply)
- self.urls = []
- def connect(self, url):
- """Connect the service client to the proto://ip:port given in the url."""
- self.urls.append(url)
- self.socket.connect(url)
- def bind(self, url):
- """Bind the service client to the proto://ip:port given in the url."""
- self.urls.append(url)
- self.socket.bind(url)
- def send_request(self, request, args, kwargs, handler, timeout):
- """Send a request to the service."""
- req = {}
- req['method'] = request.method
- req['uri'] = request.uri
- req['version'] = request.version
- req['headers'] = dict(request.headers)
- body = request.body
- req['remote_ip'] = request.remote_ip
- req['protocol'] = request.protocol
- req['host'] = request.host
- req['files'] = request.files
- req['arguments'] = request.arguments
- req['args'] = args
- req['kwargs'] = kwargs
- msg_id = bytes(uuid.uuid4())
- msg_list = [b'|', msg_id, jsonapi.dumps(req)]
- if body:
- msg_list.append(body)
- logging.debug('Sending request: %r', msg_list)
- self.stream.send_multipart(msg_list)
- if timeout > 0:
- def _handle_timeout():
- handler.send_error(504) # Gateway timeout
- try:
- self._callbacks.pop(msg_id)
- except KeyError:
- logging.error('Unexpected error removing callbacks')
- dc = DelayedCallback(_handle_timeout, timeout, self.loop)
- dc.start()
- else:
- dc = None
- self._callbacks[msg_id] = (handler, dc)
- return msg_id
- def _handle_reply(self, msg_list):
- logging.debug('Handling reply: %r', msg_list)
- len_msg_list = len(msg_list)
- if len_msg_list < 3 or not msg_list[0] == b'|':
- logging.error('Unexpected reply in ZMQApplicationProxy._handle_reply')
- return
- msg_id = msg_list[1]
- replies = msg_list[2:]
- cb = self._callbacks.pop(msg_id, None)
- if cb is not None:
- handler, dc = cb
- if dc is not None:
- dc.stop()
- try:
- for reply in replies:
- handler.write(reply)
- # The backend has already processed the headers and they are
- # included in the above write calls, so we manually tell the
- # handler that the headers are already written.
- handler._headers_written = True
- # We set transforms to an empty list because the backend
- # has already applied all of the transforms.
- handler._transforms = []
- handler.finish()
- except:
- logging.error('Unexpected error in ZMQApplicationProxy._handle_reply', exc_info=True)
- class ZMQStreamingApplicationProxy(ZMQApplicationProxy):
- """A proxy for a ZeroMQ based ZMQApplication that is using ZMQStreamingHTTPRequest.
- This class is a proxy for a backend that is running a
- ZMQApplication and MUST be used with the ZMQStreamingHTTPRequest class.
- This version sends the reply parts (each generated by RequestHandler.flush)
- as separate zmq messages to enable streaming replies. See
- ZMQApplicationProxy, for a version that has lower latency, but which sends
- all reply parts as a single zmq message.
- """
- def _handle_reply(self, msg_list):
- logging.debug('Handling reply: %r', msg_list)
- len_msg_list = len(msg_list)
- if len_msg_list < 3 or not msg_list[0] == b'|':
- logging.error('Unexpected reply in ZMQStreamingApplicationProxy._handle_reply')
- return
- msg_id = msg_list[1]
- reply = msg_list[2]
- cb = self._callbacks.get(msg_id)
- if cb is not None:
- handler, dc = cb
- if reply == b'DATA' and len_msg_list == 4:
- if dc is not None:
- # Stop the timeout DelayedCallback and set it to None.
- dc.stop()
- self._callbacks[msg_id] = (handler, None)
- try:
- handler.write(msg_list[3])
- # The backend has already processed the headers and they are
- # included in the above write calls, so we manually tell the
- # handler that the headers are already written.
- handler._headers_written = True
- # We set transforms to an empty list because the backend
- # has already applied all of the transforms.
- handler._transforms = []
- handler.flush()
- except socket.error:
- # socket.error is raised if the client disconnects while
- # we are sending.
- pass
- except:
- logging.error('Unexpected write error', exc_info=True)
- elif reply == b'FINISH':
- # We are done so we can get rid of the callbacks for this msg_id.
- self._callbacks.pop(msg_id)
- try:
- handler.finish()
- except socket.error:
- # socket.error is raised if the client disconnects while
- # we are sending.
- pass
- except:
- logging.error('Unexpected finish error', exc_info=True)
- class ZMQRequestHandlerProxy(web.RequestHandler):
- """A handler for use with a ZeroMQ backend service client."""
- SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PUT", "OPTIONS")
- def initialize(self, proxy, timeout=0):
- """Initialize with a proxy and timeout.
- Parameters
- ----------
- proxy : ZMQApplicationProxy. ZMQStreamingApplicationProxy
- A proxy instance that will be used to send requests to a backend
- process.
- timeout : int
- The timeout, in milliseconds. If this timeout is reached
- before the backend's first reply, then the server is sent a
- status code of 504 to the browser to indicate a gateway/proxy
- timeout. Set to 0 or a negative number to disable (infinite
- timeout).
- """
- # zmqweb Note: This method is empty in the base class.
- self.proxy = proxy
- self.timeout = timeout
- def _execute(self, transforms, *args, **kwargs):
- """Executes this request with the given output transforms."""
- # ZMQWEB NOTE: Transforms should be applied in the backend service so
- # we null any transforms passed in here. This may be a little too
- # silent, but there may be other handlers that do need the transforms.
- self._transforms = []
- # ZMQWEB NOTE: This following try/except block is taken from the base
- # class, but is modified to send the request to the proxy.
- try:
- if self.request.method not in self.SUPPORTED_METHODS:
- raise web.HTTPError(405)
- # ZMQWEB NOTE: We have removed the XSRF cookie handling from here
- # as it will be handled in the backend.
- self.prepare()
- if not self._finished:
- # ZMQWEB NOTE: Here is where we send the request to the proxy.
- # We don't decode args or kwargs as that will be done in the
- # backen.
- self.proxy.send_request(
- self.request, args, kwargs, self, self.timeout
- )
- except Exception:
- # ZMQWEB NOTE: We don't call the usual error handling logic
- # as that will be called by the backend process.
- logging.error('Unexpected error in _execute', exc_info=True)