LCOV - code coverage report
Current view: top level - src/server/protocol2/champ_chooser - champ_server.c (source / functions) Hit Total Coverage
Test: burp-coverage-clean.info Lines: 15 164 9.1 %
Date: 2016-02-29 Functions: 2 7 28.6 %

          Line data    Source code
       1             : #include "../../../burp.h"
       2             : #include "../../../alloc.h"
       3             : #include "../../../asfd.h"
       4             : #include "../../../async.h"
       5             : #include "../../../cmd.h"
       6             : #include "../../../conf.h"
       7             : #include "../../../conffile.h"
       8             : #include "../../../fsops.h"
       9             : #include "../../../handy.h"
      10             : #include "../../../iobuf.h"
      11             : #include "../../../lock.h"
      12             : #include "../../../log.h"
      13             : #include "../../../protocol2/blist.h"
      14             : #include "../../../protocol2/blk.h"
      15             : #include "../../sdirs.h"
      16             : #include "candidate.h"
      17             : #include "champ_chooser.h"
      18             : #include "champ_server.h"
      19             : #include "dindex.h"
      20             : #include "incoming.h"
      21             : #include "scores.h"
      22             : 
      23             : #include <sys/un.h>
      24             : 
      25             : // FIX THIS: test error conditions.
      26           0 : static int champ_chooser_new_client(struct async *as, struct conf **confs)
      27             : {
      28             :         socklen_t t;
      29           0 :         int fd=-1;
      30           0 :         struct asfd *newfd=NULL;
      31             :         struct sockaddr_un remote;
      32             : 
      33           0 :         t=sizeof(remote);
      34           0 :         if((fd=accept(as->asfd->fd, (struct sockaddr *)&remote, &t))<0)
      35             :         {
      36           0 :                 logp("accept error in %s: %s\n", __func__, strerror(errno));
      37           0 :                 goto error;
      38             :         }
      39           0 :         set_non_blocking(fd);
      40             : 
      41           0 :         if(!(newfd=asfd_alloc())
      42           0 :           || newfd->init(newfd, "(unknown)", as, fd, NULL,
      43           0 :                 ASFD_STREAM_STANDARD, confs)
      44           0 :           || !(newfd->blist=blist_alloc()))
      45             :                 goto error;
      46           0 :         as->asfd_add(as, newfd);
      47             : 
      48           0 :         logp("Connected to fd %d\n", newfd->fd);
      49             : 
      50           0 :         return 0;
      51             : error:
      52           0 :         asfd_free(&newfd);
      53           0 :         return -1;
      54             : }
      55             : 
      56           0 : static int results_to_fd(struct asfd *asfd)
      57             : {
      58             :         static struct iobuf wbuf;
      59             :         struct blk *b;
      60             :         struct blk *l;
      61             : 
      62           0 :         if(!asfd->blist->last_index) return 0;
      63             : 
      64             :         // Need to start writing the results down the fd.
      65           0 :         for(b=asfd->blist->head; b && b!=asfd->blist->blk_to_dedup; b=l)
      66             :         {
      67           0 :                 if(b->got==BLK_GOT)
      68             :                 {
      69             :                         // Need to write to fd.
      70           0 :                         blk_to_iobuf_index_and_savepath(b, &wbuf);
      71             : 
      72           0 :                         switch(asfd->append_all_to_write_buffer(asfd, &wbuf))
      73             :                         {
      74             :                                 case APPEND_OK: break;
      75             :                                 case APPEND_BLOCKED:
      76           0 :                                         asfd->blist->head=b;
      77           0 :                                         return 0; // Try again later.
      78             :                                 default: return -1;
      79             :                         }
      80             :                 }
      81             :                 else
      82             :                 {
      83             :                         // If the last in the sequence is BLK_NOT_GOT,
      84             :                         // Send a 'wrap_up' message.
      85           0 :                         if(!b->next || b->next==asfd->blist->blk_to_dedup)
      86             :                         {
      87           0 :                                 blk_to_iobuf_wrap_up(b, &wbuf);
      88           0 :                                 switch(asfd->append_all_to_write_buffer(asfd,
      89             :                                         &wbuf))
      90             :                                 {
      91             :                                         case APPEND_OK: break;
      92             :                                         case APPEND_BLOCKED:
      93           0 :                                                 asfd->blist->head=b;
      94           0 :                                                 return 0; // Try again later.
      95             :                                         default: return -1;
      96             :                                 }
      97             :                         }
      98             :                 }
      99           0 :                 l=b->next;
     100           0 :                 blk_free(&b);
     101             :         }
     102             : 
     103           0 :         asfd->blist->head=b;
     104           0 :         if(!b) asfd->blist->tail=NULL;
     105             :         return 0;
     106             : }
     107             : 
     108        4096 : static int deduplicate_maybe(struct asfd *asfd,
     109             :         struct blk *blk, const char *directory, struct scores *scores)
     110             : {
     111        4096 :         if(!asfd->in && !(asfd->in=incoming_alloc())) return -1;
     112             : 
     113        4096 :         if(blk_fingerprint_is_hook(blk))
     114             :         {
     115        4096 :                 if(incoming_grow_maybe(asfd->in)) return -1;
     116        4096 :                 asfd->in->fingerprints[asfd->in->size-1]=blk->fingerprint;
     117             :         }
     118        4096 :         if(++(asfd->blkcnt)<MANIFEST_SIG_MAX) return 0;
     119           1 :         asfd->blkcnt=0;
     120             : 
     121           1 :         if(deduplicate(asfd, directory, scores)<0)
     122             :                 return -1;
     123             : 
     124           1 :         return 0;
     125             : }
     126             : 
     127             : #ifndef UTEST
     128             : static
     129             : #endif
     130        4097 : int champ_server_deal_with_rbuf_sig(struct asfd *asfd,
     131             :         const char *directory, struct scores *scores)
     132             : {
     133             :         struct blk *blk;
     134        4097 :         if(!(blk=blk_alloc())) return -1;
     135             : 
     136        4097 :         blist_add_blk(asfd->blist, blk);
     137             : 
     138        4097 :         if(!asfd->blist->blk_to_dedup) asfd->blist->blk_to_dedup=blk;
     139             : 
     140        4097 :         if(blk_set_from_iobuf_sig(blk, asfd->rbuf)) return -1;
     141             : 
     142             :         //printf("Got weak/strong from %d: %lu - %s %s\n",
     143             :         //      asfd->fd, blk->index, blk->weak, blk->strong);
     144             : 
     145        4096 :         return deduplicate_maybe(asfd, blk, directory, scores);
     146             : }
     147             : 
     148           0 : static int deal_with_client_rbuf(struct asfd *asfd, const char *directory,
     149             :         struct scores *scores)
     150             : {
     151           0 :         if(asfd->rbuf->cmd==CMD_GEN)
     152             :         {
     153           0 :                 if(!strncmp_w(asfd->rbuf->buf, "cname:"))
     154             :                 {
     155             :                         struct iobuf wbuf;
     156           0 :                         free_w(&asfd->desc);
     157           0 :                         if(!(asfd->desc=strdup_w(asfd->rbuf->buf
     158           0 :                                 +strlen("cname:"), __func__)))
     159             :                                         goto error;
     160           0 :                         logp("%s: fd %d\n", asfd->desc, asfd->fd);
     161             :                         iobuf_set(&wbuf, CMD_GEN,
     162           0 :                                 (char *)"cname ok", strlen("cname ok"));
     163             : 
     164           0 :                         if(asfd->write(asfd, &wbuf))
     165             :                                 goto error;
     166             :                 }
     167           0 :                 else if(!strncmp_w(asfd->rbuf->buf, "sigs_end"))
     168             :                 {
     169             :                         //printf("Was told no more sigs\n");
     170           0 :                         if(deduplicate(asfd, directory, scores)<0)
     171             :                                 goto error;
     172             :                 }
     173             :                 else
     174             :                 {
     175           0 :                         iobuf_log_unexpected(asfd->rbuf, __func__);
     176           0 :                         goto error;
     177             :                 }
     178             :         }
     179           0 :         else if(asfd->rbuf->cmd==CMD_SIG)
     180             :         {
     181           0 :                 if(champ_server_deal_with_rbuf_sig(asfd, directory, scores))
     182             :                         goto error;
     183             :         }
     184           0 :         else if(asfd->rbuf->cmd==CMD_MANIFEST)
     185             :         {
     186             :                 // Client has completed a manifest file. Want to start using
     187             :                 // it as a dedup candidate now.
     188           0 :                 if(candidate_add_fresh(asfd->rbuf->buf, directory, scores))
     189             :                         goto error;
     190             :         }
     191             :         else
     192             :         {
     193           0 :                 iobuf_log_unexpected(asfd->rbuf, __func__);
     194           0 :                 goto error;
     195             :         }
     196           0 :         iobuf_free_content(asfd->rbuf);
     197           0 :         return 0;
     198             : error:
     199           0 :         iobuf_free_content(asfd->rbuf);
     200           0 :         return -1;
     201             : }
     202             : 
     203           0 : int champ_chooser_server(struct sdirs *sdirs, struct conf **confs)
     204             : {
     205             :         int s;
     206           0 :         int ret=-1;
     207             :         int len;
     208           0 :         struct asfd *asfd=NULL;
     209             :         struct sockaddr_un local;
     210           0 :         struct lock *lock=NULL;
     211           0 :         struct async *as=NULL;
     212           0 :         int started=0;
     213           0 :         struct scores *scores=NULL;
     214           0 :         const char *directory=get_string(confs[OPT_DIRECTORY]);
     215             : 
     216           0 :         if(!(lock=lock_alloc_and_init(sdirs->champlock))
     217           0 :           || build_path_w(sdirs->champlock))
     218             :                 goto end;
     219           0 :         lock_get(lock);
     220           0 :         switch(lock->status)
     221             :         {
     222             :                 case GET_LOCK_GOT:
     223           0 :                         log_fzp_set(sdirs->champlog, confs);
     224             :                         logp("Got champ lock for dedup_group: %s\n",
     225           0 :                                 get_string(confs[OPT_DEDUP_GROUP]));
     226             :                         break;
     227             :                 case GET_LOCK_NOT_GOT:
     228             :                 case GET_LOCK_ERROR:
     229             :                 default:
     230             :                         //logp("Did not get champ lock\n");
     231             :                         goto end;
     232             :         }
     233             : 
     234           0 :         if((s=socket(AF_UNIX, SOCK_STREAM, 0))<0)
     235             :         {
     236           0 :                 logp("socket error in %s: %s\n", __func__, strerror(errno));
     237           0 :                 goto end;
     238             :         }
     239             : 
     240             :         memset(&local, 0, sizeof(struct sockaddr_un));
     241           0 :         local.sun_family=AF_UNIX;
     242             :         snprintf(local.sun_path, sizeof(local.sun_path),
     243           0 :                 "%s", sdirs->champsock);
     244           0 :         len=strlen(local.sun_path)+sizeof(local.sun_family)+1;
     245           0 :         unlink(sdirs->champsock);
     246           0 :         if(bind(s, (struct sockaddr *)&local, len)<0)
     247             :         {
     248           0 :                 logp("bind error in %s: %s\n", __func__, strerror(errno));
     249           0 :                 goto end;
     250             :         }
     251             : 
     252           0 :         if(listen(s, 5)<0)
     253             :         {
     254           0 :                 logp("listen error in %s: %s\n", __func__, strerror(errno));
     255           0 :                 goto end;
     256             :         }
     257           0 :         set_non_blocking(s);
     258             : 
     259           0 :         if(!(as=async_alloc())
     260           0 :           || !(asfd=asfd_alloc())
     261           0 :           || as->init(as, 0)
     262           0 :           || asfd->init(asfd, "champ chooser main socket", as, s, NULL,
     263           0 :                 ASFD_STREAM_STANDARD, confs))
     264             :                         goto end;
     265           0 :         as->asfd_add(as, asfd);
     266           0 :         asfd->fdtype=ASFD_FD_SERVER_LISTEN_MAIN;
     267             : 
     268             :         // I think that this is probably the best point at which to run a
     269             :         // cleanup job to delete unused data files, because no other process
     270             :         // can fiddle with the dedup_group at this point.
     271           0 :         if(delete_unused_data_files(sdirs))
     272             :                 goto end;
     273             : 
     274             :         // Load the sparse indexes for this dedup group.
     275           0 :         if(!(scores=champ_chooser_init(sdirs->data)))
     276             :                 goto end;
     277             : 
     278             :         while(1)
     279             :         {
     280           0 :                 for(asfd=as->asfd->next; asfd; asfd=asfd->next)
     281             :                 {
     282           0 :                         if(!asfd->blist->head
     283           0 :                           || asfd->blist->head->got==BLK_INCOMING) continue;
     284           0 :                         if(results_to_fd(asfd)) goto end;
     285             :                 }
     286             : 
     287           0 :                 switch(as->read_write(as))
     288             :                 {
     289             :                         case 0:
     290             :                                 // Check the main socket last, as it might add
     291             :                                 // a new client to the list.
     292           0 :                                 for(asfd=as->asfd->next; asfd; asfd=asfd->next)
     293             :                                 {
     294           0 :                                         while(asfd->rbuf->buf)
     295             :                                         {
     296           0 :                                                 if(deal_with_client_rbuf(asfd,
     297           0 :                                                         directory, scores))
     298             :                                                                 goto end;
     299             :                                                 // Get as much out of the
     300             :                                                 // readbuf as possible.
     301           0 :                                                 if(asfd->parse_readbuf(asfd))
     302             :                                                         goto end;
     303             :                                         }
     304             :                                 }
     305           0 :                                 if(as->asfd->new_client)
     306             :                                 {
     307             :                                         // Incoming client.
     308           0 :                                         as->asfd->new_client=0;
     309           0 :                                         if(champ_chooser_new_client(as, confs))
     310             :                                                 goto end;
     311             :                                         started=1;
     312             :                                 }
     313             :                                 break;
     314             :                         default:
     315           0 :                                 int removed=0;
     316             :                                 // Maybe one of the fds had a problem.
     317             :                                 // Find and remove it and carry on if possible.
     318           0 :                                 for(asfd=as->asfd->next; asfd; )
     319             :                                 {
     320             :                                         struct asfd *a;
     321           0 :                                         if(!asfd->want_to_remove)
     322             :                                         {
     323           0 :                                                 asfd=asfd->next;
     324           0 :                                                 continue;
     325             :                                         }
     326           0 :                                         as->asfd_remove(as, asfd);
     327             :                                         logp("%s: disconnected fd %d\n",
     328           0 :                                                 asfd->desc, asfd->fd);
     329           0 :                                         a=asfd->next;
     330           0 :                                         asfd_free(&asfd);
     331           0 :                                         asfd=a;
     332           0 :                                         removed++;
     333             :                                 }
     334           0 :                                 if(removed) break;
     335             :                                 // If we got here, there was no fd to remove.
     336             :                                 // It is a fatal error.
     337             :                                 goto end;
     338             :                 }
     339             :                                 
     340           0 :                 if(started && !as->asfd->next)
     341             :                 {
     342           0 :                         logp("All clients disconnected.\n");
     343           0 :                         ret=0;
     344           0 :                         break;
     345             :                 }
     346             :         }
     347             : 
     348             : end:
     349           0 :         logp("champ chooser exiting: %d\n", ret);
     350           0 :         log_fzp_set(NULL, confs);
     351           0 :         async_free(&as);
     352           0 :         asfd_free(&asfd); // This closes s for us.
     353           0 :         close_fd(&s);
     354           0 :         unlink(sdirs->champsock);
     355             : // FIX THIS: free asfds.
     356           0 :         lock_release(lock);
     357           0 :         lock_free(&lock);
     358           0 :         scores_free(&scores);
     359           0 :         return ret;
     360             : }
     361             : 
     362             : // The return code of this is the return code of the standalone process.
     363           0 : int champ_chooser_server_standalone(struct conf **globalcs)
     364             : {
     365           0 :         int ret=1;
     366           0 :         struct sdirs *sdirs=NULL;
     367           0 :         struct conf **cconfs=NULL;
     368           0 :         const char *orig_client=get_string(globalcs[OPT_ORIG_CLIENT]);
     369             : 
     370           0 :         if(!(cconfs=confs_alloc()))
     371             :                 goto end;
     372           0 :         confs_init(cconfs);
     373             :         // We need to be given a client name and load the relevant server side
     374             :         // clientconfdir file, because various settings may be overridden
     375             :         // there.
     376           0 :         if(set_string(cconfs[OPT_CNAME], orig_client)
     377           0 :           || conf_load_clientconfdir(globalcs, cconfs)
     378           0 :           || !(sdirs=sdirs_alloc())
     379           0 :           || sdirs_init_from_confs(sdirs, cconfs)
     380           0 :           || champ_chooser_server(sdirs, cconfs))
     381             :                 goto end;
     382           0 :         ret=0;
     383             : end:
     384           0 :         confs_free(&cconfs);
     385           0 :         sdirs_free(&sdirs);
     386           0 :         return ret;
     387             : }

Generated by: LCOV version 1.10