LCOV - code coverage report
Current view: top level - src/server/protocol2/champ_chooser - champ_server.c (source / functions) Hit Total Coverage
Test: burp-coverage-clean.info Lines: 15 161 9.3 %
Date: 2016-05-30 Functions: 2 7 28.6 %

          Line data    Source code
       1             : #include "../../../burp.h"
       2             : #include "../../../alloc.h"
       3             : #include "../../../asfd.h"
       4             : #include "../../../async.h"
       5             : #include "../../../cmd.h"
       6             : #include "../../../conf.h"
       7             : #include "../../../conffile.h"
       8             : #include "../../../fsops.h"
       9             : #include "../../../handy.h"
      10             : #include "../../../iobuf.h"
      11             : #include "../../../lock.h"
      12             : #include "../../../log.h"
      13             : #include "../../../protocol2/blist.h"
      14             : #include "../../../protocol2/blk.h"
      15             : #include "../../sdirs.h"
      16             : #include "candidate.h"
      17             : #include "champ_chooser.h"
      18             : #include "champ_server.h"
      19             : #include "dindex.h"
      20             : #include "incoming.h"
      21             : #include "scores.h"
      22             : 
      23             : #include <sys/un.h>
      24             : 
      25           0 : static int champ_chooser_new_client(struct async *as, struct conf **confs)
      26             : {
      27           0 :         int fd=-1;
      28             :         socklen_t t;
      29           0 :         struct asfd *newfd=NULL;
      30             :         struct sockaddr_un remote;
      31           0 :         struct blist *blist=NULL;
      32             : 
      33           0 :         t=sizeof(remote);
      34           0 :         if((fd=accept(as->asfd->fd, (struct sockaddr *)&remote, &t))<0)
      35             :         {
      36           0 :                 logp("accept error in %s: %s\n", __func__, strerror(errno));
      37           0 :                 goto error;
      38             :         }
      39             : 
      40           0 :         if(!(blist=blist_alloc())
      41           0 :           || !(newfd=setup_asfd(as, "(unknown)", &fd)))
      42             :                 goto error;
      43           0 :         newfd->blist=blist;
      44           0 :         newfd->set_timeout(newfd, get_int(confs[OPT_NETWORK_TIMEOUT]));
      45             : 
      46           0 :         logp("Connected to fd %d\n", newfd->fd);
      47             : 
      48           0 :         return 0;
      49             : error:
      50           0 :         close_fd(&fd);
      51           0 :         blist_free(&blist);
      52           0 :         return -1;
      53             : }
      54             : 
      55           0 : static int results_to_fd(struct asfd *asfd)
      56             : {
      57             :         static struct iobuf wbuf;
      58             :         struct blk *b;
      59             :         struct blk *l;
      60             : 
      61           0 :         if(!asfd->blist->last_index) return 0;
      62             : 
      63             :         // Need to start writing the results down the fd.
      64           0 :         for(b=asfd->blist->head; b && b!=asfd->blist->blk_to_dedup; b=l)
      65             :         {
      66           0 :                 if(b->got==BLK_GOT)
      67             :                 {
      68             :                         // Need to write to fd.
      69           0 :                         blk_to_iobuf_index_and_savepath(b, &wbuf);
      70             : 
      71           0 :                         switch(asfd->append_all_to_write_buffer(asfd, &wbuf))
      72             :                         {
      73             :                                 case APPEND_OK: break;
      74             :                                 case APPEND_BLOCKED:
      75           0 :                                         asfd->blist->head=b;
      76           0 :                                         return 0; // Try again later.
      77             :                                 default: return -1;
      78             :                         }
      79             :                 }
      80             :                 else
      81             :                 {
      82             :                         // If the last in the sequence is BLK_NOT_GOT,
      83             :                         // Send a 'wrap_up' message.
      84           0 :                         if(!b->next || b->next==asfd->blist->blk_to_dedup)
      85             :                         {
      86           0 :                                 blk_to_iobuf_wrap_up(b, &wbuf);
      87           0 :                                 switch(asfd->append_all_to_write_buffer(asfd,
      88             :                                         &wbuf))
      89             :                                 {
      90             :                                         case APPEND_OK: break;
      91             :                                         case APPEND_BLOCKED:
      92           0 :                                                 asfd->blist->head=b;
      93           0 :                                                 return 0; // Try again later.
      94             :                                         default: return -1;
      95             :                                 }
      96             :                         }
      97             :                 }
      98           0 :                 l=b->next;
      99           0 :                 blk_free(&b);
     100             :         }
     101             : 
     102           0 :         asfd->blist->head=b;
     103           0 :         if(!b) asfd->blist->tail=NULL;
     104             :         return 0;
     105             : }
     106             : 
     107        4096 : static int deduplicate_maybe(struct asfd *asfd,
     108             :         struct blk *blk, const char *directory, struct scores *scores)
     109             : {
     110        4096 :         if(!asfd->in && !(asfd->in=incoming_alloc())) return -1;
     111             : 
     112        4096 :         if(blk_fingerprint_is_hook(blk))
     113             :         {
     114        4096 :                 if(incoming_grow_maybe(asfd->in)) return -1;
     115        4096 :                 asfd->in->fingerprints[asfd->in->size-1]=blk->fingerprint;
     116             :         }
     117        4096 :         if(++(asfd->blkcnt)<MANIFEST_SIG_MAX) return 0;
     118           1 :         asfd->blkcnt=0;
     119             : 
     120           1 :         if(deduplicate(asfd, directory, scores)<0)
     121             :                 return -1;
     122             : 
     123           1 :         return 0;
     124             : }
     125             : 
     126             : #ifndef UTEST
     127             : static
     128             : #endif
     129        4097 : int champ_server_deal_with_rbuf_sig(struct asfd *asfd,
     130             :         const char *directory, struct scores *scores)
     131             : {
     132             :         struct blk *blk;
     133        4097 :         if(!(blk=blk_alloc())) return -1;
     134             : 
     135        4097 :         blist_add_blk(asfd->blist, blk);
     136             : 
     137        4097 :         if(!asfd->blist->blk_to_dedup) asfd->blist->blk_to_dedup=blk;
     138             : 
     139        4097 :         if(blk_set_from_iobuf_sig(blk, asfd->rbuf)) return -1;
     140             : 
     141             :         //printf("Got weak/strong from %d: %lu - %s %s\n",
     142             :         //      asfd->fd, blk->index, blk->weak, blk->strong);
     143             : 
     144        4096 :         return deduplicate_maybe(asfd, blk, directory, scores);
     145             : }
     146             : 
     147           0 : static int deal_with_client_rbuf(struct asfd *asfd, const char *directory,
     148             :         struct scores *scores)
     149             : {
     150           0 :         if(asfd->rbuf->cmd==CMD_GEN)
     151             :         {
     152           0 :                 if(!strncmp_w(asfd->rbuf->buf, "cname:"))
     153             :                 {
     154             :                         struct iobuf wbuf;
     155           0 :                         free_w(&asfd->desc);
     156           0 :                         if(!(asfd->desc=strdup_w(asfd->rbuf->buf
     157           0 :                                 +strlen("cname:"), __func__)))
     158             :                                         goto error;
     159           0 :                         logp("%s: fd %d\n", asfd->desc, asfd->fd);
     160             :                         iobuf_set(&wbuf, CMD_GEN,
     161           0 :                                 (char *)"cname ok", strlen("cname ok"));
     162             : 
     163           0 :                         if(asfd->write(asfd, &wbuf))
     164             :                                 goto error;
     165             :                 }
     166           0 :                 else if(!strncmp_w(asfd->rbuf->buf, "sigs_end"))
     167             :                 {
     168             :                         //printf("Was told no more sigs\n");
     169           0 :                         if(deduplicate(asfd, directory, scores)<0)
     170             :                                 goto error;
     171             :                 }
     172             :                 else
     173             :                 {
     174           0 :                         iobuf_log_unexpected(asfd->rbuf, __func__);
     175           0 :                         goto error;
     176             :                 }
     177             :         }
     178           0 :         else if(asfd->rbuf->cmd==CMD_SIG)
     179             :         {
     180           0 :                 if(champ_server_deal_with_rbuf_sig(asfd, directory, scores))
     181             :                         goto error;
     182             :         }
     183           0 :         else if(asfd->rbuf->cmd==CMD_MANIFEST)
     184             :         {
     185             :                 // Client has completed a manifest file. Want to start using
     186             :                 // it as a dedup candidate now.
     187           0 :                 if(candidate_add_fresh(asfd->rbuf->buf, directory, scores))
     188             :                         goto error;
     189             :         }
     190             :         else
     191             :         {
     192           0 :                 iobuf_log_unexpected(asfd->rbuf, __func__);
     193           0 :                 goto error;
     194             :         }
     195           0 :         iobuf_free_content(asfd->rbuf);
     196           0 :         return 0;
     197             : error:
     198           0 :         iobuf_free_content(asfd->rbuf);
     199           0 :         return -1;
     200             : }
     201             : 
     202           0 : int champ_chooser_server(struct sdirs *sdirs, struct conf **confs,
     203             :         int resume)
     204             : {
     205             :         int s;
     206           0 :         int ret=-1;
     207             :         int len;
     208           0 :         struct asfd *asfd=NULL;
     209             :         struct sockaddr_un local;
     210           0 :         struct lock *lock=NULL;
     211           0 :         struct async *as=NULL;
     212           0 :         int started=0;
     213           0 :         struct scores *scores=NULL;
     214           0 :         const char *directory=get_string(confs[OPT_DIRECTORY]);
     215             : 
     216           0 :         if(!(lock=lock_alloc_and_init(sdirs->champlock))
     217           0 :           || build_path_w(sdirs->champlock))
     218             :                 goto end;
     219           0 :         lock_get(lock);
     220           0 :         switch(lock->status)
     221             :         {
     222             :                 case GET_LOCK_GOT:
     223           0 :                         log_fzp_set(sdirs->champlog, confs);
     224             :                         logp("Got champ lock for dedup_group: %s\n",
     225           0 :                                 get_string(confs[OPT_DEDUP_GROUP]));
     226             :                         break;
     227             :                 case GET_LOCK_NOT_GOT:
     228             :                 case GET_LOCK_ERROR:
     229             :                 default:
     230             :                         //logp("Did not get champ lock\n");
     231             :                         goto end;
     232             :         }
     233             : 
     234           0 :         if((s=socket(AF_UNIX, SOCK_STREAM, 0))<0)
     235             :         {
     236           0 :                 logp("socket error in %s: %s\n", __func__, strerror(errno));
     237           0 :                 goto end;
     238             :         }
     239             : 
     240             :         memset(&local, 0, sizeof(struct sockaddr_un));
     241           0 :         local.sun_family=AF_UNIX;
     242             :         snprintf(local.sun_path, sizeof(local.sun_path),
     243           0 :                 "%s", sdirs->champsock);
     244           0 :         len=strlen(local.sun_path)+sizeof(local.sun_family)+1;
     245           0 :         unlink(sdirs->champsock);
     246           0 :         if(bind(s, (struct sockaddr *)&local, len)<0)
     247             :         {
     248           0 :                 logp("bind error in %s: %s\n", __func__, strerror(errno));
     249           0 :                 goto end;
     250             :         }
     251             : 
     252           0 :         if(listen(s, 5)<0)
     253             :         {
     254           0 :                 logp("listen error in %s: %s\n", __func__, strerror(errno));
     255           0 :                 goto end;
     256             :         }
     257             : 
     258           0 :         if(!(as=async_alloc())
     259           0 :           || as->init(as, 0)
     260           0 :           || !(asfd=setup_asfd(as, "champ chooser main socket", &s)))
     261             :                 goto end;
     262           0 :         asfd->fdtype=ASFD_FD_SERVER_LISTEN_MAIN;
     263           0 :         asfd->set_timeout(asfd, get_int(confs[OPT_NETWORK_TIMEOUT]));
     264             : 
     265             :         // I think that this is probably the best point at which to run a
     266             :         // cleanup job to delete unused data files, because no other process
     267             :         // can fiddle with the dedup_group at this point.
     268             :         // Cannot do it on a resume, or it will delete files that are
     269             :         // referenced in the backup we are resuming.
     270           0 :         if(delete_unused_data_files(sdirs, resume))
     271             :                 goto end;
     272             : 
     273             :         // Load the sparse indexes for this dedup group.
     274           0 :         if(!(scores=champ_chooser_init(sdirs->data)))
     275             :                 goto end;
     276             : 
     277             :         while(1)
     278             :         {
     279           0 :                 for(asfd=as->asfd->next; asfd; asfd=asfd->next)
     280             :                 {
     281           0 :                         if(!asfd->blist->head
     282           0 :                           || asfd->blist->head->got==BLK_INCOMING) continue;
     283           0 :                         if(results_to_fd(asfd)) goto end;
     284             :                 }
     285             : 
     286           0 :                 switch(as->read_write(as))
     287             :                 {
     288             :                         case 0:
     289             :                                 // Check the main socket last, as it might add
     290             :                                 // a new client to the list.
     291           0 :                                 for(asfd=as->asfd->next; asfd; asfd=asfd->next)
     292             :                                 {
     293           0 :                                         while(asfd->rbuf->buf)
     294             :                                         {
     295           0 :                                                 if(deal_with_client_rbuf(asfd,
     296           0 :                                                         directory, scores))
     297             :                                                                 goto end;
     298             :                                                 // Get as much out of the
     299             :                                                 // readbuf as possible.
     300           0 :                                                 if(asfd->parse_readbuf(asfd))
     301             :                                                         goto end;
     302             :                                         }
     303             :                                 }
     304           0 :                                 if(as->asfd->new_client)
     305             :                                 {
     306             :                                         // Incoming client.
     307           0 :                                         as->asfd->new_client=0;
     308           0 :                                         if(champ_chooser_new_client(as, confs))
     309             :                                                 goto end;
     310             :                                         started=1;
     311             :                                 }
     312             :                                 break;
     313             :                         default:
     314           0 :                                 int removed=0;
     315             :                                 // Maybe one of the fds had a problem.
     316             :                                 // Find and remove it and carry on if possible.
     317           0 :                                 for(asfd=as->asfd->next; asfd; )
     318             :                                 {
     319             :                                         struct asfd *a;
     320           0 :                                         if(!asfd->want_to_remove)
     321             :                                         {
     322           0 :                                                 asfd=asfd->next;
     323           0 :                                                 continue;
     324             :                                         }
     325           0 :                                         as->asfd_remove(as, asfd);
     326             :                                         logp("%s: disconnected fd %d\n",
     327           0 :                                                 asfd->desc, asfd->fd);
     328           0 :                                         a=asfd->next;
     329           0 :                                         asfd_free(&asfd);
     330           0 :                                         asfd=a;
     331           0 :                                         removed++;
     332             :                                 }
     333           0 :                                 if(removed) break;
     334             :                                 // If we got here, there was no fd to remove.
     335             :                                 // It is a fatal error.
     336             :                                 goto end;
     337             :                 }
     338             :                                 
     339           0 :                 if(started && !as->asfd->next)
     340             :                 {
     341           0 :                         logp("All clients disconnected.\n");
     342           0 :                         ret=0;
     343           0 :                         break;
     344             :                 }
     345             :         }
     346             : 
     347             : end:
     348           0 :         logp("champ chooser exiting: %d\n", ret);
     349           0 :         champ_chooser_free(&scores);
     350           0 :         log_fzp_set(NULL, confs);
     351           0 :         async_free(&as);
     352           0 :         asfd_free(&asfd); // This closes s for us.
     353           0 :         close_fd(&s);
     354           0 :         unlink(sdirs->champsock);
     355             : // FIX THIS: free asfds.
     356           0 :         lock_release(lock);
     357           0 :         lock_free(&lock);
     358           0 :         return ret;
     359             : }
     360             : 
     361             : // The return code of this is the return code of the standalone process.
     362           0 : int champ_chooser_server_standalone(struct conf **globalcs)
     363             : {
     364           0 :         int ret=1;
     365           0 :         struct sdirs *sdirs=NULL;
     366           0 :         struct conf **cconfs=NULL;
     367           0 :         const char *orig_client=get_string(globalcs[OPT_ORIG_CLIENT]);
     368             : 
     369           0 :         if(!(cconfs=confs_alloc()))
     370             :                 goto end;
     371           0 :         confs_init(cconfs);
     372             :         // We need to be given a client name and load the relevant server side
     373             :         // clientconfdir file, because various settings may be overridden
     374             :         // there.
     375           0 :         if(set_string(cconfs[OPT_CNAME], orig_client)
     376           0 :           || conf_load_clientconfdir(globalcs, cconfs)
     377           0 :           || !(sdirs=sdirs_alloc())
     378           0 :           || sdirs_init_from_confs(sdirs, cconfs)
     379           0 :           || champ_chooser_server(sdirs, cconfs, 0 /* resume */))
     380             :                 goto end;
     381           0 :         ret=0;
     382             : end:
     383           0 :         confs_free(&cconfs);
     384           0 :         sdirs_free(&sdirs);
     385           0 :         return ret;
     386             : }

Generated by: LCOV version 1.10