Commit b3711969 authored by Vangelis Koukis's avatar Vangelis Koukis

Merge pull request #19 from philipgian/release-0.4

Various libxeg improvements
parents 7e6e14ee 744c4fda
......@@ -83,17 +83,17 @@ struct xlock {
_Static_assert(PID_BITS + TID_BITS + PC_BITS == sizeof(xlock_owner_t),
"Invalid bit definitions")
_Static_assert(PID_BITS < sizeof(pid_t) * 8,
"PID_BITS must be lower than the bits of pid_t")
_Static_assert(PID_BITS <= sizeof(pid_t) * 8,
"PID_BITS must be lower or equal than the bits of pid_t")
_Static_assert(TID_BITS < sizeof(pid_t) * 8,
"TID_BITS must be lower than the bits of pid_t")
_Static_assert(TID_BITS <= sizeof(pid_t) * 8,
"TID_BITS must be lower or equal than the bits of pid_t")
_Static_assert(PC_BITS < sizeof(void *) * 8,
"PC_BITS must be lower than the bits of a (void *)")
_Static_assert(PC_BITS <= sizeof(void *) * 8,
"PC_BITS must be lower or equal than the bits of a (void *)")
*/
static inline xlock_owner_t pack_owner(pid_t pid, pid_t tid, void *ip)
static inline xlock_owner_t xlock_pack_owner(pid_t pid, pid_t tid, void *ip)
{
xlock_owner_t owner = 0;
unsigned long pc = (unsigned long)ip;
......@@ -111,15 +111,34 @@ static inline xlock_owner_t pack_owner(pid_t pid, pid_t tid, void *ip)
return owner;
}
static inline void unpack_owner(uint64_t owner, pid_t *pid, pid_t *tid, void **ip)
static inline void xlock_unpack_owner(uint64_t owner, pid_t *pid, pid_t *tid, void **ip)
{
unsigned long pc;
pc = owner & (((xlock_owner_t)1 << PC_BITS) -1);
*ip = (void *)pc;
if (ip) {
pc = owner & (((xlock_owner_t)1 << PC_BITS) -1);
*ip = (void *)pc;
}
owner >>= PC_BITS;
*tid = owner & (((xlock_owner_t)1 << TID_BITS) -1);
if (tid) {
*tid = owner & (((xlock_owner_t)1 << TID_BITS) -1);
}
owner >>= TID_BITS;
*pid = owner & (((xlock_owner_t)1 << PID_BITS) -1);
if (pid) {
*pid = owner & (((xlock_owner_t)1 << PID_BITS) -1);
}
}
/* x86_64 specific
* TODO: Move to an arch specific directory
*/
static inline void * __get_pc()
{
void * rip;
__asm__ volatile ("lea (%%rip, 1), %0" : "=r"(rip));
return rip;
}
__attribute__((always_inline)) static inline unsigned long xlock_acquire(struct xlock *lock)
......@@ -132,18 +151,17 @@ __attribute__((always_inline)) static inline unsigned long xlock_acquire(struct
unsigned long shift = MIN_SHIFT;
#endif /* XLOCK_CONGESTION_NOTIFY */
xlock_acquire_label:
pid = getpid();
tid = gettid();
pc = &&xlock_acquire_label;
pc = __get_pc();
who = pack_owner(pid, tid, pc);
who = xlock_pack_owner(pid, tid, pc);
for (;;) {
for (; (owner = *(volatile xlock_owner_t*)(&lock->owner)) != XLOCK_NOONE;) {
#ifdef XLOCK_CONGESTION_NOTIFY
if (!(times & ((1<<shift) -1))) {
unpack_owner(owner, &opid, &otid, &opc);
xlock_unpack_owner(owner, &opid, &otid, &opc);
XSEGLOG("xlock %p spinned for %llu times"
"\n\t who: (%d, %d, %p), "
"owner: (%d, %d, %p) (full pc: %p)",
......@@ -178,12 +196,11 @@ __attribute__((always_inline)) static inline unsigned long xlock_try_lock(struct
pid_t pid, tid;
void *pc;
xlock_try_lock_label:
pid = getpid();
tid = gettid();
pc = &&xlock_try_lock_label;
pc = __get_pc();
who = pack_owner(pid, tid, pc);
who = xlock_pack_owner(pid, tid, pc);
owner = *(volatile xlock_owner_t*)(&lock->owner);
if (owner == XLOCK_NOONE &&
......
......@@ -215,6 +215,7 @@ struct xseg_task {
#define X_CREATE 18
#define X_RENAME 19
#define X_FLUSH 20
#define X_UPDATE 21
/* REQ FLAGS */
#define XF_NOSYNC (1 << 0)
......
......@@ -25,6 +25,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <xseg/xhash.h>
#include <xseg/xobj.h>
......@@ -43,7 +46,7 @@ int help(void)
" signal <portno>\n"
" bridge <portno1> <portno2> <logfile> {full|summary|stats}\n"
" recoverport <portno>\n"
" recoverlocks <portno>\n"
" recoverlocks <pid>\n"
" verify\n"
" verify-fix\n"
"port commands:\n"
......@@ -97,6 +100,42 @@ uint64_t reqs;
xport sport = NoPort;
void *sd;
time_t previous_intr = 0;
void handler(int signal)
{
int r;
time_t t = time(NULL);
if (previous_intr && t - previous_intr < 2) {
exit(1);
}
char *msg = "You should not interrupt this program, as this may cause "
"irreversible segment corruption\n."
"If you really want to terminate this program, hit "
"Ctrl-C again withing 2 seconds\n";
write(2, msg, strlen(msg));
previous_intr = t;
}
int install_signal_handler()
{
int r;
struct sigaction sa;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sa.sa_handler = handler;
r = sigaction(SIGINT, &sa, NULL);
if (r < 0)
return -errno;
return 0;
}
static void init_local_signal()
{
......@@ -1228,7 +1267,7 @@ static void lock_status(struct xlock *lock, char *buf, int len)
if (lock->owner == XLOCK_NOONE) {
r = snprintf(buf, len, "Locked: No");
} else {
unpack_owner(lock->owner, &pid, &tid, &pc);
xlock_unpack_owner(lock->owner, &pid, &tid, &pc);
full_pc = (void *)lock->pc;
r = snprintf(buf, len, "Locked: Yes (Owner: %d, %d, %p[%p])",
pid, tid, pc, full_pc);
......@@ -1462,43 +1501,35 @@ int prompt_user(char *msg)
return r;
}
static void verify_lock(struct xlock *lock, char *name, int fix)
{
char buf[64];
if (lock->owner == XLOCK_NOONE) {
return;
}
lock_status(lock, buf, 64);
fprintf(stdout, "%s: %s\n", name, buf);
if (fix && prompt_user("Unlock it ?")) {
xlock_release(lock);
}
}
//FIXME this should be in xseg lib?
int cmd_verify(int fix)
{
char buf[64];
if (cmd_join())
return -1;
//segment lock
if (xseg->shared->flags & XSEG_F_LOCK){
fprintf(stderr, "Segment lock: Locked\n");
if (fix && prompt_user("Unlock it ?"))
xseg->shared->flags &= ~XSEG_F_LOCK;
}
//heap lock
if (xseg->heap->lock.owner != XLOCK_NOONE){
fprintf(stderr, "Heap lock: Locked (Owner: %llu)\n",
(unsigned long long)xseg->heap->lock.owner);
if (fix && prompt_user("Unlock it ?"))
xlock_release(&xseg->heap->lock);
}
//obj_h locks
if (xseg->request_h->lock.owner != XLOCK_NOONE){
fprintf(stderr, "Requests handler lock: Locked (Owner: %llu)\n",
(unsigned long long)xseg->request_h->lock.owner);
if (fix && prompt_user("Unlock it ?"))
xlock_release(&xseg->request_h->lock);
}
if (xseg->port_h->lock.owner != XLOCK_NOONE){
fprintf(stderr, "Ports handler lock: Locked (Owner: %llu)\n",
(unsigned long long)xseg->port_h->lock.owner);
if (fix && prompt_user("Unlock it ?"))
xlock_release(&xseg->port_h->lock);
}
if (xseg->object_handlers->lock.owner != XLOCK_NOONE){
fprintf(stderr, "Objects handler lock: Locked (Owner: %llu)\n",
(unsigned long long)xseg->object_handlers->lock.owner);
if (fix && prompt_user("Unlock it ?"))
xlock_release(&xseg->object_handlers->lock);
}
verify_lock(&xseg->shared->segment_lock, "Segment lock", fix);
verify_lock(&xseg->heap->lock, "Heap lock", fix);
verify_lock(&xseg->request_h->lock, "Request handler lock", fix);
verify_lock(&xseg->port_h->lock, "Port handler lock", fix);
verify_lock(&xseg->object_handlers->lock, "Objects handler lock", fix);
//take segment lock?
xport i;
struct xseg_port *port;
......@@ -1509,24 +1540,15 @@ int cmd_verify(int fix)
fprintf(stderr, "Inconsisten port <-> portno mapping %u", i);
continue;
}
if (port->fq_lock.owner != XLOCK_NOONE) {
fprintf(stderr, "Free queue lock of port %u locked (Owner %llu)\n",
i, (unsigned long long)port->fq_lock.owner);
if (fix && prompt_user("Unlock it ?"))
xlock_release(&port->fq_lock);
}
if (port->rq_lock.owner != XLOCK_NOONE) {
fprintf(stderr, "Request queue lock of port %u locked (Owner %llu)\n",
i, (unsigned long long)port->rq_lock.owner);
if (fix && prompt_user("Unlock it ?"))
xlock_release(&port->rq_lock);
}
if (port->pq_lock.owner != XLOCK_NOONE) {
fprintf(stderr, "Reply queue lock of port %u locked (Owner %llu)\n",
i, (unsigned long long)port->pq_lock.owner);
if (fix && prompt_user("Unlock it ?"))
xlock_release(&port->pq_lock);
}
snprintf(buf, 64, "Free queue lock of port %u", i);
verify_lock(&port->fq_lock, buf, fix);
snprintf(buf, 64, "Request queue lock of port %u", i);
verify_lock(&port->rq_lock, buf, fix);
snprintf(buf, 64, "Reply queue lock of port %u", i);
verify_lock(&port->pq_lock, buf, fix);
}
}
......@@ -1552,64 +1574,51 @@ int cmd_verify(int fix)
return 0;
}
static void check_and_unlock(struct xlock *lock, char *name, pid_t pid)
{
pid_t owner_pid;
xlock_unpack_owner(lock->owner, &owner_pid, NULL, NULL);
if (owner_pid == pid) {
fprintf(stdout, "%s locked by %d. Unlocking..\n", name, pid);
xlock_release(lock);
}
}
int cmd_recoverlocks(int pid)
{
xport i;
struct xseg_port *port;
pid_t p = (pid_t)pid;
char buf[64];
if (cmd_join())
return -1;
if (xseg->shared->segment_lock.owner == p){
fprintf(stdout, "Segment lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->shared->segment_lock);
}
if (xseg->heap->lock.owner == p){
fprintf(stdout, "Heap lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->heap->lock);
}
/* obj_h locks */
if (xseg->request_h->lock.owner == p){
fprintf(stdout, "Request handler lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->request_h->lock);
}
if (xseg->port_h->lock.owner == p){
fprintf(stdout, "Port handler lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->port_h->lock);
}
if (xseg->object_handlers->lock.owner == p){
fprintf(stdout, "Object handler lock locked by %d. Unlocking..\n", p);
xlock_release(&xseg->object_handlers->lock);
}
check_and_unlock(&xseg->shared->segment_lock, "Segment lock", p);
check_and_unlock(&xseg->heap->lock, "Heap lock", p);
check_and_unlock(&xseg->request_h->lock, "Request handler lock", p);
check_and_unlock(&xseg->port_h->lock, "Port handler lock", p);
check_and_unlock(&xseg->object_handlers->lock, "Object handler lock", p);
for (i = 0; i < xseg->config.nr_ports; i++) {
if (xseg->ports[i]){
port = xseg_get_port(xseg, i);
if (!port){
fprintf(stdout, "Inconsisten port <-> portno mapping %u", i);
fprintf(stdout, "Consider rebooting the node\n");
return -1;
}
if (port->fq_lock.owner == p) {
fprintf(stdout, "Free queue lock of port %u "
"locked by %d. Unlocking...\n",
i, p);
xlock_release(&port->fq_lock);
}
if (port->rq_lock.owner == p) {
fprintf(stdout, "Request queue lock of port %u "
"locked by %d. Unlocking...\n",
i, p);
xlock_release(&port->rq_lock);
}
if (port->pq_lock.owner == p) {
fprintf(stdout, "Reply queue lock of port %u "
"locked by %d. Unlocking...\n",
i, p);
xlock_release(&port->pq_lock);
}
if (!xseg->ports[i])
continue;
port = xseg_get_port(xseg, i);
if (!port){
fprintf(stdout, "Inconsisten port <-> portno mapping %u", i);
fprintf(stdout, "Consider rebooting the node\n");
return -1;
}
snprintf(buf, 64, "Free queue lock of port %u", i);
check_and_unlock(&port->fq_lock, buf, p);
snprintf(buf, 64, "Request queue lock of port %u", i);
check_and_unlock(&port->rq_lock, buf, p);
snprintf(buf, 64, "Reply queue lock of port %u", i);
check_and_unlock(&port->pq_lock, buf, p);
}
return 0;
}
......@@ -2016,6 +2025,8 @@ int main(int argc, char **argv)
return -1;
}
install_signal_handler();
for (i = 2; i < argc; i++) {
if (!strcmp(argv[i], "create")) {
......
......@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <xseg/util.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#ifndef NULL
#define NULL ((void *)0)
......@@ -812,7 +813,7 @@ struct xseg *xseg_join(const char *segtypename,
struct xseg_private *priv;
struct xseg_operations *xops;
struct xseg_peer_operations *pops;
int r;
int r, err_no;
pthread_mutex_lock(&xseg_joinref_mutex);
xseg_join_ref++;
......@@ -823,6 +824,7 @@ struct xseg *xseg_join(const char *segtypename,
if (!peertype) {
XSEGLOG("Peer type '%s' not found\n", peertypename);
__unlock_domain();
err_no = EINVAL;
goto err;
}
......@@ -830,6 +832,7 @@ struct xseg *xseg_join(const char *segtypename,
if (!segtype) {
XSEGLOG("Segment type '%s' not found\n", segtypename);
__unlock_domain();
err_no = EINVAL;
goto err;
}
......@@ -840,23 +843,27 @@ struct xseg *xseg_join(const char *segtypename,
xseg = pops->malloc(sizeof(struct xseg));
if (!xseg) {
err_no = errno;
XSEGLOG("Cannot allocate memory");
goto err;
}
priv = pops->malloc(sizeof(struct xseg_private));
if (!priv) {
err_no = errno;
XSEGLOG("Cannot allocate memory");
goto err_seg;
}
__xseg = xops->map(segname, XSEG_MIN_PAGE_SIZE, NULL);
if (!__xseg) {
err_no = errno;
XSEGLOG("Cannot map segment");
goto err_priv;
}
if (!(__xseg->version == XSEG_VERSION)) {
err_no = EPROTO;
XSEGLOG("Version mismatch. Expected %llu, segment version %llu",
XSEG_VERSION, __xseg->version);
goto err_priv;
......@@ -868,6 +875,7 @@ struct xseg *xseg_join(const char *segtypename,
__xseg = xops->map(segname, size, xseg);
if (!__xseg) {
err_no = errno;
XSEGLOG("Cannot map segment");
goto err_priv;
}
......@@ -876,20 +884,24 @@ struct xseg *xseg_join(const char *segtypename,
priv->peer_type = *peertype;
priv->wakeup = wakeup;
priv->req_data = xhash_new(3, 0, XHASH_INTEGER); //FIXME should be relative to XSEG_DEF_REQS
if (!priv->req_data)
if (!priv->req_data) {
errno = ENOMEM;
goto err_priv;
}
xlock_release(&priv->reqdatalock);
xseg->max_peer_types = __xseg->max_peer_types;
priv->peer_types = pops->malloc(sizeof(void *) * xseg->max_peer_types);
if (!priv->peer_types) {
err_no = errno;
XSEGLOG("Cannot allocate memory");
goto err_unmap;
}
memset(priv->peer_types, 0, sizeof(void *) * xseg->max_peer_types);
priv->peer_type_data = pops->malloc(sizeof(void *) * xseg->max_peer_types);
if (!priv->peer_types) {
err_no = errno;
XSEGLOG("Cannot allocate memory");
//FIXME wrong err handling
goto err_unmap;
......@@ -913,6 +925,7 @@ struct xseg *xseg_join(const char *segtypename,
r = xseg_validate_pointers(xseg);
if (r) {
err_no = EFAULT;
XSEGLOG("found %d invalid xseg pointers!\n", r);
goto err_free_types;
}
......@@ -940,6 +953,7 @@ err_seg:
pops->mfree(xseg);
err:
pthread_mutex_unlock(&xseg_joinref_mutex);
errno = err_no;
return NULL;
}
......
......@@ -36,34 +36,45 @@ char errbuf[ERRSIZE];
static long posix_allocate(const char *name, uint64_t size)
{
long ret = 0;
int fd, r;
off_t lr;
int err_no = 0;
fd = shm_open(name, O_RDWR | O_CREAT | O_EXCL, 0770);
if (fd < 0) {
err_no = errno;
XSEGLOG("Cannot create shared segment: %s\n",
strerror_r(errno, errbuf, ERRSIZE));
return fd;
ret = fd;
goto exit;
}
lr = lseek(fd, size -1, SEEK_SET);
if (lr == (off_t)-1) {
err_no = errno;
close(fd);
XSEGLOG("Cannot seek into segment file: %s\n",
strerror_r(errno, errbuf, ERRSIZE));
return lr;
ret = lr;
goto exit;
}
errbuf[0] = 0;
r = write(fd, errbuf, 1);
if (r != 1) {
err_no = errno;
close(fd);
XSEGLOG("Failed to set segment size: %s\n",
strerror_r(errno, errbuf, ERRSIZE));
return r;
ret = r;
goto exit;
}
close(fd);
return 0;
exit:
errno = err_no;
return ret;
}
static long posix_deallocate(const char *name)
......@@ -74,15 +85,14 @@ static long posix_deallocate(const char *name)
static void *posix_map(const char *name, uint64_t size, struct xseg *seg)
{
struct xseg *xseg;
int fd;
// if (seg)
// XSEGLOG("struct xseg * is not NULL. Ignoring...\n");
int fd, err_no = 0;
fd = shm_open(name, O_RDWR, 0000);
if (fd < 0) {
err_no = errno;
XSEGLOG("Failed to open '%s' for mapping: %s\n",
name, strerror_r(errno, errbuf, ERRSIZE));
errno = err_no;
return NULL;
}
......@@ -93,12 +103,16 @@ static void *posix_map(const char *name, uint64_t size, struct xseg *seg)
fd, 0 );
if (xseg == MAP_FAILED) {
err_no = errno;
XSEGLOG("Could not map segment: %s\n",
strerror_r(errno, errbuf, ERRSIZE));
errno = err_no;
return NULL;
}
close(fd);
err_no = errno;
return xseg;
}
......
......@@ -36,77 +36,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#define ERRSIZE 512
char errbuf[ERRSIZE];
static long posixfd_allocate(const char *name, uint64_t size)
{
int fd, r;
off_t lr;
fd = shm_open(name, O_RDWR | O_CREAT | O_EXCL, 0770);
if (fd < 0) {
XSEGLOG("Cannot create shared segment: %s\n",
strerror_r(errno, errbuf, ERRSIZE));
return fd;
}
lr = lseek(fd, size -1, SEEK_SET);
if (lr == (off_t)-1) {
close(fd);
XSEGLOG("Cannot seek into segment file: %s\n",
strerror_r(errno, errbuf, ERRSIZE));
return lr;
}
errbuf[0] = 0;
r = write(fd, errbuf, 1);
if (r != 1) {
close(fd);
XSEGLOG("Failed to set segment size: %s\n",
strerror_r(errno, errbuf, ERRSIZE));
return r;
}
close(fd);
return 0;
}
static long posixfd_deallocate(const char *name)
{
return shm_unlink(name);
}
static void *posixfd_map(const char *name, uint64_t size, struct xseg *seg)
{
struct xseg *xseg;
int fd;
fd = shm_open(name, O_RDWR, 0000);
if (fd < 0) {
XSEGLOG("Failed to open '%s' for mapping: %s\n",
name, strerror_r(errno, errbuf, ERRSIZE));
return NULL;
}
xseg = mmap ( XSEG_BASE_AS_PTR,
size,
PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_FIXED /* | MAP_LOCKED */,
fd, 0 );
if (xseg == MAP_FAILED) {
XSEGLOG("Could not map segment: %s\n",
strerror_r(errno, errbuf, ERRSIZE));
return NULL;
}
close(fd);
return xseg;
}
static void posixfd_unmap(void *ptr, uint64_t size)
{
struct xseg *xseg = ptr;
(void)munmap(xseg, size);
}
static struct posixfd_signal_desc * __get_signal_desc(struct xseg *xseg, xport portno)
{
struct xseg_port *port = xseg_get_port(xseg, portno);
......@@ -139,6 +68,7 @@ static int posixfd_local_signal_init(struct xseg *xseg, xport portno)
/* create or truncate POSIXFD+portno file */
int r, fd;
char filename[POSIXFD_DIR_LEN + POSIXFD_FILENAME_LEN + 1];
mode_t old_mode;
struct posixfd_signal_desc *psd = __get_signal_desc(xseg, portno);
if (!psd) {
......@@ -147,7 +77,7 @@ static int posixfd_local_signal_init(struct xseg *xseg, xport portno)
__get_filename(psd, filename);
retry:
r = mkfifo(filename, S_IRUSR|S_IWUSR);
r = mkfifo(filename, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
if (r < 0) {
if (errno == EEXIST) {
unlink(filename);
......@@ -188,19 +118,29 @@ static void posixfd_local_signal_quit(struct xseg *xseg, xport portno)
/*
* When this peer type is initialized, we must make sure the directory where the
* named pipes will be created, exist.
* named pipes will be created, exist. Also make sure that th setgid bit is set.
*/
static int posixfd_remote_signal_init(void)
{
int r;
mode_t oldumask;
oldumask = umask(0000);
r = mkdir(POSIXFD_DIR, 01777);
umask(oldumask);
struct stat st;
r = stat(POSIXFD_DIR, &st);
if (r < 0) {
if (errno != EEXIST) // && isdir(POSIXFD_DIR)
return -1;
return -1;
}
if (!S_ISDIR(st.st_mode)) {