Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
itminedu
agkyra
Commits
1cbf0b27
Commit
1cbf0b27
authored
May 21, 2015
by
Giorgos Korfiatis
Browse files
messaging and testing more
parent
36f5d30f
Changes
3
Hide whitespace changes
Inline
Side-by-side
agkyra/syncer/messaging.py
View file @
1cbf0b27
...
...
@@ -44,6 +44,7 @@ class UpdateMessage(Message):
Message
.
__init__
(
self
,
*
args
,
**
kwargs
)
self
.
archive
=
kwargs
[
"archive"
]
self
.
objname
=
kwargs
[
"objname"
]
self
.
old_serial
=
kwargs
[
"old_serial"
]
self
.
serial
=
kwargs
[
"serial"
]
self
.
logger
.
info
(
"Updating archive: %s, object: '%s', serial: %s"
%
(
self
.
archive
,
self
.
objname
,
self
.
serial
))
...
...
agkyra/syncer/syncer.py
View file @
1cbf0b27
...
...
@@ -127,7 +127,8 @@ class FileSyncer(object):
return
if
db_state
.
serial
!=
ref_state
.
serial
:
msg
=
messaging
.
AlreadyProbedMessage
(
archive
=
archive
,
objname
=
objname
,
serial
=
serial
,
logger
=
logger
)
archive
=
archive
,
objname
=
objname
,
serial
=
db_state
.
serial
,
logger
=
logger
)
self
.
messager
.
put
(
msg
)
return
live_state
=
client
.
probe_file
(
objname
,
db_state
,
ref_state
,
ident
)
...
...
@@ -139,9 +140,6 @@ class FileSyncer(object):
archive
=
live_state
.
archive
objname
=
live_state
.
objname
serial
=
live_state
.
serial
msg
=
messaging
.
UpdateMessage
(
archive
=
archive
,
objname
=
objname
,
serial
=
serial
,
logger
=
logger
)
self
.
messager
.
put
(
msg
)
db_state
=
db
.
get_state
(
archive
,
objname
)
if
db_state
and
db_state
.
serial
!=
serial
:
logger
.
warning
(
...
...
@@ -153,6 +151,10 @@ class FileSyncer(object):
new_serial
=
db
.
new_serial
(
objname
)
new_state
=
live_state
.
set
(
serial
=
new_serial
)
db
.
put_state
(
new_state
)
msg
=
messaging
.
UpdateMessage
(
archive
=
archive
,
objname
=
objname
,
serial
=
new_serial
,
old_serial
=
serial
,
logger
=
logger
)
self
.
messager
.
put
(
msg
)
if
new_serial
==
0
:
sync_state
=
common
.
FileState
(
archive
=
self
.
SYNC
,
objname
=
objname
,
serial
=-
1
,
...
...
test.py
View file @
1cbf0b27
...
...
@@ -105,7 +105,7 @@ class AgkyraTest(unittest.TestCase):
self
.
s
.
probe_file
(
self
.
s
.
MASTER
,
f1
)
m
=
self
.
assert_message
(
messaging
.
UpdateMessage
)
self
.
assertEqual
(
m
.
archive
,
self
.
s
.
MASTER
)
self
.
assertEqual
(
m
.
serial
,
-
1
)
self
.
assertEqual
(
m
.
serial
,
0
)
state
=
self
.
db
.
get_state
(
self
.
s
.
MASTER
,
f1
)
self
.
assertEqual
(
state
.
serial
,
0
)
...
...
@@ -138,12 +138,12 @@ class AgkyraTest(unittest.TestCase):
def
test_002_conflict
(
self
):
fil
=
"f002"
#
update
local file
# local file
fil_local_content
=
"local"
with
open
(
self
.
get_path
(
fil
),
"w"
)
as
f
:
f
.
write
(
fil_local_content
)
#
update
upstream
# upstream
fil_upstream_content
=
"upstream"
r
=
self
.
pithos
.
upload_from_string
(
fil
,
fil_upstream_content
)
...
...
@@ -250,13 +250,24 @@ class AgkyraTest(unittest.TestCase):
fil
=
"f005"
f_path
=
self
.
get_path
(
fil
)
open
(
f_path
,
'a'
).
close
()
self
.
s
.
probe_file
(
self
.
s
.
SLAVE
,
fil
)
self
.
s
.
decide_file_sync
(
fil
)
self
.
assert_message
(
messaging
.
UpdateMessage
)
self
.
assert_message
(
messaging
.
SyncMessage
)
self
.
assert_message
(
messaging
.
AckSyncMessage
)
r
=
self
.
pithos
.
object_put
(
fil
,
content_type
=
'application/directory'
,
content_length
=
0
)
inner_fil
=
"f005/in005"
inner_fil_content
=
"ff1 in dir "
r1
=
self
.
pithos
.
upload_from_string
(
inner_fil
,
inner_fil_content
)
inner_fil2
=
"f005/in2005"
inner_fil2_content
=
"inner2 in dir "
r1
=
self
.
pithos
.
upload_from_string
(
inner_fil2
,
inner_fil2_content
)
self
.
s
.
probe_file
(
self
.
s
.
MASTER
,
fil
)
self
.
s
.
probe_file
(
self
.
s
.
MASTER
,
inner_fil
)
self
.
s
.
probe_file
(
self
.
s
.
MASTER
,
inner_fil2
)
self
.
assert_message
(
messaging
.
UpdateMessage
)
self
.
assert_message
(
messaging
.
UpdateMessage
)
self
.
assert_message
(
messaging
.
UpdateMessage
)
...
...
@@ -275,6 +286,14 @@ class AgkyraTest(unittest.TestCase):
self
.
assert_message
(
messaging
.
SyncMessage
)
self
.
assert_message
(
messaging
.
SyncErrorMessage
)
# but if we fist sync the dir, it's ok
self
.
s
.
decide_file_sync
(
fil
)
self
.
assert_message
(
messaging
.
SyncMessage
)
self
.
assert_message
(
messaging
.
AckSyncMessage
)
self
.
s
.
decide_file_sync
(
inner_fil
)
self
.
assert_message
(
messaging
.
SyncMessage
)
self
.
assert_message
(
messaging
.
AckSyncMessage
)
def
test_006_heartbeat
(
self
):
fil
=
"f006"
f_path
=
self
.
get_path
(
fil
)
...
...
@@ -288,6 +307,10 @@ class AgkyraTest(unittest.TestCase):
self
.
s
.
decide_file_sync
(
fil
)
self
.
assert_message
(
messaging
.
HeartbeatNoDecideMessage
)
self
.
assert_message
(
messaging
.
AckSyncMessage
)
with
open
(
f_path
,
'w'
)
as
f
:
f
.
write
(
"new"
)
self
.
s
.
probe_file
(
self
.
s
.
SLAVE
,
fil
)
self
.
assert_message
(
messaging
.
UpdateMessage
)
def
test_007_multiprobe
(
self
):
fil
=
"f007"
...
...
@@ -300,36 +323,38 @@ class AgkyraTest(unittest.TestCase):
self
.
s
.
probe_file
(
self
.
s
.
SLAVE
,
fil
)
self
.
assert_message
(
messaging
.
AlreadyProbedMessage
)
def
test_008_dir_contents
(
self
):
d
=
"d008"
d_path
=
self
.
get_path
(
d
)
r
=
self
.
pithos
.
object_put
(
d
,
content_type
=
'application/directory'
,
content_length
=
0
)
inner_fil
=
"d008/inf008"
inner_fil_content
=
"fil in dir "
r1
=
self
.
pithos
.
upload_from_string
(
inner_fil
,
inner_fil_content
)
self
.
s
.
probe_file
(
self
.
s
.
MASTER
,
d
)
m
=
self
.
assert_message
(
messaging
.
UpdateMessage
)
master_serial
=
m
.
serial
self
.
assertEqual
(
master_serial
,
0
)
self
.
s
.
probe_file
(
self
.
s
.
MASTER
,
inner_fil
)
self
.
assert_message
(
messaging
.
UpdateMessage
)
# this will also make the dir
self
.
s
.
decide_file_sync
(
inner_fil
)
self
.
assert_message
(
messaging
.
SyncMessage
)
self
.
assert_message
(
messaging
.
AckSyncMessage
)
self
.
assertTrue
(
os
.
path
.
isdir
(
d_path
))
self
.
s
.
probe_file
(
self
.
s
.
SLAVE
,
d
)
m
=
self
.
assert_message
(
messaging
.
UpdateMessage
)
slave_serial
=
m
.
serial
self
.
assertEqual
(
slave_serial
,
1
)
self
.
s
.
decide_file_sync
(
d
)
self
.
assert_message
(
messaging
.
SyncMessage
)
self
.
assert_message
(
messaging
.
AckSyncMessage
)
state
=
self
.
db
.
get_state
(
self
.
s
.
SLAVE
,
d
)
self
.
assertEqual
(
state
.
serial
,
master_serial
)
if
__name__
==
'__main__'
:
unittest
.
main
()
print
"SLEEPING 10"
time
.
sleep
(
10
)
# this will fail with serial mismatch
s
.
probe_file
(
s
.
MASTER
,
ff1
)
s
.
decide_file_sync
(
ff1
)
assert_message
(
messaging
.
SyncMessage
)
assert_message
(
messaging
.
SyncErrorMessage
)
print
"SLEEPING 11"
time
.
sleep
(
11
)
# locally remove f1 to allow a dir to be created
os
.
unlink
(
f1_path
)
s
.
decide_file_sync
(
ff1
)
assert_message
(
messaging
.
SyncMessage
)
assert_message
(
messaging
.
AckSyncMessage
)
# also fix the dir
s
.
probe_file
(
s
.
SLAVE
,
f1
)
assert_message
(
messaging
.
UpdateMessage
)
s
.
decide_file_sync
(
f1
)
assert_message
(
messaging
.
SyncMessage
)
assert_message
(
messaging
.
AckSyncMessage
)
# ln1 is a file; let a dir be upstream
r
=
pithos
.
object_put
(
ln1
,
content_type
=
'application/directory'
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment