messaging.py 7.06 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
import Queue

from agkyra.syncer import utils


class Messager(object):
    def __init__(self, *args, **kwargs):
        self.queue = Queue.Queue()

    def put(self, obj):
        return self.queue.put(obj)

    def get(self, **kwargs):
        try:
            return self.queue.get(**kwargs)
        except Queue.Empty:
            return None


class Message(object):
    def __init__(self, *args, **kwargs):
        self.tstamp = utils.time_stamp()
        self.logger = kwargs["logger"]
        self.name = self.__class__.__name__


class UpdateMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.archive = kwargs["archive"]
        self.objname = kwargs["objname"]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
47
        self.old_serial = kwargs["old_serial"]
48 49 50 51 52
        self.serial = kwargs["serial"]
        self.logger.info("Updating archive: %s, object: '%s', serial: %s" %
                         (self.archive, self.objname, self.serial))


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
53 54 55 56 57 58 59 60 61
class IgnoreProbeMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.archive = kwargs["archive"]
        self.objname = kwargs["objname"]
        self.logger.warning("Ignoring probe archive: %s, object: %s" %
                            (self.archive, self.objname))


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
class AlreadyProbedMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.archive = kwargs["archive"]
        self.objname = kwargs["objname"]
        self.serial = kwargs["serial"]
        self.logger.warning("Serial mismatch in probing archive: %s, "
                            "object: '%s'" % (self.archive, self.objname))


class HeartbeatNoProbeMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.archive = kwargs["archive"]
        self.objname = kwargs["objname"]
        self.heartbeat = kwargs["heartbeat"]
        self.logger.warning("Object '%s' is being synced; "
                            "Probe in archive %s aborted." %
                            (self.objname, self.archive))


class HeartbeatNoDecideMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.objname = kwargs["objname"]
        self.heartbeat = kwargs["heartbeat"]
88 89
        self.logger.debug("Object '%s' already handled; aborting deciding."
                          % self.objname)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
90 91


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
class HeartbeatReplayDecideMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.objname = kwargs["objname"]
        self.heartbeat = kwargs["heartbeat"]
        self.logger.info("Found heartbeat with current ident %s"
                         % self.heartbeat["ident"])


class FailedSyncIgnoreDecisionMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.objname = kwargs["objname"]
        self.serial = kwargs["serial"]
        self.logger.warning(
            "Ignoring failed decision for: '%s', decision: %s" %
            (self.objname, self.serial))


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
111 112 113 114 115 116 117 118 119 120
class LiveInfoUpdateMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.archive = kwargs["archive"]
        self.objname = kwargs["objname"]
        self.info = kwargs["info"]
        self.logger.warning("Actual info differs in %s for object: '%s'; "
                            "updating..." % (self.archive, self.objname))


121 122 123 124 125 126
class SyncMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.objname = kwargs["objname"]
        self.archive = kwargs["archive"]
        self.serial = kwargs["serial"]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
127
        self.info = kwargs["info"]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
128 129
        self.logger.info("Syncing archive: %s, object: '%s', serial: %s"
                         % (self.archive, self.objname, self.serial))
130 131 132 133 134 135 136 137


class AckSyncMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.objname = kwargs["objname"]
        self.archive = kwargs["archive"]
        self.serial = kwargs["serial"]
138
        self.logger.info("Acked archive: %s, object: '%s', serial: %s" %
139 140 141
                         (self.archive, self.objname, self.serial))


142 143 144 145 146 147 148 149 150 151 152
class SyncErrorMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.objname = kwargs["objname"]
        self.serial = kwargs["serial"]
        self.exception = kwargs["exception"]
        self.logger.warning(
            "Sync failed; object: '%s' serial: %s error: %s"
            % (self.objname, self.serial, self.exception))


153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
class CollisionMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.objname = kwargs["objname"]
        self.etag = kwargs["etag"]
        self.logger.warning(
            "Failed to upload; object: '%s' with etag: %s "
            "collided with upstream" % (self.objname, self.etag))


class ConflictStashMessage(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.objname = kwargs["objname"]
        self.stash_name = kwargs["stash_name"]
        self.logger.warning("Stashing file '%s' to '%s'" %
                            (self.objname, self.stash_name))
170 171 172 173 174 175 176 177 178 179 180 181


class LocalfsSyncDisabled(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.logger.warning("Localfs sync is disabled")


class PithosSyncDisabled(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.logger.warning("Pithos sync is disabled")
182 183 184 185 186 187 188 189 190 191 192 193


class LocalfsSyncEnabled(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.logger.info("Localfs sync is enabled")


class PithosSyncEnabled(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.logger.info("Pithos sync is enabled")
194 195 196 197 198 199 200 201 202 203 204 205 206 207


class PithosGenericError(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.exc = kwargs["exc"]
        self.logger.error(self.exc)


class PithosAuthTokenError(Message):
    def __init__(self, *args, **kwargs):
        Message.__init__(self, *args, **kwargs)
        self.exc = kwargs["exc"]
        self.logger.error(self.exc)