diff --git a/agkyra/syncer/messaging.py b/agkyra/syncer/messaging.py index 2305532f5acdc352aaf1c4b16fd579376a4e1f5a..f83c123e8e0ad49d79f14b32ebe110d91fa44268 100644 --- a/agkyra/syncer/messaging.py +++ b/agkyra/syncer/messaging.py @@ -44,6 +44,7 @@ class UpdateMessage(Message): Message.__init__(self, *args, **kwargs) self.archive = kwargs["archive"] self.objname = kwargs["objname"] + self.old_serial = kwargs["old_serial"] self.serial = kwargs["serial"] self.logger.info("Updating archive: %s, object: '%s', serial: %s" % (self.archive, self.objname, self.serial)) diff --git a/agkyra/syncer/syncer.py b/agkyra/syncer/syncer.py index 2b94681571f9b7d3daebd7afafc8902ed9779c35..10f31a7378fe7984a91489d0599ae8ae96e186bb 100644 --- a/agkyra/syncer/syncer.py +++ b/agkyra/syncer/syncer.py @@ -127,7 +127,8 @@ class FileSyncer(object): return if db_state.serial != ref_state.serial: msg = messaging.AlreadyProbedMessage( - archive=archive, objname=objname, serial=serial, logger=logger) + archive=archive, objname=objname, serial=db_state.serial, + logger=logger) self.messager.put(msg) return live_state = client.probe_file(objname, db_state, ref_state, ident) @@ -139,9 +140,6 @@ class FileSyncer(object): archive = live_state.archive objname = live_state.objname serial = live_state.serial - msg = messaging.UpdateMessage( - archive=archive, objname=objname, serial=serial, logger=logger) - self.messager.put(msg) db_state = db.get_state(archive, objname) if db_state and db_state.serial != serial: logger.warning( @@ -153,6 +151,10 @@ class FileSyncer(object): new_serial = db.new_serial(objname) new_state = live_state.set(serial=new_serial) db.put_state(new_state) + msg = messaging.UpdateMessage( + archive=archive, objname=objname, + serial=new_serial, old_serial=serial, logger=logger) + self.messager.put(msg) if new_serial == 0: sync_state = common.FileState( archive=self.SYNC, objname=objname, serial=-1, diff --git a/test.py b/test.py index a6e9efa09c168f651ba7cf426525d116b98e4a6c..5c2968d2d92dd7230643c5340cee836957f00e83 100644 --- a/test.py +++ b/test.py @@ -105,7 +105,7 @@ class AgkyraTest(unittest.TestCase): self.s.probe_file(self.s.MASTER, f1) m = self.assert_message(messaging.UpdateMessage) self.assertEqual(m.archive, self.s.MASTER) - self.assertEqual(m.serial, -1) + self.assertEqual(m.serial, 0) state = self.db.get_state(self.s.MASTER, f1) self.assertEqual(state.serial, 0) @@ -138,12 +138,12 @@ class AgkyraTest(unittest.TestCase): def test_002_conflict(self): fil = "f002" - # update local file + # local file fil_local_content = "local" with open(self.get_path(fil), "w") as f: f.write(fil_local_content) - # update upstream + # upstream fil_upstream_content = "upstream" r = self.pithos.upload_from_string( fil, fil_upstream_content) @@ -250,13 +250,24 @@ class AgkyraTest(unittest.TestCase): fil = "f005" f_path = self.get_path(fil) open(f_path, 'a').close() + self.s.probe_file(self.s.SLAVE, fil) + self.s.decide_file_sync(fil) + self.assert_message(messaging.UpdateMessage) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + r = self.pithos.object_put( fil, content_type='application/directory', content_length=0) inner_fil = "f005/in005" inner_fil_content = "ff1 in dir " r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) + inner_fil2 = "f005/in2005" + inner_fil2_content = "inner2 in dir " + r1 = self.pithos.upload_from_string(inner_fil2, inner_fil2_content) self.s.probe_file(self.s.MASTER, fil) self.s.probe_file(self.s.MASTER, inner_fil) + self.s.probe_file(self.s.MASTER, inner_fil2) + self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage) self.assert_message(messaging.UpdateMessage) @@ -275,6 +286,14 @@ class AgkyraTest(unittest.TestCase): self.assert_message(messaging.SyncMessage) self.assert_message(messaging.SyncErrorMessage) + # but if we fist sync the dir, it's ok + self.s.decide_file_sync(fil) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + self.s.decide_file_sync(inner_fil) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + def test_006_heartbeat(self): fil = "f006" f_path = self.get_path(fil) @@ -288,6 +307,10 @@ class AgkyraTest(unittest.TestCase): self.s.decide_file_sync(fil) self.assert_message(messaging.HeartbeatNoDecideMessage) self.assert_message(messaging.AckSyncMessage) + with open(f_path, 'w') as f: + f.write("new") + self.s.probe_file(self.s.SLAVE, fil) + self.assert_message(messaging.UpdateMessage) def test_007_multiprobe(self): fil = "f007" @@ -300,36 +323,38 @@ class AgkyraTest(unittest.TestCase): self.s.probe_file(self.s.SLAVE, fil) self.assert_message(messaging.AlreadyProbedMessage) + def test_008_dir_contents(self): + d = "d008" + d_path = self.get_path(d) + r = self.pithos.object_put( + d, content_type='application/directory', content_length=0) + inner_fil = "d008/inf008" + inner_fil_content = "fil in dir " + r1 = self.pithos.upload_from_string(inner_fil, inner_fil_content) + self.s.probe_file(self.s.MASTER, d) + m = self.assert_message(messaging.UpdateMessage) + master_serial = m.serial + self.assertEqual(master_serial, 0) + self.s.probe_file(self.s.MASTER, inner_fil) + self.assert_message(messaging.UpdateMessage) + # this will also make the dir + self.s.decide_file_sync(inner_fil) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + self.assertTrue(os.path.isdir(d_path)) + self.s.probe_file(self.s.SLAVE, d) + m = self.assert_message(messaging.UpdateMessage) + slave_serial = m.serial + self.assertEqual(slave_serial, 1) + self.s.decide_file_sync(d) + self.assert_message(messaging.SyncMessage) + self.assert_message(messaging.AckSyncMessage) + state = self.db.get_state(self.s.SLAVE, d) + self.assertEqual(state.serial, master_serial) + if __name__ == '__main__': unittest.main() -print "SLEEPING 10" -time.sleep(10) - -# this will fail with serial mismatch -s.probe_file(s.MASTER, ff1) -s.decide_file_sync(ff1) - -assert_message(messaging.SyncMessage) -assert_message(messaging.SyncErrorMessage) - -print "SLEEPING 11" -time.sleep(11) - -# locally remove f1 to allow a dir to be created -os.unlink(f1_path) -s.decide_file_sync(ff1) - -assert_message(messaging.SyncMessage) -assert_message(messaging.AckSyncMessage) - -# also fix the dir -s.probe_file(s.SLAVE, f1) -assert_message(messaging.UpdateMessage) -s.decide_file_sync(f1) -assert_message(messaging.SyncMessage) -assert_message(messaging.AckSyncMessage) - # ln1 is a file; let a dir be upstream r = pithos.object_put( ln1, content_type='application/directory',