Merge branch 'xseg-refactor' into debian
authorFilippos Giannakos <philipgian@grnet.gr>
Wed, 28 Nov 2012 14:03:17 +0000 (16:03 +0200)
committerFilippos Giannakos <philipgian@grnet.gr>
Wed, 28 Nov 2012 14:03:17 +0000 (16:03 +0200)
13 files changed:
xseg/archipelago
xseg/peers/user/Makefile
xseg/peers/user/dummy.c
xseg/peers/user/monitor.c
xseg/peers/user/mt-mapperd.c
xseg/peers/user/mt-pfiled.c
xseg/peers/user/mt-sosd.c
xseg/peers/user/mt-vlmcd.c
xseg/peers/user/peer.c
xseg/peers/user/peer.h
xseg/peers/user/xseg-tool.c
xseg/xseg/protocol.h
xseg/xseg/xseg.h

index 5b4ce88..3704048 100755 (executable)
@@ -5,9 +5,11 @@
 import os, sys, subprocess, argparse, time, psutil, signal, errno
 from subprocess import call, check_call
 
-BIN_PATH="/usr/bin"
-PIDFILE_PATH="/var/run/archipelago"
+DEFAULTS='/etc/default/archipelago'
 
+#system defaults
+PIDFILE_PATH="/var/run/archipelago"
+LOGS_PATH="/var/log/archipelago"
 DEVICE_PREFIX="/dev/xsegbd"
 XSEGBD_SYSFS="/sys/bus/xsegbd/"
 
@@ -15,22 +17,27 @@ CHARDEV_NAME="/dev/segdev"
 CHARDEV_MAJOR=60
 CHARDEV_MINOR=0
 
-SPEC="segdev:xsegbd:512:1024:12"
-#REQS=512
-#PORTS=512
+FILE_BLOCKER='mt-pfiled'
+RADOS_BLOCKER='mt-sosd'
+MAPPER='mt-mapperd'
+VLMC='st-vlmcd'
+BLOCKER=''
+
+available_storage = {'files': FILE_BLOCKER, 'rados': RADOS_BLOCKER}
+
+peers = []
+modules = ["xseg", "segdev", "xseg_posix", "xseg_pthread", "xseg_segdev"]
+xsegbd = "xsegbd"
 
-BPORT=""
+BPORT=0
 MPORT=1
 MBPORT=2
 VTOOL=3
 VPORT_START=204
 VPORT_END=403
 
-LOGS_PATH=""
-
-BLOCKER=""
-MAPPER=""
-VLMC=""
+#default config
+SPEC="segdev:xsegbd:512:1024:12"
 
 NR_OPS_BLOCKERB=""
 NR_OPS_BLOCKERM=""
@@ -42,86 +49,18 @@ VERBOSITY_BLOCKERM=""
 VERBOSITY_MAPPER=""
 VERBOSITY_VLMC=""
 
+
+#mt-pfiled specific options
 FILED_IMAGES=""
 FILED_MAPS=""
-
 PITHOS=""
 PITHOSMAPS=""
 
+#mt-sosd specific options
 RADOS_POOL_MAPS=""
 RADOS_POOL_BLOCKS=""
-STORAGE="File"
-
-try:
-    execfile(os.path.expanduser("/etc/default/archipelago"), globals())
-except:
-    print "Cannot open defaults"
-    sys.exit(1)
-
-if BLOCKER == "pfiled":
-    peer_blockerb = [BLOCKER,
-            ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
-             str(PITHOS), str(FILED_IMAGES), "-d",
-            "-f", os.path.join(PIDFILE_PATH, "blockerb.pid")],
-            "blockerb"]
-    peer_blockerm = [BLOCKER,
-            ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
-            str(PITHOSMAPS), str(FILED_MAPS), "-d",
-            "-f", os.path.join(PIDFILE_PATH, "blockerm.pid")],
-            "blockerm" ]
-elif BLOCKER == "mt-sosd":
-    peer_blockerb = [BLOCKER,
-            ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
-             "--pool", str(RADOS_POOL_BLOCKS), "-v", str(VERBOSITY_BLOCKERB),
-             "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
-             "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
-             "-t", "3"],
-             "blockerb"]
-    peer_blockerm = [BLOCKER,
-            ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
-             "--pool", str(RADOS_POOL_MAPS), "-v", str(VERBOSITY_BLOCKERM),
-             "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
-             "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
-             "-t", "3"],
-             "blockerm"]
-elif BLOCKER == "mt-pfiled":
-    peer_blockerb = [BLOCKER,
-            ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
-             "--pithos", str(PITHOS), "--archip", str(FILED_IMAGES),
-            "-v", str(VERBOSITY_BLOCKERB),
-             "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
-             "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
-             "-t", str(NR_OPS_BLOCKERB), "--prefix", "archip_"],
-             "blockerb"]
-    peer_blockerm = [BLOCKER,
-            ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
-             "--pithos", str(PITHOSMAPS), "--archip", str(FILED_MAPS),
-            "-v", str(VERBOSITY_BLOCKERM),
-             "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
-             "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
-             "-t", str(NR_OPS_BLOCKERM), "--prefix", "archip_"],
-             "blockerm"]
-else:
-        sys.exit(-1)
-
-peer_vlmcd = [VLMC,
-         ["-t" , "1", "-sp",  str(VPORT_START), "-ep", str(VPORT_END),
-          "-g", str(SPEC), "-n", str(NR_OPS_VLMC), "-bp", str(BPORT),
-          "-mp", str(MPORT), "-d", "-v", str(VERBOSITY_VLMC),
-          "--pidfile", os.path.join(PIDFILE_PATH, "vlmcd.pid"),
-          "-l", os.path.join(str(LOGS_PATH), "vlmcd.log")
-          ], "vlmcd"]
-peer_mapperd = [MAPPER,
-         ["-t" , "1", "-p",  str(MPORT), "-mbp", str(MBPORT),
-          "-g", str(SPEC), "-n", str(NR_OPS_MAPPER), "-bp", str(BPORT), "-d",
-          "--pidfile", os.path.join(PIDFILE_PATH, "mapperd.pid"),
-          "-v", str(VERBOSITY_MAPPER),
-          "-l", os.path.join(str(LOGS_PATH), "mapperd.log")
-          ], "mapperd"]
-
-peers = [peer_blockerb, peer_blockerm, peer_vlmcd, peer_mapperd]
-modules = ["xseg", "segdev", "xseg_posix", "xseg_pthread", "xseg_segdev"]
-xsegbd = "xsegbd"
+
+
 
 def check_conf():
     def isExec(file_path):
@@ -147,11 +86,6 @@ def check_conf():
         return True
 
 
-    for p in [BLOCKER, MAPPER, VLMC]:
-        if not validExec(p):
-            print p + "is not a valid executable"
-            return False
-
     if not LOGS_PATH:
         print "LOGS_PATH is not set"
         return False
@@ -164,7 +98,7 @@ def check_conf():
             print "LOGS_PATH "+str(LOGS_PATH)+" does not exist"
             return False
     except:
-        print "LOGS_PATH doesn't exist or is not set"
+        print "LOGS_PATH doesn't exist or is not a directory"
         return False
 
     try:
@@ -180,7 +114,7 @@ def check_conf():
             print "Cannot create " + str(PIDFILE_PATH)
             return False
     except:
-        print "PIDFILE_PATH or is not set"
+        print "PIDFILE_PATH is not set"
         return False
 
     splitted_spec = str(SPEC).split(':')
@@ -210,10 +144,10 @@ def check_conf():
              print "Verbosity missing"
          try:
              if (int(v) > 3 or int(v) < 0):
-                 print "Invalid verboisity " + str(v)
+                 print "Invalid verbosity " + str(v)
                  return False
          except:
-             print "Invalid verboisity " + str(v)
+             print "Invalid verbosity " + str(v)
              return False
 
     for n in [NR_OPS_BLOCKERB, NR_OPS_BLOCKERM, NR_OPS_VLMC, NR_OPS_MAPPER]:
@@ -239,8 +173,16 @@ def check_conf():
         return False
     if not validPort(VPORT_END, xseg_ports, "VPORT_END"):
         return False
-#if filed
-    if STORAGE == "File":
+
+    global BLOCKER
+    try:
+        BLOCKER = available_storage[str(STORAGE)]
+    except:
+        print "Invalid storage " + str(STORAGE)
+        print "Available storage: \"" + ', "'.join(available_storage) + "\""
+        return False
+
+    if STORAGE=="files":
         if FILED_IMAGES and not os.path.isdir(str(FILED_IMAGES)):
              print "FILED_IMAGES invalid"
              return False
@@ -254,11 +196,83 @@ def check_conf():
              print "PITHOSMAPS invalid"
              return False
 
+    for p in [BLOCKER, MAPPER, VLMC]:
+        if not validExec(p):
+            print p + "is not a valid executable"
+            return False
 
     return True
 
-if not check_conf():
-    sys.exit(1)
+def construct_peers():
+    if BLOCKER == "pfiled":
+        peer_blockerb = [BLOCKER,
+                ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
+                 str(PITHOS), str(FILED_IMAGES), "-d",
+                "-f", os.path.join(PIDFILE_PATH, "blockerb.pid")],
+                "blockerb"]
+        peer_blockerm = [BLOCKER,
+                ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
+                str(PITHOSMAPS), str(FILED_MAPS), "-d",
+                "-f", os.path.join(PIDFILE_PATH, "blockerm.pid")],
+                "blockerm" ]
+    elif BLOCKER == "mt-sosd":
+        peer_blockerb = [BLOCKER,
+                ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
+                 "--pool", str(RADOS_POOL_BLOCKS), "-v", str(VERBOSITY_BLOCKERB),
+                 "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
+                 "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
+                 "-t", "3"],
+                 "blockerb"]
+        peer_blockerm = [BLOCKER,
+                ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
+                 "--pool", str(RADOS_POOL_MAPS), "-v", str(VERBOSITY_BLOCKERM),
+                 "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
+                 "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
+                 "-t", "3"],
+                 "blockerm"]
+    elif BLOCKER == "mt-pfiled":
+        peer_blockerb = [BLOCKER,
+                ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
+                 "--pithos", str(PITHOS), "--archip", str(FILED_IMAGES),
+             "-v", str(VERBOSITY_BLOCKERB),
+                 "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
+                 "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
+                 "-t", str(NR_OPS_BLOCKERB), "--prefix", "archip_"],
+                 "blockerb"]
+        peer_blockerm = [BLOCKER,
+                ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
+                 "--pithos", str(PITHOSMAPS), "--archip", str(FILED_MAPS),
+             "-v", str(VERBOSITY_BLOCKERM),
+                 "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
+                 "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
+                 "-t", str(NR_OPS_BLOCKERM), "--prefix", "archip_"],
+                 "blockerm"]
+    else:
+            sys.exit(-1)
+
+    peer_vlmcd = [VLMC,
+             ["-t" , "1", "-sp",  str(VPORT_START), "-ep", str(VPORT_END),
+              "-g", str(SPEC), "-n", str(NR_OPS_VLMC), "-bp", str(BPORT),
+              "-mp", str(MPORT), "-d", "-v", str(VERBOSITY_VLMC),
+              "--pidfile", os.path.join(PIDFILE_PATH, "vlmcd.pid"),
+              "-l", os.path.join(str(LOGS_PATH), "vlmcd.log")
+              ], "vlmcd"]
+    peer_mapperd = [MAPPER,
+             ["-t" , "1", "-p",  str(MPORT), "-mbp", str(MBPORT),
+              "-g", str(SPEC), "-n", str(NR_OPS_MAPPER), "-bp", str(BPORT),
+              "--pidfile", os.path.join(PIDFILE_PATH, "mapperd.pid"),
+              "-v", str(VERBOSITY_MAPPER), "-d",
+              "-l", os.path.join(str(LOGS_PATH), "mapperd.log")
+              ], "mapperd"]
+
+    peers = []
+    peers.append(peer_blockerb)
+    peers.append(peer_blockerm)
+    peers.append(peer_vlmcd)
+    peers.append(peer_mapperd)
+
+    return peers
+
 
 def exclusive(fn):
     def exclusive_args(args):
@@ -304,7 +318,7 @@ def vlmc_showmapped(args):
 def loadrc(rc):
     try:
         if rc == None:
-            execfile(os.path.expanduser("/etc/default/archipelago"), globals())
+            execfile(os.path.expanduser(DEFAULTS), globals())
         else:
             execfile(rc, globals())
     except:
@@ -327,7 +341,6 @@ def load_module(name):
     if name in modules:
         return 0
     cmd = ["modprobe -v %s" % name]
-    print cmd
     try:
         check_call(cmd, shell=True);
     except Exception:
@@ -435,7 +448,7 @@ def make_segdev():
     except:
         pass
     cmd = ["mknod", str(CHARDEV_NAME), "c", str(CHARDEV_MAJOR), str(CHARDEV_MINOR)]
-    print cmd
+    print ' '.join(cmd)
     try:
         check_call(cmd, shell=False);
     except Exception:
@@ -468,15 +481,15 @@ def start(args):
     if make_segdev() < 0:
         stop(args)
         return -1
-    
+
     time.sleep(0.5)
-    
+
     if create_segment() < 0:
         stop(args)
         return -1
-    
+
     time.sleep(0.5)
-    
+
     for p in peers:
         if start_peer(p) < 0:
             stop(args)
@@ -500,7 +513,7 @@ def stop(args):
         stop_peer(p)
 
     remove_segdev()
-    
+
     for m in reversed(modules):
         unload_module(m)
     return 0
@@ -527,28 +540,32 @@ def status(args):
             print p[0] + " running"
             r += 1
     return r
-  
+
 def restart(args):
-    stop(args)
-    start(args)
+    r = stop(args)
+    if r < 0:
+        return r
+    return start(args)
 
 if __name__ == "__main__":
     # parse arguments and discpatch to the correct func
-    parser = argparse.ArgumentParser(description='vlmc tool')
+    parser = argparse.ArgumentParser(description='Archipelago tool')
     parser.add_argument('-c', '--config', type=str, nargs='?', help='config file')
     subparsers = parser.add_subparsers()
 
     start_parser = subparsers.add_parser('start', help='Start archipelago')
     start_parser.set_defaults(func=start)
-    
+
     stop_parser = subparsers.add_parser('stop', help='Stop archipelago')
     stop_parser.set_defaults(func=stop)
-    
+
     status_parser = subparsers.add_parser('status', help='Archipelago status')
     status_parser.set_defaults(func=status)
-    
+
     restart_parser = subparsers.add_parser('restart', help='Restart archipelago')
     restart_parser.set_defaults(func=restart)
 
     args = parser.parse_args()
+    loadrc(args.config)
+    peers = construct_peers()
     sys.exit(args.func(args))
index 9f26d82..ae68b94 100644 (file)
@@ -43,7 +43,7 @@ st-vlmcd: mt-vlmcd.c peer.c peer.h $(BASE)/xseg/protocol.h
        $(CC) $(CFLAGS) -o $@ $< peer.c $(INC) -L$(LIB) -lxseg 
 
 mt-mapperd: mt-mapperd.c peer.c peer.h $(BASE)/xseg/protocol.h 
-       $(CC) $(CFLAGS) -o $@ $< peer.c $(INC) -L$(LIB) -DST_THREADS -lxseg -lst
+       $(CC) $(CFLAGS) -o $@ $< peer.c $(INC) -L$(LIB) -DST_THREADS -lxseg -lst -lcrypto
 
 pfiled: pfiled.c common.c $(BASE)/xseg/xseg.h $(BASE)/xseg/protocol.h common.h
        $(CC) $(CFLAGS) -o $@ $< common.c  $(INC) -L$(LIB) -lxseg -lpthread
index fd512b6..05e3d6e 100644 (file)
@@ -19,6 +19,11 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        return 0;
 }
 
+void custom_peer_finalize(struct peerd *peer)
+{
+       return;
+}
+
 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
                enum dispatch_reason reason)
 {
index 75b044a..9e5b962 100644 (file)
@@ -184,3 +184,8 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
 
        return 0;
 }
+
+void custom_peer_finalize(struct peerd *peer)
+{
+       return;
+}
index c9f2465..065ab53 100644 (file)
 #include <xseg/protocol.h>
 #include <sys/stat.h>
 #include <fcntl.h>
-//#include <gcrypt.h>
 #include <errno.h>
 #include <sched.h>
 #include <sys/syscall.h>
+#include <openssl/sha.h>
 
-//GCRY_THREAD_OPTION_PTHREAD_IMPL;
+/* general mapper flags */
 #define MF_LOAD        (1 << 0)
 #define MF_EXCLUSIVE   (1 << 1)
 #define MF_FORCE       (1 << 2)
 #define MF_ARCHIP      (1 << 3)
 
+#ifndef SHA256_DIGEST_SIZE
 #define SHA256_DIGEST_SIZE 32
+#endif
 /* hex representation of sha256 value takes up double the sha256 size */
 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
 
+#define block_size (1<<22) //FIXME this should be defined here?
+
+/* transparency byte + max object len in disk */
+#define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
+
+/* Map header contains:
+ *     map version
+ *     volume size
+ */
+#define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
+
+
 #define MAPPER_PREFIX "archip_"
 #define MAPPER_PREFIX_LEN 7
 
-#define block_size (1<<22) //FIXME this should be defined here?
+#define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
+#define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
 
-/* transparency byte + max object len */
-#define objectsize_in_map (1 + XSEG_MAX_TARGETLEN)
+#if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
+#error         "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
+#endif
 
-/* volume size */
-#define mapheader_size (sizeof(uint64_t))
+#define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
 
-/* reserve enough space to hold _%u object idx */
-#define MAX_VOLUME_LEN (XSEG_MAX_TARGETLEN - 20)
+#if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
+#error         "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
+#endif
 
-#define MF_OBJECT_EXIST                (1 << 0)
-#define MF_OBJECT_COPYING      (1 << 1)
-#define MF_OBJECT_WRITING      (1 << 2)
-#define MF_OBJECT_DELETING     (1 << 3)
-#define MF_OBJECT_DELETED      (1 << 4)
-#define MF_OBJECT_DESTROYED    (1 << 5)
-
-#define MF_OBJECT_NOT_READY    (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
+#define MAX_VOLUME_SIZE \
+((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
 
-//char *magic_string = "This a magic string. Please hash me";
-//unsigned char magic_sha256[SHA256_DIGEST_SIZE];      /* sha256 hash value of magic string */
-//char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
 
-char *zero_block="zeroblock";
-#define ZERO_BLOCK_LEN 9 /* strlen(zero_block) */
+char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
+#define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
 
-//dispatch_internal mapper states
+/* dispatch_internal mapper states */
 enum mapper_state {
        ACCEPTED = 0,
        WRITING = 1,
@@ -66,28 +73,27 @@ enum mapper_state {
 
 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
 
+
+/* mapper object flags */
+#define MF_OBJECT_EXIST                (1 << 0)
+#define MF_OBJECT_COPYING      (1 << 1)
+#define MF_OBJECT_WRITING      (1 << 2)
+#define MF_OBJECT_DELETING     (1 << 3)
+#define MF_OBJECT_DESTROYED    (1 << 5)
+
+#define MF_OBJECT_NOT_READY    (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
+                                       MF_OBJECT_DELETING)
 struct map_node {
        uint32_t flags;
        uint32_t objectidx;
        uint32_t objectlen;
-       char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
+       char object[MAX_OBJECT_LEN + 1];        /* NULL terminated string */
        struct map *map;
        uint32_t ref;
        uint32_t waiters;
        st_cond_t cond;
 };
 
-#define MF_MAP_LOADING         (1 << 0)
-#define MF_MAP_DESTROYED       (1 << 1)
-#define MF_MAP_WRITING         (1 << 2)
-#define MF_MAP_DELETING                (1 << 3)
-#define MF_MAP_DROPPING_CACHE  (1 << 4)
-#define MF_MAP_EXCLUSIVE       (1 << 5)
-#define MF_MAP_OPENING         (1 << 6)
-#define MF_MAP_CLOSING         (1 << 7)
-
-#define MF_MAP_NOT_READY       (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
-                                       MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
 
 #define wait_on_pr(__pr, __condition__)        \
        while (__condition__){                  \
@@ -101,7 +107,8 @@ struct map_node {
        while (__condition__){                  \
                ta--;                           \
                __mn->waiters++;                \
-               XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
+               XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
+                       ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
                st_cond_wait(__mn->cond);       \
        }
 
@@ -109,7 +116,8 @@ struct map_node {
        while (__condition__){                  \
                ta--;                           \
                __map->waiters++;               \
-               XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
+               XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
+                                  __map, __map->volume, __map->waiters, ta); \
                st_cond_wait(__map->cond);      \
        }
 
@@ -117,7 +125,7 @@ struct map_node {
        do {                                    \
                if (!__get_mapper_io(pr)->active){\
                        ta++;                   \
-                       XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta); \
+                       XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
                        __get_mapper_io(pr)->active = 1;\
                        st_cond_signal(__pr->cond);     \
                }                               \
@@ -127,7 +135,8 @@ struct map_node {
        do {                                    \
                if (__map->waiters) {           \
                        ta += 1;                \
-                       XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
+                       XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
+                       ta: %u",  __map, __map->volume, __map->waiters, ta); \
                        __map->waiters--;       \
                        st_cond_signal(__map->cond);    \
                }                               \
@@ -137,18 +146,34 @@ struct map_node {
        do {                                    \
                if (__mn->waiters) {            \
                        ta += __mn->waiters;    \
-                       XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
+                       XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
+                       %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
                        __mn->waiters = 0;      \
                        st_cond_broadcast(__mn->cond);  \
                }                               \
        }while(0)
 
 
+/* map flags */
+#define MF_MAP_LOADING         (1 << 0)
+#define MF_MAP_DESTROYED       (1 << 1)
+#define MF_MAP_WRITING         (1 << 2)
+#define MF_MAP_DELETING                (1 << 3)
+#define MF_MAP_DROPPING_CACHE  (1 << 4)
+#define MF_MAP_EXCLUSIVE       (1 << 5)
+#define MF_MAP_OPENING         (1 << 6)
+#define MF_MAP_CLOSING         (1 << 7)
+#define MF_MAP_DELETED         (1 << 8)
+
+#define MF_MAP_NOT_READY       (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
+                                       MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
+
 struct map {
+       uint32_t version;
        uint32_t flags;
        uint64_t size;
        uint32_t volumelen;
-       char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
+       char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
        xhash_t *objects;       /* obj_index --> map_node */
        uint32_t ref;
        uint32_t waiters;
@@ -229,6 +254,52 @@ static uint32_t calc_nr_obj(struct xseg_request *req)
        return r;
 }
 
+/* hexlify function. 
+ * Unsafe. Doesn't check if data length is odd!
+ */
+static void hexlify(unsigned char *data, char *hex)
+{
+       int i;
+       for (i=0; i<SHA256_DIGEST_LENGTH; i++)
+               sprintf(hex+2*i, "%02x", data[i]);
+}
+
+static void unhexlify(char *hex, unsigned char *data)
+{
+       int i;
+       char c;
+       for (i=0; i<SHA256_DIGEST_LENGTH; i++){
+               data[i] = 0;
+               c = hex[2*i];
+               if (isxdigit(c)){
+                       if (isdigit(c)){
+                               c-= '0';
+                       }
+                       else {
+                               c = tolower(c);
+                               c = c-'a' + 10;
+                       }
+               }
+               else {
+                       c = 0;
+               }
+               data[i] |= (c << 4) & 0xF0;
+               c = hex[2*i+1];
+               if (isxdigit(c)){
+                       if (isdigit(c)){
+                               c-= '0';
+                       }
+                       else {
+                               c = tolower(c);
+                               c = c-'a' + 10;
+                       }
+               }
+               else {
+                       c = 0;
+               }
+               data[i] |= c & 0x0F;
+       }
+}
 /*
  * Maps handling functions
  */
@@ -337,7 +408,7 @@ static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
                goto out_err;
        }
 
-       r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
+       r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
        if (r < 0){
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
                                map->volume);
@@ -349,7 +420,7 @@ static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
                goto out_put;
        strncpy(reqtarget, map->volume, req->targetlen);
        req->op = X_CLOSE;
-       req->size = block_size;
+       req->size = 0;
        req->offset = 0;
        r = xseg_set_req_data(peer->xseg, req, pr);
        if (r < 0){
@@ -455,47 +526,54 @@ static int insert_object(struct map *map, struct map_node *mn)
  */
 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
 {
-       int i;
-       //hexlify sha256 value
-       for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
-               sprintf(mn->object+2*i, "%02x", buf[i]);
-       }
-
-       mn->object[SHA256_DIGEST_SIZE * 2] = 0;
-       mn->objectlen = SHA256_DIGEST_SIZE * 2;
+       hexlify(buf, mn->object);
+       mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
+       mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
        mn->flags = MF_OBJECT_EXIST;
 }
 
-static inline void map_to_object(struct map_node *mn, char *buf)
+static inline void map_to_object(struct map_node *mn, unsigned char *buf)
 {
        char c = buf[0];
        mn->flags = 0;
-       if (c)
+       if (c){
                mn->flags |= MF_OBJECT_EXIST;
-       memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
-       mn->object[XSEG_MAX_TARGETLEN] = 0;
-       mn->objectlen = strlen(mn->object);
+               strcpy(mn->object, MAPPER_PREFIX);
+               hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
+               mn->object[MAX_OBJECT_LEN] = 0;
+               mn->objectlen = strlen(mn->object);
+       }
+       else {
+               hexlify(buf+1, mn->object);
+               mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
+               mn->objectlen = strlen(mn->object);
+       }
+
 }
 
 static inline void object_to_map(char* buf, struct map_node *mn)
 {
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
-       memcpy(buf+1, mn->object, mn->objectlen);
-       /* zero out the rest of the buffer */
-       memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen);
+       if (buf[0]){
+               /* strip common prefix */
+               unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
+       }
+       else {
+               unhexlify(mn->object, (unsigned char *)(buf+1));
+       }
 }
 
 static inline void mapheader_to_map(struct map *m, char *buf)
 {
        uint64_t pos = 0;
-//     memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
-//     pos += SHA256_DIGEST_SIZE;
+       memcpy(buf + pos, &m->version, sizeof(m->version));
+       pos += sizeof(m->version);
        memcpy(buf + pos, &m->size, sizeof(m->size));
        pos += sizeof(m->size);
 }
 
 
-static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr, 
+static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
                                struct map *map, struct map_node *mn)
 {
        void *dummy;
@@ -700,7 +778,7 @@ out_fail:
        return NULL;
 }
 
-static int read_map (struct map *map, char *buf)
+static int read_map (struct map *map, unsigned char *buf)
 {
        char nulls[SHA256_DIGEST_SIZE];
        memset(nulls, 0, SHA256_DIGEST_SIZE);
@@ -711,7 +789,7 @@ static int read_map (struct map *map, char *buf)
                //read error;
                return -1;
        }
-       //type 1, our type, type 0 pithos map
+       //type 1, archip type, type 0 pithos map
        int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
        XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
        uint64_t pos;
@@ -719,6 +797,8 @@ static int read_map (struct map *map, char *buf)
        struct map_node *map_node;
        if (type) {
                uint64_t pos = 0;
+               map->version = *(uint32_t *) (buf + pos);
+               pos += sizeof(uint32_t);
                map->size = *(uint64_t *) (buf + pos);
                pos += sizeof(uint64_t);
                nr_objs = map->size / block_size;
@@ -781,7 +861,7 @@ static int load_map(struct peer_req *pr, struct map *map)
                xseg_put_request(peer->xseg, req, pr->portno);
                return -1;
        }
-       r = read_map(map, xseg_get_data(peer->xseg, req));
+       r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
        xseg_put_request(peer->xseg, req, pr->portno);
        return r;
 }
@@ -854,8 +934,10 @@ static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
 
        map->flags |= MF_MAP_OPENING;
        req = __open_map(pr, map, flags);
-       if (!req)
+       if (!req){
+               map->flags &= ~MF_MAP_OPENING;
                return -1;
+       }
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
        map->flags &= ~MF_MAP_OPENING;
        err = req->state & XS_FAILED;
@@ -875,6 +957,8 @@ static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, st
 {
        int r = 0;
        if (mn){
+               XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
+                               req, mn, mio);
                r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
                if (r == -XHASH_ERESIZE) {
                        xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
@@ -884,8 +968,13 @@ static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, st
                        mio->copyups_nodes = new_hashmap;
                        r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
                }
+               if (r < 0)
+                       XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
+                                       req, mn, mio);
        }
        else {
+               XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
+                               req, mio);
                r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
                if (r == -XHASH_ERESIZE) {
                        xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
@@ -895,6 +984,9 @@ static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, st
                        mio->copyups_nodes = new_hashmap;
                        r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
                }
+               if (r < 0)
+                       XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
+                                       req, mio);
        }
 out:
        return r;
@@ -904,8 +996,11 @@ static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_re
 {
        struct map_node *mn;
        int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
-       if (r < 0)
+       if (r < 0){
+               XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
                return NULL;
+       }
+       XSEGLOG2(&lc, I, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
        return mn;
 }
 
@@ -919,11 +1014,21 @@ static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *
        xport p;
 
        uint32_t newtargetlen;
-       char new_target[XSEG_MAX_TARGETLEN + 1];
-       strncpy(new_target, map->volume, map->volumelen);
-       sprintf(new_target + map->volumelen, "_%u", mn->objectidx);
-       new_target[XSEG_MAX_TARGETLEN] = 0;
-       newtargetlen = strlen(new_target);
+       char new_target[MAX_OBJECT_LEN + 1];
+       unsigned char sha[SHA256_DIGEST_SIZE];
+
+       strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
+
+       char tmp[XSEG_MAX_TARGETLEN + 1];
+       uint32_t tmplen;
+       strncpy(tmp, map->volume, map->volumelen);
+       sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
+       tmp[XSEG_MAX_TARGETLEN] = 0;
+       tmplen = strlen(tmp);
+       SHA256((unsigned char *)tmp, tmplen, sha);
+       hexlify(sha, new_target+MAPPER_PREFIX_LEN);
+       newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
+
 
        if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
                goto copyup_zeroblock;
@@ -957,10 +1062,12 @@ static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *
                goto out_put;
        }
        r = __set_copyup_node(mio, req, mn);
+       if (r < 0)
+               goto out_unset;
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort) {
                XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
-               goto out_unset;
+               goto out_mapper_unset;
        }
        xseg_signal(peer->xseg, p);
 //     mio->copyups++;
@@ -969,8 +1076,9 @@ static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *
        XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
        return req;
 
+out_mapper_unset:
+       __set_copyup_node(mio, req, NULL);
 out_unset:
-       r = __set_copyup_node(mio, req, NULL);
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
        xseg_put_request(peer->xseg, req, pr->portno);
@@ -990,6 +1098,8 @@ copyup_zeroblock:
        newmn.objectidx = mn->objectidx; 
        req = object_write(peer, pr, map, &newmn);
        r = __set_copyup_node(mio, req, mn);
+       if (r < 0)
+               return NULL;
        if (!req){
                XSEGLOG2(&lc, E, "Object write returned error for object %s"
                                "\n\t of map %s [%llu]",
@@ -1029,16 +1139,20 @@ static struct xseg_request * delete_object(struct peer_req *pr, struct map_node
                XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
                goto out_put;
        }
-       __set_copyup_node(mio, req, mn);
+       r = __set_copyup_node(mio, req, mn);
+       if (r < 0)
+               goto out_unset;
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
-               goto out_unset;
+               goto out_mapper_unset;
        }
        r = xseg_signal(peer->xseg, p);
        XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
        return req;
 
+out_mapper_unset:
+       __set_copyup_node(mio, req, NULL);
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
@@ -1076,6 +1190,7 @@ static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
                XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
                goto out_put;
        }
+       /* do not check return value. just make sure there is no node set */
        __set_copyup_node(mio, req, NULL);
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
@@ -1169,11 +1284,13 @@ static struct map * create_map(struct mapperd *mapper, char *name,
                strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
                m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
                m->volumelen = MAPPER_PREFIX_LEN + namelen;
+               m->version = 1; /* keep this hardcoded for now */
        }
        else {
                strncpy(m->volume, name, namelen);
                m->volume[namelen] = 0;
                m->volumelen = namelen;
+               m->version = 0; /* version 0 should be pithos maps */
        }
        m->flags = 0;
        m->objects = xhash_new(3, INTEGER); 
@@ -1212,11 +1329,22 @@ void deletion_cb(struct peer_req *pr, struct xseg_request *req)
        struct mapper_io *mio = __get_mapper_io(pr);
        struct map_node *mn = __get_copyup_node(mio, req);
 
+       __set_copyup_node(mio, req, NULL);
+
+       XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
        mio->del_pending--;
        if (req->state & XS_FAILED){
                mio->err = 1;
        }
-       signal_mapnode(mn);
+       if (mn){
+               XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
+                               mn, mn->object, mio, req);
+               signal_mapnode(mn);
+       }
+       else {
+               XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
+                               mio, req);
+       }
        xseg_put_request(peer->xseg, req, pr->portno);
        signal_pr(pr);
 }
@@ -1250,7 +1378,7 @@ void copyup_cb(struct peer_req *pr, struct xseg_request *req)
 
                struct map_node tmp;
                char *data = xseg_get_data(peer->xseg, req);
-               map_to_object(&tmp, data);
+               map_to_object(&tmp, (unsigned char *) data);
                mn->flags |= MF_OBJECT_EXIST;
                if (mn->flags != MF_OBJECT_EXIST){
                        XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
@@ -1380,7 +1508,7 @@ static int req2objs(struct peer_req *pr, struct map *map, int write)
                                if (mn->flags & MF_OBJECT_NOT_READY) {
                                        if (can_wait){
                                                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
-                                               if (mn->flags & MF_OBJECT_DELETED){
+                                               if (mn->flags & MF_OBJECT_DESTROYED){
                                                        mio->err = 1;
                                                }
                                                if (mio->err){
@@ -1491,10 +1619,11 @@ static int do_info(struct peer_req *pr, struct map *map)
 
 static int do_close(struct peer_req *pr, struct map *map)
 {
-//     struct peerd *peer = pr->peer;
-//     struct xseg_request *req;
-       if (map->flags & MF_MAP_EXCLUSIVE) 
-               close_map(pr, map);
+       if (map->flags & MF_MAP_EXCLUSIVE){
+               /* do not drop cache if close failed and map not deleted */
+               if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
+                       return -1;
+       }
        return do_dropcache(pr, map);
 }
 
@@ -1555,6 +1684,7 @@ wait_pending:
        }
        mio->cb = NULL;
        map->flags &= ~MF_MAP_DELETING;
+       map->flags |= MF_MAP_DELETED;
        XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
        return do_close(pr, map);
 }
@@ -1989,40 +2119,14 @@ int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
 {
        int i;
-//     unsigned char buf[SHA256_DIGEST_SIZE];
-//     unsigned char *zero;
-
-       //gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
-
-               /* Version check should be the very first call because it
-          makes sure that important subsystems are intialized. */
-//             gcry_check_version (NULL);
-
-               /* Disable secure memory.  */
-//             gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
-
-               /* Tell Libgcrypt that initialization has completed. */
-//             gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
-
-       /* calculate out magic sha hash value */
-//     gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
-
-       /* calculate zero block */
-       //FIXME check hash value
-//     zero = malloc(block_size);
-//     memset(zero, 0, block_size);
-//     gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
-//     for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
-//             sprintf(zero_block + 2*i, "%02x", buf[i]);
-//     printf("%s \n", zero_block);
-//     free(zero);
 
        //FIXME error checks
        struct mapperd *mapperd = malloc(sizeof(struct mapperd));
        peer->priv = mapperd;
        mapper = mapperd;
        mapper->hashmaps = xhash_new(3, STRING);
-       
+
+       printf("%llu \n", MAX_VOLUME_SIZE);
        for (i = 0; i < peer->nr_ops; i++) {
                struct mapper_io *mio = malloc(sizeof(struct mapper_io));
                mio->copyups_nodes = xhash_new(3, INTEGER);
@@ -2064,6 +2168,82 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        return 0;
 }
 
+/* FIXME this should not be here */
+int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
+{
+       struct xseg *xseg = peer->xseg;
+       xport portno_start = peer->portno_start;
+       xport portno_end = peer->portno_end;
+       struct peer_req *pr;
+       xport i;
+       int  r, c = 0;
+       struct xseg_request *req, *received;
+       xseg_prepare_wait(xseg, portno_start);
+       while(1) {
+               XSEGLOG2(&lc, D, "Attempting to check for reply");
+               c = 1;
+               while (c){
+                       c = 0;
+                       for (i = portno_start; i <= portno_end; i++) {
+                               received = xseg_receive(xseg, i, 0);
+                               if (received) {
+                                       c = 1;
+                                       r =  xseg_get_req_data(xseg, received, (void **) &pr);
+                                       if (r < 0 || !pr || received != expected_req){
+                                               XSEGLOG2(&lc, W, "Received request with no pr data\n");
+                                               xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+                                               if (p == NoPort){
+                                                       XSEGLOG2(&lc, W, "Could not respond stale request");
+                                                       xseg_put_request(xseg, received, portno_start);
+                                                       continue;
+                                               } else {
+                                                       xseg_signal(xseg, p);
+                                               }
+                                       } else {
+                                               xseg_cancel_wait(xseg, portno_start);
+                                               return 0;
+                                       }
+                               }
+                       }
+               }
+               xseg_wait_signal(xseg, 1000000UL);
+       }
+}
+
+
+void custom_peer_finalize(struct peerd *peer)
+{
+       struct mapperd *mapper = __get_mapperd(peer);
+       struct peer_req *pr = alloc_peer_req(peer);
+       if (!pr){
+               XSEGLOG2(&lc, E, "Cannot get peer request");
+               return;
+       }
+       int r;
+       struct map *map;
+       struct xseg_request *req;
+       xhash_iter_t it;
+       xhashidx key, val;
+       xhash_iter_init(mapper->hashmaps, &it);
+       while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
+               map = (struct map *)val;
+               if (!(map->flags & MF_MAP_EXCLUSIVE))
+                       continue;
+               map->flags |= MF_MAP_CLOSING;
+               req = __close_map(pr, map);
+               if (!req)
+                       continue;
+               wait_reply(peer, req);
+               if (!(req->state & XS_SERVED))
+                       XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
+               map->flags &= ~MF_MAP_CLOSING;
+               xseg_put_request(peer->xseg, req, pr->portno);
+       }
+       return;
+
+
+}
+
 void print_obj(struct map_node *mn)
 {
        fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
@@ -2077,10 +2257,11 @@ void print_map(struct map *m)
        uint64_t nr_objs = m->size/block_size;
        if (m->size % block_size)
                nr_objs++;
-       fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
+       fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
                        m->volume, m->volumelen, 
                        (unsigned long long) m->size, 
-                       (unsigned long long) nr_objs);
+                       (unsigned long long) nr_objs,
+                       m->version);
        uint64_t i;
        struct map_node *mn;
        if (nr_objs > 1000000) //FIXME to protect against invalid volume size
index 446452b..12a81d5 100644 (file)
@@ -755,6 +755,11 @@ out:
        return ret;
 }
 
+void custom_peer_finalize(struct peerd *peer)
+{
+       return;
+}
+
 static int safe_atoi(char *s)
 {
        long l;
index bffc26b..ad54788 100644 (file)
@@ -9,6 +9,9 @@
 
 #define MAX_POOL_NAME 64
 #define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
+#define RADOS_LOCK_NAME "RadosLock"
+//#define RADOS_LOCK_COOKIE "Cookie"
+#define RADOS_LOCK_COOKIE "foo"
 
 void custom_peer_usage()
 {
@@ -482,7 +485,8 @@ void * lock_op(void *arg)
                }
        }
 
-       while(rados_lock(rados->ioctx, rio->obj_name) < 0){
+       while(rados_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
+               C_LOCK_EXCLUSIVE, RADOS_LOCK_COOKIE, "", "", NULL, 0) < 0){
                if (pr->req->flags & XF_NOSYNC){
                        XSEGLOG2(&lc, E, "Rados lock failed for %s",
                                        rio->obj_name);
@@ -517,9 +521,14 @@ void * unlock_op(void *arg)
        struct rados_io *rio = (struct rados_io *) (pr->priv);
        int r;
        XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
-       r = rados_unlock(rados->ioctx, rio->obj_name);
+       if (pr->req->flags & XF_FORCE)
+               r = rados_break_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
+                       RADOS_LOCK_COOKIE);
+       else
+               r = rados_unlock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
+                       RADOS_LOCK_COOKIE);
        if (r < 0){
-               XSEGLOG2(&lc, E, "Rados unlock failed for %s", rio->obj_name);
+               XSEGLOG2(&lc, E, "Rados unlock failed for %s (r: %d)", rio->obj_name, r);
                fail(pr->peer, pr);
        }
        else {
@@ -635,6 +644,11 @@ int custom_arg_parse(int argc, const char *argv[])
        return 0;
 }
 
+void custom_peer_finalize(struct peerd *peer)
+{
+       return;
+}
+
 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
                enum dispatch_reason reason)
 {
index 56aee24..8d86123 100644 (file)
@@ -374,3 +374,8 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
 
        return 0;
 }
+
+void custom_peer_finalize(struct peerd *peer)
+{
+       return;
+}
index bea851f..2d2647e 100644 (file)
@@ -426,6 +426,7 @@ static void* thread_loop(void *arg)
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
        }
        wake_up_next_thread(peer);
+       custom_peer_finalize(peer);
        return NULL;
 }
 
@@ -447,6 +448,7 @@ int peerd_start_threads(struct peerd *peer)
 }
 #endif
 
+
 void defer_request(struct peerd *peer, struct peer_req *pr)
 {
        // assert canDefer(peer);
@@ -494,6 +496,7 @@ static int peerd_loop(struct peerd *peer)
                }
 #endif
        }
+       custom_peer_finalize(peer);
        xseg_quit_local_signal(xseg, peer->portno_start);
 #endif
        return 0;
index 2fe9c59..53335b0 100644 (file)
@@ -109,6 +109,7 @@ extern uint32_t ta;
 
 /* peer main function */
 int custom_peer_init(struct peerd *peer, int argc, char *argv[]);
+void custom_peer_finalize(struct peerd *peer);
 
 /* dispatch function */
 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
index 4b0c712..bb5c300 100644 (file)
@@ -452,6 +452,7 @@ int cmd_release(char *target)
        req->offset = 0;
        req->size = 0;
        req->op = X_CLOSE;
+       req->flags = XF_FORCE;
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
        if (p == NoPort)
                return -1;
index f65f506..7314c19 100644 (file)
@@ -18,13 +18,11 @@ struct xseg_reply_info {
 
 #define XSEG_MAX_TARGETLEN 256
 
-/*
 #if (XSEG_MAX_TARGETLEN < 64)
-#pragma message("XSEG_MAX_TARGETLEN should be at least 64!")
+#warning "XSEG_MAX_TARGETLEN should be at least 64!"
 #undef XSEG_MAX_TARGETLEN
 #define XSEG_MAX_TARGETLEN 64
 #endif
-*/
 
 struct xseg_reply_map_scatterlist {
        char target[XSEG_MAX_TARGETLEN];
index cd778f4..208e14b 100644 (file)
@@ -176,6 +176,7 @@ struct xseg_task {
 #define XF_NOSYNC (1 << 0)
 #define XF_FLUSH  (1 << 1)
 #define XF_FUA    (1 << 2)
+#define XF_FORCE  (1 << 3)
 
 /* STATES */
 #define XS_SERVED      (1 << 0)