Ticket #4586: sync_using_youngest_rev_metadata-r4871.diff
| File sync_using_youngest_rev_metadata-r4871.diff, 22.2 KB (added by cboos, 21 months ago) |
|---|
-
trac/versioncontrol/api.py
136 136 """Close the connection to the repository.""" 137 137 raise NotImplementedError 138 138 139 def clear(self): 140 """Clear any data that may have been cached in instance properties.""" 139 def clear(self, youngest_rev=None): 140 """Clear any data that may have been cached in instance properties. 141 142 `youngest_rev` can be specified as a way to force the value 143 of the `youngest_rev` property (''will change in 0.12''). 144 """ 141 145 pass 142 146 143 147 def get_quickjump_entries(self, rev): … … 221 225 The way revisions are sequenced is version control specific. 222 226 By default, one assumes that the revisions are sequenced in time 223 227 (... which is ''not'' correct for most VCS, including Subversion). 228 229 (Deprecated, will not be used anymore in Trac 0.12) 224 230 """ 225 231 cursor = db.cursor() 226 232 cursor.execute("SELECT rev FROM revision ORDER BY time DESC LIMIT 1") -
trac/versioncontrol/tests/cache.py
17 17 from datetime import datetime 18 18 19 19 from trac.log import logger_factory 20 from trac.test import Mock, InMemoryDatabase 20 from trac.test import Mock, InMemoryDatabase, EnvironmentStub 21 21 from trac.util.datefmt import to_timestamp, utc 22 22 from trac.versioncontrol import Repository, Changeset, Node 23 23 from trac.versioncontrol.cache import CachedRepository … … 29 29 class CacheTestCase(unittest.TestCase): 30 30 31 31 def setUp(self): 32 self.db = InMemoryDatabase() 32 self.env = EnvironmentStub() 33 self.db = self.env.get_db_cnx() 33 34 self.log = logger_factory('test') 35 cursor = self.db.cursor() 36 cursor.execute("INSERT INTO system (name, value) VALUES (%s,%s)", 37 ('youngest_rev', '')) 34 38 35 39 def test_initial_sync_with_empty_repos(self): 36 40 t = datetime(2001, 1, 1, 1, 1, 1, 0, utc) … … 42 46 get_youngest_rev=lambda: 0, 43 47 normalize_rev=lambda x: x, 44 48 next_rev=lambda x: None) 45 cache = CachedRepository(self. db, repos, None, self.log)49 cache = CachedRepository(self.env, repos, None, self.log) 46 50 cache.sync() 47 51 48 52 cursor = self.db.cursor() … … 66 70 get_youngest_rev=lambda: 1, 67 71 normalize_rev=lambda x: x, 68 72 next_rev=lambda x: int(x) == 0 and 1 or None) 69 cache = CachedRepository(self. db, repos, None, self.log)73 cache = CachedRepository(self.env, repos, None, self.log) 70 74 cache.sync() 71 75 72 76 cursor = self.db.cursor() … … 96 100 "VALUES ('1',%s,%s,%s,%s,%s)", 97 101 [('trunk', 'D', 'A', None, None), 98 102 ('trunk/README', 'F', 'A', None, None)]) 103 cursor.execute("UPDATE system SET value='1' WHERE name='youngest_rev'") 99 104 100 105 changes = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README', 1)] 101 106 changeset = Mock(Changeset, 2, 'Update', 'joe', t3, … … 103 108 repos = Mock(Repository, 'test-repos', None, self.log, 104 109 get_changeset=lambda x: changeset, 105 110 get_youngest_rev=lambda: 2, 106 next_rev=lambda x: int(x) == 1 and 2 or None) 107 cache = CachedRepository(self.db, repos, None, self.log) 111 get_oldest_rev=lambda: 0, 112 normalize_rev=lambda x: x, 113 next_rev=lambda x: x and int(x) == 1 and 2 or None) 114 cache = CachedRepository(self.env, repos, None, self.log) 108 115 cache.sync() 109 116 110 117 cursor = self.db.cursor() … … 130 137 "VALUES ('1',%s,%s,%s,%s,%s)", 131 138 [('trunk', 'D', 'A', None, None), 132 139 ('trunk/README', 'F', 'A', None, None)]) 140 cursor.execute("UPDATE system SET value='1' WHERE name='youngest_rev'") 133 141 134 142 repos = Mock(Repository, 'test-repos', None, self.log, 135 143 get_changeset=lambda x: None, 136 144 get_youngest_rev=lambda: 1, 137 next_rev=lambda x: None, normalize_rev=lambda rev: rev) 138 cache = CachedRepository(self.db, repos, None, self.log) 145 get_oldest_rev=lambda: 0, 146 next_rev=lambda x: None, 147 normalize_rev=lambda rev: rev) 148 cache = CachedRepository(self.env, repos, None, self.log) 139 149 self.assertEqual('1', cache.youngest_rev) 140 150 changeset = cache.get_changeset(1) 141 151 self.assertEqual('joe', changeset.author) -
trac/versioncontrol/svn_fs.py
276 276 repos = SubversionRepository(dir, None, self.log, 277 277 {'tags': self.tags, 278 278 'branches': self.branches}) 279 crepos = CachedRepository(self.env .get_db_cnx(), repos, None, self.log)279 crepos = CachedRepository(self.env, repos, None, self.log) 280 280 if authname: 281 281 authz = SubversionAuthorizer(self.env, crepos, authname) 282 282 repos.authz = crepos.authz = authz … … 392 392 assert self.scope[0] == '/' 393 393 self.clear() 394 394 395 def clear(self ):395 def clear(self, youngest_rev=None): 396 396 self.youngest = None 397 if youngest_rev is not None: 398 self.youngest = self.normalize_rev(youngest_rev) 397 399 self.oldest = None 398 400 399 401 def __del__(self): -
trac/versioncontrol/cache.py
16 16 17 17 from datetime import datetime 18 18 19 from trac.core import TracError 19 from trac.core import * 20 from trac.env import IEnvironmentSetupParticipant 20 21 from trac.util.datefmt import utc, to_timestamp 21 22 from trac.versioncontrol import Changeset, Node, Repository, Authorizer, \ 22 23 NoSuchChangeset … … 27 28 'D': Changeset.DELETE, 'E': Changeset.EDIT, 28 29 'M': Changeset.MOVE} 29 30 31 CACHE_REPOSITORY_DIR = 'repository_dir' 32 CACHE_YOUNGEST_REV = 'youngest_rev' 30 33 34 def get_cache_metadata(db): 35 """Retrieve the repository cache metadata from 'system' table.""" 36 cursor = db.cursor() 37 cursor.execute("SELECT name, value FROM system " 38 "WHERE name IN ('%s', '%s')" % 39 (CACHE_REPOSITORY_DIR, CACHE_YOUNGEST_REV)) 40 metadata = {} 41 for name, value in cursor: 42 metadata[name] = value 43 return metadata 44 45 31 46 class CachedRepository(Repository): 32 47 33 def __init__(self, db, repos, authz, log): 48 def __init__(self, env, repos, authz, log): 49 # Note: don't store the db connection anymore, 50 # since CachedRepository are themselves cached 51 self.env = env 34 52 Repository.__init__(self, repos.name, authz, log) 35 self.db = db36 53 self.repos = repos 37 try: 54 if CacheSetup(self.env).upgrade_in_progress: 55 self.log.info("Skipping sync during upgrade") 56 else: 38 57 self.sync() 39 except TracError:40 raise41 except Exception, e: # most probably 2 concurrent resync attempts42 log.warning('Error during sync(): %s' % e)43 58 44 59 def close(self): 45 60 self.repos.close() … … 50 65 51 66 def get_changeset(self, rev): 52 67 return CachedChangeset(self.repos, self.repos.normalize_rev(rev), 53 self. db, self.authz)68 self.env, self.authz) 54 69 55 70 def get_changesets(self, start, stop): 56 cursor = self.db.cursor() 71 db = self.env.get_db_cnx() 72 cursor = db.cursor() 57 73 cursor.execute("SELECT rev FROM revision " 58 74 "WHERE time >= %s AND time < %s " 59 "ORDER BY time", (to_timestamp(start), to_timestamp(stop))) 75 "ORDER BY time", 76 (to_timestamp(start), to_timestamp(stop))) 60 77 for rev, in cursor: 61 78 if self.authz.has_permission_for_changeset(rev): 62 79 yield self.get_changeset(rev) 63 80 64 81 def sync(self): 65 cursor = self.db.cursor() 66 67 # -- repository used for populating the cache 68 cursor.execute("SELECT value FROM system WHERE name='repository_dir'") 69 for previous_repository_dir, in cursor: 70 if previous_repository_dir != self.name: 82 db = self.env.get_db_cnx() 83 metadata = get_cache_metadata(db) 84 cursor = db.cursor() 85 86 # -- check that we're populating the cache for the correct repository 87 repository_dir = metadata.get(CACHE_REPOSITORY_DIR) 88 if repository_dir: 89 if repository_dir != self.name: 71 90 raise TracError("The 'repository_dir' has changed, " 72 91 "a 'trac-admin resync' operation is needed.") 73 break74 92 else: # no 'repository_dir' stored yet, assume everything's OK 75 cursor.execute("INSERT INTO system (name,value) "76 "VALUES ('repository_dir',%s)", (self.name,))93 cursor.execute("INSERT INTO system (name,value) VALUES (%s,%s)", 94 (CACHE_REPOSITORY_DIR, self.name,)) 77 95 96 # -- check the latest version stored against the latest in repository 97 if CACHE_YOUNGEST_REV not in metadata: 98 raise TracError("The repository cache metadata has changed, " 99 " a 'trac-admin upgrade' operation is needed.") 100 101 self.youngest = metadata[CACHE_YOUNGEST_REV] 78 102 self.repos.clear() 79 youngest_stored = self.repos.get_youngest_rev_in_cache(self.db)103 repos_youngest = self.repos.youngest_rev 80 104 81 if youngest_stored != str(self.repos.youngest_rev): 105 if self.youngest != str(repos_youngest): # must try to resync 106 if self.youngest: 107 next_youngest = self.repos.next_rev(self.youngest) 108 else: 109 next_youngest = None 110 try: 111 next_youngest = self.repos.oldest_rev 112 next_youngest = self.repos.normalize_rev(next_youngest) 113 except TracError: 114 pass 115 116 if next_youngest is None: # nothing to cache yet 117 return 118 119 # 0. first check if there's no (obvious) resync in progress 120 cursor.execute("SELECT rev FROM revision WHERE rev=%s", 121 (str(next_youngest),)) 122 for rev, in cursor: 123 # already there, but in progress, so keep ''previous'' 124 # notion of 'youngest' 125 self.repos.clear(youngest_rev=self.youngest) 126 return 127 128 # 1. prepare for resyncing 129 # (there still might be a race condition at this point) 130 82 131 authz = self.repos.authz 83 132 self.repos.authz = Authorizer() # remove permission checking 84 133 85 134 kindmap = dict(zip(_kindmap.values(), _kindmap.keys())) 86 135 actionmap = dict(zip(_actionmap.values(), _actionmap.keys())) 87 self.log.info("Syncing with repository (%s to %s)"88 % (youngest_stored, self.repos.youngest_rev))89 if youngest_stored:90 current_rev = self.repos.next_rev(youngest_stored)91 else:92 try:93 current_rev = self.repos.oldest_rev94 current_rev = self.repos.normalize_rev(current_rev)95 except TracError:96 current_rev = None97 while current_rev is not None:98 changeset = self.repos.get_changeset(current_rev)99 cursor.execute("INSERT INTO revision (rev,time,author,message) "100 "VALUES (%s,%s,%s,%s)", (str(current_rev),101 to_timestamp(changeset.date),102 changeset.author,103 changeset.message))104 for path,kind,action,base_path,base_rev in changeset.get_changes():105 self.log.debug("Caching node change in [%s]: %s"106 % (current_rev, (path, kind, action,107 base_path, base_rev)))108 kind = kindmap[kind]109 action = actionmap[action]110 cursor.execute("INSERT INTO node_change (rev,path,"111 "node_type,change_type,base_path,base_rev) "112 "VALUES (%s,%s,%s,%s,%s,%s)",113 (str(current_rev), path, kind, action,114 base_path, base_rev))115 current_rev = self.repos.next_rev(current_rev)116 self.db.commit()117 self.repos.authz = authz # restore permission checking118 136 137 try: 138 while next_youngest is not None: 139 140 # 1.1 Attempt to resync the 'revision' table 141 self.log.info("Trying to sync revision [%s]" % 142 next_youngest) 143 cset = self.repos.get_changeset(next_youngest) 144 try: 145 cursor.execute("INSERT INTO revision " 146 " (rev,time,author,message) " 147 "VALUES (%s,%s,%s,%s)", 148 (str(next_youngest), 149 to_timestamp(cset.date), 150 cset.author, cset.message)) 151 db.commit() 152 except Exception, e: # *another* 1.1. resync attempt won 153 log.warning('Revision %s already cached: %s' % e) 154 # also potentially in progress, so keep ''previous'' 155 # notion of 'youngest' 156 return 157 158 # 1.2. now *only* one process was able to get there 159 # (i.e. there *shouldn't* be any race condition here) 160 161 self.youngest = str(next_youngest) 162 163 for path,kind,action,bpath,brev in cset.get_changes(): 164 self.log.debug("Caching node change in [%s]: %s" 165 % (next_youngest, 166 (path,kind,action,bpath,brev))) 167 kind = kindmap[kind] 168 action = actionmap[action] 169 cursor.execute("INSERT INTO node_change " 170 " (rev,path,node_type,change_type, " 171 " base_path,base_rev) " 172 "VALUES (%s,%s,%s,%s,%s,%s)", 173 (self.youngest, 174 path, kind, action, bpath, brev)) 175 176 # 1.3. iterate (1.1 should always succeed now) 177 next_youngest = self.repos.next_rev(next_youngest) 178 179 # 2. update 'youngest_rev' metadata (minimize failures at 0.) 180 cursor.execute("UPDATE system SET value=%s WHERE name=%s", 181 (self.youngest, CACHE_YOUNGEST_REV)) 182 db.commit() 183 finally: 184 # 3. restore permission checking (after 1.) 185 self.repos.authz = authz 186 119 187 def get_node(self, path, rev=None): 120 188 return self.repos.get_node(path, rev) 121 189 … … 126 194 return self.repos.oldest_rev 127 195 128 196 def get_youngest_rev(self): 129 return self. repos.get_youngest_rev_in_cache(self.db)197 return self.youngest 130 198 131 199 def previous_rev(self, rev): 132 200 return self.repos.previous_rev(rev) … … 146 214 def normalize_rev(self, rev): 147 215 return self.repos.normalize_rev(rev) 148 216 149 def get_changes(self, old_path, old_rev, new_path, new_rev, ignore_ancestry=1): 150 return self.repos.get_changes(old_path, old_rev, new_path, new_rev, ignore_ancestry) 217 def get_changes(self, old_path, old_rev, new_path, new_rev, 218 ignore_ancestry=1): 219 return self.repos.get_changes(old_path, old_rev, new_path, new_rev, 220 ignore_ancestry) 151 221 152 222 153 223 class CachedChangeset(Changeset): 154 224 155 def __init__(self, repos, rev, db, authz):225 def __init__(self, repos, rev, env, authz): 156 226 self.repos = repos 157 self.db = db227 self.db = env.get_db_cnx() 158 228 self.authz = authz 159 229 cursor = self.db.cursor() 160 230 cursor.execute("SELECT time,author,message FROM revision " … … 182 252 183 253 def get_properties(self): 184 254 return self.repos.get_changeset(self.rev).get_properties() 255 256 257 class CacheSetup(Component): 258 implements(IEnvironmentSetupParticipant) 259 260 upgrade_in_progress = False 261 262 # IEnvironmentSetupParticipant methods 263 264 def environment_created(self): 265 pass 266 267 def environment_needs_upgrade(self, db): 268 metadata = get_cache_metadata(db) 269 return CACHE_REPOSITORY_DIR in metadata and \ 270 CACHE_YOUNGEST_REV not in metadata 271 272 def upgrade_environment(self, db): 273 self.upgrade_in_progress = True 274 repos = self.env.get_repository() 275 self.upgrade_in_progress = False 276 value = repos.get_youngest_rev_in_cache(db) or '' 277 cursor = db.cursor() 278 cursor.execute("INSERT INTO system (name, value) VALUES (%s, %s)", 279 (CACHE_YOUNGEST_REV, value)) 280 self.log.info('Upgraded cache metadata (youngest_rev=%s)' % value) -
trac/admin/console.py
600 600 cursor.execute("DELETE FROM revision") 601 601 cursor.execute("DELETE FROM node_change") 602 602 cursor.execute("DELETE FROM system WHERE name='repository_dir'") 603 cursor.execute("DELETE FROM system WHERE name='youngest_rev'") 604 cursor.execute("INSERT INTO system (name, value) " 605 "VALUES ('youngest_rev', '')") 603 606 repos = self.__env.get_repository() # this will do the sync() 604 607 print 'Done.' 605 608 -
trac/web/api.py
324 324 data = self.hdf.render(template) 325 325 326 326 if template.endswith('.html'): 327 from trac.web.chrome import Chrome 328 data = Chrome(env).render_template(self, template, data, 329 'text/html') 327 if env: 328 from trac.web.chrome import Chrome 329 data = Chrome(env).render_template(self, template, data, 330 'text/html') 331 else: 332 content_type = 'text/plain' 333 data = '%s\n\n%s: %s' % (data.get('title'), 334 data.get('type'), 335 data.get('message')) 330 336 except: # failed to render 331 337 data = get_last_traceback() 332 338 content_type = 'text/plain' -
trac/web/main.py
181 181 182 182 # Select the component that should handle the request 183 183 chosen_handler = None 184 if not req.path_info or req.path_info == '/': 185 chosen_handler = self.default_handler 186 else: 187 for handler in self.handlers: 188 if handler.match_request(req): 189 chosen_handler = handler 190 break 191 chosen_handler = self._pre_process_request(req, chosen_handler) 184 try: 185 if not req.path_info or req.path_info == '/': 186 chosen_handler = self.default_handler 187 else: 188 for handler in self.handlers: 189 if handler.match_request(req): 190 chosen_handler = handler 191 break 192 chosen_handler = self._pre_process_request(req, chosen_handler) 193 except TracError, e: 194 chosen_handler = None 192 195 if not chosen_handler: 193 196 raise HTTPNotFound('No handler matched request to %s', 194 197 req.path_info) … … 407 410 'missing. Trac requires one of these options ' 408 411 'to locate the Trac environment(s).') 409 412 run_once = environ['wsgi.run_once'] 410 env = _open_environment(env_path, run_once=run_once)411 413 412 if env.base_url: 413 environ['trac.base_url'] = env.base_url 414 env = env_error = None 415 try: 416 env = _open_environment(env_path, run_once=run_once) 417 if env.base_url: 418 environ['trac.base_url'] = env.base_url 419 except TracError, e: 420 env_error = e 414 421 415 422 req = Request(environ, start_response) 416 423 try: 424 if not env and env_error: 425 raise HTTPInternalError(env_error.message) 417 426 try: 418 427 try: 419 428 dispatcher = RequestDispatcher(env) … … 426 435 env.shutdown(threading._get_ident()) 427 436 428 437 except HTTPException, e: 429 env.log.warn(e) 438 if env: 439 env.log.warn(e) 430 440 title = e.reason or 'Error' 431 441 data = {'title': title, 'type': 'TracError', 'message': e.message} 432 442 try:
