/* * Copyright (C) 2011, 2014, 2015 Filip Pizlo. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY FILIP PIZLO ``AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FILIP PIZLO OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "tsf_internal.h" #include "tsf_format.h" #include "tsf_fsdb_protocol.h" #include #include #include #include #include #include #include #include #include #ifdef HAVE_BSDSOCKS #include #include #include #include #include #include #include #include #include #endif #ifdef HAVE_BSDSOCKS static tsf_fsdb_response_t *connection_read(tsf_fsdb_connection_t *con); static tsf_bool_t open_connection(int domain, int type, int protocol, const struct sockaddr *addr, socklen_t addrlen, int *fd) { int flag; *fd=socket(domain,type,protocol); if (*fd<0) { tsf_set_errno("Could not create socket with domain %d, type %d, protocol %d", domain,type,protocol); return tsf_false; } if (connect(*fd,addr,addrlen)<0) { tsf_set_errno("Could not establish connection"); close(*fd); return tsf_false; } flag=1; if (setsockopt(*fd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag))<0) { tsf_set_errno("Could not disable Nagle's algorithm"); close(*fd); return tsf_false; } return tsf_true; } static tsf_fsdb_connection_t *connection_create(int fd, tsf_fsdb_t *fsdb) { tsf_fsdb_connection_t *result; tsf_fsdb_response_t *rsp; tsf_assert(fd>=0); result=malloc(sizeof(tsf_fsdb_connection_t)); if (result==NULL) { tsf_set_errno("Could not malloc tsf_fsdb_connection_t"); goto error1; } result->buf_size=0; result->buf_cursor=0; result->buf=NULL; result->fd=fd; result->in=tsf_stream_file_input_fd_open( fd,fsdb->limits,&fsdb->rdr_attr,tsf_false,TSF_DEF_NET_BUF_SIZE); if (result->in==NULL) { goto error2; } result->out=tsf_stream_file_output_fd_open( fd,&fsdb->wtr_attr,tsf_false,TSF_DEF_NET_BUF_SIZE); if (result->out==NULL) { goto error3; } rsp=connection_read(result); if (rsp==NULL) { goto error4; } if (rsp->payload.value!=tsf_fsdb_response__payload__welcome) { tsf_set_error(TSF_E_PARSE_ERROR, "Bad response from FSDB server: response %s with state %s", tsf_fsdb_response__payload__to_str(rsp->payload.value), tsf_fsdb_state__to_str(rsp->state.value)); tsf_region_free(rsp); goto error4; } tsf_region_free(rsp); return result; error4: tsf_stream_file_output_close(result->out); error3: tsf_stream_file_input_close(result->in); error2: free(result); error1: close(fd); return NULL; } static void connection_destroy(tsf_fsdb_connection_t *con) { if (con->buf!=NULL) { free(con->buf); } tsf_stream_file_input_close(con->in); tsf_stream_file_output_close(con->out); close(con->fd); free(con); } static tsf_fsdb_response_t *connection_read(tsf_fsdb_connection_t *con) { tsf_fsdb_response_t *rsp=tsf_fsdb_response__read(con->in); if (rsp==NULL) { return NULL; } con->clear=(rsp->state.value==tsf_fsdb_state__clear); return rsp; } static void connection_return(tsf_fsdb_t *fsdb, tsf_fsdb_connection_t *con) { if (con->clear) { tsf_mutex_lock(&fsdb->lock); if (fsdb->u.remote.connection==NULL) { fsdb->u.remote.connection=con; con=NULL; } tsf_mutex_unlock(&fsdb->lock); } if (con!=NULL) { connection_destroy(con); } } static tsf_fsdb_connection_t *create_first_connection(tsf_fsdb_t *fsdb, const char *hostname, int port) { int fd=-1; tsf_bool_t connected=tsf_false; #ifdef HAVE_GETADDRINFO struct addrinfo hints; struct addrinfo *result,*cur; char buf[32]; bzero(&hints,sizeof(hints)); hints.ai_family=AF_INET; hints.ai_socktype=SOCK_STREAM; snprintf(buf,sizeof(buf),"%d",port); if (getaddrinfo(hostname,buf,&hints,&result)!=0) { tsf_set_error(TSF_E_HOSTNAME, "Could not lookup hostname %s", hostname); return NULL; } for (cur=result;cur!=NULL;cur=cur->ai_next) { if (open_connection(cur->ai_family, cur->ai_socktype, cur->ai_protocol, cur->ai_addr, cur->ai_addrlen, &fd)) { connected=tsf_true; fsdb->u.remote.result=result; fsdb->u.remote.cur=cur; break; } } if (!connected) { freeaddrinfo(result); } #else struct hostent *hp = gethostbyname(hostname); if (hp == NULL) { tsf_set_error(TSF_E_HOSTNAME, "Could not lookup hostname %s", hostname); return NULL; } else { unsigned i; for (i=0;hp->h_addr_list[i]!=NULL;++i) { struct sockaddr_in addr; socklen_t slen=sizeof(addr); bzero(&addr,slen); addr.sin_family=AF_INET; addr.sin_port=htons(port); addr.sin_addr.s_addr=*(struct in_addr*)(hp->h_addr_list[i]); if (open_connection(AF_INET, SOCK_STREAM, 0, &addr, slen, &fd)) { connected=tsf_true; fsdb->u.remote.addr=*(uint32_t*)(hp->h_addr_list[i]); fsdb->u.remote.port=port; break; } } } #endif if (!connected) { return NULL; } return connection_create(fd,fsdb); } static tsf_fsdb_connection_t *create_connection(tsf_fsdb_t *fsdb) { int fd; #ifdef HAVE_GETADDRINFO if (!open_connection(fsdb->u.remote.cur->ai_family, fsdb->u.remote.cur->ai_socktype, fsdb->u.remote.cur->ai_protocol, fsdb->u.remote.cur->ai_addr, fsdb->u.remote.cur->ai_addrlen, &fd)) { return NULL; } #else struct sockaddr_in addr; socklen_t slen=sizeof(addr); bzero(&addr,slen); addr.sin_family=AF_INET; addr.sin_port=htons(fsdb->u.remote.port); addr.sin_addr.s_addr=*(struct in_addr*)(&fsdb->u.remote.addr); if (!open_connection(AF_INET, SOCK_STREAM, 0, &addr, slen, &fd)) { return NULL; } #endif return connection_create(fd,fsdb); } static tsf_fsdb_response_t *first_action(tsf_fsdb_t *fsdb, tsf_fsdb_connection_t **connection, tsf_fsdb_command_t *cmd) { unsigned i; tsf_mutex_lock(&fsdb->lock); *connection=fsdb->u.remote.connection; fsdb->u.remote.connection=NULL; tsf_mutex_unlock(&fsdb->lock); for (i=0;;++i) { if (*connection!=NULL && tsf_fsdb_command__write((*connection)->out,cmd) && tsf_stream_file_output_flush((*connection)->out)) { tsf_fsdb_response_t *response=connection_read(*connection); if (response!=NULL) { return response; } } if (i==0) { /* retry - the connection may have gone stale */ if (*connection!=NULL) { connection_destroy(*connection); } *connection=create_connection(fsdb); if (*connection==NULL) { return NULL; } } else { /* already tried before, now fail permanently and destroy the connection. */ tsf_assert(*connection!=NULL); connection_destroy(*connection); *connection=NULL; return NULL; } } } static void set_error(tsf_fsdb_response_t *rsp) { switch (rsp->payload.value) { case tsf_fsdb_response__payload__sys_error: tsf_set_error(TSF_E_EXTERNAL, "FSDB server responded with system error: %s", rsp->payload.u.sys_error.msg); break; case tsf_fsdb_response__payload__proto_error: tsf_set_error(TSF_E_EXTERNAL, "FSDB server responded with protocol error: %s", rsp->payload.u.proto_error.msg); break; case tsf_fsdb_response__payload__not_found: tsf_set_error(TSF_E_ELE_NOT_FOUND, "FSDB server could not find the requested element"); break; case tsf_fsdb_response__payload__in_eof: tsf_set_error(TSF_E_EOF, "End of file from FSDB server"); break; default: tsf_set_error(TSF_E_PARSE_ERROR, "Unexpected or unrecognized response from FSDB server; " "state = %s, response = %s", tsf_fsdb_state__to_str(rsp->state.value), tsf_fsdb_response__payload__to_str(rsp->payload.value)); break; } } static tsf_bool_t network_writer_flush(tsf_fsdb_out_t *out) { tsf_fsdb_command_t cmd; tsf_fsdb_response_t *rsp; cmd.value=tsf_fsdb_command__out_write; cmd.u.out_write.data.data=out->u.remote.connection->buf; cmd.u.out_write.data.len=out->u.remote.connection->buf_cursor; if (!tsf_fsdb_command__write(out->u.remote.connection->out, &cmd) || !tsf_stream_file_output_flush(out->u.remote.connection->out)) { return tsf_false; } rsp=connection_read(out->u.remote.connection); if (rsp==NULL) { return tsf_false; } if (rsp->state.value!=tsf_fsdb_state__put || rsp->payload.value!=tsf_fsdb_response__payload__ok) { set_error(rsp); tsf_region_free(rsp); return tsf_false; } tsf_region_free(rsp); out->u.remote.connection->buf_cursor=0; return tsf_true; } static tsf_bool_t network_writer(void *arg, const void *buf_, uint32_t len) { tsf_fsdb_out_t *out=(tsf_fsdb_out_t*)arg; uint8_t *buf = (uint8_t*)buf_; uint32_t towrite; while (len>0) { towrite=tsf_min(len, out->u.remote.connection->buf_size- out->u.remote.connection->buf_cursor); memcpy(out->u.remote.connection->buf+ out->u.remote.connection->buf_cursor, buf, towrite); out->u.remote.connection->buf_cursor+=towrite; buf+=towrite; len-=towrite; if (len==0) { break; } if (!network_writer_flush(out)) { return tsf_false; } } return tsf_true; } static tsf_bool_t network_reader(void *arg, void *buf_, uint32_t len) { tsf_fsdb_in_t *in=(tsf_fsdb_in_t*)arg; uint8_t *buf=(uint8_t*)buf_; while (len>0) { if (in->u.remote.rsp!=NULL) { uint32_t toread= tsf_min(len, in->u.remote.rsp->payload.u.in_data.data.len- in->u.remote.rsp_cursor); memcpy(buf, in->u.remote.rsp->payload.u.in_data.data.data+in->u.remote.rsp_cursor, toread); in->u.remote.rsp_cursor+=toread; len-=toread; buf+=toread; if (len==0) { break; } } if (in->u.remote.rsp!=NULL && in->u.remote.rsp_cursor==in->u.remote.rsp->payload.u.in_data.data.len) { tsf_region_free(in->u.remote.rsp); in->u.remote.rsp=NULL; in->u.remote.rsp_cursor=0; } if (in->u.remote.rsp==NULL) { in->u.remote.rsp_cursor=0; in->u.remote.rsp= connection_read(in->u.remote.connection); switch (in->u.remote.rsp->payload.value) { case tsf_fsdb_response__payload__in_data: /* ok! */ break; case tsf_fsdb_response__payload__in_eof: tsf_set_error((void*)buf==buf_? TSF_E_UNEXPECTED_EOF:TSF_E_ERRONEOUS_EOF, "EOF in response from FSDB server"); tsf_region_free(in->u.remote.rsp); in->u.remote.rsp=NULL; return tsf_false; default: set_error(in->u.remote.rsp); tsf_region_free(in->u.remote.rsp); in->u.remote.rsp=NULL; return tsf_false; } } } return tsf_true; } #endif static void copy_params(tsf_fsdb_t *fsdb, tsf_limits_t *limits, tsf_zip_rdr_attr_t *rdr_attr, tsf_zip_wtr_attr_t *wtr_attr) { fsdb->limits=limits; if (rdr_attr==NULL) { tsf_zip_rdr_attr_get_default(&fsdb->rdr_attr); } else { fsdb->rdr_attr=*rdr_attr; } if (wtr_attr==NULL) { tsf_zip_wtr_attr_get_default(&fsdb->wtr_attr); } else { fsdb->wtr_attr=*wtr_attr; } } tsf_fsdb_t *tsf_fsdb_open_local(const char *dirname, tsf_limits_t *limits, tsf_zip_rdr_attr_t *rdr_attr, tsf_zip_wtr_attr_t *wtr_attr, tsf_bool_t update, tsf_bool_t truncate) { tsf_fsdb_t *result; char *marker_flnm; FILE *fin; char buf[9]; if (!update && truncate) { tsf_set_error(TSF_E_INVALID_ARG, "Cannot specify the truncate without the update " "flag"); return NULL; } result=malloc(sizeof(tsf_fsdb_t)); if (result==NULL) { tsf_set_errno("Could not malloc tsf_fsdb_t"); return NULL; } result->kind=TSF_FSDB_LOCAL; result->u.local.dirname=strdup(dirname); if (result->u.local.dirname==NULL) { tsf_set_errno("Could not strdup dirname in tsf_fsdb_open_local"); goto error1; } if (update) { if (mkdir(result->u.local.dirname,0777)<0) { if (errno==EEXIST) { /* all good. */ } else { tsf_set_errno("Could not create fsdb directory %s",result->u.local.dirname); goto error2; } } else { /* directory created; now create the marker file. */ FILE *fout; marker_flnm=tsf_asprintf("%s/TSF_FSDB",result->u.local.dirname); if (marker_flnm==NULL) { tsf_set_errno("Could not allocate string"); goto error2; } fout=fopen(marker_flnm,"w"); if (fout==NULL) { tsf_set_errno("Could not open %s",marker_flnm); free(marker_flnm); goto error2; } free(marker_flnm); fputs("TSF_FSDB",fout); /* FIXME: error checking? */ fclose(fout); } } marker_flnm=tsf_asprintf("%s/TSF_FSDB",result->u.local.dirname); if (marker_flnm==NULL) { tsf_set_errno("Could not allocate string"); goto error2; } fin=fopen(marker_flnm,"r"); if (fin==NULL) { tsf_set_errno("Could not open %s",marker_flnm); free(marker_flnm); goto error2; } free(marker_flnm); bzero(buf,sizeof(buf)); fgets(buf,sizeof(buf),fin); fclose(fin); if (strcmp(buf,"TSF_FSDB")) { tsf_set_error(TSF_E_PARSE_ERROR, "Bad FSDB database: the marker file contains %s " "instead of TSF_FSDB", buf); goto error2; } /* make sure that it exists and is a directory, and keep a handle to it just for good measure. */ result->u.local.dirhandle=opendir(result->u.local.dirname); if (result->u.local.dirhandle==NULL) { tsf_set_errno("Could not open fsdb directory %s",result->u.local.dirname); goto error2; } /* if we're truncating, then delete all of the files. */ if (truncate) { struct dirent *entry; #if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF) struct dirent *entrybuf; long limit; size_t len; limit=pathconf(result->u.local.dirname,_PC_PATH_MAX); if (limit<0) { tsf_set_errno("Could not get pathconf _PC_PATH_MAX for %s", result->u.local.dirname); goto error3; } len=tsf_offsetof(struct dirent,d_name)+limit+1; entrybuf=malloc(len); if (entrybuf==NULL) { tsf_set_errno("Could not malloc struct dirent with size " fsz, len); goto error3; } #endif for (;;) { char *flnm; #if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF) entry=NULL; if (readdir_r(result->u.local.dirhandle,entrybuf,&entry)!=0) { tsf_set_errno("Could not readdir_r in %s", result->u.local.dirname); free(entrybuf); goto error3; } #else errno=0; entry=readdir(result->u.local.dirhandle); if (entry==NULL && errno!=0) { tsf_set_errno("Could not readdir in %s", result->u.local.dirname); goto error3; } #endif if (entry==NULL) { break; } if (strcmp(entry->d_name,".") && strcmp(entry->d_name,"..") && strcmp(entry->d_name,"TSF_FSDB")) { flnm=tsf_asprintf("%s/%s",result->u.local.dirname,entry->d_name); if (flnm==NULL) { tsf_set_errno("Could not tsf_asprintf"); #if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF) free(entrybuf); #endif goto error3; } if (unlink(flnm)<0) { tsf_set_errno("Could not unlink %s",flnm); #if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF) free(entrybuf); #endif free(flnm); goto error3; } free(flnm); } } #if defined(HAVE_READDIR_R) && defined(HAVE_PATHCONF) free(entrybuf); #endif } copy_params(result,limits,rdr_attr,wtr_attr); result->u.local.update=update; result->u.local.truncate=truncate; result->u.local.create_cnt=(((int64_t)time(NULL))<<32)+(((int64_t)getpid())<<48); tsf_mutex_init(&result->lock); return result; error3: closedir(result->u.local.dirhandle); error2: free((void*)result->u.local.dirname); error1: free(result); return NULL; } tsf_fsdb_t *tsf_fsdb_open_remote(const char *host, int port, tsf_limits_t *limits, tsf_zip_rdr_attr_t *rdr_attr, tsf_zip_wtr_attr_t *wtr_attr) { #ifdef HAVE_BSDSOCKS tsf_fsdb_t *result; result=malloc(sizeof(tsf_fsdb_t)); if (result==NULL) { tsf_set_errno("Could not malloc tsf_fsdb_t"); return NULL; } result->kind=TSF_FSDB_REMOTE; copy_params(result,limits,rdr_attr,wtr_attr); result->u.remote.connection=create_first_connection(result,host,port); if (result->u.remote.connection==NULL) { free(result); return NULL; } tsf_mutex_init(&result->lock); return result; #else tsf_set_error(TSF_E_NOT_SUPPORTED, "BSD sockets support was not found on your system"); return NULL; #endif } void tsf_fsdb_close(tsf_fsdb_t *fsdb) { tsf_mutex_destroy(&fsdb->lock); switch (fsdb->kind) { case TSF_FSDB_LOCAL: closedir(fsdb->u.local.dirhandle); free((void*)fsdb->u.local.dirname); break; #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: #ifdef HAVE_GETADDRINFO freeaddrinfo(fsdb->u.remote.result); #endif if (fsdb->u.remote.connection!=NULL) { connection_destroy(fsdb->u.remote.connection); } break; #endif default: tsf_abort("bad FSDB kind"); } free(fsdb); } tsf_bool_t tsf_fsdb_guts_put(tsf_fsdb_t *fsdb, tsf_buffer_t *key, int *fd, char **partflnm) { tsf_stream_file_output_t *out; if (fsdb==NULL || key==NULL) { return tsf_false; } tsf_assert(fsdb->kind==TSF_FSDB_LOCAL); if (!fsdb->u.local.update) { tsf_set_error(TSF_E_NOT_ALLOWED, "Cannot put into FSDB because update flag was not set."); return tsf_false; } for (;;) { uint64_t my_id; tsf_mutex_lock(&fsdb->lock); my_id=fsdb->u.local.create_cnt; fsdb->u.local.create_cnt+=(uint64_t)(uintptr_t)fd; tsf_mutex_unlock(&fsdb->lock); *partflnm=tsf_asprintf("%s/part-" fui64,fsdb->u.local.dirname,my_id); if (*partflnm==NULL) { tsf_set_errno("Could not tsf_asprintf in tsf_fsdb_put"); return tsf_false; } *fd=open(*partflnm,O_CREAT|O_EXCL|O_WRONLY|O_TRUNC,0666); if (*fd<0) { if (errno==EEXIST) { free(*partflnm); continue; } tsf_set_errno("Could not open %s",*partflnm); free(*partflnm); return tsf_false; } break; } /* ok, now write the key as the first record of the file. */ out=tsf_stream_file_output_fd_open(*fd,NULL,tsf_false,0); if (out==NULL) { goto abort; } if (!tsf_stream_file_output_write(out,key)) { tsf_stream_file_output_close(out); goto abort; } if (!tsf_stream_file_output_close(out)) { goto abort; } return tsf_true; abort: tsf_fsdb_guts_abort(fsdb,*partflnm); free(*partflnm); close(*fd); return tsf_false; } tsf_bool_t tsf_fsdb_guts_commit(tsf_fsdb_t *fsdb, char *partflnm, tsf_buffer_t *key, char **dataflnm) { char *name; char *flnm; uint32_t sha1sum[5]; if (fsdb==NULL || partflnm==NULL || key==NULL) { return tsf_false; } tsf_assert(fsdb->kind==TSF_FSDB_LOCAL); if (!fsdb->u.local.update) { tsf_set_error(TSF_E_NOT_ALLOWED, "Cannot commit into FSDB because update flag was not set."); return tsf_false; } if (!tsf_buffer_calc_sha1(key,sha1sum)) { return tsf_false; } name=tsf_sha1_sum_to_str(sha1sum); if (name==NULL) { return tsf_false; } flnm=tsf_asprintf("%s/data-%s",fsdb->u.local.dirname,name); if (flnm==NULL) { tsf_set_errno("Could not asprintf in tsf_fsdb_guts_commit"); free(name); return tsf_false; } free(name); if (rename(partflnm,flnm)<0) { tsf_set_errno("Could not rename %s to %s", partflnm,flnm); free(flnm); return tsf_false; } if (dataflnm!=NULL) { *dataflnm=flnm; } else { free(flnm); } return tsf_true; } tsf_bool_t tsf_fsdb_guts_abort(tsf_fsdb_t *fsdb, char *partflnm) { if (fsdb==NULL || partflnm==NULL) { return tsf_false; } tsf_assert(fsdb->kind==TSF_FSDB_LOCAL); if (!fsdb->u.local.update) { tsf_set_error(TSF_E_NOT_ALLOWED, "Cannot abort FSDB file because update flag was not set."); return tsf_false; } unlink(partflnm); return tsf_true; } tsf_bool_t tsf_fsdb_guts_get(tsf_fsdb_t *fsdb, tsf_buffer_t *key, int *fd, char **dataflnm) { uint32_t sha1sum[5]; char *name; char *flnm; tsf_serial_in_man_t *in_man; tsf_buf_rdr_t *reader; tsf_buffer_t *read_key; uint32_t num_remaining; if (fsdb==NULL || key==NULL) { /* propagate error */ return tsf_false; } tsf_assert(fsdb->kind==TSF_FSDB_LOCAL); if (!tsf_buffer_calc_sha1(key,sha1sum)) { return tsf_false; } name=tsf_sha1_sum_to_str(sha1sum); if (name==NULL) { return tsf_false; } flnm=tsf_asprintf("%s/data-%s",fsdb->u.local.dirname,name); if (flnm==NULL) { tsf_set_errno("Could not asprintf in tsf_fsdb_get"); free(name); return tsf_false; } free(name); *fd=open(flnm,O_RDONLY); if (*fd<0) { if (errno==ENOENT) { tsf_set_error(TSF_E_ELE_NOT_FOUND, "The requested key was not found in the FSDB database"); } else { tsf_set_errno("Could not open %s for reading",flnm); } free(flnm); return tsf_false; } if (dataflnm!=NULL) { *dataflnm=flnm; } else { free(flnm); } /* now verify that the key is what it is supposed to be. use a lower-level approach to reading so we can control how much buffering is done. */ reader=tsf_buf_reader_create(*fd,128,tsf_false); if (reader==NULL) { goto fail; } in_man=tsf_serial_in_man_create(tsf_buf_reader_read,reader,fsdb->limits); if (in_man==NULL) { tsf_buf_reader_destroy(reader); goto fail; } read_key=tsf_serial_in_man_read(in_man); if (read_key==NULL) { tsf_serial_in_man_destroy(in_man); tsf_buf_reader_destroy(reader); goto fail; } tsf_serial_in_man_destroy(in_man); num_remaining=tsf_buf_reader_get_remaining(reader); tsf_buf_reader_destroy(reader); if (lseek(*fd,-((off_t)num_remaining),SEEK_CUR)<0) { tsf_set_errno("Could not unseek buffered data"); tsf_buffer_destroy(read_key); goto fail; } if (!tsf_buffer_compare(key,read_key)) { tsf_set_error(TSF_E_PARSE_ERROR, "Key mismatch on entry in FSDB database"); tsf_buffer_destroy(read_key); goto fail; } tsf_buffer_destroy(read_key); return tsf_true; fail: if (dataflnm!=NULL) { free(*dataflnm); } close(*fd); return tsf_false; } tsf_bool_t tsf_fsdb_guts_rm(tsf_fsdb_t *fsdb, tsf_buffer_t *key, char **dataflnm) { uint32_t sha1sum[5]; char *name; char *flnm; if (fsdb==NULL || key==NULL) { /* propagate error */ return tsf_false; } tsf_assert(fsdb->kind==TSF_FSDB_LOCAL); if (!fsdb->u.local.update) { tsf_set_error(TSF_E_NOT_ALLOWED, "Cannot rm from FSDB because update flag was not set."); return tsf_false; } if (!tsf_buffer_calc_sha1(key,sha1sum)) { return tsf_false; } name=tsf_sha1_sum_to_str(sha1sum); if (name==NULL) { return tsf_false; } flnm=tsf_asprintf("%s/data-%s",fsdb->u.local.dirname,name); if (flnm==NULL) { tsf_set_errno("Could not asprintf in tsf_fsdb_get"); free(name); return tsf_false; } free(name); if (unlink(flnm)<0) { if (errno==ENOENT) { tsf_set_error(TSF_E_ELE_NOT_FOUND, "The requested key was not found in the FSDB database"); } else { tsf_set_errno("Could not unlink %s",flnm); } free(flnm); return tsf_false; } if (dataflnm!=NULL) { *dataflnm=flnm; } else { free(flnm); } return tsf_true; } tsf_fsdb_out_t *tsf_fsdb_put(tsf_fsdb_t *fsdb, tsf_buffer_t *key) { if (fsdb==NULL || key==NULL) { return NULL; } switch (fsdb->kind) { case TSF_FSDB_LOCAL: { int fd; char *flnm; tsf_fsdb_out_t *result; if (!tsf_fsdb_guts_put(fsdb,key,&fd,&flnm)) { return NULL; } result=malloc(sizeof(tsf_fsdb_out_t)); if (result==NULL) { tsf_set_errno("Could not malloc in tsf_fsdb_put"); close(fd); tsf_fsdb_guts_abort(fsdb,flnm); free(flnm); return NULL; } result->db=fsdb; result->u.local.partname=flnm; result->u.local.out=tsf_stream_file_output_fd_open(fd, &fsdb->wtr_attr, tsf_true, 0); if (result->u.local.out==NULL) { close(fd); tsf_fsdb_guts_abort(fsdb,flnm); free(flnm); free(result); return NULL; } result->u.local.key=tsf_buffer_dup(key); if (result->u.local.key==NULL) { tsf_stream_file_output_close(result->u.local.out); tsf_fsdb_guts_abort(fsdb,flnm); free(flnm); free(result); return NULL; } return result; } #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: { tsf_fsdb_connection_t *connection; tsf_fsdb_command_t cmd; tsf_fsdb_response_t *rsp; tsf_fsdb_out_t *result; cmd.value=tsf_fsdb_command__put; tsf_assert(key!=NULL); cmd.u.put.key=key; rsp=first_action(fsdb,&connection,&cmd); if (rsp==NULL) { return NULL; } if (rsp->state.value!=tsf_fsdb_state__put || rsp->payload.value!=tsf_fsdb_response__payload__ok) { set_error(rsp); connection_destroy(connection); tsf_region_free(rsp); return NULL; } tsf_region_free(rsp); result=malloc(sizeof(tsf_fsdb_out_t)); if (result==NULL) { tsf_set_errno("Could not malloc in tsf_fsdb_put"); connection_destroy(connection); return NULL; } result->db=fsdb; result->u.remote.connection=connection; if (connection->buf==NULL) { connection->buf_size=TSF_DEF_NET_BUF_SIZE; connection->buf_cursor=0; connection->buf=malloc(connection->buf_size); if (connection->buf==NULL) { connection_destroy(connection); free(result); return NULL; } } result->u.remote.out_man= tsf_serial_out_man_create(network_writer, result); if (result->u.remote.out_man==NULL) { connection_destroy(connection); free(result); return NULL; } return result; } #endif default: tsf_abort("bad FSDB kind"); } } tsf_fsdb_in_t *tsf_fsdb_get(tsf_fsdb_t *fsdb, tsf_buffer_t *key) { if (fsdb==NULL || key==NULL) { return NULL; } switch (fsdb->kind) { case TSF_FSDB_LOCAL: { int fd; tsf_fsdb_in_t *result; if (!tsf_fsdb_guts_get(fsdb,key,&fd,NULL)) { return NULL; } result=malloc(sizeof(tsf_fsdb_in_t)); if (result==NULL) { tsf_set_errno("Could not malloc in tsf_fsdb_get"); close(fd); return NULL; } result->db=fsdb; result->u.local.in=tsf_stream_file_input_fd_open(fd, fsdb->limits, &fsdb->rdr_attr, tsf_true, 0); if (result->u.local.in==NULL) { close(fd); free(result); return NULL; } result->u.local.key=tsf_buffer_dup(key); if (result->u.local.key==NULL) { tsf_stream_file_input_close(result->u.local.in); free(result); return NULL; } return result; } #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: { tsf_fsdb_connection_t *connection; tsf_fsdb_command_t cmd; tsf_fsdb_response_t *rsp; tsf_fsdb_in_t *result; cmd.value=tsf_fsdb_command__get; cmd.u.get.key=key; rsp=first_action(fsdb,&connection,&cmd); if (rsp==NULL) { return NULL; } if (rsp->state.value!=tsf_fsdb_state__get || rsp->payload.value!=tsf_fsdb_response__payload__ok) { set_error(rsp); if (rsp->payload.value==tsf_fsdb_response__payload__not_found) { connection_return(fsdb,connection); } else { connection_destroy(connection); } tsf_region_free(rsp); return NULL; } tsf_region_free(rsp); result=malloc(sizeof(tsf_fsdb_in_t)); if (result==NULL) { tsf_set_errno("Could not malloc in tsf_fsdb_get"); connection_destroy(connection); return NULL; } result->db=fsdb; result->u.remote.connection=connection; result->u.remote.rsp=NULL; result->u.remote.rsp_cursor=0; result->u.remote.in_man= tsf_serial_in_man_create(network_reader, result, fsdb->limits); if (result->u.remote.in_man==NULL) { connection_destroy(connection); free(result); return NULL; } return result; } #endif default: tsf_abort("bad FSDB kind"); } } tsf_bool_t tsf_fsdb_rm(tsf_fsdb_t *fsdb, tsf_buffer_t *key) { if (fsdb==NULL || key==NULL) { return tsf_false; } switch (fsdb->kind) { case TSF_FSDB_LOCAL: return tsf_fsdb_guts_rm(fsdb,key,NULL); #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: { tsf_fsdb_connection_t *connection; tsf_fsdb_command_t cmd; tsf_fsdb_response_t *rsp; cmd.value=tsf_fsdb_command__rm; cmd.u.rm.key=key; rsp=first_action(fsdb,&connection,&cmd); if (rsp==NULL) { return tsf_false; } if (rsp->state.value!=tsf_fsdb_state__clear || rsp->payload.value!=tsf_fsdb_response__payload__ok) { set_error(rsp); if (rsp->payload.value==tsf_fsdb_response__payload__not_found) { connection_return(fsdb,connection); } else { connection_destroy(connection); } tsf_region_free(rsp); return tsf_false; } tsf_region_free(rsp); connection_return(fsdb,connection); return tsf_true; } #endif default: tsf_abort("bad FSDB kind"); } } void tsf_fsdb_in_close(tsf_fsdb_in_t *in) { switch (in->db->kind) { case TSF_FSDB_LOCAL: tsf_stream_file_input_close(in->u.local.in); tsf_buffer_destroy(in->u.local.key); break; #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: connection_return(in->db,in->u.remote.connection); if (in->u.remote.rsp!=NULL) { tsf_region_free(in->u.remote.rsp); } tsf_serial_in_man_destroy(in->u.remote.in_man); break; #endif default: tsf_abort("bad FSDB kind"); } free(in); } tsf_buffer_t *tsf_fsdb_in_read(tsf_fsdb_in_t *in) { switch (in->db->kind) { case TSF_FSDB_LOCAL: return tsf_stream_file_input_read(in->u.local.in); #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: return tsf_serial_in_man_read(in->u.remote.in_man); #endif default: tsf_abort("bad FSDB kind"); } } tsf_bool_t tsf_fsdb_out_commit(tsf_fsdb_out_t *out) { if (out==NULL) { return tsf_false; } switch (out->db->kind) { case TSF_FSDB_LOCAL: if (!tsf_stream_file_output_close(out->u.local.out)) { goto abort_local; } if (!tsf_fsdb_guts_commit(out->db, out->u.local.partname, out->u.local.key, NULL)) { goto abort_local; } free(out->u.local.partname); tsf_buffer_destroy(out->u.local.key); free(out); return tsf_true; abort_local: tsf_fsdb_guts_abort(out->db,out->u.local.partname); free(out->u.local.partname); tsf_buffer_destroy(out->u.local.key); free(out); return tsf_false; #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: { tsf_fsdb_command_t cmd; tsf_fsdb_response_t *rsp; /* this part could be optimized - we're currently sending the commit command * as a separate packet. it would be best to try to send it as part of the * last packet of the flush. */ if (!network_writer_flush(out)) { tsf_fsdb_out_abort(out); return tsf_false; } cmd.value=tsf_fsdb_command__out_commit; if (!tsf_fsdb_command__write(out->u.remote.connection->out, &cmd) || !tsf_stream_file_output_flush(out->u.remote.connection->out)) { tsf_fsdb_out_abort(out); return tsf_false; } rsp=connection_read(out->u.remote.connection); if (rsp==NULL) { tsf_fsdb_out_abort(out); return tsf_false; } if (rsp->state.value!=tsf_fsdb_state__clear || rsp->payload.value!=tsf_fsdb_response__payload__ok) { set_error(rsp); tsf_region_free(rsp); tsf_fsdb_out_abort(out); return tsf_false; } tsf_region_free(rsp); connection_return(out->db,out->u.remote.connection); tsf_serial_out_man_destroy(out->u.remote.out_man); free(out); return tsf_true; } #endif default: tsf_abort("bad FSDB kind"); } } void tsf_fsdb_out_abort(tsf_fsdb_out_t *out) { switch (out->db->kind) { case TSF_FSDB_LOCAL: tsf_stream_file_output_close(out->u.local.out); tsf_fsdb_guts_abort(out->db,out->u.local.partname); tsf_buffer_destroy(out->u.local.key); free(out->u.local.partname); break; #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: connection_destroy(out->u.remote.connection); tsf_serial_out_man_destroy(out->u.remote.out_man); break; #endif default: tsf_abort("bad FSDB kind"); } free(out); } tsf_bool_t tsf_fsdb_out_write(tsf_fsdb_out_t *out, tsf_buffer_t *buf) { if (out==NULL || buf==NULL) { return tsf_false; } switch (out->db->kind) { case TSF_FSDB_LOCAL: return tsf_stream_file_output_write(out->u.local.out,buf); #ifdef HAVE_BSDSOCKS case TSF_FSDB_REMOTE: return tsf_serial_out_man_write(out->u.remote.out_man,buf); #endif default: tsf_abort("bad FSDB kind"); } }