import os, sys, subprocess, argparse, time, psutil, signal, errno
from subprocess import call, check_call
-BIN_PATH="/usr/bin"
-PIDFILE_PATH="/var/run/archipelago"
+DEFAULTS='/etc/default/archipelago'
+#system defaults
+PIDFILE_PATH="/var/run/archipelago"
+LOGS_PATH="/var/log/archipelago"
DEVICE_PREFIX="/dev/xsegbd"
XSEGBD_SYSFS="/sys/bus/xsegbd/"
CHARDEV_MAJOR=60
CHARDEV_MINOR=0
-SPEC="segdev:xsegbd:512:1024:12"
-#REQS=512
-#PORTS=512
+FILE_BLOCKER='mt-pfiled'
+RADOS_BLOCKER='mt-sosd'
+MAPPER='mt-mapperd'
+VLMC='st-vlmcd'
+BLOCKER=''
+
+available_storage = {'files': FILE_BLOCKER, 'rados': RADOS_BLOCKER}
+
+peers = []
+modules = ["xseg", "segdev", "xseg_posix", "xseg_pthread", "xseg_segdev"]
+xsegbd = "xsegbd"
-BPORT=""
+BPORT=0
MPORT=1
MBPORT=2
VTOOL=3
VPORT_START=204
VPORT_END=403
-LOGS_PATH=""
-
-BLOCKER=""
-MAPPER=""
-VLMC=""
+#default config
+SPEC="segdev:xsegbd:512:1024:12"
NR_OPS_BLOCKERB=""
NR_OPS_BLOCKERM=""
VERBOSITY_MAPPER=""
VERBOSITY_VLMC=""
+
+#mt-pfiled specific options
FILED_IMAGES=""
FILED_MAPS=""
-
PITHOS=""
PITHOSMAPS=""
+#mt-sosd specific options
RADOS_POOL_MAPS=""
RADOS_POOL_BLOCKS=""
-STORAGE="File"
-
-try:
- execfile(os.path.expanduser("/etc/default/archipelago"), globals())
-except:
- print "Cannot open defaults"
- sys.exit(1)
-
-if BLOCKER == "pfiled":
- peer_blockerb = [BLOCKER,
- ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
- str(PITHOS), str(FILED_IMAGES), "-d",
- "-f", os.path.join(PIDFILE_PATH, "blockerb.pid")],
- "blockerb"]
- peer_blockerm = [BLOCKER,
- ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
- str(PITHOSMAPS), str(FILED_MAPS), "-d",
- "-f", os.path.join(PIDFILE_PATH, "blockerm.pid")],
- "blockerm" ]
-elif BLOCKER == "mt-sosd":
- peer_blockerb = [BLOCKER,
- ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
- "--pool", str(RADOS_POOL_BLOCKS), "-v", str(VERBOSITY_BLOCKERB),
- "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
- "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
- "-t", "3"],
- "blockerb"]
- peer_blockerm = [BLOCKER,
- ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
- "--pool", str(RADOS_POOL_MAPS), "-v", str(VERBOSITY_BLOCKERM),
- "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
- "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
- "-t", "3"],
- "blockerm"]
-elif BLOCKER == "mt-pfiled":
- peer_blockerb = [BLOCKER,
- ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
- "--pithos", str(PITHOS), "--archip", str(FILED_IMAGES),
- "-v", str(VERBOSITY_BLOCKERB),
- "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
- "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
- "-t", str(NR_OPS_BLOCKERB), "--prefix", "archip_"],
- "blockerb"]
- peer_blockerm = [BLOCKER,
- ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
- "--pithos", str(PITHOSMAPS), "--archip", str(FILED_MAPS),
- "-v", str(VERBOSITY_BLOCKERM),
- "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
- "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
- "-t", str(NR_OPS_BLOCKERM), "--prefix", "archip_"],
- "blockerm"]
-else:
- sys.exit(-1)
-
-peer_vlmcd = [VLMC,
- ["-t" , "1", "-sp", str(VPORT_START), "-ep", str(VPORT_END),
- "-g", str(SPEC), "-n", str(NR_OPS_VLMC), "-bp", str(BPORT),
- "-mp", str(MPORT), "-d", "-v", str(VERBOSITY_VLMC),
- "--pidfile", os.path.join(PIDFILE_PATH, "vlmcd.pid"),
- "-l", os.path.join(str(LOGS_PATH), "vlmcd.log")
- ], "vlmcd"]
-peer_mapperd = [MAPPER,
- ["-t" , "1", "-p", str(MPORT), "-mbp", str(MBPORT),
- "-g", str(SPEC), "-n", str(NR_OPS_MAPPER), "-bp", str(BPORT), "-d",
- "--pidfile", os.path.join(PIDFILE_PATH, "mapperd.pid"),
- "-v", str(VERBOSITY_MAPPER),
- "-l", os.path.join(str(LOGS_PATH), "mapperd.log")
- ], "mapperd"]
-
-peers = [peer_blockerb, peer_blockerm, peer_vlmcd, peer_mapperd]
-modules = ["xseg", "segdev", "xseg_posix", "xseg_pthread", "xseg_segdev"]
-xsegbd = "xsegbd"
+
+
def check_conf():
def isExec(file_path):
return True
- for p in [BLOCKER, MAPPER, VLMC]:
- if not validExec(p):
- print p + "is not a valid executable"
- return False
-
if not LOGS_PATH:
print "LOGS_PATH is not set"
return False
print "LOGS_PATH "+str(LOGS_PATH)+" does not exist"
return False
except:
- print "LOGS_PATH doesn't exist or is not set"
+ print "LOGS_PATH doesn't exist or is not a directory"
return False
try:
print "Cannot create " + str(PIDFILE_PATH)
return False
except:
- print "PIDFILE_PATH or is not set"
+ print "PIDFILE_PATH is not set"
return False
splitted_spec = str(SPEC).split(':')
print "Verbosity missing"
try:
if (int(v) > 3 or int(v) < 0):
- print "Invalid verboisity " + str(v)
+ print "Invalid verbosity " + str(v)
return False
except:
- print "Invalid verboisity " + str(v)
+ print "Invalid verbosity " + str(v)
return False
for n in [NR_OPS_BLOCKERB, NR_OPS_BLOCKERM, NR_OPS_VLMC, NR_OPS_MAPPER]:
return False
if not validPort(VPORT_END, xseg_ports, "VPORT_END"):
return False
-#if filed
- if STORAGE == "File":
+
+ global BLOCKER
+ try:
+ BLOCKER = available_storage[str(STORAGE)]
+ except:
+ print "Invalid storage " + str(STORAGE)
+ print "Available storage: \"" + ', "'.join(available_storage) + "\""
+ return False
+
+ if STORAGE=="files":
if FILED_IMAGES and not os.path.isdir(str(FILED_IMAGES)):
print "FILED_IMAGES invalid"
return False
print "PITHOSMAPS invalid"
return False
+ for p in [BLOCKER, MAPPER, VLMC]:
+ if not validExec(p):
+ print p + "is not a valid executable"
+ return False
return True
-if not check_conf():
- sys.exit(1)
+def construct_peers():
+ if BLOCKER == "pfiled":
+ peer_blockerb = [BLOCKER,
+ ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
+ str(PITHOS), str(FILED_IMAGES), "-d",
+ "-f", os.path.join(PIDFILE_PATH, "blockerb.pid")],
+ "blockerb"]
+ peer_blockerm = [BLOCKER,
+ ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
+ str(PITHOSMAPS), str(FILED_MAPS), "-d",
+ "-f", os.path.join(PIDFILE_PATH, "blockerm.pid")],
+ "blockerm" ]
+ elif BLOCKER == "mt-sosd":
+ peer_blockerb = [BLOCKER,
+ ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
+ "--pool", str(RADOS_POOL_BLOCKS), "-v", str(VERBOSITY_BLOCKERB),
+ "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
+ "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
+ "-t", "3"],
+ "blockerb"]
+ peer_blockerm = [BLOCKER,
+ ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
+ "--pool", str(RADOS_POOL_MAPS), "-v", str(VERBOSITY_BLOCKERM),
+ "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
+ "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
+ "-t", "3"],
+ "blockerm"]
+ elif BLOCKER == "mt-pfiled":
+ peer_blockerb = [BLOCKER,
+ ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
+ "--pithos", str(PITHOS), "--archip", str(FILED_IMAGES),
+ "-v", str(VERBOSITY_BLOCKERB),
+ "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
+ "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
+ "-t", str(NR_OPS_BLOCKERB), "--prefix", "archip_"],
+ "blockerb"]
+ peer_blockerm = [BLOCKER,
+ ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
+ "--pithos", str(PITHOSMAPS), "--archip", str(FILED_MAPS),
+ "-v", str(VERBOSITY_BLOCKERM),
+ "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
+ "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
+ "-t", str(NR_OPS_BLOCKERM), "--prefix", "archip_"],
+ "blockerm"]
+ else:
+ sys.exit(-1)
+
+ peer_vlmcd = [VLMC,
+ ["-t" , "1", "-sp", str(VPORT_START), "-ep", str(VPORT_END),
+ "-g", str(SPEC), "-n", str(NR_OPS_VLMC), "-bp", str(BPORT),
+ "-mp", str(MPORT), "-d", "-v", str(VERBOSITY_VLMC),
+ "--pidfile", os.path.join(PIDFILE_PATH, "vlmcd.pid"),
+ "-l", os.path.join(str(LOGS_PATH), "vlmcd.log")
+ ], "vlmcd"]
+ peer_mapperd = [MAPPER,
+ ["-t" , "1", "-p", str(MPORT), "-mbp", str(MBPORT),
+ "-g", str(SPEC), "-n", str(NR_OPS_MAPPER), "-bp", str(BPORT),
+ "--pidfile", os.path.join(PIDFILE_PATH, "mapperd.pid"),
+ "-v", str(VERBOSITY_MAPPER), "-d",
+ "-l", os.path.join(str(LOGS_PATH), "mapperd.log")
+ ], "mapperd"]
+
+ peers = []
+ peers.append(peer_blockerb)
+ peers.append(peer_blockerm)
+ peers.append(peer_vlmcd)
+ peers.append(peer_mapperd)
+
+ return peers
+
def exclusive(fn):
def exclusive_args(args):
def loadrc(rc):
try:
if rc == None:
- execfile(os.path.expanduser("/etc/default/archipelago"), globals())
+ execfile(os.path.expanduser(DEFAULTS), globals())
else:
execfile(rc, globals())
except:
if name in modules:
return 0
cmd = ["modprobe -v %s" % name]
- print cmd
try:
check_call(cmd, shell=True);
except Exception:
except:
pass
cmd = ["mknod", str(CHARDEV_NAME), "c", str(CHARDEV_MAJOR), str(CHARDEV_MINOR)]
- print cmd
+ print ' '.join(cmd)
try:
check_call(cmd, shell=False);
except Exception:
if make_segdev() < 0:
stop(args)
return -1
-
+
time.sleep(0.5)
-
+
if create_segment() < 0:
stop(args)
return -1
-
+
time.sleep(0.5)
-
+
for p in peers:
if start_peer(p) < 0:
stop(args)
stop_peer(p)
remove_segdev()
-
+
for m in reversed(modules):
unload_module(m)
return 0
print p[0] + " running"
r += 1
return r
-
+
def restart(args):
- stop(args)
- start(args)
+ r = stop(args)
+ if r < 0:
+ return r
+ return start(args)
if __name__ == "__main__":
# parse arguments and discpatch to the correct func
- parser = argparse.ArgumentParser(description='vlmc tool')
+ parser = argparse.ArgumentParser(description='Archipelago tool')
parser.add_argument('-c', '--config', type=str, nargs='?', help='config file')
subparsers = parser.add_subparsers()
start_parser = subparsers.add_parser('start', help='Start archipelago')
start_parser.set_defaults(func=start)
-
+
stop_parser = subparsers.add_parser('stop', help='Stop archipelago')
stop_parser.set_defaults(func=stop)
-
+
status_parser = subparsers.add_parser('status', help='Archipelago status')
status_parser.set_defaults(func=status)
-
+
restart_parser = subparsers.add_parser('restart', help='Restart archipelago')
restart_parser.set_defaults(func=restart)
args = parser.parse_args()
+ loadrc(args.config)
+ peers = construct_peers()
sys.exit(args.func(args))
#include <xseg/protocol.h>
#include <sys/stat.h>
#include <fcntl.h>
-//#include <gcrypt.h>
#include <errno.h>
#include <sched.h>
#include <sys/syscall.h>
+#include <openssl/sha.h>
-//GCRY_THREAD_OPTION_PTHREAD_IMPL;
+/* general mapper flags */
#define MF_LOAD (1 << 0)
#define MF_EXCLUSIVE (1 << 1)
#define MF_FORCE (1 << 2)
#define MF_ARCHIP (1 << 3)
+#ifndef SHA256_DIGEST_SIZE
#define SHA256_DIGEST_SIZE 32
+#endif
/* hex representation of sha256 value takes up double the sha256 size */
#define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
+#define block_size (1<<22) //FIXME this should be defined here?
+
+/* transparency byte + max object len in disk */
+#define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
+
+/* Map header contains:
+ * map version
+ * volume size
+ */
+#define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
+
+
#define MAPPER_PREFIX "archip_"
#define MAPPER_PREFIX_LEN 7
-#define block_size (1<<22) //FIXME this should be defined here?
+#define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
+#define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
-/* transparency byte + max object len */
-#define objectsize_in_map (1 + XSEG_MAX_TARGETLEN)
+#if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
+#error "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
+#endif
-/* volume size */
-#define mapheader_size (sizeof(uint64_t))
+#define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
-/* reserve enough space to hold _%u object idx */
-#define MAX_VOLUME_LEN (XSEG_MAX_TARGETLEN - 20)
+#if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
+#error "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
+#endif
-#define MF_OBJECT_EXIST (1 << 0)
-#define MF_OBJECT_COPYING (1 << 1)
-#define MF_OBJECT_WRITING (1 << 2)
-#define MF_OBJECT_DELETING (1 << 3)
-#define MF_OBJECT_DELETED (1 << 4)
-#define MF_OBJECT_DESTROYED (1 << 5)
-
-#define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
+#define MAX_VOLUME_SIZE \
+((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
-//char *magic_string = "This a magic string. Please hash me";
-//unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
-//char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
-char *zero_block="zeroblock";
-#define ZERO_BLOCK_LEN 9 /* strlen(zero_block) */
+char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
+#define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
-//dispatch_internal mapper states
+/* dispatch_internal mapper states */
enum mapper_state {
ACCEPTED = 0,
WRITING = 1,
typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
+
+/* mapper object flags */
+#define MF_OBJECT_EXIST (1 << 0)
+#define MF_OBJECT_COPYING (1 << 1)
+#define MF_OBJECT_WRITING (1 << 2)
+#define MF_OBJECT_DELETING (1 << 3)
+#define MF_OBJECT_DESTROYED (1 << 5)
+
+#define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
+ MF_OBJECT_DELETING)
struct map_node {
uint32_t flags;
uint32_t objectidx;
uint32_t objectlen;
- char object[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
+ char object[MAX_OBJECT_LEN + 1]; /* NULL terminated string */
struct map *map;
uint32_t ref;
uint32_t waiters;
st_cond_t cond;
};
-#define MF_MAP_LOADING (1 << 0)
-#define MF_MAP_DESTROYED (1 << 1)
-#define MF_MAP_WRITING (1 << 2)
-#define MF_MAP_DELETING (1 << 3)
-#define MF_MAP_DROPPING_CACHE (1 << 4)
-#define MF_MAP_EXCLUSIVE (1 << 5)
-#define MF_MAP_OPENING (1 << 6)
-#define MF_MAP_CLOSING (1 << 7)
-
-#define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
- MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
#define wait_on_pr(__pr, __condition__) \
while (__condition__){ \
while (__condition__){ \
ta--; \
__mn->waiters++; \
- XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
+ XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
+ ta: %u", __mn, __mn->object, __mn->waiters, ta); \
st_cond_wait(__mn->cond); \
}
while (__condition__){ \
ta--; \
__map->waiters++; \
- XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u", __map, __map->volume, __map->waiters, ta); \
+ XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
+ __map, __map->volume, __map->waiters, ta); \
st_cond_wait(__map->cond); \
}
do { \
if (!__get_mapper_io(pr)->active){\
ta++; \
- XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta); \
+ XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta);\
__get_mapper_io(pr)->active = 1;\
st_cond_signal(__pr->cond); \
} \
do { \
if (__map->waiters) { \
ta += 1; \
- XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u", __map, __map->volume, __map->waiters, ta); \
+ XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
+ ta: %u", __map, __map->volume, __map->waiters, ta); \
__map->waiters--; \
st_cond_signal(__map->cond); \
} \
do { \
if (__mn->waiters) { \
ta += __mn->waiters; \
- XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
+ XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
+ %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
__mn->waiters = 0; \
st_cond_broadcast(__mn->cond); \
} \
}while(0)
+/* map flags */
+#define MF_MAP_LOADING (1 << 0)
+#define MF_MAP_DESTROYED (1 << 1)
+#define MF_MAP_WRITING (1 << 2)
+#define MF_MAP_DELETING (1 << 3)
+#define MF_MAP_DROPPING_CACHE (1 << 4)
+#define MF_MAP_EXCLUSIVE (1 << 5)
+#define MF_MAP_OPENING (1 << 6)
+#define MF_MAP_CLOSING (1 << 7)
+#define MF_MAP_DELETED (1 << 8)
+
+#define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
+ MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
+
struct map {
+ uint32_t version;
uint32_t flags;
uint64_t size;
uint32_t volumelen;
- char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
+ char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
xhash_t *objects; /* obj_index --> map_node */
uint32_t ref;
uint32_t waiters;
return r;
}
+/* hexlify function.
+ * Unsafe. Doesn't check if data length is odd!
+ */
+static void hexlify(unsigned char *data, char *hex)
+{
+ int i;
+ for (i=0; i<SHA256_DIGEST_LENGTH; i++)
+ sprintf(hex+2*i, "%02x", data[i]);
+}
+
+static void unhexlify(char *hex, unsigned char *data)
+{
+ int i;
+ char c;
+ for (i=0; i<SHA256_DIGEST_LENGTH; i++){
+ data[i] = 0;
+ c = hex[2*i];
+ if (isxdigit(c)){
+ if (isdigit(c)){
+ c-= '0';
+ }
+ else {
+ c = tolower(c);
+ c = c-'a' + 10;
+ }
+ }
+ else {
+ c = 0;
+ }
+ data[i] |= (c << 4) & 0xF0;
+ c = hex[2*i+1];
+ if (isxdigit(c)){
+ if (isdigit(c)){
+ c-= '0';
+ }
+ else {
+ c = tolower(c);
+ c = c-'a' + 10;
+ }
+ }
+ else {
+ c = 0;
+ }
+ data[i] |= c & 0x0F;
+ }
+}
/*
* Maps handling functions
*/
goto out_err;
}
- r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
+ r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
if (r < 0){
XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
map->volume);
goto out_put;
strncpy(reqtarget, map->volume, req->targetlen);
req->op = X_CLOSE;
- req->size = block_size;
+ req->size = 0;
req->offset = 0;
r = xseg_set_req_data(peer->xseg, req, pr);
if (r < 0){
*/
static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
{
- int i;
- //hexlify sha256 value
- for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
- sprintf(mn->object+2*i, "%02x", buf[i]);
- }
-
- mn->object[SHA256_DIGEST_SIZE * 2] = 0;
- mn->objectlen = SHA256_DIGEST_SIZE * 2;
+ hexlify(buf, mn->object);
+ mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
+ mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
mn->flags = MF_OBJECT_EXIST;
}
-static inline void map_to_object(struct map_node *mn, char *buf)
+static inline void map_to_object(struct map_node *mn, unsigned char *buf)
{
char c = buf[0];
mn->flags = 0;
- if (c)
+ if (c){
mn->flags |= MF_OBJECT_EXIST;
- memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
- mn->object[XSEG_MAX_TARGETLEN] = 0;
- mn->objectlen = strlen(mn->object);
+ strcpy(mn->object, MAPPER_PREFIX);
+ hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
+ mn->object[MAX_OBJECT_LEN] = 0;
+ mn->objectlen = strlen(mn->object);
+ }
+ else {
+ hexlify(buf+1, mn->object);
+ mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
+ mn->objectlen = strlen(mn->object);
+ }
+
}
static inline void object_to_map(char* buf, struct map_node *mn)
{
buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
- memcpy(buf+1, mn->object, mn->objectlen);
- /* zero out the rest of the buffer */
- memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen);
+ if (buf[0]){
+ /* strip common prefix */
+ unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
+ }
+ else {
+ unhexlify(mn->object, (unsigned char *)(buf+1));
+ }
}
static inline void mapheader_to_map(struct map *m, char *buf)
{
uint64_t pos = 0;
-// memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
-// pos += SHA256_DIGEST_SIZE;
+ memcpy(buf + pos, &m->version, sizeof(m->version));
+ pos += sizeof(m->version);
memcpy(buf + pos, &m->size, sizeof(m->size));
pos += sizeof(m->size);
}
-static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
+static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
struct map *map, struct map_node *mn)
{
void *dummy;
return NULL;
}
-static int read_map (struct map *map, char *buf)
+static int read_map (struct map *map, unsigned char *buf)
{
char nulls[SHA256_DIGEST_SIZE];
memset(nulls, 0, SHA256_DIGEST_SIZE);
//read error;
return -1;
}
- //type 1, our type, type 0 pithos map
+ //type 1, archip type, type 0 pithos map
int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
uint64_t pos;
struct map_node *map_node;
if (type) {
uint64_t pos = 0;
+ map->version = *(uint32_t *) (buf + pos);
+ pos += sizeof(uint32_t);
map->size = *(uint64_t *) (buf + pos);
pos += sizeof(uint64_t);
nr_objs = map->size / block_size;
xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
- r = read_map(map, xseg_get_data(peer->xseg, req));
+ r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
xseg_put_request(peer->xseg, req, pr->portno);
return r;
}
map->flags |= MF_MAP_OPENING;
req = __open_map(pr, map, flags);
- if (!req)
+ if (!req){
+ map->flags &= ~MF_MAP_OPENING;
return -1;
+ }
wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
map->flags &= ~MF_MAP_OPENING;
err = req->state & XS_FAILED;
{
int r = 0;
if (mn){
+ XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
+ req, mn, mio);
r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
if (r == -XHASH_ERESIZE) {
xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
mio->copyups_nodes = new_hashmap;
r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
}
+ if (r < 0)
+ XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
+ req, mn, mio);
}
else {
+ XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
+ req, mio);
r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
if (r == -XHASH_ERESIZE) {
xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
mio->copyups_nodes = new_hashmap;
r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
}
+ if (r < 0)
+ XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
+ req, mio);
}
out:
return r;
{
struct map_node *mn;
int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
- if (r < 0)
+ if (r < 0){
+ XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
return NULL;
+ }
+ XSEGLOG2(&lc, I, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
return mn;
}
xport p;
uint32_t newtargetlen;
- char new_target[XSEG_MAX_TARGETLEN + 1];
- strncpy(new_target, map->volume, map->volumelen);
- sprintf(new_target + map->volumelen, "_%u", mn->objectidx);
- new_target[XSEG_MAX_TARGETLEN] = 0;
- newtargetlen = strlen(new_target);
+ char new_target[MAX_OBJECT_LEN + 1];
+ unsigned char sha[SHA256_DIGEST_SIZE];
+
+ strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
+
+ char tmp[XSEG_MAX_TARGETLEN + 1];
+ uint32_t tmplen;
+ strncpy(tmp, map->volume, map->volumelen);
+ sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
+ tmp[XSEG_MAX_TARGETLEN] = 0;
+ tmplen = strlen(tmp);
+ SHA256((unsigned char *)tmp, tmplen, sha);
+ hexlify(sha, new_target+MAPPER_PREFIX_LEN);
+ newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
+
if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
goto copyup_zeroblock;
goto out_put;
}
r = __set_copyup_node(mio, req, mn);
+ if (r < 0)
+ goto out_unset;
p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort) {
XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
- goto out_unset;
+ goto out_mapper_unset;
}
xseg_signal(peer->xseg, p);
// mio->copyups++;
XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
return req;
+out_mapper_unset:
+ __set_copyup_node(mio, req, NULL);
out_unset:
- r = __set_copyup_node(mio, req, NULL);
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
xseg_put_request(peer->xseg, req, pr->portno);
newmn.objectidx = mn->objectidx;
req = object_write(peer, pr, map, &newmn);
r = __set_copyup_node(mio, req, mn);
+ if (r < 0)
+ return NULL;
if (!req){
XSEGLOG2(&lc, E, "Object write returned error for object %s"
"\n\t of map %s [%llu]",
XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
goto out_put;
}
- __set_copyup_node(mio, req, mn);
+ r = __set_copyup_node(mio, req, mn);
+ if (r < 0)
+ goto out_unset;
xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
- goto out_unset;
+ goto out_mapper_unset;
}
r = xseg_signal(peer->xseg, p);
XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
return req;
+out_mapper_unset:
+ __set_copyup_node(mio, req, NULL);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
goto out_put;
}
+ /* do not check return value. just make sure there is no node set */
__set_copyup_node(mio, req, NULL);
xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
m->volumelen = MAPPER_PREFIX_LEN + namelen;
+ m->version = 1; /* keep this hardcoded for now */
}
else {
strncpy(m->volume, name, namelen);
m->volume[namelen] = 0;
m->volumelen = namelen;
+ m->version = 0; /* version 0 should be pithos maps */
}
m->flags = 0;
m->objects = xhash_new(3, INTEGER);
struct mapper_io *mio = __get_mapper_io(pr);
struct map_node *mn = __get_copyup_node(mio, req);
+ __set_copyup_node(mio, req, NULL);
+
+ XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
mio->del_pending--;
if (req->state & XS_FAILED){
mio->err = 1;
}
- signal_mapnode(mn);
+ if (mn){
+ XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
+ mn, mn->object, mio, req);
+ signal_mapnode(mn);
+ }
+ else {
+ XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
+ mio, req);
+ }
xseg_put_request(peer->xseg, req, pr->portno);
signal_pr(pr);
}
struct map_node tmp;
char *data = xseg_get_data(peer->xseg, req);
- map_to_object(&tmp, data);
+ map_to_object(&tmp, (unsigned char *) data);
mn->flags |= MF_OBJECT_EXIST;
if (mn->flags != MF_OBJECT_EXIST){
XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
if (mn->flags & MF_OBJECT_NOT_READY) {
if (can_wait){
wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
- if (mn->flags & MF_OBJECT_DELETED){
+ if (mn->flags & MF_OBJECT_DESTROYED){
mio->err = 1;
}
if (mio->err){
static int do_close(struct peer_req *pr, struct map *map)
{
-// struct peerd *peer = pr->peer;
-// struct xseg_request *req;
- if (map->flags & MF_MAP_EXCLUSIVE)
- close_map(pr, map);
+ if (map->flags & MF_MAP_EXCLUSIVE){
+ /* do not drop cache if close failed and map not deleted */
+ if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
+ return -1;
+ }
return do_dropcache(pr, map);
}
}
mio->cb = NULL;
map->flags &= ~MF_MAP_DELETING;
+ map->flags |= MF_MAP_DELETED;
XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
return do_close(pr, map);
}
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
{
int i;
-// unsigned char buf[SHA256_DIGEST_SIZE];
-// unsigned char *zero;
-
- //gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
-
- /* Version check should be the very first call because it
- makes sure that important subsystems are intialized. */
-// gcry_check_version (NULL);
-
- /* Disable secure memory. */
-// gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
-
- /* Tell Libgcrypt that initialization has completed. */
-// gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
-
- /* calculate out magic sha hash value */
-// gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
-
- /* calculate zero block */
- //FIXME check hash value
-// zero = malloc(block_size);
-// memset(zero, 0, block_size);
-// gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
-// for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
-// sprintf(zero_block + 2*i, "%02x", buf[i]);
-// printf("%s \n", zero_block);
-// free(zero);
//FIXME error checks
struct mapperd *mapperd = malloc(sizeof(struct mapperd));
peer->priv = mapperd;
mapper = mapperd;
mapper->hashmaps = xhash_new(3, STRING);
-
+
+ printf("%llu \n", MAX_VOLUME_SIZE);
for (i = 0; i < peer->nr_ops; i++) {
struct mapper_io *mio = malloc(sizeof(struct mapper_io));
mio->copyups_nodes = xhash_new(3, INTEGER);
return 0;
}
+/* FIXME this should not be here */
+int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
+{
+ struct xseg *xseg = peer->xseg;
+ xport portno_start = peer->portno_start;
+ xport portno_end = peer->portno_end;
+ struct peer_req *pr;
+ xport i;
+ int r, c = 0;
+ struct xseg_request *req, *received;
+ xseg_prepare_wait(xseg, portno_start);
+ while(1) {
+ XSEGLOG2(&lc, D, "Attempting to check for reply");
+ c = 1;
+ while (c){
+ c = 0;
+ for (i = portno_start; i <= portno_end; i++) {
+ received = xseg_receive(xseg, i, 0);
+ if (received) {
+ c = 1;
+ r = xseg_get_req_data(xseg, received, (void **) &pr);
+ if (r < 0 || !pr || received != expected_req){
+ XSEGLOG2(&lc, W, "Received request with no pr data\n");
+ xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+ if (p == NoPort){
+ XSEGLOG2(&lc, W, "Could not respond stale request");
+ xseg_put_request(xseg, received, portno_start);
+ continue;
+ } else {
+ xseg_signal(xseg, p);
+ }
+ } else {
+ xseg_cancel_wait(xseg, portno_start);
+ return 0;
+ }
+ }
+ }
+ }
+ xseg_wait_signal(xseg, 1000000UL);
+ }
+}
+
+
+void custom_peer_finalize(struct peerd *peer)
+{
+ struct mapperd *mapper = __get_mapperd(peer);
+ struct peer_req *pr = alloc_peer_req(peer);
+ if (!pr){
+ XSEGLOG2(&lc, E, "Cannot get peer request");
+ return;
+ }
+ int r;
+ struct map *map;
+ struct xseg_request *req;
+ xhash_iter_t it;
+ xhashidx key, val;
+ xhash_iter_init(mapper->hashmaps, &it);
+ while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
+ map = (struct map *)val;
+ if (!(map->flags & MF_MAP_EXCLUSIVE))
+ continue;
+ map->flags |= MF_MAP_CLOSING;
+ req = __close_map(pr, map);
+ if (!req)
+ continue;
+ wait_reply(peer, req);
+ if (!(req->state & XS_SERVED))
+ XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
+ map->flags &= ~MF_MAP_CLOSING;
+ xseg_put_request(peer->xseg, req, pr->portno);
+ }
+ return;
+
+
+}
+
void print_obj(struct map_node *mn)
{
fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
uint64_t nr_objs = m->size/block_size;
if (m->size % block_size)
nr_objs++;
- fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n",
+ fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n",
m->volume, m->volumelen,
(unsigned long long) m->size,
- (unsigned long long) nr_objs);
+ (unsigned long long) nr_objs,
+ m->version);
uint64_t i;
struct map_node *mn;
if (nr_objs > 1000000) //FIXME to protect against invalid volume size