blob: 572ba1d7da83eae93f01a27fe2fef88cf5c91630 [file] [log] [blame]
/*
* Copyright (C) 2011, 2014, 2015 Filip Pizlo. All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY FILIP PIZLO ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FILIP PIZLO OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "tsf_internal.h"
#include "tsf_format.h"
#include "tsf_fsdb_protocol.h"
#include <time.h>
#include <dirent.h>
#include <sys/types.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#ifdef HAVE_BSDSOCKS
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netdb.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#endif
#ifdef HAVE_BSDSOCKS
static tsf_fsdb_response_t *connection_read(tsf_fsdb_connection_t *con);
static tsf_bool_t open_connection(int domain,
int type,
int protocol,
const struct sockaddr *addr,
socklen_t addrlen,
int *fd) {
int flag;
*fd=socket(domain,type,protocol);
if (*fd<0) {
tsf_set_errno("Could not create socket with domain %d, type %d, protocol %d",
domain,type,protocol);
return tsf_false;
}
if (connect(*fd,addr,addrlen)<0) {
tsf_set_errno("Could not establish connection");
close(*fd);
return tsf_false;
}
flag=1;
if (setsockopt(*fd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag))<0) {
tsf_set_errno("Could not disable Nagle's algorithm");
close(*fd);
return tsf_false;
}
return tsf_true;
}
static tsf_fsdb_connection_t *connection_create(int fd,
tsf_fsdb_t *fsdb) {
tsf_fsdb_connection_t *result;
tsf_fsdb_response_t *rsp;
tsf_assert(fd>=0);
result=malloc(sizeof(tsf_fsdb_connection_t));
if (result==NULL) {
tsf_set_errno("Could not malloc tsf_fsdb_connection_t");
goto error1;
}
result->buf_size=0;
result->buf_cursor=0;
result->buf=NULL;
result->fd=fd;
result->in=tsf_stream_file_input_fd_open(
fd,fsdb->limits,&fsdb->rdr_attr,tsf_false,TSF_DEF_NET_BUF_SIZE);
if (result->in==NULL) {
goto error2;
}
result->out=tsf_stream_file_output_fd_open(
fd,&fsdb->wtr_attr,tsf_false,TSF_DEF_NET_BUF_SIZE);
if (result->out==NULL) {
goto error3;
}
rsp=connection_read(result);
if (rsp==NULL) {
goto error4;
}
if (rsp->payload.value!=tsf_fsdb_response__payload__welcome) {
tsf_set_error(TSF_E_PARSE_ERROR,
"Bad response from FSDB server: response %s with state %s",
tsf_fsdb_response__payload__to_str(rsp->payload.value),
tsf_fsdb_state__to_str(rsp->state.value));
tsf_region_free(rsp);
goto error4;
}
tsf_region_free(rsp);
return result;
error4:
tsf_stream_file_output_close(result->out);
error3:
tsf_stream_file_input_close(result->in);
error2:
free(result);
error1:
close(fd);
return NULL;
}
static void connection_destroy(tsf_fsdb_connection_t *con) {
if (con->buf!=NULL) {
free(con->buf);
}
tsf_stream_file_input_close(con->in);
tsf_stream_file_output_close(con->out);
close(con->fd);
free(con);
}
static tsf_fsdb_response_t *connection_read(tsf_fsdb_connection_t *con) {
tsf_fsdb_response_t *rsp=tsf_fsdb_response__read(con->in);
if (rsp==NULL) {
return NULL;
}
con->clear=(rsp->state.value==tsf_fsdb_state__clear);
return rsp;
}
static void connection_return(tsf_fsdb_t *fsdb,
tsf_fsdb_connection_t *con) {
if (con->clear) {
tsf_mutex_lock(&fsdb->lock);
if (fsdb->u.remote.connection==NULL) {
fsdb->u.remote.connection=con;
con=NULL;
}
tsf_mutex_unlock(&fsdb->lock);
}
if (con!=NULL) {
connection_destroy(con);
}
}
static tsf_fsdb_connection_t *create_first_connection(tsf_fsdb_t *fsdb,
const char *hostname,
int port) {
int fd=-1;
tsf_bool_t connected=tsf_false;
#ifdef HAVE_GETADDRINFO
struct addrinfo hints;
struct addrinfo *result,*cur;
char buf[32];
bzero(&hints,sizeof(hints));
hints.ai_family=AF_INET;
hints.ai_socktype=SOCK_STREAM;
snprintf(buf,sizeof(buf),"%d",port);
if (getaddrinfo(hostname,buf,&hints,&result)!=0) {
tsf_set_error(TSF_E_HOSTNAME,
"Could not lookup hostname %s",
hostname);
return NULL;
}
for (cur=result;cur!=NULL;cur=cur->ai_next) {
if (open_connection(cur->ai_family,
cur->ai_socktype,
cur->ai_protocol,
cur->ai_addr,
cur->ai_addrlen,
&fd)) {
connected=tsf_true;
fsdb->u.remote.result=result;
fsdb->u.remote.cur=cur;
break;
}
}
if (!connected) {
freeaddrinfo(result);
}
#else
struct hostent *hp = gethostbyname(hostname);
if (hp == NULL) {
tsf_set_error(TSF_E_HOSTNAME,
"Could not lookup hostname %s",
hostname);
return NULL;
} else {
unsigned i;
for (i=0;hp->h_addr_list[i]!=NULL;++i) {
struct sockaddr_in addr;
socklen_t slen=sizeof(addr);
bzero(&addr,slen);
addr.sin_family=AF_INET;
addr.sin_port=htons(port);
addr.sin_addr.s_addr=*(struct in_addr*)(hp->h_addr_list[i]);
if (open_connection(AF_INET,
SOCK_STREAM,
0,
&addr,
slen,
&fd)) {
connected=tsf_true;
fsdb->u.remote.addr=*(uint32_t*)(hp->h_addr_list[i]);
fsdb->u.remote.port=port;
break;
}
}
}
#endif
if (!connected) {
return NULL;
}
return connection_create(fd,fsdb);
}
static tsf_fsdb_connection_t *create_connection(tsf_fsdb_t *fsdb) {
int fd;
#ifdef HAVE_GETADDRINFO
if (!open_connection(fsdb->u.remote.cur->ai_family,
fsdb->u.remote.cur->ai_socktype,
fsdb->u.remote.cur->ai_protocol,
fsdb->u.remote.cur->ai_addr,
fsdb->u.remote.cur->ai_addrlen,
&fd)) {
return NULL;
}
#else
struct sockaddr_in addr;
socklen_t slen=sizeof(addr);
bzero(&addr,slen);
addr.sin_family=AF_INET;
addr.sin_port=htons(fsdb->u.remote.port);
addr.sin_addr.s_addr=*(struct in_addr*)(&fsdb->u.remote.addr);
if (!open_connection(AF_INET,
SOCK_STREAM,
0,
&addr,
slen,
&fd)) {
return NULL;
}
#endif
return connection_create(fd,fsdb);
}
static tsf_fsdb_response_t *first_action(tsf_fsdb_t *fsdb,
tsf_fsdb_connection_t **connection,
tsf_fsdb_command_t *cmd) {
unsigned i;
tsf_mutex_lock(&fsdb->lock);
*connection=fsdb->u.remote.connection;
fsdb->u.remote.connection=NULL;
tsf_mutex_unlock(&fsdb->lock);
for (i=0;;++i) {
if (*connection!=NULL &&
tsf_fsdb_command__write((*connection)->out,cmd) &&
tsf_stream_file_output_flush((*connection)->out)) {
tsf_fsdb_response_t *response=connection_read(*connection);
if (response!=NULL) {
return response;
}
}
if (i==0) {
/* retry - the connection may have gone stale */
if (*connection!=NULL) {
connection_destroy(*connection);
}
*connection=create_connection(fsdb);
if (*connection==NULL) {
return NULL;
}
} else {
/* already tried before, now fail permanently and destroy the connection. */
tsf_assert(*connection!=NULL);
connection_destroy(*connection);
*connection=NULL;
return NULL;
}
}
}
static void set_error(tsf_fsdb_response_t *rsp) {
switch (rsp->payload.value) {
case tsf_fsdb_response__payload__sys_error:
tsf_set_error(TSF_E_EXTERNAL,
"FSDB server responded with system error: %s",
rsp->payload.u.sys_error.msg);
break;
case tsf_fsdb_response__payload__proto_error:
tsf_set_error(TSF_E_EXTERNAL,
"FSDB server responded with protocol error: %s",
rsp->payload.u.proto_error.msg);
break;
case tsf_fsdb_response__payload__not_found:
tsf_set_error(TSF_E_ELE_NOT_FOUND,
"FSDB server could not find the requested element");
break;
case tsf_fsdb_response__payload__in_eof:
tsf_set_error(TSF_E_EOF,
"End of file from FSDB server");
break;
default:
tsf_set_error(TSF_E_PARSE_ERROR,
"Unexpected or unrecognized response from FSDB server; "
"state = %s, response = %s",
tsf_fsdb_state__to_str(rsp->state.value),
tsf_fsdb_response__payload__to_str(rsp->payload.value));
break;
}
}
static tsf_bool_t network_writer_flush(tsf_fsdb_out_t *out) {
tsf_fsdb_command_t cmd;
tsf_fsdb_response_t *rsp;
cmd.value=tsf_fsdb_command__out_write;
cmd.u.out_write.data.data=out->u.remote.connection->buf;
cmd.u.out_write.data.len=out->u.remote.connection->buf_cursor;
if (!tsf_fsdb_command__write(out->u.remote.connection->out,
&cmd) ||
!tsf_stream_file_output_flush(out->u.remote.connection->out)) {
return tsf_false;
}
rsp=connection_read(out->u.remote.connection);
if (rsp==NULL) {
return tsf_false;
}
if (rsp->state.value!=tsf_fsdb_state__put ||
rsp->payload.value!=tsf_fsdb_response__payload__ok) {
set_error(rsp);
tsf_region_free(rsp);
return tsf_false;
}
tsf_region_free(rsp);
out->u.remote.connection->buf_cursor=0;
return tsf_true;
}
static tsf_bool_t network_writer(void *arg,
const void *buf_,
uint32_t len) {
tsf_fsdb_out_t *out=(tsf_fsdb_out_t*)arg;
uint8_t *buf = (uint8_t*)buf_;
uint32_t towrite;
while (len>0) {
towrite=tsf_min(len,
out->u.remote.connection->buf_size-
out->u.remote.connection->buf_cursor);
memcpy(out->u.remote.connection->buf+
out->u.remote.connection->buf_cursor,
buf,
towrite);
out->u.remote.connection->buf_cursor+=towrite;
buf+=towrite;
len-=towrite;
if (len==0) {
break;
}
if (!network_writer_flush(out)) {
return tsf_false;
}
}
return tsf_true;
}
static tsf_bool_t network_reader(void *arg,
void *buf_,
uint32_t len) {
tsf_fsdb_in_t *in=(tsf_fsdb_in_t*)arg;
uint8_t *buf=(uint8_t*)buf_;
while (len>0) {
if (in->u.remote.rsp!=NULL) {
uint32_t toread=
tsf_min(len,
in->u.remote.rsp->payload.u.in_data.data.len-
in->u.remote.rsp_cursor);
memcpy(buf,
in->u.remote.rsp->payload.u.in_data.data.data+in->u.remote.rsp_cursor,
toread);
in->u.remote.rsp_cursor+=toread;
len-=toread;
buf+=toread;
if (len==0) {
break;
}
}
if (in->u.remote.rsp!=NULL &&
in->u.remote.rsp_cursor==in->u.remote.rsp->payload.u.in_data.data.len) {
tsf_region_free(in->u.remote.rsp);
in->u.remote.rsp=NULL;
in->u.remote.rsp_cursor=0;
}
if (in->u.remote.rsp==NULL) {
in->u.remote.rsp_cursor=0;
in->u.remote.rsp=
connection_read(in->u.remote.connection);
switch (in->u.remote.rsp->payload.value) {
case tsf_fsdb_response__payload__in_data:
/* ok! */
break;
case tsf_fsdb_response__payload__in_eof:
tsf_set_error((void*)buf==buf_?
TSF_E_UNEXPECTED_EOF:TSF_E_ERRONEOUS_EOF,
"EOF in response from FSDB server");
tsf_region_free(in->u.remote.rsp);
in->u.remote.rsp=NULL;
return tsf_false;
default:
set_error(in->u.remote.rsp);
tsf_region_free(in->u.remote.rsp);
in->u.remote.rsp=NULL;
return tsf_false;
}
}
}
return tsf_true;
}
#endif
static void copy_params(tsf_fsdb_t *fsdb,
tsf_limits_t *limits,
tsf_zip_rdr_attr_t *rdr_attr,
tsf_zip_wtr_attr_t *wtr_attr) {
fsdb->limits=limits;
if (rdr_attr==NULL) {
tsf_zip_rdr_attr_get_default(&fsdb->rdr_attr);
} else {
fsdb->rdr_attr=*rdr_attr;
}
if (wtr_attr==NULL) {
tsf_zip_wtr_attr_get_default(&fsdb->wtr_attr);
} else {
fsdb->wtr_attr=*wtr_attr;
}
}
tsf_fsdb_t *tsf_fsdb_open_local(const char *dirname,
tsf_limits_t *limits,
tsf_zip_rdr_attr_t *rdr_attr,
tsf_zip_wtr_attr_t *wtr_attr,
tsf_bool_t update,
tsf_bool_t truncate) {
tsf_fsdb_t *result;
char *marker_flnm;
FILE *fin;
char buf[9];
if (!update && truncate) {
tsf_set_error(TSF_E_INVALID_ARG,
"Cannot specify the truncate without the update "
"flag");
return NULL;
}
result=malloc(sizeof(tsf_fsdb_t));
if (result==NULL) {
tsf_set_errno("Could not malloc tsf_fsdb_t");
return NULL;
}
result->kind=TSF_FSDB_LOCAL;
result->u.local.dirname=strdup(dirname);
if (result->u.local.dirname==NULL) {
tsf_set_errno("Could not strdup dirname in tsf_fsdb_open_local");
goto error1;
}
if (update) {
if (mkdir(result->u.local.dirname,0777)<0) {
if (errno==EEXIST) {
/* all good. */
} else {
tsf_set_errno("Could not create fsdb directory %s",result->u.local.dirname);
goto error2;
}
} else {
/* directory created; now create the marker file. */
FILE *fout;
marker_flnm=tsf_asprintf("%s/TSF_FSDB",result->u.local.dirname);
if (marker_flnm==NULL) {
tsf_set_errno("Could not allocate string");
goto error2;
}
fout=fopen(marker_flnm,"w");
if (fout==NULL) {
tsf_set_errno("Could not open %s",marker_flnm);
free(marker_flnm);
goto error2;
}
free(marker_flnm);
fputs("TSF_FSDB",fout); /* FIXME: error checking? */
fclose(fout);
}
}
marker_flnm=tsf_asprintf("%s/TSF_FSDB",result->u.local.dirname);
if (marker_flnm==NULL) {
tsf_set_errno("Could not allocate string");
goto error2;
}
fin=fopen(marker_flnm,"r");
if (fin==NULL) {
tsf_set_errno("Could not open %s",marker_flnm);
free(marker_flnm);
goto error2;
}
free(marker_flnm);
bzero(buf,sizeof(buf));
fgets(buf,sizeof(buf),fin);
fclose(fin);
if (strcmp(buf,"TSF_FSDB")) {
tsf_set_error(TSF_E_PARSE_ERROR,
"Bad FSDB database: the marker file contains %s "
"instead of TSF_FSDB",
buf);
goto error2;
}
/* make sure that it exists and is a directory, and keep a handle to it
just for good measure. */
result->u.local.dirhandle=opendir(result->u.local.dirname);
if (result->u.local.dirhandle==NULL) {
tsf_set_errno("Could not open fsdb directory %s",result->u.local.dirname);
goto error2;
}
/* if we're truncating, then delete all of the files. */
if (truncate) {
struct dirent *entry;
#if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF)
struct dirent *entrybuf;
long limit;
size_t len;
limit=pathconf(result->u.local.dirname,_PC_PATH_MAX);
if (limit<0) {
tsf_set_errno("Could not get pathconf _PC_PATH_MAX for %s",
result->u.local.dirname);
goto error3;
}
len=tsf_offsetof(struct dirent,d_name)+limit+1;
entrybuf=malloc(len);
if (entrybuf==NULL) {
tsf_set_errno("Could not malloc struct dirent with size " fsz,
len);
goto error3;
}
#endif
for (;;) {
char *flnm;
#if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF)
entry=NULL;
if (readdir_r(result->u.local.dirhandle,entrybuf,&entry)!=0) {
tsf_set_errno("Could not readdir_r in %s",
result->u.local.dirname);
free(entrybuf);
goto error3;
}
#else
errno=0;
entry=readdir(result->u.local.dirhandle);
if (entry==NULL && errno!=0) {
tsf_set_errno("Could not readdir in %s",
result->u.local.dirname);
goto error3;
}
#endif
if (entry==NULL) {
break;
}
if (strcmp(entry->d_name,".") &&
strcmp(entry->d_name,"..") &&
strcmp(entry->d_name,"TSF_FSDB")) {
flnm=tsf_asprintf("%s/%s",result->u.local.dirname,entry->d_name);
if (flnm==NULL) {
tsf_set_errno("Could not tsf_asprintf");
#if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF)
free(entrybuf);
#endif
goto error3;
}
if (unlink(flnm)<0) {
tsf_set_errno("Could not unlink %s",flnm);
#if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF)
free(entrybuf);
#endif
free(flnm);
goto error3;
}
free(flnm);
}
}
#if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF)
free(entrybuf);
#endif
}
copy_params(result,limits,rdr_attr,wtr_attr);
result->u.local.update=update;
result->u.local.truncate=truncate;
result->u.local.create_cnt=(((int64_t)time(NULL))<<32)+(((int64_t)getpid())<<48);
tsf_mutex_init(&result->lock);
return result;
error3:
closedir(result->u.local.dirhandle);
error2:
free((void*)result->u.local.dirname);
error1:
free(result);
return NULL;
}
tsf_fsdb_t *tsf_fsdb_open_remote(const char *host,
int port,
tsf_limits_t *limits,
tsf_zip_rdr_attr_t *rdr_attr,
tsf_zip_wtr_attr_t *wtr_attr) {
#ifdef HAVE_BSDSOCKS
tsf_fsdb_t *result;
result=malloc(sizeof(tsf_fsdb_t));
if (result==NULL) {
tsf_set_errno("Could not malloc tsf_fsdb_t");
return NULL;
}
result->kind=TSF_FSDB_REMOTE;
copy_params(result,limits,rdr_attr,wtr_attr);
result->u.remote.connection=create_first_connection(result,host,port);
if (result->u.remote.connection==NULL) {
free(result);
return NULL;
}
tsf_mutex_init(&result->lock);
return result;
#else
tsf_set_error(TSF_E_NOT_SUPPORTED,
"BSD sockets support was not found on your system");
return NULL;
#endif
}
void tsf_fsdb_close(tsf_fsdb_t *fsdb) {
tsf_mutex_destroy(&fsdb->lock);
switch (fsdb->kind) {
case TSF_FSDB_LOCAL:
closedir(fsdb->u.local.dirhandle);
free((void*)fsdb->u.local.dirname);
break;
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE:
#ifdef HAVE_GETADDRINFO
freeaddrinfo(fsdb->u.remote.result);
#endif
if (fsdb->u.remote.connection!=NULL) {
connection_destroy(fsdb->u.remote.connection);
}
break;
#endif
default:
tsf_abort("bad FSDB kind");
}
free(fsdb);
}
tsf_bool_t tsf_fsdb_guts_put(tsf_fsdb_t *fsdb,
tsf_buffer_t *key,
int *fd,
char **partflnm) {
tsf_stream_file_output_t *out;
if (fsdb==NULL || key==NULL) {
return tsf_false;
}
tsf_assert(fsdb->kind==TSF_FSDB_LOCAL);
if (!fsdb->u.local.update) {
tsf_set_error(TSF_E_NOT_ALLOWED,
"Cannot put into FSDB because update flag was not set.");
return tsf_false;
}
for (;;) {
uint64_t my_id;
tsf_mutex_lock(&fsdb->lock);
my_id=fsdb->u.local.create_cnt;
fsdb->u.local.create_cnt+=(uint64_t)(uintptr_t)fd;
tsf_mutex_unlock(&fsdb->lock);
*partflnm=tsf_asprintf("%s/part-" fui64,fsdb->u.local.dirname,my_id);
if (*partflnm==NULL) {
tsf_set_errno("Could not tsf_asprintf in tsf_fsdb_put");
return tsf_false;
}
*fd=open(*partflnm,O_CREAT|O_EXCL|O_WRONLY|O_TRUNC,0666);
if (*fd<0) {
if (errno==EEXIST) {
free(*partflnm);
continue;
}
tsf_set_errno("Could not open %s",*partflnm);
free(*partflnm);
return tsf_false;
}
break;
}
/* ok, now write the key as the first record of the file. */
out=tsf_stream_file_output_fd_open(*fd,NULL,tsf_false,0);
if (out==NULL) {
goto abort;
}
if (!tsf_stream_file_output_write(out,key)) {
tsf_stream_file_output_close(out);
goto abort;
}
if (!tsf_stream_file_output_close(out)) {
goto abort;
}
return tsf_true;
abort:
tsf_fsdb_guts_abort(fsdb,*partflnm);
free(*partflnm);
close(*fd);
return tsf_false;
}
tsf_bool_t tsf_fsdb_guts_commit(tsf_fsdb_t *fsdb,
char *partflnm,
tsf_buffer_t *key,
char **dataflnm) {
char *name;
char *flnm;
uint32_t sha1sum[5];
if (fsdb==NULL || partflnm==NULL || key==NULL) {
return tsf_false;
}
tsf_assert(fsdb->kind==TSF_FSDB_LOCAL);
if (!fsdb->u.local.update) {
tsf_set_error(TSF_E_NOT_ALLOWED,
"Cannot commit into FSDB because update flag was not set.");
return tsf_false;
}
if (!tsf_buffer_calc_sha1(key,sha1sum)) {
return tsf_false;
}
name=tsf_sha1_sum_to_str(sha1sum);
if (name==NULL) {
return tsf_false;
}
flnm=tsf_asprintf("%s/data-%s",fsdb->u.local.dirname,name);
if (flnm==NULL) {
tsf_set_errno("Could not asprintf in tsf_fsdb_guts_commit");
free(name);
return tsf_false;
}
free(name);
if (rename(partflnm,flnm)<0) {
tsf_set_errno("Could not rename %s to %s",
partflnm,flnm);
free(flnm);
return tsf_false;
}
if (dataflnm!=NULL) {
*dataflnm=flnm;
} else {
free(flnm);
}
return tsf_true;
}
tsf_bool_t tsf_fsdb_guts_abort(tsf_fsdb_t *fsdb,
char *partflnm) {
if (fsdb==NULL || partflnm==NULL) {
return tsf_false;
}
tsf_assert(fsdb->kind==TSF_FSDB_LOCAL);
if (!fsdb->u.local.update) {
tsf_set_error(TSF_E_NOT_ALLOWED,
"Cannot abort FSDB file because update flag was not set.");
return tsf_false;
}
unlink(partflnm);
return tsf_true;
}
tsf_bool_t tsf_fsdb_guts_get(tsf_fsdb_t *fsdb,
tsf_buffer_t *key,
int *fd,
char **dataflnm) {
uint32_t sha1sum[5];
char *name;
char *flnm;
tsf_serial_in_man_t *in_man;
tsf_buf_rdr_t *reader;
tsf_buffer_t *read_key;
uint32_t num_remaining;
if (fsdb==NULL || key==NULL) {
/* propagate error */
return tsf_false;
}
tsf_assert(fsdb->kind==TSF_FSDB_LOCAL);
if (!tsf_buffer_calc_sha1(key,sha1sum)) {
return tsf_false;
}
name=tsf_sha1_sum_to_str(sha1sum);
if (name==NULL) {
return tsf_false;
}
flnm=tsf_asprintf("%s/data-%s",fsdb->u.local.dirname,name);
if (flnm==NULL) {
tsf_set_errno("Could not asprintf in tsf_fsdb_get");
free(name);
return tsf_false;
}
free(name);
*fd=open(flnm,O_RDONLY);
if (*fd<0) {
if (errno==ENOENT) {
tsf_set_error(TSF_E_ELE_NOT_FOUND,
"The requested key was not found in the FSDB database");
} else {
tsf_set_errno("Could not open %s for reading",flnm);
}
free(flnm);
return tsf_false;
}
if (dataflnm!=NULL) {
*dataflnm=flnm;
} else {
free(flnm);
}
/* now verify that the key is what it is supposed to be. use a lower-level
approach to reading so we can control how much buffering is done. */
reader=tsf_buf_reader_create(*fd,128,tsf_false);
if (reader==NULL) {
goto fail;
}
in_man=tsf_serial_in_man_create(tsf_buf_reader_read,reader,fsdb->limits);
if (in_man==NULL) {
tsf_buf_reader_destroy(reader);
goto fail;
}
read_key=tsf_serial_in_man_read(in_man);
if (read_key==NULL) {
tsf_serial_in_man_destroy(in_man);
tsf_buf_reader_destroy(reader);
goto fail;
}
tsf_serial_in_man_destroy(in_man);
num_remaining=tsf_buf_reader_get_remaining(reader);
tsf_buf_reader_destroy(reader);
if (lseek(*fd,-((off_t)num_remaining),SEEK_CUR)<0) {
tsf_set_errno("Could not unseek buffered data");
tsf_buffer_destroy(read_key);
goto fail;
}
if (!tsf_buffer_compare(key,read_key)) {
tsf_set_error(TSF_E_PARSE_ERROR,
"Key mismatch on entry in FSDB database");
tsf_buffer_destroy(read_key);
goto fail;
}
tsf_buffer_destroy(read_key);
return tsf_true;
fail:
if (dataflnm!=NULL) {
free(*dataflnm);
}
close(*fd);
return tsf_false;
}
tsf_bool_t tsf_fsdb_guts_rm(tsf_fsdb_t *fsdb,
tsf_buffer_t *key,
char **dataflnm) {
uint32_t sha1sum[5];
char *name;
char *flnm;
if (fsdb==NULL || key==NULL) {
/* propagate error */
return tsf_false;
}
tsf_assert(fsdb->kind==TSF_FSDB_LOCAL);
if (!fsdb->u.local.update) {
tsf_set_error(TSF_E_NOT_ALLOWED,
"Cannot rm from FSDB because update flag was not set.");
return tsf_false;
}
if (!tsf_buffer_calc_sha1(key,sha1sum)) {
return tsf_false;
}
name=tsf_sha1_sum_to_str(sha1sum);
if (name==NULL) {
return tsf_false;
}
flnm=tsf_asprintf("%s/data-%s",fsdb->u.local.dirname,name);
if (flnm==NULL) {
tsf_set_errno("Could not asprintf in tsf_fsdb_get");
free(name);
return tsf_false;
}
free(name);
if (unlink(flnm)<0) {
if (errno==ENOENT) {
tsf_set_error(TSF_E_ELE_NOT_FOUND,
"The requested key was not found in the FSDB database");
} else {
tsf_set_errno("Could not unlink %s",flnm);
}
free(flnm);
return tsf_false;
}
if (dataflnm!=NULL) {
*dataflnm=flnm;
} else {
free(flnm);
}
return tsf_true;
}
tsf_fsdb_out_t *tsf_fsdb_put(tsf_fsdb_t *fsdb,
tsf_buffer_t *key) {
if (fsdb==NULL || key==NULL) {
return NULL;
}
switch (fsdb->kind) {
case TSF_FSDB_LOCAL: {
int fd;
char *flnm;
tsf_fsdb_out_t *result;
if (!tsf_fsdb_guts_put(fsdb,key,&fd,&flnm)) {
return NULL;
}
result=malloc(sizeof(tsf_fsdb_out_t));
if (result==NULL) {
tsf_set_errno("Could not malloc in tsf_fsdb_put");
close(fd);
tsf_fsdb_guts_abort(fsdb,flnm);
free(flnm);
return NULL;
}
result->db=fsdb;
result->u.local.partname=flnm;
result->u.local.out=tsf_stream_file_output_fd_open(fd,
&fsdb->wtr_attr,
tsf_true,
0);
if (result->u.local.out==NULL) {
close(fd);
tsf_fsdb_guts_abort(fsdb,flnm);
free(flnm);
free(result);
return NULL;
}
result->u.local.key=tsf_buffer_dup(key);
if (result->u.local.key==NULL) {
tsf_stream_file_output_close(result->u.local.out);
tsf_fsdb_guts_abort(fsdb,flnm);
free(flnm);
free(result);
return NULL;
}
return result;
}
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE: {
tsf_fsdb_connection_t *connection;
tsf_fsdb_command_t cmd;
tsf_fsdb_response_t *rsp;
tsf_fsdb_out_t *result;
cmd.value=tsf_fsdb_command__put;
tsf_assert(key!=NULL);
cmd.u.put.key=key;
rsp=first_action(fsdb,&connection,&cmd);
if (rsp==NULL) {
return NULL;
}
if (rsp->state.value!=tsf_fsdb_state__put ||
rsp->payload.value!=tsf_fsdb_response__payload__ok) {
set_error(rsp);
connection_destroy(connection);
tsf_region_free(rsp);
return NULL;
}
tsf_region_free(rsp);
result=malloc(sizeof(tsf_fsdb_out_t));
if (result==NULL) {
tsf_set_errno("Could not malloc in tsf_fsdb_put");
connection_destroy(connection);
return NULL;
}
result->db=fsdb;
result->u.remote.connection=connection;
if (connection->buf==NULL) {
connection->buf_size=TSF_DEF_NET_BUF_SIZE;
connection->buf_cursor=0;
connection->buf=malloc(connection->buf_size);
if (connection->buf==NULL) {
connection_destroy(connection);
free(result);
return NULL;
}
}
result->u.remote.out_man=
tsf_serial_out_man_create(network_writer, result);
if (result->u.remote.out_man==NULL) {
connection_destroy(connection);
free(result);
return NULL;
}
return result;
}
#endif
default: tsf_abort("bad FSDB kind");
}
}
tsf_fsdb_in_t *tsf_fsdb_get(tsf_fsdb_t *fsdb,
tsf_buffer_t *key) {
if (fsdb==NULL || key==NULL) {
return NULL;
}
switch (fsdb->kind) {
case TSF_FSDB_LOCAL: {
int fd;
tsf_fsdb_in_t *result;
if (!tsf_fsdb_guts_get(fsdb,key,&fd,NULL)) {
return NULL;
}
result=malloc(sizeof(tsf_fsdb_in_t));
if (result==NULL) {
tsf_set_errno("Could not malloc in tsf_fsdb_get");
close(fd);
return NULL;
}
result->db=fsdb;
result->u.local.in=tsf_stream_file_input_fd_open(fd,
fsdb->limits,
&fsdb->rdr_attr,
tsf_true,
0);
if (result->u.local.in==NULL) {
close(fd);
free(result);
return NULL;
}
result->u.local.key=tsf_buffer_dup(key);
if (result->u.local.key==NULL) {
tsf_stream_file_input_close(result->u.local.in);
free(result);
return NULL;
}
return result;
}
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE: {
tsf_fsdb_connection_t *connection;
tsf_fsdb_command_t cmd;
tsf_fsdb_response_t *rsp;
tsf_fsdb_in_t *result;
cmd.value=tsf_fsdb_command__get;
cmd.u.get.key=key;
rsp=first_action(fsdb,&connection,&cmd);
if (rsp==NULL) {
return NULL;
}
if (rsp->state.value!=tsf_fsdb_state__get ||
rsp->payload.value!=tsf_fsdb_response__payload__ok) {
set_error(rsp);
if (rsp->payload.value==tsf_fsdb_response__payload__not_found) {
connection_return(fsdb,connection);
} else {
connection_destroy(connection);
}
tsf_region_free(rsp);
return NULL;
}
tsf_region_free(rsp);
result=malloc(sizeof(tsf_fsdb_in_t));
if (result==NULL) {
tsf_set_errno("Could not malloc in tsf_fsdb_get");
connection_destroy(connection);
return NULL;
}
result->db=fsdb;
result->u.remote.connection=connection;
result->u.remote.rsp=NULL;
result->u.remote.rsp_cursor=0;
result->u.remote.in_man=
tsf_serial_in_man_create(network_reader,
result,
fsdb->limits);
if (result->u.remote.in_man==NULL) {
connection_destroy(connection);
free(result);
return NULL;
}
return result;
}
#endif
default: tsf_abort("bad FSDB kind");
}
}
tsf_bool_t tsf_fsdb_rm(tsf_fsdb_t *fsdb,
tsf_buffer_t *key) {
if (fsdb==NULL || key==NULL) {
return tsf_false;
}
switch (fsdb->kind) {
case TSF_FSDB_LOCAL:
return tsf_fsdb_guts_rm(fsdb,key,NULL);
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE: {
tsf_fsdb_connection_t *connection;
tsf_fsdb_command_t cmd;
tsf_fsdb_response_t *rsp;
cmd.value=tsf_fsdb_command__rm;
cmd.u.rm.key=key;
rsp=first_action(fsdb,&connection,&cmd);
if (rsp==NULL) {
return tsf_false;
}
if (rsp->state.value!=tsf_fsdb_state__clear ||
rsp->payload.value!=tsf_fsdb_response__payload__ok) {
set_error(rsp);
if (rsp->payload.value==tsf_fsdb_response__payload__not_found) {
connection_return(fsdb,connection);
} else {
connection_destroy(connection);
}
tsf_region_free(rsp);
return tsf_false;
}
tsf_region_free(rsp);
connection_return(fsdb,connection);
return tsf_true;
}
#endif
default: tsf_abort("bad FSDB kind");
}
}
void tsf_fsdb_in_close(tsf_fsdb_in_t *in) {
switch (in->db->kind) {
case TSF_FSDB_LOCAL:
tsf_stream_file_input_close(in->u.local.in);
tsf_buffer_destroy(in->u.local.key);
break;
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE:
connection_return(in->db,in->u.remote.connection);
if (in->u.remote.rsp!=NULL) {
tsf_region_free(in->u.remote.rsp);
}
tsf_serial_in_man_destroy(in->u.remote.in_man);
break;
#endif
default: tsf_abort("bad FSDB kind");
}
free(in);
}
tsf_buffer_t *tsf_fsdb_in_read(tsf_fsdb_in_t *in) {
switch (in->db->kind) {
case TSF_FSDB_LOCAL:
return tsf_stream_file_input_read(in->u.local.in);
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE:
return tsf_serial_in_man_read(in->u.remote.in_man);
#endif
default: tsf_abort("bad FSDB kind");
}
}
tsf_bool_t tsf_fsdb_out_commit(tsf_fsdb_out_t *out) {
if (out==NULL) {
return tsf_false;
}
switch (out->db->kind) {
case TSF_FSDB_LOCAL:
if (!tsf_stream_file_output_close(out->u.local.out)) {
goto abort_local;
}
if (!tsf_fsdb_guts_commit(out->db,
out->u.local.partname,
out->u.local.key,
NULL)) {
goto abort_local;
}
free(out->u.local.partname);
tsf_buffer_destroy(out->u.local.key);
free(out);
return tsf_true;
abort_local:
tsf_fsdb_guts_abort(out->db,out->u.local.partname);
free(out->u.local.partname);
tsf_buffer_destroy(out->u.local.key);
free(out);
return tsf_false;
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE: {
tsf_fsdb_command_t cmd;
tsf_fsdb_response_t *rsp;
/* this part could be optimized - we're currently sending the commit command
* as a separate packet. it would be best to try to send it as part of the
* last packet of the flush. */
if (!network_writer_flush(out)) {
tsf_fsdb_out_abort(out);
return tsf_false;
}
cmd.value=tsf_fsdb_command__out_commit;
if (!tsf_fsdb_command__write(out->u.remote.connection->out,
&cmd) ||
!tsf_stream_file_output_flush(out->u.remote.connection->out)) {
tsf_fsdb_out_abort(out);
return tsf_false;
}
rsp=connection_read(out->u.remote.connection);
if (rsp==NULL) {
tsf_fsdb_out_abort(out);
return tsf_false;
}
if (rsp->state.value!=tsf_fsdb_state__clear ||
rsp->payload.value!=tsf_fsdb_response__payload__ok) {
set_error(rsp);
tsf_region_free(rsp);
tsf_fsdb_out_abort(out);
return tsf_false;
}
tsf_region_free(rsp);
connection_return(out->db,out->u.remote.connection);
tsf_serial_out_man_destroy(out->u.remote.out_man);
free(out);
return tsf_true;
}
#endif
default: tsf_abort("bad FSDB kind");
}
}
void tsf_fsdb_out_abort(tsf_fsdb_out_t *out) {
switch (out->db->kind) {
case TSF_FSDB_LOCAL:
tsf_stream_file_output_close(out->u.local.out);
tsf_fsdb_guts_abort(out->db,out->u.local.partname);
tsf_buffer_destroy(out->u.local.key);
free(out->u.local.partname);
break;
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE:
connection_destroy(out->u.remote.connection);
tsf_serial_out_man_destroy(out->u.remote.out_man);
break;
#endif
default: tsf_abort("bad FSDB kind");
}
free(out);
}
tsf_bool_t tsf_fsdb_out_write(tsf_fsdb_out_t *out,
tsf_buffer_t *buf) {
if (out==NULL || buf==NULL) {
return tsf_false;
}
switch (out->db->kind) {
case TSF_FSDB_LOCAL:
return tsf_stream_file_output_write(out->u.local.out,buf);
#ifdef HAVE_BSDSOCKS
case TSF_FSDB_REMOTE:
return tsf_serial_out_man_write(out->u.remote.out_man,buf);
#endif
default: tsf_abort("bad FSDB kind");
}
}