Line data Source code
1 : #include "../../../burp.h"
2 : #include "../../../alloc.h"
3 : #include "../../../asfd.h"
4 : #include "../../../async.h"
5 : #include "../../../cmd.h"
6 : #include "../../../conf.h"
7 : #include "../../../conffile.h"
8 : #include "../../../fsops.h"
9 : #include "../../../handy.h"
10 : #include "../../../iobuf.h"
11 : #include "../../../lock.h"
12 : #include "../../../log.h"
13 : #include "../../../protocol2/blist.h"
14 : #include "../../../protocol2/blk.h"
15 : #include "../../sdirs.h"
16 : #include "candidate.h"
17 : #include "champ_chooser.h"
18 : #include "champ_server.h"
19 : #include "dindex.h"
20 : #include "incoming.h"
21 : #include "scores.h"
22 :
23 : #include <sys/un.h>
24 :
25 : // FIX THIS: test error conditions.
26 0 : static int champ_chooser_new_client(struct async *as, struct conf **confs)
27 : {
28 : socklen_t t;
29 0 : int fd=-1;
30 0 : struct asfd *newfd=NULL;
31 : struct sockaddr_un remote;
32 :
33 0 : t=sizeof(remote);
34 0 : if((fd=accept(as->asfd->fd, (struct sockaddr *)&remote, &t))<0)
35 : {
36 0 : logp("accept error in %s: %s\n", __func__, strerror(errno));
37 0 : goto error;
38 : }
39 0 : set_non_blocking(fd);
40 :
41 0 : if(!(newfd=asfd_alloc())
42 0 : || newfd->init(newfd, "(unknown)", as, fd, NULL,
43 0 : ASFD_STREAM_STANDARD, confs)
44 0 : || !(newfd->blist=blist_alloc()))
45 0 : goto error;
46 0 : as->asfd_add(as, newfd);
47 :
48 0 : logp("Connected to fd %d\n", newfd->fd);
49 :
50 0 : return 0;
51 : error:
52 0 : asfd_free(&newfd);
53 0 : return -1;
54 : }
55 :
56 0 : static int results_to_fd(struct asfd *asfd)
57 : {
58 : static struct iobuf wbuf;
59 : struct blk *b;
60 : struct blk *l;
61 :
62 0 : if(!asfd->blist->last_index) return 0;
63 :
64 : // Need to start writing the results down the fd.
65 0 : for(b=asfd->blist->head; b && b!=asfd->blist->blk_to_dedup; b=l)
66 : {
67 0 : if(b->got==BLK_GOT)
68 : {
69 : // Need to write to fd.
70 0 : blk_to_iobuf_index_and_savepath(b, &wbuf);
71 :
72 0 : switch(asfd->append_all_to_write_buffer(asfd, &wbuf))
73 : {
74 0 : case APPEND_OK: break;
75 : case APPEND_BLOCKED:
76 0 : asfd->blist->head=b;
77 0 : return 0; // Try again later.
78 0 : default: return -1;
79 : }
80 : }
81 : else
82 : {
83 : // If the last in the sequence is BLK_NOT_GOT,
84 : // Send a 'wrap_up' message.
85 0 : if(!b->next || b->next==asfd->blist->blk_to_dedup)
86 : {
87 0 : blk_to_iobuf_wrap_up(b, &wbuf);
88 0 : switch(asfd->append_all_to_write_buffer(asfd,
89 : &wbuf))
90 : {
91 0 : case APPEND_OK: break;
92 : case APPEND_BLOCKED:
93 0 : asfd->blist->head=b;
94 0 : return 0; // Try again later.
95 0 : default: return -1;
96 : }
97 : }
98 : }
99 0 : l=b->next;
100 0 : blk_free(&b);
101 : }
102 :
103 0 : asfd->blist->head=b;
104 0 : if(!b) asfd->blist->tail=NULL;
105 0 : return 0;
106 : }
107 :
108 0 : static int deduplicate_maybe(struct asfd *asfd,
109 : struct blk *blk, const char *directory, struct scores *scores)
110 : {
111 0 : if(!asfd->in && !(asfd->in=incoming_alloc())) return -1;
112 :
113 0 : if(blk_fingerprint_is_hook(blk))
114 : {
115 0 : if(incoming_grow_maybe(asfd->in)) return -1;
116 0 : asfd->in->fingerprints[asfd->in->size-1]=blk->fingerprint;
117 : }
118 0 : if(++(asfd->blkcnt)<MANIFEST_SIG_MAX) return 0;
119 0 : asfd->blkcnt=0;
120 :
121 0 : if(deduplicate(asfd, directory, scores)<0)
122 0 : return -1;
123 :
124 0 : return 0;
125 : }
126 :
127 0 : static int deal_with_rbuf_sig(struct asfd *asfd, const char *directory,
128 : struct scores *scores)
129 : {
130 : struct blk *blk;
131 0 : if(!(blk=blk_alloc())) return -1;
132 :
133 0 : blist_add_blk(asfd->blist, blk);
134 :
135 0 : if(!asfd->blist->blk_to_dedup) asfd->blist->blk_to_dedup=blk;
136 :
137 0 : if(blk_set_from_iobuf_sig(blk, asfd->rbuf)) return -1;
138 :
139 : //printf("Got weak/strong from %d: %lu - %s %s\n",
140 : // asfd->fd, blk->index, blk->weak, blk->strong);
141 :
142 0 : return deduplicate_maybe(asfd, blk, directory, scores);
143 : }
144 :
145 0 : static int deal_with_client_rbuf(struct asfd *asfd, const char *directory,
146 : struct scores *scores)
147 : {
148 0 : if(asfd->rbuf->cmd==CMD_GEN)
149 : {
150 0 : if(!strncmp_w(asfd->rbuf->buf, "cname:"))
151 : {
152 : struct iobuf wbuf;
153 0 : free_w(&asfd->desc);
154 0 : if(!(asfd->desc=strdup_w(asfd->rbuf->buf
155 0 : +strlen("cname:"), __func__)))
156 0 : goto error;
157 0 : logp("%s: fd %d\n", asfd->desc, asfd->fd);
158 : iobuf_set(&wbuf, CMD_GEN,
159 0 : (char *)"cname ok", strlen("cname ok"));
160 :
161 0 : if(asfd->write(asfd, &wbuf))
162 0 : goto error;
163 : }
164 0 : else if(!strncmp_w(asfd->rbuf->buf, "sigs_end"))
165 : {
166 : //printf("Was told no more sigs\n");
167 0 : if(deduplicate(asfd, directory, scores)<0)
168 0 : goto error;
169 : }
170 : else
171 : {
172 0 : iobuf_log_unexpected(asfd->rbuf, __func__);
173 0 : goto error;
174 : }
175 : }
176 0 : else if(asfd->rbuf->cmd==CMD_SIG)
177 : {
178 0 : if(deal_with_rbuf_sig(asfd, directory, scores))
179 0 : goto error;
180 : }
181 0 : else if(asfd->rbuf->cmd==CMD_MANIFEST)
182 : {
183 : // Client has completed a manifest file. Want to start using
184 : // it as a dedup candidate now.
185 0 : if(candidate_add_fresh(asfd->rbuf->buf, directory, scores))
186 0 : goto error;
187 : }
188 : else
189 : {
190 0 : iobuf_log_unexpected(asfd->rbuf, __func__);
191 0 : goto error;
192 : }
193 0 : iobuf_free_content(asfd->rbuf);
194 0 : return 0;
195 : error:
196 0 : iobuf_free_content(asfd->rbuf);
197 0 : return -1;
198 : }
199 :
200 0 : int champ_chooser_server(struct sdirs *sdirs, struct conf **confs)
201 : {
202 : int s;
203 0 : int ret=-1;
204 : int len;
205 0 : struct asfd *asfd=NULL;
206 : struct sockaddr_un local;
207 0 : struct lock *lock=NULL;
208 0 : struct async *as=NULL;
209 0 : int started=0;
210 0 : struct scores *scores=NULL;
211 0 : const char *directory=get_string(confs[OPT_DIRECTORY]);
212 :
213 0 : if(!(lock=lock_alloc_and_init(sdirs->champlock))
214 0 : || build_path_w(sdirs->champlock))
215 0 : goto end;
216 0 : lock_get(lock);
217 0 : switch(lock->status)
218 : {
219 : case GET_LOCK_GOT:
220 0 : log_fzp_set(sdirs->champlog, confs);
221 : logp("Got champ lock for dedup_group: %s\n",
222 0 : get_string(confs[OPT_DEDUP_GROUP]));
223 0 : break;
224 : case GET_LOCK_NOT_GOT:
225 : case GET_LOCK_ERROR:
226 : default:
227 : //logp("Did not get champ lock\n");
228 0 : goto end;
229 : }
230 :
231 0 : unlink(local.sun_path);
232 0 : if((s=socket(AF_UNIX, SOCK_STREAM, 0))<0)
233 : {
234 0 : logp("socket error in %s: %s\n", __func__, strerror(errno));
235 0 : goto end;
236 : }
237 :
238 0 : memset(&local, 0, sizeof(struct sockaddr_un));
239 0 : local.sun_family=AF_UNIX;
240 0 : strcpy(local.sun_path, sdirs->champsock);
241 0 : len=strlen(local.sun_path)+sizeof(local.sun_family);
242 0 : unlink(sdirs->champsock);
243 0 : if(bind(s, (struct sockaddr *)&local, len)<0)
244 : {
245 0 : logp("bind error in %s: %s\n", __func__, strerror(errno));
246 0 : goto end;
247 : }
248 :
249 0 : if(listen(s, 5)<0)
250 : {
251 0 : logp("listen error in %s: %s\n", __func__, strerror(errno));
252 0 : goto end;
253 : }
254 0 : set_non_blocking(s);
255 :
256 0 : if(!(as=async_alloc())
257 0 : || !(asfd=asfd_alloc())
258 0 : || as->init(as, 0)
259 0 : || asfd->init(asfd, "champ chooser main socket", as, s, NULL,
260 0 : ASFD_STREAM_STANDARD, confs))
261 0 : goto end;
262 0 : as->asfd_add(as, asfd);
263 0 : asfd->fdtype=ASFD_FD_SERVER_LISTEN_MAIN;
264 :
265 : // I think that this is probably the best point at which to run a
266 : // cleanup job to delete unused data files, because no other process
267 : // can fiddle with the dedup_group at this point.
268 0 : if(delete_unused_data_files(sdirs))
269 0 : goto end;
270 :
271 : // Load the sparse indexes for this dedup group.
272 0 : if(!(scores=champ_chooser_init(sdirs->data)))
273 0 : goto end;
274 :
275 : while(1)
276 : {
277 0 : for(asfd=as->asfd->next; asfd; asfd=asfd->next)
278 : {
279 0 : if(!asfd->blist->head
280 0 : || asfd->blist->head->got==BLK_INCOMING) continue;
281 0 : if(results_to_fd(asfd)) goto end;
282 : }
283 :
284 0 : switch(as->read_write(as))
285 : {
286 : case 0:
287 : // Check the main socket last, as it might add
288 : // a new client to the list.
289 0 : for(asfd=as->asfd->next; asfd; asfd=asfd->next)
290 : {
291 0 : while(asfd->rbuf->buf)
292 : {
293 0 : if(deal_with_client_rbuf(asfd,
294 0 : directory, scores))
295 0 : goto end;
296 : // Get as much out of the
297 : // readbuf as possible.
298 0 : if(asfd->parse_readbuf(asfd))
299 0 : goto end;
300 : }
301 : }
302 0 : if(as->asfd->new_client)
303 : {
304 : // Incoming client.
305 0 : as->asfd->new_client=0;
306 0 : if(champ_chooser_new_client(as, confs))
307 0 : goto end;
308 0 : started=1;
309 : }
310 0 : break;
311 : default:
312 0 : int removed=0;
313 : // Maybe one of the fds had a problem.
314 : // Find and remove it and carry on if possible.
315 0 : for(asfd=as->asfd->next; asfd; )
316 : {
317 : struct asfd *a;
318 0 : if(!asfd->want_to_remove)
319 : {
320 0 : asfd=asfd->next;
321 0 : continue;
322 : }
323 0 : as->asfd_remove(as, asfd);
324 : logp("%s: disconnected fd %d\n",
325 0 : asfd->desc, asfd->fd);
326 0 : a=asfd->next;
327 0 : asfd_free(&asfd);
328 0 : asfd=a;
329 0 : removed++;
330 : }
331 0 : if(removed) break;
332 : // If we got here, there was no fd to remove.
333 : // It is a fatal error.
334 0 : goto end;
335 : }
336 :
337 0 : if(started && !as->asfd->next)
338 : {
339 0 : logp("All clients disconnected.\n");
340 0 : ret=0;
341 0 : break;
342 : }
343 : }
344 :
345 : end:
346 0 : logp("champ chooser exiting: %d\n", ret);
347 0 : log_fzp_set(NULL, confs);
348 0 : async_free(&as);
349 0 : asfd_free(&asfd); // This closes s for us.
350 0 : close_fd(&s);
351 0 : unlink(sdirs->champsock);
352 : // FIX THIS: free asfds.
353 0 : lock_release(lock);
354 0 : lock_free(&lock);
355 0 : scores_free(&scores);
356 0 : return ret;
357 : }
358 :
359 : // The return code of this is the return code of the standalone process.
360 0 : int champ_chooser_server_standalone(struct conf **globalcs)
361 : {
362 0 : int ret=1;
363 0 : struct sdirs *sdirs=NULL;
364 0 : struct conf **cconfs=NULL;
365 0 : const char *orig_client=get_string(globalcs[OPT_ORIG_CLIENT]);
366 :
367 0 : if(!(cconfs=confs_alloc()))
368 0 : goto end;
369 0 : confs_init(cconfs);
370 : // We need to be given a client name and load the relevant server side
371 : // clientconfdir file, because various settings may be overridden
372 : // there.
373 0 : if(set_string(cconfs[OPT_CNAME], orig_client)
374 0 : || conf_load_clientconfdir(globalcs, cconfs)
375 0 : || !(sdirs=sdirs_alloc())
376 0 : || sdirs_init_from_confs(sdirs, cconfs)
377 0 : || champ_chooser_server(sdirs, cconfs))
378 0 : goto end;
379 0 : ret=0;
380 : end:
381 0 : confs_free(&cconfs);
382 0 : sdirs_free(&sdirs);
383 0 : return ret;
384 : }
|