Line data Source code
1 : #include "../../../burp.h"
2 : #include "../../../alloc.h"
3 : #include "../../../asfd.h"
4 : #include "../../../async.h"
5 : #include "../../../cmd.h"
6 : #include "../../../conf.h"
7 : #include "../../../conffile.h"
8 : #include "../../../fsops.h"
9 : #include "../../../handy.h"
10 : #include "../../../iobuf.h"
11 : #include "../../../lock.h"
12 : #include "../../../log.h"
13 : #include "../../../protocol2/blist.h"
14 : #include "../../../protocol2/blk.h"
15 : #include "../../sdirs.h"
16 : #include "candidate.h"
17 : #include "champ_chooser.h"
18 : #include "champ_server.h"
19 : #include "dindex.h"
20 : #include "incoming.h"
21 : #include "scores.h"
22 :
23 : #include <sys/un.h>
24 :
25 0 : static int champ_chooser_new_client(struct async *as, struct conf **confs)
26 : {
27 0 : int fd=-1;
28 : socklen_t t;
29 0 : struct asfd *newfd=NULL;
30 : struct sockaddr_un remote;
31 0 : struct blist *blist=NULL;
32 :
33 0 : t=sizeof(remote);
34 0 : if((fd=accept(as->asfd->fd, (struct sockaddr *)&remote, &t))<0)
35 : {
36 0 : logp("accept error in %s: %s\n", __func__, strerror(errno));
37 0 : goto error;
38 : }
39 :
40 0 : if(!(blist=blist_alloc())
41 0 : || !(newfd=setup_asfd(as, "(unknown)", &fd, /*port*/-1)))
42 : goto error;
43 0 : newfd->blist=blist;
44 0 : newfd->set_timeout(newfd, get_int(confs[OPT_NETWORK_TIMEOUT]));
45 :
46 0 : logp("Connected to fd %d\n", newfd->fd);
47 :
48 0 : return 0;
49 : error:
50 0 : close_fd(&fd);
51 0 : blist_free(&blist);
52 0 : return -1;
53 : }
54 :
55 0 : static int results_to_fd(struct asfd *asfd)
56 : {
57 : static struct iobuf wbuf;
58 : struct blk *b;
59 : struct blk *l;
60 :
61 0 : if(!asfd->blist->last_index) return 0;
62 :
63 : // Need to start writing the results down the fd.
64 0 : for(b=asfd->blist->head; b && b!=asfd->blist->blk_to_dedup; b=l)
65 : {
66 0 : if(b->got==BLK_GOT)
67 : {
68 : // Need to write to fd.
69 0 : blk_to_iobuf_index_and_savepath(b, &wbuf);
70 :
71 0 : switch(asfd->append_all_to_write_buffer(asfd, &wbuf))
72 : {
73 : case APPEND_OK: break;
74 : case APPEND_BLOCKED:
75 0 : asfd->blist->head=b;
76 0 : return 0; // Try again later.
77 : default: return -1;
78 : }
79 : }
80 : else
81 : {
82 : // If the last in the sequence is BLK_NOT_GOT,
83 : // Send a 'wrap_up' message.
84 0 : if(!b->next || b->next==asfd->blist->blk_to_dedup)
85 : {
86 0 : blk_to_iobuf_wrap_up(b, &wbuf);
87 0 : switch(asfd->append_all_to_write_buffer(asfd,
88 : &wbuf))
89 : {
90 : case APPEND_OK: break;
91 : case APPEND_BLOCKED:
92 0 : asfd->blist->head=b;
93 0 : return 0; // Try again later.
94 : default: return -1;
95 : }
96 : }
97 : }
98 0 : l=b->next;
99 0 : blk_free(&b);
100 : }
101 :
102 0 : asfd->blist->head=b;
103 0 : if(!b) asfd->blist->tail=NULL;
104 : return 0;
105 : }
106 :
107 4096 : static int deduplicate_maybe(struct asfd *asfd,
108 : struct blk *blk, const char *directory, struct scores *scores)
109 : {
110 4096 : if(!asfd->in && !(asfd->in=incoming_alloc()))
111 : return -1;
112 :
113 4096 : if(blk_fingerprint_is_hook(blk))
114 : {
115 4096 : if(incoming_grow_maybe(asfd->in))
116 : return -1;
117 4096 : asfd->in->fingerprints[asfd->in->size-1]=blk->fingerprint;
118 : }
119 4096 : if(++(asfd->blkcnt)<MANIFEST_SIG_MAX)
120 : return 0;
121 1 : asfd->blkcnt=0;
122 :
123 1 : if(deduplicate(asfd, directory, scores)<0)
124 : return -1;
125 :
126 1 : return 0;
127 : }
128 :
129 : #ifndef UTEST
130 : static
131 : #endif
132 4097 : int champ_server_deal_with_rbuf_sig(struct asfd *asfd,
133 : const char *directory, struct scores *scores)
134 : {
135 : struct blk *blk;
136 4097 : if(!(blk=blk_alloc())) return -1;
137 :
138 4097 : blist_add_blk(asfd->blist, blk);
139 :
140 4097 : if(!asfd->blist->blk_to_dedup)
141 0 : asfd->blist->blk_to_dedup=blk;
142 :
143 4097 : if(blk_set_from_iobuf_sig(blk, asfd->rbuf))
144 : return -1;
145 :
146 : //logp("Got fingerprint from %d: %lu - %lu\n",
147 : // asfd->fd, blk->index, blk->fingerprint);
148 :
149 4096 : return deduplicate_maybe(asfd, blk, directory, scores);
150 : }
151 :
152 0 : static int deal_with_client_rbuf(struct asfd *asfd, const char *directory,
153 : struct scores *scores)
154 : {
155 0 : if(asfd->rbuf->cmd==CMD_GEN)
156 : {
157 0 : if(!strncmp_w(asfd->rbuf->buf, "cname:"))
158 : {
159 : struct iobuf wbuf;
160 0 : free_w(&asfd->desc);
161 0 : if(!(asfd->desc=strdup_w(asfd->rbuf->buf
162 0 : +strlen("cname:"), __func__)))
163 : goto error;
164 0 : logp("%s: fd %d\n", asfd->desc, asfd->fd);
165 0 : iobuf_set(&wbuf, CMD_GEN,
166 : (char *)"cname ok", strlen("cname ok"));
167 :
168 0 : if(asfd->write(asfd, &wbuf))
169 : goto error;
170 : }
171 0 : else if(!strncmp_w(asfd->rbuf->buf, "sigs_end"))
172 : {
173 : //printf("Was told no more sigs\n");
174 0 : if(deduplicate(asfd, directory, scores)<0)
175 : goto error;
176 : }
177 : else
178 : {
179 0 : iobuf_log_unexpected(asfd->rbuf, __func__);
180 0 : goto error;
181 : }
182 : }
183 0 : else if(asfd->rbuf->cmd==CMD_SIG)
184 : {
185 0 : if(champ_server_deal_with_rbuf_sig(asfd, directory, scores))
186 : goto error;
187 : }
188 0 : else if(asfd->rbuf->cmd==CMD_MANIFEST)
189 : {
190 : // Client has completed a manifest file. Want to start using
191 : // it as a dedup candidate now.
192 0 : if(candidate_add_fresh(asfd->rbuf->buf, directory, scores))
193 : goto error;
194 : }
195 : else
196 : {
197 0 : iobuf_log_unexpected(asfd->rbuf, __func__);
198 0 : goto error;
199 : }
200 0 : iobuf_free_content(asfd->rbuf);
201 0 : return 0;
202 : error:
203 0 : iobuf_free_content(asfd->rbuf);
204 0 : return -1;
205 : }
206 :
207 0 : int champ_chooser_server(struct sdirs *sdirs, struct conf **confs,
208 : int resume)
209 : {
210 : int s;
211 0 : int ret=-1;
212 : int len;
213 0 : struct asfd *asfd=NULL;
214 : struct sockaddr_un local;
215 0 : struct lock *lock=NULL;
216 0 : struct async *as=NULL;
217 0 : int started=0;
218 0 : struct scores *scores=NULL;
219 0 : const char *directory=get_string(confs[OPT_DIRECTORY]);
220 :
221 0 : if(!(lock=lock_alloc_and_init(sdirs->champlock))
222 0 : || build_path_w(sdirs->champlock))
223 : goto end;
224 0 : lock_get(lock);
225 0 : switch(lock->status)
226 : {
227 : case GET_LOCK_GOT:
228 0 : log_fzp_set(sdirs->champlog, confs);
229 0 : logp("Got champ lock for dedup_group: %s\n",
230 : get_string(confs[OPT_DEDUP_GROUP]));
231 : break;
232 : case GET_LOCK_NOT_GOT:
233 : case GET_LOCK_ERROR:
234 : default:
235 : //logp("Did not get champ lock\n");
236 : goto end;
237 : }
238 :
239 0 : if((s=socket(AF_UNIX, SOCK_STREAM, 0))<0)
240 : {
241 0 : logp("socket error in %s: %s\n", __func__, strerror(errno));
242 0 : goto end;
243 : }
244 :
245 0 : memset(&local, 0, sizeof(struct sockaddr_un));
246 0 : local.sun_family=AF_UNIX;
247 0 : snprintf(local.sun_path, sizeof(local.sun_path),
248 : "%s", sdirs->champsock);
249 0 : len=strlen(local.sun_path)+sizeof(local.sun_family)+1;
250 0 : unlink(sdirs->champsock);
251 0 : if(bind(s, (struct sockaddr *)&local, len)<0)
252 : {
253 0 : logp("bind error in %s: %s\n", __func__, strerror(errno));
254 0 : goto end;
255 : }
256 :
257 0 : if(listen(s, 5)<0)
258 : {
259 0 : logp("listen error in %s: %s\n", __func__, strerror(errno));
260 0 : goto end;
261 : }
262 :
263 0 : if(!(as=async_alloc())
264 0 : || as->init(as, 0)
265 0 : || !(asfd=setup_asfd(as, "champ chooser main socket", &s,
266 : /*port*/-1)))
267 : goto end;
268 0 : asfd->fdtype=ASFD_FD_SERVER_LISTEN_MAIN;
269 :
270 : // I think that this is probably the best point at which to run a
271 : // cleanup job to delete unused data files, because no other process
272 : // can fiddle with the dedup_group at this point.
273 : // Cannot do it on a resume, or it will delete files that are
274 : // referenced in the backup we are resuming.
275 0 : if(delete_unused_data_files(sdirs, resume))
276 : goto end;
277 :
278 : // Load the sparse indexes for this dedup group.
279 0 : if(!(scores=champ_chooser_init(sdirs->data)))
280 : goto end;
281 :
282 : while(1)
283 : {
284 0 : for(asfd=as->asfd->next; asfd; asfd=asfd->next)
285 : {
286 0 : if(!asfd->blist->head
287 0 : || asfd->blist->head->got==BLK_INCOMING) continue;
288 0 : if(results_to_fd(asfd)) goto end;
289 : }
290 :
291 : int removed;
292 :
293 0 : switch(as->read_write(as))
294 : {
295 : case 0:
296 : // Check the main socket last, as it might add
297 : // a new client to the list.
298 0 : for(asfd=as->asfd->next; asfd; asfd=asfd->next)
299 : {
300 0 : while(asfd->rbuf->buf)
301 : {
302 0 : if(deal_with_client_rbuf(asfd,
303 : directory, scores))
304 : goto end;
305 : // Get as much out of the
306 : // readbuf as possible.
307 0 : if(asfd->parse_readbuf(asfd))
308 : goto end;
309 : }
310 : }
311 0 : if(as->asfd->new_client)
312 : {
313 : // Incoming client.
314 0 : as->asfd->new_client=0;
315 0 : if(champ_chooser_new_client(as, confs))
316 : goto end;
317 : started=1;
318 : }
319 : break;
320 : default:
321 0 : removed=0;
322 : // Maybe one of the fds had a problem.
323 : // Find and remove it and carry on if possible.
324 0 : for(asfd=as->asfd->next; asfd; )
325 : {
326 : struct asfd *a;
327 0 : if(!asfd->want_to_remove)
328 : {
329 0 : asfd=asfd->next;
330 0 : continue;
331 : }
332 0 : as->asfd_remove(as, asfd);
333 0 : logp("%s: disconnected fd %d\n",
334 0 : asfd->desc, asfd->fd);
335 0 : a=asfd->next;
336 0 : asfd_free(&asfd);
337 0 : asfd=a;
338 0 : removed++;
339 : }
340 0 : if(removed) break;
341 : // If we got here, there was no fd to remove.
342 : // It is a fatal error.
343 : goto end;
344 : }
345 :
346 0 : if(started && !as->asfd->next)
347 : {
348 0 : logp("All clients disconnected.\n");
349 0 : ret=0;
350 0 : break;
351 : }
352 : }
353 :
354 : end:
355 0 : logp("champ chooser exiting: %d\n", ret);
356 0 : champ_chooser_free(&scores);
357 0 : log_fzp_set(NULL, confs);
358 0 : async_free(&as);
359 0 : asfd_free(&asfd); // This closes s for us.
360 0 : close_fd(&s);
361 0 : unlink(sdirs->champsock);
362 : // FIX THIS: free asfds.
363 0 : lock_release(lock);
364 0 : lock_free(&lock);
365 0 : return ret;
366 : }
367 :
368 : // The return code of this is the return code of the standalone process.
369 0 : int champ_chooser_server_standalone(struct conf **globalcs)
370 : {
371 0 : int ret=1;
372 0 : struct sdirs *sdirs=NULL;
373 0 : struct conf **cconfs=NULL;
374 0 : const char *orig_client=get_string(globalcs[OPT_ORIG_CLIENT]);
375 :
376 0 : if(!(cconfs=confs_alloc()))
377 : goto end;
378 0 : confs_init(cconfs);
379 : // We need to be given a client name and load the relevant server side
380 : // clientconfdir file, because various settings may be overridden
381 : // there.
382 0 : if(set_string(cconfs[OPT_CNAME], orig_client)
383 0 : || conf_load_clientconfdir(globalcs, cconfs)
384 0 : || !(sdirs=sdirs_alloc())
385 0 : || sdirs_init_from_confs(sdirs, cconfs)
386 0 : || champ_chooser_server(sdirs, cconfs, 0 /* resume */))
387 : goto end;
388 0 : ret=0;
389 : end:
390 0 : confs_free(&cconfs);
391 0 : sdirs_free(&sdirs);
392 0 : return ret;
393 : }
|