PageRenderTime 57ms CodeModel.GetById 13ms RepoModel.GetById 1ms app.codeStats 0ms

/pyzmq-2.2.0/zmq/web/proxy.py

#
Python | 247 lines | 145 code | 23 blank | 79 comment | 27 complexity | 24a6a405d43bc0d77096d31ae96f1550 MD5 | raw file
Possible License(s): LGPL-3.0, BSD-3-Clause
  1. """Proxy classes for forwarding tornado handlers to be run in separate processes.
  2. This module uses ZeroMQ/PyZMQ sockets (DEALER/ROUTER) to enable individual
  3. Tornado handlers to be run in a separate backend process. Through the
  4. usage of DEALER/ROUTER sockets, multiple backend processes for a given
  5. handler can be started and requests will be load balanced among the backend
  6. processes.
  7. Authors:
  8. * Brian Granger
  9. """
  10. #-----------------------------------------------------------------------------
  11. # Copyright (c) 2012 Brian Granger, Min Ragan-Kelley
  12. #
  13. # This file is part of pyzmq
  14. #
  15. # Distributed under the terms of the New BSD License. The full license is in
  16. # the file COPYING.BSD, distributed as part of this software.
  17. #-----------------------------------------------------------------------------
  18. #-----------------------------------------------------------------------------
  19. # Imports
  20. #-----------------------------------------------------------------------------
  21. import logging
  22. import socket
  23. import time
  24. import uuid
  25. from tornado import web
  26. from tornado import stack_context
  27. import zmq
  28. from zmq.eventloop.zmqstream import ZMQStream
  29. from zmq.eventloop.ioloop import IOLoop, DelayedCallback
  30. from zmq.utils import jsonapi
  31. from .zmqweb import ZMQHTTPRequest
  32. #-----------------------------------------------------------------------------
  33. # Service client
  34. #-----------------------------------------------------------------------------
  35. class ZMQApplicationProxy(object):
  36. """A proxy for a ZeroMQ based ZMQApplication that is using ZMQHTTPRequest.
  37. This class is a proxy for a backend that is running a
  38. ZMQApplication and MUST be used with the ZMQHTTPRequest class. This
  39. version sends the reply parts (each generated by RequestHandler.flush) as
  40. a single multipart message for low latency replies. See
  41. ZMQStreamingApplicationProxy, for a version that has higher latency, but
  42. which sends each reply part as a separate zmq message.
  43. """
  44. def __init__(self, loop=None, context=None):
  45. self.loop = loop if loop is not None else IOLoop.instance()
  46. self.context = context if context is not None else zmq.Context.instance()
  47. self._callbacks = {}
  48. self.socket = self.context.socket(zmq.DEALER)
  49. self.stream = ZMQStream(self.socket, self.loop)
  50. self.stream.on_recv(self._handle_reply)
  51. self.urls = []
  52. def connect(self, url):
  53. """Connect the service client to the proto://ip:port given in the url."""
  54. self.urls.append(url)
  55. self.socket.connect(url)
  56. def bind(self, url):
  57. """Bind the service client to the proto://ip:port given in the url."""
  58. self.urls.append(url)
  59. self.socket.bind(url)
  60. def send_request(self, request, args, kwargs, handler, timeout):
  61. """Send a request to the service."""
  62. req = {}
  63. req['method'] = request.method
  64. req['uri'] = request.uri
  65. req['version'] = request.version
  66. req['headers'] = dict(request.headers)
  67. body = request.body
  68. req['remote_ip'] = request.remote_ip
  69. req['protocol'] = request.protocol
  70. req['host'] = request.host
  71. req['files'] = request.files
  72. req['arguments'] = request.arguments
  73. req['args'] = args
  74. req['kwargs'] = kwargs
  75. msg_id = bytes(uuid.uuid4())
  76. msg_list = [b'|', msg_id, jsonapi.dumps(req)]
  77. if body:
  78. msg_list.append(body)
  79. logging.debug('Sending request: %r', msg_list)
  80. self.stream.send_multipart(msg_list)
  81. if timeout > 0:
  82. def _handle_timeout():
  83. handler.send_error(504) # Gateway timeout
  84. try:
  85. self._callbacks.pop(msg_id)
  86. except KeyError:
  87. logging.error('Unexpected error removing callbacks')
  88. dc = DelayedCallback(_handle_timeout, timeout, self.loop)
  89. dc.start()
  90. else:
  91. dc = None
  92. self._callbacks[msg_id] = (handler, dc)
  93. return msg_id
  94. def _handle_reply(self, msg_list):
  95. logging.debug('Handling reply: %r', msg_list)
  96. len_msg_list = len(msg_list)
  97. if len_msg_list < 3 or not msg_list[0] == b'|':
  98. logging.error('Unexpected reply in ZMQApplicationProxy._handle_reply')
  99. return
  100. msg_id = msg_list[1]
  101. replies = msg_list[2:]
  102. cb = self._callbacks.pop(msg_id, None)
  103. if cb is not None:
  104. handler, dc = cb
  105. if dc is not None:
  106. dc.stop()
  107. try:
  108. for reply in replies:
  109. handler.write(reply)
  110. # The backend has already processed the headers and they are
  111. # included in the above write calls, so we manually tell the
  112. # handler that the headers are already written.
  113. handler._headers_written = True
  114. # We set transforms to an empty list because the backend
  115. # has already applied all of the transforms.
  116. handler._transforms = []
  117. handler.finish()
  118. except:
  119. logging.error('Unexpected error in ZMQApplicationProxy._handle_reply', exc_info=True)
  120. class ZMQStreamingApplicationProxy(ZMQApplicationProxy):
  121. """A proxy for a ZeroMQ based ZMQApplication that is using ZMQStreamingHTTPRequest.
  122. This class is a proxy for a backend that is running a
  123. ZMQApplication and MUST be used with the ZMQStreamingHTTPRequest class.
  124. This version sends the reply parts (each generated by RequestHandler.flush)
  125. as separate zmq messages to enable streaming replies. See
  126. ZMQApplicationProxy, for a version that has lower latency, but which sends
  127. all reply parts as a single zmq message.
  128. """
  129. def _handle_reply(self, msg_list):
  130. logging.debug('Handling reply: %r', msg_list)
  131. len_msg_list = len(msg_list)
  132. if len_msg_list < 3 or not msg_list[0] == b'|':
  133. logging.error('Unexpected reply in ZMQStreamingApplicationProxy._handle_reply')
  134. return
  135. msg_id = msg_list[1]
  136. reply = msg_list[2]
  137. cb = self._callbacks.get(msg_id)
  138. if cb is not None:
  139. handler, dc = cb
  140. if reply == b'DATA' and len_msg_list == 4:
  141. if dc is not None:
  142. # Stop the timeout DelayedCallback and set it to None.
  143. dc.stop()
  144. self._callbacks[msg_id] = (handler, None)
  145. try:
  146. handler.write(msg_list[3])
  147. # The backend has already processed the headers and they are
  148. # included in the above write calls, so we manually tell the
  149. # handler that the headers are already written.
  150. handler._headers_written = True
  151. # We set transforms to an empty list because the backend
  152. # has already applied all of the transforms.
  153. handler._transforms = []
  154. handler.flush()
  155. except socket.error:
  156. # socket.error is raised if the client disconnects while
  157. # we are sending.
  158. pass
  159. except:
  160. logging.error('Unexpected write error', exc_info=True)
  161. elif reply == b'FINISH':
  162. # We are done so we can get rid of the callbacks for this msg_id.
  163. self._callbacks.pop(msg_id)
  164. try:
  165. handler.finish()
  166. except socket.error:
  167. # socket.error is raised if the client disconnects while
  168. # we are sending.
  169. pass
  170. except:
  171. logging.error('Unexpected finish error', exc_info=True)
  172. class ZMQRequestHandlerProxy(web.RequestHandler):
  173. """A handler for use with a ZeroMQ backend service client."""
  174. SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PUT", "OPTIONS")
  175. def initialize(self, proxy, timeout=0):
  176. """Initialize with a proxy and timeout.
  177. Parameters
  178. ----------
  179. proxy : ZMQApplicationProxy. ZMQStreamingApplicationProxy
  180. A proxy instance that will be used to send requests to a backend
  181. process.
  182. timeout : int
  183. The timeout, in milliseconds. If this timeout is reached
  184. before the backend's first reply, then the server is sent a
  185. status code of 504 to the browser to indicate a gateway/proxy
  186. timeout. Set to 0 or a negative number to disable (infinite
  187. timeout).
  188. """
  189. # zmqweb Note: This method is empty in the base class.
  190. self.proxy = proxy
  191. self.timeout = timeout
  192. def _execute(self, transforms, *args, **kwargs):
  193. """Executes this request with the given output transforms."""
  194. # ZMQWEB NOTE: Transforms should be applied in the backend service so
  195. # we null any transforms passed in here. This may be a little too
  196. # silent, but there may be other handlers that do need the transforms.
  197. self._transforms = []
  198. # ZMQWEB NOTE: This following try/except block is taken from the base
  199. # class, but is modified to send the request to the proxy.
  200. try:
  201. if self.request.method not in self.SUPPORTED_METHODS:
  202. raise web.HTTPError(405)
  203. # ZMQWEB NOTE: We have removed the XSRF cookie handling from here
  204. # as it will be handled in the backend.
  205. self.prepare()
  206. if not self._finished:
  207. # ZMQWEB NOTE: Here is where we send the request to the proxy.
  208. # We don't decode args or kwargs as that will be done in the
  209. # backen.
  210. self.proxy.send_request(
  211. self.request, args, kwargs, self, self.timeout
  212. )
  213. except Exception:
  214. # ZMQWEB NOTE: We don't call the usual error handling logic
  215. # as that will be called by the backend process.
  216. logging.error('Unexpected error in _execute', exc_info=True)