Edgewall Software

Ticket #4586: sync_using_youngest_rev_metadata-r4871.diff

File sync_using_youngest_rev_metadata-r4871.diff, 22.2 KB (added by cboos, 21 months ago)

Reworked CachedRepository.sync in order to make it faster and more robust in case of concurrent resync attempts.

  • trac/versioncontrol/api.py

     
    136136        """Close the connection to the repository.""" 
    137137        raise NotImplementedError 
    138138 
    139     def clear(self): 
    140         """Clear any data that may have been cached in instance properties.""" 
     139    def clear(self, youngest_rev=None): 
     140        """Clear any data that may have been cached in instance properties. 
     141 
     142        `youngest_rev` can be specified as a way to force the value 
     143        of the `youngest_rev` property (''will change in 0.12''). 
     144        """ 
    141145        pass 
    142146 
    143147    def get_quickjump_entries(self, rev): 
     
    221225        The way revisions are sequenced is version control specific. 
    222226        By default, one assumes that the revisions are sequenced in time 
    223227        (... which is ''not'' correct for most VCS, including Subversion). 
     228 
     229        (Deprecated, will not be used anymore in Trac 0.12) 
    224230        """ 
    225231        cursor = db.cursor() 
    226232        cursor.execute("SELECT rev FROM revision ORDER BY time DESC LIMIT 1") 
  • trac/versioncontrol/tests/cache.py

     
    1717from datetime import datetime 
    1818 
    1919from trac.log import logger_factory 
    20 from trac.test import Mock, InMemoryDatabase 
     20from trac.test import Mock, InMemoryDatabase, EnvironmentStub 
    2121from trac.util.datefmt import to_timestamp, utc 
    2222from trac.versioncontrol import Repository, Changeset, Node 
    2323from trac.versioncontrol.cache import CachedRepository 
     
    2929class CacheTestCase(unittest.TestCase): 
    3030 
    3131    def setUp(self): 
    32         self.db = InMemoryDatabase() 
     32        self.env = EnvironmentStub() 
     33        self.db = self.env.get_db_cnx() 
    3334        self.log = logger_factory('test') 
     35        cursor = self.db.cursor() 
     36        cursor.execute("INSERT INTO system (name, value) VALUES (%s,%s)", 
     37                       ('youngest_rev', '')) 
    3438 
    3539    def test_initial_sync_with_empty_repos(self): 
    3640        t = datetime(2001, 1, 1, 1, 1, 1, 0, utc) 
     
    4246                     get_youngest_rev=lambda: 0, 
    4347                     normalize_rev=lambda x: x, 
    4448                     next_rev=lambda x: None) 
    45         cache = CachedRepository(self.db, repos, None, self.log) 
     49        cache = CachedRepository(self.env, repos, None, self.log) 
    4650        cache.sync() 
    4751 
    4852        cursor = self.db.cursor() 
     
    6670                     get_youngest_rev=lambda: 1, 
    6771                     normalize_rev=lambda x: x, 
    6872                     next_rev=lambda x: int(x) == 0 and 1 or None) 
    69         cache = CachedRepository(self.db, repos, None, self.log) 
     73        cache = CachedRepository(self.env, repos, None, self.log) 
    7074        cache.sync() 
    7175 
    7276        cursor = self.db.cursor() 
     
    96100                           "VALUES ('1',%s,%s,%s,%s,%s)", 
    97101                           [('trunk', 'D', 'A', None, None), 
    98102                            ('trunk/README', 'F', 'A', None, None)]) 
     103        cursor.execute("UPDATE system SET value='1' WHERE name='youngest_rev'") 
    99104 
    100105        changes = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README', 1)] 
    101106        changeset = Mock(Changeset, 2, 'Update', 'joe', t3, 
     
    103108        repos = Mock(Repository, 'test-repos', None, self.log, 
    104109                     get_changeset=lambda x: changeset, 
    105110                     get_youngest_rev=lambda: 2, 
    106                      next_rev=lambda x: int(x) == 1 and 2 or None) 
    107         cache = CachedRepository(self.db, repos, None, self.log) 
     111                     get_oldest_rev=lambda: 0, 
     112                     normalize_rev=lambda x: x,                     
     113                     next_rev=lambda x: x and int(x) == 1 and 2 or None) 
     114        cache = CachedRepository(self.env, repos, None, self.log) 
    108115        cache.sync() 
    109116 
    110117        cursor = self.db.cursor() 
     
    130137                           "VALUES ('1',%s,%s,%s,%s,%s)", 
    131138                           [('trunk', 'D', 'A', None, None), 
    132139                            ('trunk/README', 'F', 'A', None, None)]) 
     140        cursor.execute("UPDATE system SET value='1' WHERE name='youngest_rev'") 
    133141 
    134142        repos = Mock(Repository, 'test-repos', None, self.log, 
    135143                     get_changeset=lambda x: None, 
    136144                     get_youngest_rev=lambda: 1, 
    137                      next_rev=lambda x: None, normalize_rev=lambda rev: rev) 
    138         cache = CachedRepository(self.db, repos, None, self.log) 
     145                     get_oldest_rev=lambda: 0, 
     146                     next_rev=lambda x: None, 
     147                     normalize_rev=lambda rev: rev) 
     148        cache = CachedRepository(self.env, repos, None, self.log) 
    139149        self.assertEqual('1', cache.youngest_rev) 
    140150        changeset = cache.get_changeset(1) 
    141151        self.assertEqual('joe', changeset.author) 
  • trac/versioncontrol/svn_fs.py

     
    276276        repos = SubversionRepository(dir, None, self.log, 
    277277                                     {'tags': self.tags, 
    278278                                      'branches': self.branches}) 
    279         crepos = CachedRepository(self.env.get_db_cnx(), repos, None, self.log) 
     279        crepos = CachedRepository(self.env, repos, None, self.log) 
    280280        if authname: 
    281281            authz = SubversionAuthorizer(self.env, crepos, authname) 
    282282            repos.authz = crepos.authz = authz 
     
    392392        assert self.scope[0] == '/' 
    393393        self.clear() 
    394394 
    395     def clear(self): 
     395    def clear(self, youngest_rev=None): 
    396396        self.youngest = None 
     397        if youngest_rev is not None: 
     398            self.youngest = self.normalize_rev(youngest_rev) 
    397399        self.oldest = None 
    398400 
    399401    def __del__(self): 
  • trac/versioncontrol/cache.py

     
    1616 
    1717from datetime import datetime 
    1818 
    19 from trac.core import TracError 
     19from trac.core import * 
     20from trac.env import IEnvironmentSetupParticipant 
    2021from trac.util.datefmt import utc, to_timestamp 
    2122from trac.versioncontrol import Changeset, Node, Repository, Authorizer, \ 
    2223                                NoSuchChangeset 
     
    2728              'D': Changeset.DELETE, 'E': Changeset.EDIT, 
    2829              'M': Changeset.MOVE} 
    2930 
     31CACHE_REPOSITORY_DIR = 'repository_dir' 
     32CACHE_YOUNGEST_REV = 'youngest_rev' 
    3033 
     34def get_cache_metadata(db): 
     35    """Retrieve the repository cache metadata from 'system' table.""" 
     36    cursor = db.cursor() 
     37    cursor.execute("SELECT name, value FROM system " 
     38                   "WHERE name IN ('%s', '%s')" % 
     39                   (CACHE_REPOSITORY_DIR, CACHE_YOUNGEST_REV)) 
     40    metadata = {} 
     41    for name, value in cursor: 
     42        metadata[name] = value 
     43    return metadata 
     44    
     45 
    3146class CachedRepository(Repository): 
    3247 
    33     def __init__(self, db, repos, authz, log): 
     48    def __init__(self, env, repos, authz, log): 
     49        # Note: don't store the db connection anymore, 
     50        # since CachedRepository are themselves cached 
     51        self.env = env 
    3452        Repository.__init__(self, repos.name, authz, log) 
    35         self.db = db 
    3653        self.repos = repos 
    37         try: 
     54        if CacheSetup(self.env).upgrade_in_progress: 
     55            self.log.info("Skipping sync during upgrade") 
     56        else: 
    3857            self.sync() 
    39         except TracError: 
    40             raise 
    41         except Exception, e: # most probably 2 concurrent resync attempts 
    42             log.warning('Error during sync(): %s' % e)  
    4358 
    4459    def close(self): 
    4560        self.repos.close() 
     
    5065 
    5166    def get_changeset(self, rev): 
    5267        return CachedChangeset(self.repos, self.repos.normalize_rev(rev), 
    53                                self.db, self.authz) 
     68                               self.env, self.authz) 
    5469 
    5570    def get_changesets(self, start, stop): 
    56         cursor = self.db.cursor() 
     71        db = self.env.get_db_cnx()         
     72        cursor = db.cursor() 
    5773        cursor.execute("SELECT rev FROM revision " 
    5874                       "WHERE time >= %s AND time < %s " 
    59                        "ORDER BY time", (to_timestamp(start), to_timestamp(stop))) 
     75                       "ORDER BY time", 
     76                       (to_timestamp(start), to_timestamp(stop))) 
    6077        for rev, in cursor: 
    6178            if self.authz.has_permission_for_changeset(rev): 
    6279                yield self.get_changeset(rev) 
    6380 
    6481    def sync(self): 
    65         cursor = self.db.cursor() 
    66  
    67         # -- repository used for populating the cache 
    68         cursor.execute("SELECT value FROM system WHERE name='repository_dir'") 
    69         for previous_repository_dir, in cursor: 
    70             if previous_repository_dir != self.name: 
     82        db = self.env.get_db_cnx()         
     83        metadata = get_cache_metadata(db) 
     84        cursor = db.cursor() 
     85         
     86        # -- check that we're populating the cache for the correct repository 
     87        repository_dir = metadata.get(CACHE_REPOSITORY_DIR) 
     88        if repository_dir: 
     89            if repository_dir != self.name: 
    7190                raise TracError("The 'repository_dir' has changed, " 
    7291                                "a 'trac-admin resync' operation is needed.") 
    73             break 
    7492        else: # no 'repository_dir' stored yet, assume everything's OK 
    75             cursor.execute("INSERT INTO system (name,value) " 
    76                            "VALUES ('repository_dir',%s)", (self.name,)) 
     93            cursor.execute("INSERT INTO system (name,value) VALUES (%s,%s)", 
     94                           (CACHE_REPOSITORY_DIR, self.name,)) 
    7795 
     96        # -- check the latest version stored against the latest in repository 
     97        if CACHE_YOUNGEST_REV not in metadata: 
     98            raise TracError("The repository cache metadata has changed, " 
     99                            " a 'trac-admin upgrade' operation is needed.") 
     100 
     101        self.youngest = metadata[CACHE_YOUNGEST_REV] 
    78102        self.repos.clear() 
    79         youngest_stored = self.repos.get_youngest_rev_in_cache(self.db) 
     103        repos_youngest = self.repos.youngest_rev 
    80104 
    81         if youngest_stored != str(self.repos.youngest_rev): 
     105        if self.youngest != str(repos_youngest): # must try to resync 
     106            if self.youngest: 
     107                next_youngest = self.repos.next_rev(self.youngest) 
     108            else: 
     109                next_youngest = None 
     110                try: 
     111                    next_youngest = self.repos.oldest_rev 
     112                    next_youngest = self.repos.normalize_rev(next_youngest) 
     113                except TracError: 
     114                    pass 
     115 
     116            if next_youngest is None: # nothing to cache yet 
     117                return 
     118 
     119            # 0. first check if there's no (obvious) resync in progress 
     120            cursor.execute("SELECT rev FROM revision WHERE rev=%s", 
     121                           (str(next_youngest),)) 
     122            for rev, in cursor: 
     123                # already there, but in progress, so keep ''previous'' 
     124                # notion of 'youngest' 
     125                self.repos.clear(youngest_rev=self.youngest) 
     126                return 
     127 
     128            # 1. prepare for resyncing 
     129            #    (there still might be a race condition at this point) 
     130 
    82131            authz = self.repos.authz 
    83132            self.repos.authz = Authorizer() # remove permission checking 
    84133 
    85134            kindmap = dict(zip(_kindmap.values(), _kindmap.keys())) 
    86135            actionmap = dict(zip(_actionmap.values(), _actionmap.keys())) 
    87             self.log.info("Syncing with repository (%s to %s)" 
    88                           % (youngest_stored, self.repos.youngest_rev)) 
    89             if youngest_stored: 
    90                 current_rev = self.repos.next_rev(youngest_stored) 
    91             else: 
    92                 try: 
    93                     current_rev = self.repos.oldest_rev 
    94                     current_rev = self.repos.normalize_rev(current_rev) 
    95                 except TracError: 
    96                     current_rev = None 
    97             while current_rev is not None: 
    98                 changeset = self.repos.get_changeset(current_rev) 
    99                 cursor.execute("INSERT INTO revision (rev,time,author,message) " 
    100                                "VALUES (%s,%s,%s,%s)", (str(current_rev), 
    101                                                         to_timestamp(changeset.date), 
    102                                                         changeset.author, 
    103                                                         changeset.message)) 
    104                 for path,kind,action,base_path,base_rev in changeset.get_changes(): 
    105                     self.log.debug("Caching node change in [%s]: %s" 
    106                                    % (current_rev, (path, kind, action, 
    107                                       base_path, base_rev))) 
    108                     kind = kindmap[kind] 
    109                     action = actionmap[action] 
    110                     cursor.execute("INSERT INTO node_change (rev,path," 
    111                                    "node_type,change_type,base_path,base_rev) " 
    112                                    "VALUES (%s,%s,%s,%s,%s,%s)", 
    113                                    (str(current_rev), path, kind, action, 
    114                                    base_path, base_rev)) 
    115                 current_rev = self.repos.next_rev(current_rev) 
    116             self.db.commit() 
    117             self.repos.authz = authz # restore permission checking 
    118136 
     137            try: 
     138                while next_youngest is not None: 
     139                     
     140                    # 1.1 Attempt to resync the 'revision' table 
     141                    self.log.info("Trying to sync revision [%s]" % 
     142                                  next_youngest) 
     143                    cset = self.repos.get_changeset(next_youngest) 
     144                    try: 
     145                        cursor.execute("INSERT INTO revision " 
     146                                       " (rev,time,author,message) " 
     147                                       "VALUES (%s,%s,%s,%s)", 
     148                                       (str(next_youngest), 
     149                                        to_timestamp(cset.date), 
     150                                        cset.author, cset.message)) 
     151                        db.commit() 
     152                    except Exception, e: # *another* 1.1. resync attempt won  
     153                        log.warning('Revision %s already cached: %s' % e) 
     154                        # also potentially in progress, so keep ''previous'' 
     155                        # notion of 'youngest' 
     156                        return 
     157 
     158                    # 1.2. now *only* one process was able to get there 
     159                    #      (i.e. there *shouldn't* be any race condition here) 
     160 
     161                    self.youngest = str(next_youngest) 
     162 
     163                    for path,kind,action,bpath,brev in cset.get_changes(): 
     164                        self.log.debug("Caching node change in [%s]: %s" 
     165                                       % (next_youngest, 
     166                                          (path,kind,action,bpath,brev))) 
     167                        kind = kindmap[kind] 
     168                        action = actionmap[action] 
     169                        cursor.execute("INSERT INTO node_change " 
     170                                       " (rev,path,node_type,change_type, " 
     171                                       "  base_path,base_rev) " 
     172                                       "VALUES (%s,%s,%s,%s,%s,%s)", 
     173                                       (self.youngest, 
     174                                        path, kind, action, bpath, brev)) 
     175 
     176                    # 1.3. iterate (1.1 should always succeed now) 
     177                    next_youngest = self.repos.next_rev(next_youngest) 
     178 
     179                # 2. update 'youngest_rev' metadata (minimize failures at 0.) 
     180                cursor.execute("UPDATE system SET value=%s WHERE name=%s", 
     181                               (self.youngest, CACHE_YOUNGEST_REV)) 
     182                db.commit() 
     183            finally: 
     184                # 3. restore permission checking (after 1.) 
     185                self.repos.authz = authz 
     186 
    119187    def get_node(self, path, rev=None): 
    120188        return self.repos.get_node(path, rev) 
    121189 
     
    126194        return self.repos.oldest_rev 
    127195 
    128196    def get_youngest_rev(self): 
    129         return self.repos.get_youngest_rev_in_cache(self.db) 
     197        return self.youngest 
    130198 
    131199    def previous_rev(self, rev): 
    132200        return self.repos.previous_rev(rev) 
     
    146214    def normalize_rev(self, rev): 
    147215        return self.repos.normalize_rev(rev) 
    148216 
    149     def get_changes(self, old_path, old_rev, new_path, new_rev, ignore_ancestry=1): 
    150         return self.repos.get_changes(old_path, old_rev, new_path, new_rev, ignore_ancestry) 
     217    def get_changes(self, old_path, old_rev, new_path, new_rev, 
     218                    ignore_ancestry=1): 
     219        return self.repos.get_changes(old_path, old_rev, new_path, new_rev, 
     220                                      ignore_ancestry) 
    151221 
    152222 
    153223class CachedChangeset(Changeset): 
    154224 
    155     def __init__(self, repos, rev, db, authz): 
     225    def __init__(self, repos, rev, env, authz): 
    156226        self.repos = repos 
    157         self.db = db 
     227        self.db = env.get_db_cnx() 
    158228        self.authz = authz 
    159229        cursor = self.db.cursor() 
    160230        cursor.execute("SELECT time,author,message FROM revision " 
     
    182252 
    183253    def get_properties(self): 
    184254        return self.repos.get_changeset(self.rev).get_properties() 
     255 
     256 
     257class CacheSetup(Component): 
     258    implements(IEnvironmentSetupParticipant) 
     259 
     260    upgrade_in_progress = False 
     261 
     262    # IEnvironmentSetupParticipant methods 
     263 
     264    def environment_created(self): 
     265        pass 
     266 
     267    def environment_needs_upgrade(self, db): 
     268        metadata = get_cache_metadata(db) 
     269        return CACHE_REPOSITORY_DIR in metadata and \ 
     270               CACHE_YOUNGEST_REV not in metadata 
     271 
     272    def upgrade_environment(self, db): 
     273        self.upgrade_in_progress = True 
     274        repos = self.env.get_repository() 
     275        self.upgrade_in_progress = False 
     276        value = repos.get_youngest_rev_in_cache(db) or '' 
     277        cursor = db.cursor() 
     278        cursor.execute("INSERT INTO system (name, value) VALUES (%s, %s)", 
     279                       (CACHE_YOUNGEST_REV, value)) 
     280        self.log.info('Upgraded cache metadata (youngest_rev=%s)' % value) 
  • trac/admin/console.py

     
    600600        cursor.execute("DELETE FROM revision") 
    601601        cursor.execute("DELETE FROM node_change") 
    602602        cursor.execute("DELETE FROM system WHERE name='repository_dir'") 
     603        cursor.execute("DELETE FROM system WHERE name='youngest_rev'") 
     604        cursor.execute("INSERT INTO system (name, value) " 
     605                       "VALUES ('youngest_rev', '')") 
    603606        repos = self.__env.get_repository() # this will do the sync() 
    604607        print 'Done.' 
    605608 
  • trac/web/api.py

     
    324324                    data = self.hdf.render(template) 
    325325 
    326326            if template.endswith('.html'): 
    327                 from trac.web.chrome import Chrome 
    328                 data = Chrome(env).render_template(self, template, data, 
    329                                                    'text/html') 
     327                if env: 
     328                    from trac.web.chrome import Chrome 
     329                    data = Chrome(env).render_template(self, template, data, 
     330                                                       'text/html') 
     331                else: 
     332                    content_type = 'text/plain' 
     333                    data = '%s\n\n%s: %s' % (data.get('title'), 
     334                                             data.get('type'), 
     335                                             data.get('message')) 
    330336        except: # failed to render 
    331337            data = get_last_traceback() 
    332338            content_type = 'text/plain' 
  • trac/web/main.py

     
    181181 
    182182        # Select the component that should handle the request 
    183183        chosen_handler = None 
    184         if not req.path_info or req.path_info == '/': 
    185             chosen_handler = self.default_handler 
    186         else: 
    187             for handler in self.handlers: 
    188                 if handler.match_request(req): 
    189                     chosen_handler = handler 
    190                     break 
    191         chosen_handler = self._pre_process_request(req, chosen_handler) 
     184        try: 
     185            if not req.path_info or req.path_info == '/': 
     186                chosen_handler = self.default_handler 
     187            else: 
     188                for handler in self.handlers: 
     189                    if handler.match_request(req): 
     190                        chosen_handler = handler 
     191                        break 
     192            chosen_handler = self._pre_process_request(req, chosen_handler) 
     193        except TracError, e: 
     194            chosen_handler = None 
    192195        if not chosen_handler: 
    193196            raise HTTPNotFound('No handler matched request to %s', 
    194197                               req.path_info) 
     
    407410                               'missing. Trac requires one of these options ' 
    408411                               'to locate the Trac environment(s).') 
    409412    run_once = environ['wsgi.run_once'] 
    410     env = _open_environment(env_path, run_once=run_once) 
    411413 
    412     if env.base_url: 
    413         environ['trac.base_url'] = env.base_url 
     414    env = env_error = None 
     415    try: 
     416        env = _open_environment(env_path, run_once=run_once) 
     417        if env.base_url: 
     418            environ['trac.base_url'] = env.base_url 
     419    except TracError, e: 
     420        env_error = e 
    414421 
    415422    req = Request(environ, start_response) 
    416423    try: 
     424        if not env and env_error: 
     425            raise HTTPInternalError(env_error.message) 
    417426        try: 
    418427            try: 
    419428                dispatcher = RequestDispatcher(env) 
     
    426435                env.shutdown(threading._get_ident()) 
    427436 
    428437    except HTTPException, e: 
    429         env.log.warn(e) 
     438        if env: 
     439            env.log.warn(e) 
    430440        title = e.reason or 'Error' 
    431441        data = {'title': title, 'type': 'TracError', 'message': e.message} 
    432442        try: