Commit d0f70ed5 authored by Vangelis Koukis's avatar Vangelis Koukis

Merge pull request #18 from philipgian/release-0.4-locks

Improve xlock
parents f9f55b09 4f550235
......@@ -17,7 +17,8 @@ cmake_minimum_required(VERSION 2.8)
project (xseg)
SET(MAJOR 0)
SET(MINOR 2)
SET(MINOR 4)
SET(REVISION 1)
FIND_PROGRAM(H2XML h2xml)
......
......@@ -2,7 +2,6 @@
CUR_SOURCE_DIR=$1
CUR_BINARY_DIR=$2
echo "AAAAA"
cd $CUR_SOURCE_DIR
mkdir -p $CUR_BINARY_DIR/xseg
if [ -f xseg/version.py ] ; then
......
......@@ -4,7 +4,8 @@
#define XSEG_MAJOR ((uint64_t)${MAJOR})
#define XSEG_MINOR ((uint64_t)${MINOR})
#define XSEG_REVISION ((uint64_t)${REVISION})
#define XSEG_VERSION ((uint64_t)((XSEG_MAJOR << 48) + (XSEG_MINOR << 16)) + 0)
#define XSEG_VERSION ((uint64_t)((XSEG_MAJOR << 48) + (XSEG_MINOR << 32) + (XSEG_REVISION)))
#endif /* XSEG_VERSION_H */
......@@ -126,6 +126,7 @@ static void *xcache_get_entry(struct xcache *cache, xcache_handler h)
* Return a pointer to a NULL terminated string holding the name of the
* associated cache entry.
*/
/*
static char *xcache_get_name(struct xcache *cache, xcache_handler h)
{
xqindex idx = (xqindex)h;
......@@ -135,6 +136,7 @@ static char *xcache_get_name(struct xcache *cache, xcache_handler h)
return cache->nodes[idx].name;
}
*/
int xcache_init(struct xcache *cache, uint32_t xcache_size,
struct xcache_ops *ops, uint32_t flags, void *priv);
......
......@@ -19,6 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#define _XLOCK_H
#include <xseg/util.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#define MFENCE() __sync_synchronize()
#define BARRIER() __asm__ __volatile__ ("" ::: "memory")
......@@ -26,111 +29,181 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#undef __pause
#define __pause()
#define XLOCK_NOONE ((unsigned long)-1)
#define XLOCK_UNKNOWN_OWNER ((unsigned long)-2)
#define XLOCK_XSEGTOOL ((unsigned long)-3)
#define XLOCK_SANITY_CHECKS
#define XLOCK_CONGESTION_NOTIFY
typedef uint64_t xlock_owner_t;
#define XLOCK_NOONE ((xlock_owner_t)0)
#ifdef XLOCK_SANITY_CHECKS
#define MAX_VALID_OWNER 65536 /* we are not gonna have more ports than that */
#endif /* XLOCK_SANITY_CHECKS */
#define XLOCK_CONGESTION_NOTIFY
#ifdef XLOCK_CONGESTION_NOTIFY
/* When XLOCK_CONGESTION_NOTIFY is defined, xlock_acquire will start print
* congestion warning messages after it has spinned for a minimum of 2^MIN_SHIFT
* times. After that, each time the spin count doubles, a new message will be
* logged.
* At STACKTRACE_SHIFT, the backtrace of the process trying to acquire the lock
* will be logged, to better get the codepath that led to congestions (or even
* deadlock).
*/
#define MIN_SHIFT 20
#define STACKTRACE_SHIFT 23
#define MAX_SHIFT ((sizeof(unsigned long) * 8) -1)
#endif /* XLOCK_CONGESTION_NOTIFY */
/* Implement our gettid wrapper to cache gettid results */
static __thread pid_t tid=0;
static pid_t gettid()
{
if (tid)
return tid;
tid = syscall(SYS_gettid);
return tid;
}
struct xlock {
unsigned long owner;
xlock_owner_t owner;
unsigned long pc;
};
//} __attribute__ ((aligned (16))); /* support up to 128bit longs */
#ifdef XLOCK_SANITY_CHECKS
static inline int __is_valid_owner(unsigned long owner)
/*
* Owner comprises of three values: The PID of the owner, the TID of the owner
* and the PC where the lock resides.
* In order for them to fit into an xlock_owner_t, the 16 LSBs of the PID, the
* 16 LSBs of the TID and the 32 LSBs from the PC are kept.
*
* |-----16-----||-----16-----||------------32------------|
* |----PID-----||----TID-----||------------PC------------|
*/
#define PID_BITS 16
#define TID_BITS 16
#define PC_BITS 32
/*
_Static_assert(PID_BITS + TID_BITS + PC_BITS == sizeof(xlock_owner_t),
"Invalid bit definitions")
_Static_assert(PID_BITS < sizeof(pid_t) * 8,
"PID_BITS must be lower than the bits of pid_t")
_Static_assert(TID_BITS < sizeof(pid_t) * 8,
"TID_BITS must be lower than the bits of pid_t")
_Static_assert(PC_BITS < sizeof(void *) * 8,
"PC_BITS must be lower than the bits of a (void *)")
*/
static inline xlock_owner_t pack_owner(pid_t pid, pid_t tid, void *ip)
{
if (owner == XLOCK_UNKNOWN_OWNER || owner <= MAX_VALID_OWNER
|| owner != XLOCK_XSEGTOOL)
return 1;
return 0;
xlock_owner_t owner = 0;
unsigned long pc = (unsigned long)ip;
pid &= ((xlock_owner_t)1 << PID_BITS) -1;
tid &= ((xlock_owner_t)1 << TID_BITS) -1;
pc &= ((xlock_owner_t)1 << PC_BITS) -1;
owner |= pid;
owner <<= TID_BITS;
owner |= tid;
owner <<= PC_BITS;
owner |= pc;
return owner;
}
#endif /* XLOCK_SANITY_CHECKS */
static inline unsigned long xlock_acquire(struct xlock *lock, unsigned long who)
static inline void unpack_owner(uint64_t owner, pid_t *pid, pid_t *tid, void **ip)
{
unsigned long owner;
unsigned long pc;
pc = owner & (((xlock_owner_t)1 << PC_BITS) -1);
*ip = (void *)pc;
owner >>= PC_BITS;
*tid = owner & (((xlock_owner_t)1 << TID_BITS) -1);
owner >>= TID_BITS;
*pid = owner & (((xlock_owner_t)1 << PID_BITS) -1);
}
__attribute__((always_inline)) static inline unsigned long xlock_acquire(struct xlock *lock)
{
xlock_owner_t owner, who;
pid_t pid, tid, opid, otid;
void *pc, *opc;
#ifdef XLOCK_CONGESTION_NOTIFY
unsigned long times = 1;
unsigned long shift = MIN_SHIFT;
#endif /* XLOCK_CONGESTION_NOTIFY */
xlock_acquire_label:
pid = getpid();
tid = gettid();
pc = &&xlock_acquire_label;
who = pack_owner(pid, tid, pc);
for (;;) {
for (; (owner = *(volatile unsigned long *)(&lock->owner) != XLOCK_NOONE);){
#ifdef XLOCK_SANITY_CHECKS
if (!__is_valid_owner(owner)) {
XSEGLOG("xlock %lx corrupted. Lock owner %lu",
(unsigned long) lock, owner);
XSEGLOG("Resetting xlock %lx to XLOCK_NOONE",
(unsigned long) lock);
lock->owner = XLOCK_NOONE;
}
#endif /* XLOCK_SANITY_CHECKS */
for (; (owner = *(volatile xlock_owner_t*)(&lock->owner)) != XLOCK_NOONE;) {
#ifdef XLOCK_CONGESTION_NOTIFY
if (!(times & ((1<<shift) -1))){
XSEGLOG("xlock %lx spinned for %llu times"
"\n\t who: %lu, owner: %lu",
if (!(times & ((1<<shift) -1))) {
unpack_owner(owner, &opid, &otid, &opc);
XSEGLOG("xlock %p spinned for %llu times"
"\n\t who: (%d, %d, %p), "
"owner: (%d, %d, %p) (full pc: %p)",
(unsigned long) lock, times,
who, owner);
pid, tid, pc, opid, otid, opc,
lock->pc);
if (shift == STACKTRACE_SHIFT)
xseg_printtrace();
if (shift < MAX_SHIFT)
shift++;
// xseg_printtrace();
}
times++;
#endif /* XLOCK_CONGESTION_NOTIFY */
__pause();
}
if (__sync_bool_compare_and_swap(&lock->owner, XLOCK_NOONE, who))
if (__sync_bool_compare_and_swap(&lock->owner, XLOCK_NOONE, who)) {
/* Store full program counter in a non-atomic manner,
* for debuging reasons only.
*/
lock->pc = (unsigned long)pc;
break;
}
}
#ifdef XLOCK_SANITY_CHECKS
if (!__is_valid_owner(lock->owner)) {
XSEGLOG("xlock %lx locked with INVALID lock owner %lu",
(unsigned long) lock, lock->owner);
}
#endif /* XLOCK_SANITY_CHECKS */
return who;
return 1;
}
static inline unsigned long xlock_try_lock(struct xlock *lock, unsigned long who)
__attribute__((always_inline)) static inline unsigned long xlock_try_lock(struct xlock *lock)
{
unsigned long owner;
owner = *(volatile unsigned long *)(&lock->owner);
if (owner == XLOCK_NOONE)
return __sync_bool_compare_and_swap(&lock->owner, XLOCK_NOONE, who);
xlock_owner_t owner, who;
pid_t pid, tid;
void *pc;
xlock_try_lock_label:
pid = getpid();
tid = gettid();
pc = &&xlock_try_lock_label;
who = pack_owner(pid, tid, pc);
owner = *(volatile xlock_owner_t*)(&lock->owner);
if (owner == XLOCK_NOONE &&
__sync_bool_compare_and_swap(&lock->owner, XLOCK_NOONE, who)) {
lock->pc = (unsigned long)pc;
return 1;
}
return 0;
}
static inline void xlock_release(struct xlock *lock)
{
BARRIER();
/*
#ifdef XLOCK_SANITY_CHECKS
if (!__is_valid_owner(lock->owner)) {
XSEGLOG("xlock %lx releasing lock with INVALID lock owner %lu",
(unsigned long) lock, lock->owner);
}
#endif
*/
/* XLOCK_SANITY_CHECKS */
lock->pc = 0;
lock->owner = XLOCK_NOONE;
}
static inline unsigned long xlock_get_owner(struct xlock *lock)
static inline xlock_owner_t xlock_get_owner(struct xlock *lock)
{
return *(volatile unsigned long *)(&lock->owner);
return *(volatile xlock_owner_t*)(&lock->owner);
}
#endif
......@@ -45,13 +45,13 @@ struct xpool {
};
void xpool_init(struct xpool *xp, uint64_t size, struct xpool_node* mem);
void xpool_clear(struct xpool *xp, uint32_t who);
xpool_index xpool_add(struct xpool *xp, xpool_data data, uint32_t who);
xpool_index xpool_remove(struct xpool *xp, xpool_index idx, xpool_data *data, uint32_t who);
xpool_index xpool_peek(struct xpool *xp, xpool_data *data, uint32_t who);
xpool_index xpool_peek_idx(struct xpool *xp, xpool_index idx, xpool_data *data, uint32_t who);
xpool_index xpool_peek_and_fwd(struct xpool *xp, xpool_data *data, uint32_t who);
xpool_index xpool_set_idx(struct xpool *xp, xpool_index idx, xpool_data data, uint32_t who);
void xpool_clear(struct xpool *xp);
xpool_index xpool_add(struct xpool *xp, xpool_data data);
xpool_index xpool_remove(struct xpool *xp, xpool_index idx, xpool_data *data);
xpool_index xpool_peek(struct xpool *xp, xpool_data *data);
xpool_index xpool_peek_idx(struct xpool *xp, xpool_index idx, xpool_data *data);
xpool_index xpool_peek_and_fwd(struct xpool *xp, xpool_data *data);
xpool_index xpool_set_idx(struct xpool *xp, xpool_index idx, xpool_data data);
void __xpool_clear(struct xpool *xp);
xpool_index __xpool_add(struct xpool *xp, xpool_data data);
......
......@@ -64,40 +64,33 @@ xqindex __xq_append_head( struct xq * xq,
xqindex xqi );
xqindex xq_append_head ( struct xq * xq,
xqindex xqi,
unsigned long who);
xqindex xqi );
xqindex __xq_pop_head ( struct xq * xq );
xqindex xq_pop_head ( struct xq * xq,
unsigned long who);
xqindex __xq_pop_head ( struct xq * xq );
xqindex xq_pop_head ( struct xq * xq );
xqindex __xq_append_tail( struct xq * xq,
xqindex xqi );
xqindex xq_append_tail ( struct xq * xq,
xqindex xqi,
unsigned long who);
xqindex xqi );
xqindex __xq_peek_head ( struct xq * xq);
xqindex xq_peek_head ( struct xq * xq,
unsigned long who);
xqindex xq_peek_head ( struct xq * xq );
xqindex __xq_peek_tail ( struct xq * xq);
xqindex xq_peek_tail ( struct xq * xq,
unsigned long who);
xqindex xq_peek_tail ( struct xq * xq );
xqindex __xq_pop_tail ( struct xq * xq );
xqindex xq_pop_tail ( struct xq * xq,
unsigned long who);
xqindex xq_pop_tail ( struct xq * xq );
int xq_head_to_tail ( struct xq * hq,
struct xq * tq,
xqindex nr ,
unsigned long who);
xqindex nr );
xqindex xq_size ( struct xq * xq );
......@@ -109,14 +102,12 @@ int __xq_check ( struct xq * xq,
xqindex idx );
int xq_check ( struct xq * xq,
xqindex idx,
unsigned long who );
xqindex idx );
xqindex __xq_resize ( struct xq * xq,
struct xq * newxq);
xqindex xq_resize ( struct xq * xq,
struct xq * newxq,
unsigned long who );
struct xq * newxq );
#endif
......@@ -34,6 +34,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <sys/time.h>
#include <xseg/version.h>
#include <xseg/util.h>
#include <xseg/xlock.h>
#include <xseg/xq.h>
#include <xseg/xobj.h>
#include <xseg/xhash.h>
......@@ -214,7 +215,6 @@ struct xseg_task {
#define X_CREATE 18
#define X_RENAME 19
#define X_FLUSH 20
#define X_TRUNCATE 21
/* REQ FLAGS */
#define XF_NOSYNC (1 << 0)
......@@ -270,6 +270,7 @@ struct xseg_shared {
char (*peer_types)[XSEG_TNAMESIZE]; /* alignment? */
xptr *peer_type_data;
uint32_t nr_peer_types;
struct xlock segment_lock;
};
struct xseg_private {
......
......@@ -1223,10 +1223,16 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op)
static void lock_status(struct xlock *lock, char *buf, int len)
{
int r;
if (lock->owner == XLOCK_NOONE)
pid_t pid, tid;
void *pc, *full_pc;
if (lock->owner == XLOCK_NOONE) {
r = snprintf(buf, len, "Locked: No");
else
r = snprintf(buf, len, "Locked: Yes (Owner: %lu)", lock->owner);
} else {
unpack_owner(lock->owner, &pid, &tid, &pc);
full_pc = (void *)lock->pc;
r = snprintf(buf, len, "Locked: Yes (Owner: %d, %d, %p[%p])",
pid, tid, pc, full_pc);
}
if (r >= len)
buf[len-1] = 0;
}
......@@ -1262,7 +1268,7 @@ int cmd_report(uint32_t portno)
(unsigned long long)port->max_alloc_reqs,
xseg->path_next[portno],
xseg->dst_gw[portno],
port->owner,
(unsigned long long)port->owner,
(void *)fq, (unsigned long long)xq_count(fq), fls,
(void *)rq, (unsigned long long)xq_count(rq), rls,
(void *)pq, (unsigned long long)xq_count(pq), pls);
......@@ -1413,19 +1419,19 @@ static int isDangling(struct xseg_request *req)
fq = xseg_get_queue(xseg, port, free_queue);
rq = xseg_get_queue(xseg, port, request_queue);
pq = xseg_get_queue(xseg, port, reply_queue);
xlock_acquire(&port->fq_lock, XLOCK_XSEGTOOL);
xlock_acquire(&port->fq_lock);
if (__xq_check(fq, XPTR_MAKE(req, xseg->segment))){
xlock_release(&port->fq_lock);
return 0;
}
xlock_release(&port->fq_lock);
xlock_acquire(&port->rq_lock, XLOCK_XSEGTOOL);
xlock_acquire(&port->rq_lock);
if (__xq_check(rq, XPTR_MAKE(req, xseg->segment))){
xlock_release(&port->rq_lock);
return 0;
}
xlock_release(&port->rq_lock);
xlock_acquire(&port->pq_lock, XLOCK_XSEGTOOL);
xlock_acquire(&port->pq_lock);
if (__xq_check(pq, XPTR_MAKE(req, xseg->segment))){
xlock_release(&port->pq_lock);
return 0;
......@@ -1528,7 +1534,7 @@ int cmd_verify(int fix)
struct xobject_iter it;
struct xseg_request *req;
xlock_acquire(&obj_h->lock, XLOCK_XSEGTOOL);
xlock_acquire(&obj_h->lock);
xobj_iter_init(obj_h, &it);
while (xobj_iterate(obj_h, &it, (void **)&req)){
//FIXME this will not work cause obj->magic - req->serial is not
......@@ -1546,42 +1552,37 @@ int cmd_verify(int fix)
return 0;
}
int cmd_recoverlocks(long p)
int cmd_recoverlocks(int pid)
{
xport portno = (xport)p;
xport i;
struct xseg_port *port;
pid_t p = (pid_t)pid;
if (cmd_join())
return -1;
if (xseg->shared->flags & XSEG_F_LOCK){
fprintf(stdout, "Segment lock: Locked\n");
fprintf(stdout, "Consider rebooting the node\n");
return -1;
if (xseg->shared->segment_lock.owner == p){
fprintf(stdout, "Segment lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->shared->segment_lock);
}
if (xseg->heap->lock.owner != XLOCK_NOONE){
fprintf(stdout, "Heap lock: Locked\n");
fprintf(stdout, "Consider rebooting the node\n");
return -1;
if (xseg->heap->lock.owner == p){
fprintf(stdout, "Heap lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->heap->lock);
}
//obj_h locks
if (xseg->request_h->lock.owner != XLOCK_NOONE){
fprintf(stdout, "Requests handler lock: Locked\n");
fprintf(stdout, "Consider rebooting the node\n");
return -1;
/* obj_h locks */
if (xseg->request_h->lock.owner == p){
fprintf(stdout, "Request handler lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->request_h->lock);
}
if (xseg->port_h->lock.owner != XLOCK_NOONE){
fprintf(stdout, "Ports handler lock: Locked\n");
fprintf(stdout, "Consider rebooting the node\n");
return -1;
if (xseg->port_h->lock.owner == p){
fprintf(stdout, "Port handler lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->port_h->lock);
}
if (xseg->object_handlers->lock.owner != XLOCK_NOONE){
fprintf(stdout, "Objects handler lock: Locked\n");
fprintf(stdout, "Consider rebooting the node\n");
return -1;
if (xseg->object_handlers->lock.owner == p){
fprintf(stdout, "Object handler lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->object_handlers->lock);
}
//take segment lock?
xport i;
struct xseg_port *port;
for (i = 0; i < xseg->config.nr_ports; i++) {
if (xseg->ports[i]){
port = xseg_get_port(xseg, i);
......@@ -1590,16 +1591,22 @@ int cmd_recoverlocks(long p)
fprintf(stdout, "Consider rebooting the node\n");
return -1;
}
if (port->fq_lock.owner == portno) {
fprintf(stdout, "Free queue lock of port %u locked. Unlocking...\n", i);
if (port->fq_lock.owner == p) {
fprintf(stdout, "Free queue lock of port %u "
"locked by %d. Unlocking...\n",
i, p);
xlock_release(&port->fq_lock);
}
if (port->rq_lock.owner == portno) {
fprintf(stdout, "Request queue lock of port %u locked. Unlocking...\n", i);
if (port->rq_lock.owner == p) {
fprintf(stdout, "Request queue lock of port %u "
"locked by %d. Unlocking...\n",
i, p);
xlock_release(&port->rq_lock);
}
if (port->pq_lock.owner == portno) {
fprintf(stdout, "Reply queue lock of port %u locked. Unlocking...\n", i);
if (port->pq_lock.owner == p) {
fprintf(stdout, "Reply queue lock of port %u "
"locked by %d. Unlocking...\n",
i, p);
xlock_release(&port->pq_lock);
}
}
......@@ -1616,12 +1623,12 @@ int cmd_recoverport(long portno)
struct xobject_iter it;
struct xseg_request *req;
xlock_acquire(&obj_h->lock, XLOCK_XSEGTOOL);
xlock_acquire(&obj_h->lock);
xobj_iter_init(obj_h, &it);
while (xobj_iterate(obj_h, &it, (void **)&req)){
/* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
//FIXME this will not work cause obj->magic - req->serial is not
//touched when a request is get
/* if (obj->magic != MAGIC_REQ && t->src_portno == portno){ */
if (isDangling(req) && !__xobj_isFree(obj_h, req)){
if (req->transit_portno == (uint32_t)portno) {
report_request(req);
......@@ -1659,7 +1666,7 @@ int cmd_inspectq(xport portno, enum queue qt)
}
else
return -1;
xlock_acquire(l, XLOCK_XSEGTOOL);
xlock_acquire(l);
xqindex i,c = xq_count(q);
if (c) {
struct xseg_request *req;
......@@ -2043,7 +2050,7 @@ int main(int argc, char **argv)
}
if (!strcmp(argv[i], "recoverlocks") && (i + 1 < argc)) {
ret = cmd_recoverlocks(atol(argv[i+1]));
ret = cmd_recoverlocks(atoi(argv[i+1]));
i += 1;
continue;
}
......
......@@ -41,16 +41,12 @@ static int xseg_join_ref;
static void __lock_segment(struct xseg *xseg)
{
volatile uint64_t *flags;
flags = &xseg->shared->flags;
while (__sync_fetch_and_or(flags, XSEG_F_LOCK));
xlock_acquire(&xseg->shared->segment_lock);
}
static void __unlock_segment(struct xseg *xseg)
{
volatile uint64_t *flags;
flags = &xseg->shared->flags;
__sync_fetch_and_and(flags, ~XSEG_F_LOCK);
xlock_release(&xseg->shared->segment_lock);
}
static struct xseg_type *__find_type(const char *name, long *index)
......@@ -660,6 +656,7 @@ static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
shared = (struct xseg_shared *) mem;
shared->flags = 0;
shared->nr_peer_types = 0;
xlock_release(&shared->segment_lock);
xseg->shared = (struct xseg_shared *) XPTR_MAKE(mem, segment);
mem = xheap_allocate(heap, page_size);
......@@ -1245,7 +1242,7 @@ int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr)
if (!port)
return -1;
xlock_acquire(&port->fq_lock, portno);
xlock_acquire(&port->fq_lock);
q = XPTR_TAKE(port->free_queue, xseg->segment);
while ((req = xobj_get_obj(xseg->request_h, X_ALLOC)) != NULL && i < nr) {
xqi = XPTR_MAKE(req, xseg->segment);
......@@ -1280,7 +1277,7 @@ int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
if (!port)
return -1;
xlock_acquire(&port->fq_lock, portno);
xlock_acquire(&port->fq_lock);
q = XPTR_TAKE(port->free_queue, xseg->segment);
while ((xqi = __xq_pop_head(q)) != Noneidx && i < nr) {
req = XPTR_TAKE(xqi, xseg->segment);
......@@ -1291,7 +1288,7 @@ int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
return -1;
xlock_release(&port->fq_lock);
xlock_acquire(&port->port_lock, portno);