Edgewall Software

Ticket #9111: t9111-wait-if-waiters.diff

File t9111-wait-if-waiters.diff, 5.0 KB (added by cboos, 23 months ago)

pool: fair attribution of Connection. (patch on r9929)

  • trac/db/pool.py

     
    7272        self._pool = [] 
    7373        self._pool_key = [] 
    7474        self._pool_time = [] 
     75        self._waiters = 0 
    7576 
    7677    def get_cnx(self, connector, kwargs, timeout=None): 
    77         num = 1 
    7878        cnx = None 
    7979        log = kwargs.get('log') 
    8080        key = unicode(kwargs) 
     
    8282        tid = threading._get_ident() 
    8383        self._available.acquire() 
    8484        try: 
    85             while True: 
    86                 # First choice: Return the same cnx already used by the thread 
    87                 if (tid, key) in self._active: 
    88                     cnx, num = self._active[(tid, key)] 
    89                     num += 1 
    90                 # Second best option: Reuse a live pooled connection 
    91                 elif key in self._pool_key: 
    92                     idx = self._pool_key.index(key) 
    93                     self._pool_key.pop(idx) 
    94                     self._pool_time.pop(idx) 
    95                     cnx = self._pool.pop(idx) 
    96                     # If possible, verify that the pooled connection is 
    97                     # still available and working. 
    98                     if hasattr(cnx, 'ping'): 
    99                         try: 
    100                             cnx.ping() 
    101                         except: 
    102                             continue 
    103                 # Third best option: Create a new connection 
    104                 elif len(self._active) + len(self._pool) < self._maxsize: 
    105                     cnx = connector.get_connection(**kwargs) 
    106                 # Forth best option: Replace a pooled connection with a new one 
    107                 elif len(self._active)  < self._maxsize: 
    108                     # Remove the LRU connection in the pool 
    109                     self._pool.pop(0).close() 
    110                     self._pool_key.pop(0) 
    111                     self._pool_time.pop(0) 
    112                     cnx = connector.get_connection(**kwargs) 
    113                 if cnx: 
    114                     self._active[(tid, key)] = (cnx, num) 
    115                     return PooledConnection(self, cnx, key, tid, log) 
    116                 # Worst option: wait until a connection pool slot is available 
    117                 if timeout and (time.time() - start) > timeout: 
    118                     raise TimeoutError(_('Unable to get database ' 
    119                                          'connection within %(time)d ' 
    120                                          'seconds', time=timeout)) 
    121                 elif timeout: 
    122                     self._available.wait(timeout) 
    123                 else: 
     85            # First choice: Return the same cnx already used by the thread 
     86            if (tid, key) in self._active: 
     87                cnx, num = self._active[(tid, key)] 
     88                num += 1 
     89            else: 
     90                if self._waiters == 0: 
     91                    cnx = self._take_cnx(connector, kwargs, key, tid) 
     92                if not cnx: 
     93                    self._waiters += 1 
    12494                    self._available.wait() 
     95                    self._waiters -= 1 
     96                    cnx = self._take_cnx(connector, kwargs, key, tid) 
     97                num = 1 
     98            if cnx: 
     99                self._active[(tid, key)] = (cnx, num) 
     100                return PooledConnection(self, cnx, key, tid, log) 
     101            else: 
     102                # if we didn't get a cnx after wait() something's fishy... 
     103                timeout = time.time() - start 
     104                raise TimeoutError(_('Unable to get database ' 
     105                                     'connection within %(time)d ' 
     106                                     'seconds', time=timeout)) 
    125107        finally: 
    126108            self._available.release() 
    127109 
     110    def _take_cnx(self, connector, kwargs, key, tid): 
     111        """Note: _available lock must be held when calling this method.""" 
     112        create = False 
     113        # Second best option: Reuse a live pooled connection 
     114        if key in self._pool_key: 
     115            idx = self._pool_key.index(key) 
     116            self._pool_key.pop(idx) 
     117            self._pool_time.pop(idx) 
     118            cnx = self._pool.pop(idx) 
     119            # If possible, verify that the pooled connection is 
     120            # still available and working. 
     121            if hasattr(cnx, 'ping'): 
     122                try: 
     123                    cnx.ping() 
     124                except: 
     125                    cnx = self._take_cnx(key, tid) 
     126            return cnx 
     127        # Third best option: Create a new connection 
     128        elif len(self._active) + len(self._pool) < self._maxsize: 
     129            create = True 
     130        # Forth best option: Replace a pooled connection with a new one 
     131        elif len(self._active) < self._maxsize: 
     132            # Remove the LRU connection in the pool 
     133            self._pool.pop(0).close() 
     134            self._pool_key.pop(0) 
     135            self._pool_time.pop(0) 
     136            create = True 
     137        if create: 
     138            return connector.get_connection(**kwargs) 
     139 
    128140    def _return_cnx(self, cnx, key, tid): 
    129141        self._available.acquire() 
    130142        try: