?????????? ????????? - ??????????????? - /home/agenciai/public_html/cd38d8/multiprocessing.tar
???????
connection.py 0000644 00000076160 15125160144 0007266 0 ustar 00 # # A higher level module for using sockets (or Windows named pipes) # # multiprocessing/connection.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] import io import os import sys import socket import struct import time import tempfile import itertools import _multiprocessing from . import util from . import AuthenticationError, BufferTooShort from .context import reduction _ForkingPickler = reduction.ForkingPickler try: import _winapi from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE except ImportError: if sys.platform == 'win32': raise _winapi = None # # # BUFSIZE = 8192 # A very generous timeout when it comes to local connections... CONNECTION_TIMEOUT = 20. # The hmac module implicitly defaults to using MD5. # Support using a stronger algorithm for the challenge/response code: HMAC_DIGEST_NAME='sha256' _mmap_counter = itertools.count() default_family = 'AF_INET' families = ['AF_INET'] if hasattr(socket, 'AF_UNIX'): default_family = 'AF_UNIX' families += ['AF_UNIX'] if sys.platform == 'win32': default_family = 'AF_PIPE' families += ['AF_PIPE'] def _init_timeout(timeout=CONNECTION_TIMEOUT): return time.monotonic() + timeout def _check_timeout(t): return time.monotonic() > t # # # def arbitrary_address(family): ''' Return an arbitrary free address for the given family ''' if family == 'AF_INET': return ('localhost', 0) elif family == 'AF_UNIX': return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) elif family == 'AF_PIPE': return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % (os.getpid(), next(_mmap_counter)), dir="") else: raise ValueError('unrecognized family') def _validate_family(family): ''' Checks if the family is valid for the current environment. ''' if sys.platform != 'win32' and family == 'AF_PIPE': raise ValueError('Family %s is not recognized.' % family) if sys.platform == 'win32' and family == 'AF_UNIX': # double check if not hasattr(socket, family): raise ValueError('Family %s is not recognized.' % family) def address_type(address): ''' Return the types of the address This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' ''' if type(address) == tuple: return 'AF_INET' elif type(address) is str and address.startswith('\\\\'): return 'AF_PIPE' elif type(address) is str or util.is_abstract_socket_namespace(address): return 'AF_UNIX' else: raise ValueError('address type of %r unrecognized' % address) # # Connection classes # class _ConnectionBase: _handle = None def __init__(self, handle, readable=True, writable=True): handle = handle.__index__() if handle < 0: raise ValueError("invalid handle") if not readable and not writable: raise ValueError( "at least one of `readable` and `writable` must be True") self._handle = handle self._readable = readable self._writable = writable # XXX should we use util.Finalize instead of a __del__? def __del__(self): if self._handle is not None: self._close() def _check_closed(self): if self._handle is None: raise OSError("handle is closed") def _check_readable(self): if not self._readable: raise OSError("connection is write-only") def _check_writable(self): if not self._writable: raise OSError("connection is read-only") def _bad_message_length(self): if self._writable: self._readable = False else: self.close() raise OSError("bad message length") @property def closed(self): """True if the connection is closed""" return self._handle is None @property def readable(self): """True if the connection is readable""" return self._readable @property def writable(self): """True if the connection is writable""" return self._writable def fileno(self): """File descriptor or handle of the connection""" self._check_closed() return self._handle def close(self): """Close the connection""" if self._handle is not None: try: self._close() finally: self._handle = None def send_bytes(self, buf, offset=0, size=None): """Send the bytes data from a bytes-like object""" self._check_closed() self._check_writable() m = memoryview(buf) # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) if m.itemsize > 1: m = memoryview(bytes(m)) n = len(m) if offset < 0: raise ValueError("offset is negative") if n < offset: raise ValueError("buffer length < offset") if size is None: size = n - offset elif size < 0: raise ValueError("size is negative") elif offset + size > n: raise ValueError("buffer length < offset + size") self._send_bytes(m[offset:offset + size]) def send(self, obj): """Send a (picklable) object""" self._check_closed() self._check_writable() self._send_bytes(_ForkingPickler.dumps(obj)) def recv_bytes(self, maxlength=None): """ Receive bytes data as a bytes object. """ self._check_closed() self._check_readable() if maxlength is not None and maxlength < 0: raise ValueError("negative maxlength") buf = self._recv_bytes(maxlength) if buf is None: self._bad_message_length() return buf.getvalue() def recv_bytes_into(self, buf, offset=0): """ Receive bytes data into a writeable bytes-like object. Return the number of bytes read. """ self._check_closed() self._check_readable() with memoryview(buf) as m: # Get bytesize of arbitrary buffer itemsize = m.itemsize bytesize = itemsize * len(m) if offset < 0: raise ValueError("negative offset") elif offset > bytesize: raise ValueError("offset too large") result = self._recv_bytes() size = result.tell() if bytesize < offset + size: raise BufferTooShort(result.getvalue()) # Message can fit in dest result.seek(0) result.readinto(m[offset // itemsize : (offset + size) // itemsize]) return size def recv(self): """Receive a (picklable) object""" self._check_closed() self._check_readable() buf = self._recv_bytes() return _ForkingPickler.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" self._check_closed() self._check_readable() return self._poll(timeout) def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.close() if _winapi: class PipeConnection(_ConnectionBase): """ Connection class based on a Windows named pipe. Overlapped I/O is used, so the handles must have been created with FILE_FLAG_OVERLAPPED. """ _got_empty_message = False def _close(self, _CloseHandle=_winapi.CloseHandle): _CloseHandle(self._handle) def _send_bytes(self, buf): ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) try: if err == _winapi.ERROR_IO_PENDING: waitres = _winapi.WaitForMultipleObjects( [ov.event], False, INFINITE) assert waitres == WAIT_OBJECT_0 except: ov.cancel() raise finally: nwritten, err = ov.GetOverlappedResult(True) assert err == 0 assert nwritten == len(buf) def _recv_bytes(self, maxsize=None): if self._got_empty_message: self._got_empty_message = False return io.BytesIO() else: bsize = 128 if maxsize is None else min(maxsize, 128) try: ov, err = _winapi.ReadFile(self._handle, bsize, overlapped=True) try: if err == _winapi.ERROR_IO_PENDING: waitres = _winapi.WaitForMultipleObjects( [ov.event], False, INFINITE) assert waitres == WAIT_OBJECT_0 except: ov.cancel() raise finally: nread, err = ov.GetOverlappedResult(True) if err == 0: f = io.BytesIO() f.write(ov.getbuffer()) return f elif err == _winapi.ERROR_MORE_DATA: return self._get_more_data(ov, maxsize) except OSError as e: if e.winerror == _winapi.ERROR_BROKEN_PIPE: raise EOFError else: raise raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") def _poll(self, timeout): if (self._got_empty_message or _winapi.PeekNamedPipe(self._handle)[0] != 0): return True return bool(wait([self], timeout)) def _get_more_data(self, ov, maxsize): buf = ov.getbuffer() f = io.BytesIO() f.write(buf) left = _winapi.PeekNamedPipe(self._handle)[1] assert left > 0 if maxsize is not None and len(buf) + left > maxsize: self._bad_message_length() ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) rbytes, err = ov.GetOverlappedResult(True) assert err == 0 assert rbytes == left f.write(ov.getbuffer()) return f class Connection(_ConnectionBase): """ Connection class based on an arbitrary file descriptor (Unix only), or a socket handle (Windows). """ if _winapi: def _close(self, _close=_multiprocessing.closesocket): _close(self._handle) _write = _multiprocessing.send _read = _multiprocessing.recv else: def _close(self, _close=os.close): _close(self._handle) _write = os.write _read = os.read def _send(self, buf, write=_write): remaining = len(buf) while True: n = write(self._handle, buf) remaining -= n if remaining == 0: break buf = buf[n:] def _recv(self, size, read=_read): buf = io.BytesIO() handle = self._handle remaining = size while remaining > 0: chunk = read(handle, remaining) n = len(chunk) if n == 0: if remaining == size: raise EOFError else: raise OSError("got end of file during message") buf.write(chunk) remaining -= n return buf def _send_bytes(self, buf): n = len(buf) if n > 0x7fffffff: pre_header = struct.pack("!i", -1) header = struct.pack("!Q", n) self._send(pre_header) self._send(header) self._send(buf) else: # For wire compatibility with 3.7 and lower header = struct.pack("!i", n) if n > 16384: # The payload is large so Nagle's algorithm won't be triggered # and we'd better avoid the cost of concatenation. self._send(header) self._send(buf) else: # Issue #20540: concatenate before sending, to avoid delays due # to Nagle's algorithm on a TCP socket. # Also note we want to avoid sending a 0-length buffer separately, # to avoid "broken pipe" errors if the other end closed the pipe. self._send(header + buf) def _recv_bytes(self, maxsize=None): buf = self._recv(4) size, = struct.unpack("!i", buf.getvalue()) if size == -1: buf = self._recv(8) size, = struct.unpack("!Q", buf.getvalue()) if maxsize is not None and size > maxsize: return None return self._recv(size) def _poll(self, timeout): r = wait([self], timeout) return bool(r) # # Public functions # class Listener(object): ''' Returns a listener object. This is a wrapper for a bound socket which is 'listening' for connections, or for a Windows named pipe. ''' def __init__(self, address=None, family=None, backlog=1, authkey=None): family = family or (address and address_type(address)) \ or default_family address = address or arbitrary_address(family) _validate_family(family) if family == 'AF_PIPE': self._listener = PipeListener(address, backlog) else: self._listener = SocketListener(address, family, backlog) if authkey is not None and not isinstance(authkey, bytes): raise TypeError('authkey should be a byte string') self._authkey = authkey def accept(self): ''' Accept a connection on the bound socket or named pipe of `self`. Returns a `Connection` object. ''' if self._listener is None: raise OSError('listener is closed') c = self._listener.accept() if self._authkey: deliver_challenge(c, self._authkey) answer_challenge(c, self._authkey) return c def close(self): ''' Close the bound socket or named pipe of `self`. ''' listener = self._listener if listener is not None: self._listener = None listener.close() @property def address(self): return self._listener._address @property def last_accepted(self): return self._listener._last_accepted def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.close() def Client(address, family=None, authkey=None): ''' Returns a connection to the address of a `Listener` ''' family = family or address_type(address) _validate_family(family) if family == 'AF_PIPE': c = PipeClient(address) else: c = SocketClient(address) if authkey is not None and not isinstance(authkey, bytes): raise TypeError('authkey should be a byte string') if authkey is not None: answer_challenge(c, authkey) deliver_challenge(c, authkey) return c if sys.platform != 'win32': def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else: fd1, fd2 = os.pipe() c1 = Connection(fd1, writable=False) c2 = Connection(fd2, readable=False) return c1, c2 else: def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' address = arbitrary_address('AF_PIPE') if duplex: openmode = _winapi.PIPE_ACCESS_DUPLEX access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE obsize, ibsize = BUFSIZE, BUFSIZE else: openmode = _winapi.PIPE_ACCESS_INBOUND access = _winapi.GENERIC_WRITE obsize, ibsize = 0, BUFSIZE h1 = _winapi.CreateNamedPipe( address, openmode | _winapi.FILE_FLAG_OVERLAPPED | _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | _winapi.PIPE_WAIT, 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, # default security descriptor: the handle cannot be inherited _winapi.NULL ) h2 = _winapi.CreateFile( address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) _winapi.SetNamedPipeHandleState( h2, _winapi.PIPE_READMODE_MESSAGE, None, None ) overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) _, err = overlapped.GetOverlappedResult(True) assert err == 0 c1 = PipeConnection(h1, writable=duplex) c2 = PipeConnection(h2, readable=duplex) return c1, c2 # # Definitions for connections based on sockets # class SocketListener(object): ''' Representation of a socket which is bound to an address and listening ''' def __init__(self, address, family, backlog=1): self._socket = socket.socket(getattr(socket, family)) try: # SO_REUSEADDR has different semantics on Windows (issue #2550). if os.name == 'posix': self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.setblocking(True) self._socket.bind(address) self._socket.listen(backlog) self._address = self._socket.getsockname() except OSError: self._socket.close() raise self._family = family self._last_accepted = None if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address): # Linux abstract socket namespaces do not need to be explicitly unlinked self._unlink = util.Finalize( self, os.unlink, args=(address,), exitpriority=0 ) else: self._unlink = None def accept(self): s, self._last_accepted = self._socket.accept() s.setblocking(True) return Connection(s.detach()) def close(self): try: self._socket.close() finally: unlink = self._unlink if unlink is not None: self._unlink = None unlink() def SocketClient(address): ''' Return a connection object connected to the socket given by `address` ''' family = address_type(address) with socket.socket( getattr(socket, family) ) as s: s.setblocking(True) s.connect(address) return Connection(s.detach()) # # Definitions for connections based on named pipes # if sys.platform == 'win32': class PipeListener(object): ''' Representation of a named pipe ''' def __init__(self, address, backlog=None): self._address = address self._handle_queue = [self._new_handle(first=True)] self._last_accepted = None util.sub_debug('listener created with address=%r', self._address) self.close = util.Finalize( self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) def _new_handle(self, first=False): flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED if first: flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE return _winapi.CreateNamedPipe( self._address, flags, _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | _winapi.PIPE_WAIT, _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) def accept(self): self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) try: ov = _winapi.ConnectNamedPipe(handle, overlapped=True) except OSError as e: if e.winerror != _winapi.ERROR_NO_DATA: raise # ERROR_NO_DATA can occur if a client has already connected, # written data and then disconnected -- see Issue 14725. else: try: res = _winapi.WaitForMultipleObjects( [ov.event], False, INFINITE) except: ov.cancel() _winapi.CloseHandle(handle) raise finally: _, err = ov.GetOverlappedResult(True) assert err == 0 return PipeConnection(handle) @staticmethod def _finalize_pipe_listener(queue, address): util.sub_debug('closing listener with address=%r', address) for handle in queue: _winapi.CloseHandle(handle) def PipeClient(address): ''' Return a connection object connected to the pipe given by `address` ''' t = _init_timeout() while 1: try: _winapi.WaitNamedPipe(address, 1000) h = _winapi.CreateFile( address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 0, _winapi.NULL, _winapi.OPEN_EXISTING, _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) except OSError as e: if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): raise else: break else: raise _winapi.SetNamedPipeHandleState( h, _winapi.PIPE_READMODE_MESSAGE, None, None ) return PipeConnection(h) # # Authentication stuff # MESSAGE_LENGTH = 20 CHALLENGE = b'#CHALLENGE#' WELCOME = b'#WELCOME#' FAILURE = b'#FAILURE#' def deliver_challenge(connection, authkey): import hmac if not isinstance(authkey, bytes): raise ValueError( "Authkey must be bytes, not {0!s}".format(type(authkey))) message = os.urandom(MESSAGE_LENGTH) connection.send_bytes(CHALLENGE + message) digest = hmac.new(authkey, message, HMAC_DIGEST_NAME).digest() response = connection.recv_bytes(256) # reject large message if response == digest: connection.send_bytes(WELCOME) else: connection.send_bytes(FAILURE) raise AuthenticationError('digest received was wrong') def answer_challenge(connection, authkey): import hmac if not isinstance(authkey, bytes): raise ValueError( "Authkey must be bytes, not {0!s}".format(type(authkey))) message = connection.recv_bytes(256) # reject large message assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message message = message[len(CHALLENGE):] digest = hmac.new(authkey, message, HMAC_DIGEST_NAME).digest() connection.send_bytes(digest) response = connection.recv_bytes(256) # reject large message if response != WELCOME: raise AuthenticationError('digest sent was rejected') # # Support for using xmlrpclib for serialization # class ConnectionWrapper(object): def __init__(self, conn, dumps, loads): self._conn = conn self._dumps = dumps self._loads = loads for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): obj = getattr(conn, attr) setattr(self, attr, obj) def send(self, obj): s = self._dumps(obj) self._conn.send_bytes(s) def recv(self): s = self._conn.recv_bytes() return self._loads(s) def _xml_dumps(obj): return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') def _xml_loads(s): (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj class XmlListener(Listener): def accept(self): global xmlrpclib import xmlrpc.client as xmlrpclib obj = Listener.accept(self) return ConnectionWrapper(obj, _xml_dumps, _xml_loads) def XmlClient(*args, **kwds): global xmlrpclib import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) # # Wait # if sys.platform == 'win32': def _exhaustive_wait(handles, timeout): # Return ALL handles which are currently signalled. (Only # returning the first signalled might create starvation issues.) L = list(handles) ready = [] while L: res = _winapi.WaitForMultipleObjects(L, False, timeout) if res == WAIT_TIMEOUT: break elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): res -= WAIT_OBJECT_0 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): res -= WAIT_ABANDONED_0 else: raise RuntimeError('Should not get here') ready.append(L[res]) L = L[res+1:] timeout = 0 return ready _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} def wait(object_list, timeout=None): ''' Wait till an object in object_list is ready/readable. Returns list of those objects in object_list which are ready/readable. ''' if timeout is None: timeout = INFINITE elif timeout < 0: timeout = 0 else: timeout = int(timeout * 1000 + 0.5) object_list = list(object_list) waithandle_to_obj = {} ov_list = [] ready_objects = set() ready_handles = set() try: for o in object_list: try: fileno = getattr(o, 'fileno') except AttributeError: waithandle_to_obj[o.__index__()] = o else: # start an overlapped read of length zero try: ov, err = _winapi.ReadFile(fileno(), 0, True) except OSError as e: ov, err = None, e.winerror if err not in _ready_errors: raise if err == _winapi.ERROR_IO_PENDING: ov_list.append(ov) waithandle_to_obj[ov.event] = o else: # If o.fileno() is an overlapped pipe handle and # err == 0 then there is a zero length message # in the pipe, but it HAS NOT been consumed... if ov and sys.getwindowsversion()[:2] >= (6, 2): # ... except on Windows 8 and later, where # the message HAS been consumed. try: _, err = ov.GetOverlappedResult(False) except OSError as e: err = e.winerror if not err and hasattr(o, '_got_empty_message'): o._got_empty_message = True ready_objects.add(o) timeout = 0 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) finally: # request that overlapped reads stop for ov in ov_list: ov.cancel() # wait for all overlapped reads to stop for ov in ov_list: try: _, err = ov.GetOverlappedResult(True) except OSError as e: err = e.winerror if err not in _ready_errors: raise if err != _winapi.ERROR_OPERATION_ABORTED: o = waithandle_to_obj[ov.event] ready_objects.add(o) if err == 0: # If o.fileno() is an overlapped pipe handle then # a zero length message HAS been consumed. if hasattr(o, '_got_empty_message'): o._got_empty_message = True ready_objects.update(waithandle_to_obj[h] for h in ready_handles) return [o for o in object_list if o in ready_objects] else: import selectors # poll/select have the advantage of not requiring any extra file # descriptor, contrarily to epoll/kqueue (also, they require a single # syscall). if hasattr(selectors, 'PollSelector'): _WaitSelector = selectors.PollSelector else: _WaitSelector = selectors.SelectSelector def wait(object_list, timeout=None): ''' Wait till an object in object_list is ready/readable. Returns list of those objects in object_list which are ready/readable. ''' with _WaitSelector() as selector: for obj in object_list: selector.register(obj, selectors.EVENT_READ) if timeout is not None: deadline = time.monotonic() + timeout while True: ready = selector.select(timeout) if ready: return [key.fileobj for (key, events) in ready] else: if timeout is not None: timeout = deadline - time.monotonic() if timeout < 0: return ready # # Make connection and socket objects sharable if possible # if sys.platform == 'win32': def reduce_connection(conn): handle = conn.fileno() with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: from . import resource_sharer ds = resource_sharer.DupSocket(s) return rebuild_connection, (ds, conn.readable, conn.writable) def rebuild_connection(ds, readable, writable): sock = ds.detach() return Connection(sock.detach(), readable, writable) reduction.register(Connection, reduce_connection) def reduce_pipe_connection(conn): access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) dh = reduction.DupHandle(conn.fileno(), access) return rebuild_pipe_connection, (dh, conn.readable, conn.writable) def rebuild_pipe_connection(dh, readable, writable): handle = dh.detach() return PipeConnection(handle, readable, writable) reduction.register(PipeConnection, reduce_pipe_connection) else: def reduce_connection(conn): df = reduction.DupFd(conn.fileno()) return rebuild_connection, (df, conn.readable, conn.writable) def rebuild_connection(df, readable, writable): fd = df.detach() return Connection(fd, readable, writable) reduction.register(Connection, reduce_connection) forkserver.py 0000644 00000027556 15125160144 0007324 0 ustar 00 import errno import os import selectors import signal import socket import struct import sys import threading import warnings from . import connection from . import process from .context import reduction from . import resource_tracker from . import spawn from . import util __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process', 'set_forkserver_preload'] # # # MAXFDS_TO_SEND = 256 SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t # # Forkserver class # class ForkServer(object): def __init__(self): self._forkserver_address = None self._forkserver_alive_fd = None self._forkserver_pid = None self._inherited_fds = None self._lock = threading.Lock() self._preload_modules = ['__main__'] def _stop(self): # Method used by unit tests to stop the server with self._lock: self._stop_unlocked() def _stop_unlocked(self): if self._forkserver_pid is None: return # close the "alive" file descriptor asks the server to stop os.close(self._forkserver_alive_fd) self._forkserver_alive_fd = None os.waitpid(self._forkserver_pid, 0) self._forkserver_pid = None if not util.is_abstract_socket_namespace(self._forkserver_address): os.unlink(self._forkserver_address) self._forkserver_address = None def set_forkserver_preload(self, modules_names): '''Set list of module names to try to load in forkserver process.''' if not all(type(mod) is str for mod in self._preload_modules): raise TypeError('module_names must be a list of strings') self._preload_modules = modules_names def get_inherited_fds(self): '''Return list of fds inherited from parent process. This returns None if the current process was not started by fork server. ''' return self._inherited_fds def connect_to_new_process(self, fds): '''Request forkserver to create a child process. Returns a pair of fds (status_r, data_w). The calling process can read the child process's pid and (eventually) its returncode from status_r. The calling process should write to data_w the pickled preparation and process data. ''' self.ensure_running() if len(fds) + 4 >= MAXFDS_TO_SEND: raise ValueError('too many fds') with socket.socket(socket.AF_UNIX) as client: client.connect(self._forkserver_address) parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, resource_tracker.getfd()] allfds += fds try: reduction.sendfds(client, allfds) return parent_r, parent_w except: os.close(parent_r) os.close(parent_w) raise finally: os.close(child_r) os.close(child_w) def ensure_running(self): '''Make sure that a fork server is running. This can be called from any process. Note that usually a child process will just reuse the forkserver started by its parent, so ensure_running() will do nothing. ''' with self._lock: resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) if not pid: # still alive return # dead, launch it again os.close(self._forkserver_alive_fd) self._forkserver_address = None self._forkserver_alive_fd = None self._forkserver_pid = None cmd = ('from multiprocessing.forkserver import main; ' + 'main(%d, %d, %r, **%r)') if self._preload_modules: desired_keys = {'main_path', 'sys_path'} data = spawn.get_preparation_data('ignore') data = {x: y for x, y in data.items() if x in desired_keys} else: data = {} with socket.socket(socket.AF_UNIX) as listener: address = connection.arbitrary_address('AF_UNIX') listener.bind(address) if not util.is_abstract_socket_namespace(address): os.chmod(address, 0o600) listener.listen() # all client processes own the write end of the "alive" pipe; # when they all terminate the read end becomes ready. alive_r, alive_w = os.pipe() try: fds_to_pass = [listener.fileno(), alive_r] cmd %= (listener.fileno(), alive_r, self._preload_modules, data) exe = spawn.get_executable() args = [exe] + util._args_from_interpreter_flags() args += ['-c', cmd] pid = util.spawnv_passfds(exe, args, fds_to_pass) except: os.close(alive_w) raise finally: os.close(alive_r) self._forkserver_address = address self._forkserver_alive_fd = alive_w self._forkserver_pid = pid # # # def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): '''Run forkserver.''' if preload: if '__main__' in preload and main_path is not None: process.current_process()._inheriting = True try: spawn.import_main_path(main_path) finally: del process.current_process()._inheriting for modname in preload: try: __import__(modname) except ImportError: pass util._close_stdin() sig_r, sig_w = os.pipe() os.set_blocking(sig_r, False) os.set_blocking(sig_w, False) def sigchld_handler(*_unused): # Dummy signal handler, doesn't do anything pass handlers = { # unblocking SIGCHLD allows the wakeup fd to notify our event loop signal.SIGCHLD: sigchld_handler, # protect the process from ^C signal.SIGINT: signal.SIG_IGN, } old_handlers = {sig: signal.signal(sig, val) for (sig, val) in handlers.items()} # calling os.write() in the Python signal handler is racy signal.set_wakeup_fd(sig_w) # map child pids to client fds pid_to_fd = {} with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \ selectors.DefaultSelector() as selector: _forkserver._forkserver_address = listener.getsockname() selector.register(listener, selectors.EVENT_READ) selector.register(alive_r, selectors.EVENT_READ) selector.register(sig_r, selectors.EVENT_READ) while True: try: while True: rfds = [key.fileobj for (key, events) in selector.select()] if rfds: break if alive_r in rfds: # EOF because no more client processes left assert os.read(alive_r, 1) == b'', "Not at EOF?" raise SystemExit if sig_r in rfds: # Got SIGCHLD os.read(sig_r, 65536) # exhaust while True: # Scan for child processes try: pid, sts = os.waitpid(-1, os.WNOHANG) except ChildProcessError: break if pid == 0: break child_w = pid_to_fd.pop(pid, None) if child_w is not None: returncode = os.waitstatus_to_exitcode(sts) # Send exit code to client process try: write_signed(child_w, returncode) except BrokenPipeError: # client vanished pass os.close(child_w) else: # This shouldn't happen really warnings.warn('forkserver: waitpid returned ' 'unexpected pid %d' % pid) if listener in rfds: # Incoming fork request with listener.accept()[0] as s: # Receive fds from client fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) if len(fds) > MAXFDS_TO_SEND: raise RuntimeError( "Too many ({0:n}) fds to send".format( len(fds))) child_r, child_w, *fds = fds s.close() pid = os.fork() if pid == 0: # Child code = 1 try: listener.close() selector.close() unused_fds = [alive_r, child_w, sig_r, sig_w] unused_fds.extend(pid_to_fd.values()) code = _serve_one(child_r, fds, unused_fds, old_handlers) except Exception: sys.excepthook(*sys.exc_info()) sys.stderr.flush() finally: os._exit(code) else: # Send pid to client process try: write_signed(child_w, pid) except BrokenPipeError: # client vanished pass pid_to_fd[pid] = child_w os.close(child_r) for fd in fds: os.close(fd) except OSError as e: if e.errno != errno.ECONNABORTED: raise def _serve_one(child_r, fds, unused_fds, handlers): # close unnecessary stuff and reset signal handlers signal.set_wakeup_fd(-1) for sig, val in handlers.items(): signal.signal(sig, val) for fd in unused_fds: os.close(fd) (_forkserver._forkserver_alive_fd, resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe parent_sentinel = os.dup(child_r) code = spawn._main(child_r, parent_sentinel) return code # # Read and write signed numbers # def read_signed(fd): data = b'' length = SIGNED_STRUCT.size while len(data) < length: s = os.read(fd, length - len(data)) if not s: raise EOFError('unexpected EOF') data += s return SIGNED_STRUCT.unpack(data)[0] def write_signed(fd, n): msg = SIGNED_STRUCT.pack(n) while msg: nbytes = os.write(fd, msg) if nbytes == 0: raise RuntimeError('should not get here') msg = msg[nbytes:] # # # _forkserver = ForkServer() ensure_running = _forkserver.ensure_running get_inherited_fds = _forkserver.get_inherited_fds connect_to_new_process = _forkserver.connect_to_new_process set_forkserver_preload = _forkserver.set_forkserver_preload __pycache__/shared_memory.cpython-39.opt-1.pyc 0000644 00000033630 15125160144 0015206 0 ustar 00 a �i�G � @ s� d Z ddgZddlmZ ddlZddlZddlZddlZddlZddl Z ej dkr`ddlZdZnddl Z dZejejB Zd Zer�d ZndZdd � ZG dd� d�ZdZG dd� d�ZdS )z�Provides shared memory for direct access across processes. The API of this package is currently provisional. Refer to the documentation for details. �SharedMemory� ShareableList� )�partialN�ntFT� z/psm_Zwnsm_c C s"