Line data Source code
1 : #include "../../burp.h"
2 : #include "../../alloc.h"
3 : #include "../../asfd.h"
4 : #include "../../async.h"
5 : #include "../../attribs.h"
6 : #include "../../base64.h"
7 : #include "../../cmd.h"
8 : #include "../../cntr.h"
9 : #include "../../cstat.h"
10 : #include "../../handy.h"
11 : #include "../../hexmap.h"
12 : #include "../../iobuf.h"
13 : #include "../../log.h"
14 : #include "../../server/manio.h"
15 : #include "../../protocol2/blist.h"
16 : #include "../../protocol2/rabin/rabin.h"
17 : #include "../../slist.h"
18 : #include "../child.h"
19 : #include "../manios.h"
20 : #include "../resume.h"
21 : #include "champ_chooser/champ_client.h"
22 : #include "champ_chooser/champ_server.h"
23 : #include "champ_chooser/hash.h"
24 : #include "dpth.h"
25 : #include "backup_phase2.h"
26 :
27 : #define END_SIGS 0x01
28 : #define END_BACKUP 0x02
29 : #define END_REQUESTS 0x04
30 : #define END_BLK_REQUESTS 0x08
31 :
32 : #define BLKS_MAX_UNREQUESTED BLKS_MAX_IN_MEM/4
33 : // This needs to be greater than BLKS_MAX_UNREQUESTED
34 : #define SLIST_MAX_IN_MEM BLKS_MAX_IN_MEM
35 :
36 : static int breaking=0;
37 : static int breakcount=0;
38 :
39 : static int data_needed(struct sbuf *sb)
40 : {
41 33720 : if(sb->path.cmd==CMD_FILE)
42 : return 1;
43 16674 : if(sb->path.cmd==CMD_METADATA)
44 : return 1;
45 : return 0;
46 : }
47 :
48 : static int manio_component_to_chfd(struct asfd *chfd, char *path)
49 : {
50 : struct iobuf wbuf;
51 4 : iobuf_from_str(&wbuf, CMD_MANIFEST, path);
52 4 : return chfd->write(chfd, &wbuf);
53 : }
54 :
55 0 : static int unchanged(struct sbuf *csb, struct sbuf *sb,
56 : struct manios *manios, struct asfd *chfd)
57 : {
58 0 : int ret=-1;
59 0 : char *fpath=NULL;
60 0 : if(!(fpath=strdup_w(manios->changed->offset->fpath, __func__)))
61 : goto end;
62 0 : if(manio_copy_entry(csb, sb,
63 : manios->current, manios->unchanged,
64 : /*seed_src*/NULL, /*seed_dst*/NULL)<0)
65 : goto end;
66 0 : if(strcmp(fpath, manios->changed->offset->fpath))
67 : {
68 : // If the copy crossed a manio boundary, we should tell the
69 : // champ server to load the previous one as a candidate.
70 0 : if(manio_component_to_chfd(chfd, fpath))
71 : goto end;
72 : }
73 : ret=0;
74 : end:
75 0 : free_w(&fpath);
76 0 : return ret;
77 : }
78 :
79 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
80 0 : static int found_in_current_manifest(struct sbuf *csb, struct sbuf *sb,
81 : struct manios *manios, struct asfd *chfd,
82 : struct cntr *cntr)
83 : {
84 : // Located the entry in the current manifest.
85 : // If the file type changed, I think it is time to back it up again
86 : // (for example, EFS changing to normal file, or back again).
87 0 : if(csb->path.cmd!=sb->path.cmd)
88 : {
89 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
90 : return -1;
91 0 : return 1;
92 : }
93 :
94 : // mtime is the actual file data.
95 : // ctime is the attributes or meta data.
96 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
97 0 : && csb->statp.st_ctime==sb->statp.st_ctime)
98 : {
99 : // Got an unchanged file.
100 0 : cntr_add_same(cntr, sb->path.cmd);
101 : // No actual data saved - goes in counters_n.
102 0 : if(manio_write_cntr(manios->counters_n, sb, CNTR_MANIO_SAME))
103 : return -1;
104 0 : return unchanged(csb, sb, manios, chfd);
105 : }
106 :
107 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
108 0 : && csb->statp.st_ctime!=sb->statp.st_ctime)
109 : {
110 : // FIX THIS:
111 : // File data stayed the same, but attributes or meta data
112 : // changed. We already have the attributes, but may need to
113 : // get extra meta data.
114 0 : cntr_add_same(cntr, sb->path.cmd);
115 : // No actual data saved - goes in counters_n.
116 0 : if(manio_write_cntr(manios->counters_n, sb, CNTR_MANIO_SAME))
117 : return -1;
118 0 : return unchanged(csb, sb, manios, chfd);
119 : }
120 :
121 : // File data changed.
122 0 : cntr_add_changed(cntr, sb->path.cmd);
123 : // Data has been saved - goes in counters_d, or the counters
124 : // will be out of sequence.
125 0 : if(manio_write_cntr(manios->counters_d, sb, CNTR_MANIO_CHANGED))
126 : return -1;
127 :
128 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
129 : return -1;
130 0 : return 1;
131 : }
132 :
133 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
134 33720 : static int entry_changed(struct sbuf *sb,
135 : struct manios *manios, struct asfd *chfd, struct sbuf **csb,
136 : struct cntr *cntr)
137 : {
138 : static int finished=0;
139 : int pcmp;
140 :
141 33720 : if(finished)
142 : {
143 : // Data has been saved - goes in counters_d, or the counters
144 : // will be out of sequence.
145 33710 : if(manio_write_cntr(manios->counters_d, sb, CNTR_MANIO_NEW))
146 : return -1;
147 33710 : cntr_add_new(cntr, sb->path.cmd);
148 33710 : return 1;
149 : }
150 :
151 10 : if(*csb && (*csb)->path.buf)
152 : {
153 : // Already have an entry.
154 : }
155 : else
156 : {
157 : // Need to read another.
158 10 : switch(manio_read(manios->current, *csb))
159 : {
160 : case 1: // Reached the end.
161 10 : sbuf_free(csb);
162 10 : finished=1;
163 10 : cntr_add_new(cntr, sb->path.cmd);
164 : // Data has been saved - goes in counters_d,
165 : // or the counters will be out of sequence.
166 10 : if(manio_write_cntr(manios->counters_d,
167 : sb, CNTR_MANIO_NEW)) return -1;
168 10 : return 1;
169 : case -1: return -1;
170 : }
171 0 : if(!(*csb)->path.buf)
172 : {
173 0 : logp("Should have a path at this point, but do not, in %s\n", __func__);
174 0 : return -1;
175 : }
176 : // Got an entry.
177 : }
178 :
179 : while(1)
180 : {
181 0 : if(!(pcmp=sbuf_pathcmp(*csb, sb)))
182 0 : return found_in_current_manifest(*csb, sb,
183 : manios, chfd, cntr);
184 0 : else if(pcmp>0)
185 : {
186 0 : cntr_add_new(cntr, sb->path.cmd);
187 : // Data has been saved - goes in counters_d,
188 : // or the counters will be out of sequence.
189 0 : if(manio_write_cntr(manios->counters_d,
190 : sb, CNTR_MANIO_NEW)) return -1;
191 0 : return 1;
192 : }
193 : // cntr_add_deleted(cntr, (*csb)->path.cmd);
194 : // Behind - need to read more data from the old manifest.
195 0 : switch(manio_read(manios->current, *csb))
196 : {
197 : case 1: // Reached the end.
198 0 : sbuf_free(csb);
199 0 : cntr_add_new(cntr, sb->path.cmd);
200 : // Data has been saved - goes in counters_d,
201 : // or the counters will be out of sequence.
202 0 : if(manio_write_cntr(manios->counters_d,
203 : sb, CNTR_MANIO_NEW)) return -1;
204 0 : return 1;
205 : case -1: return -1;
206 : }
207 : // Got something, go back around the loop.
208 : }
209 :
210 : return 0;
211 : }
212 :
213 8523 : static int add_data_to_store(struct cntr *cntr,
214 : struct slist *slist, struct iobuf *rbuf, struct dpth *dpth)
215 : {
216 : static struct blk *blk=NULL;
217 :
218 : // Find the first one in the list that was requested.
219 : // FIX THIS: Going up the list here, and then later
220 : // when writing to the manifest is not efficient.
221 17046 : for(blk=slist->blist->head;
222 8522 : blk && (!blk->requested || blk->got==BLK_GOT); blk=blk->next)
223 : {
224 : // logp("try: %d %d\n", blk->index, blk->got);
225 : }
226 8523 : if(!blk)
227 : {
228 1 : logp("Received data but could not find next requested block.\n");
229 1 : if(!slist->blist->head)
230 1 : logp("and slist->blist->head is null\n");
231 : else
232 0 : logp("head index: %" PRIu64 "\n", slist->blist->head->index);
233 : return -1;
234 : }
235 :
236 : // FIX THIS
237 : #ifndef UTEST
238 : if(blk_verify(blk->fingerprint, blk->md5sum, rbuf->buf, rbuf->len)<=0)
239 : {
240 : logp("ERROR: Block %" PRIu64 " from client did not verify.\n",
241 : blk->index);
242 : return -1;
243 : }
244 : #endif
245 :
246 : // Add it to the data store straight away.
247 8522 : if(dpth_protocol2_fwrite(dpth, rbuf, blk)) return -1;
248 :
249 8522 : cntr_add(cntr, CMD_DATA, 0);
250 :
251 8522 : blk->got=BLK_GOT;
252 8522 : blk=blk->next;
253 :
254 : return 0;
255 : }
256 :
257 17034 : static int set_up_for_sig_info(struct slist *slist, struct iobuf *attr,
258 : uint64_t index)
259 : {
260 : struct sbuf *sb;
261 :
262 50701 : for(sb=slist->add_sigs_here; sb; sb=sb->next)
263 : {
264 50701 : if(!sb->protocol2->index)
265 16642 : continue;
266 34059 : if(index==sb->protocol2->index)
267 : break;
268 : }
269 17034 : if(!sb)
270 : {
271 0 : logp("Could not find %" PRIu64 " in request list\n", index);
272 : return -1;
273 : }
274 : // Replace the attribs with the more recent values.
275 17034 : iobuf_free_content(&sb->attr);
276 17034 : iobuf_move(&sb->attr, attr);
277 :
278 : // Mark the end of the previous file.
279 17034 : slist->add_sigs_here->protocol2->bend=slist->blist->tail;
280 :
281 17034 : slist->add_sigs_here=sb;
282 :
283 : // Incoming sigs now need to get added to 'add_sigs_here'
284 : return 0;
285 : }
286 :
287 : /*
288 : static void dump_blks(const char *msg, struct blk *b)
289 : {
290 : struct blk *xx;
291 : for(xx=b; xx; xx=xx->next)
292 : printf("%s: %d %d %p\n", msg, xx->index, xx->got, xx);
293 : }
294 : */
295 :
296 25058 : static int add_to_sig_list(struct slist *slist, struct iobuf *rbuf)
297 : {
298 : // Goes on slist->add_sigs_here
299 : struct blk *blk;
300 : struct protocol2 *protocol2;
301 :
302 25058 : if(!(blk=blk_alloc())) return -1;
303 25058 : blist_add_blk(slist->blist, blk);
304 :
305 25058 : protocol2=slist->add_sigs_here->protocol2;
306 25058 : if(!protocol2->bstart) protocol2->bstart=blk;
307 25058 : if(!protocol2->bsighead) protocol2->bsighead=blk;
308 :
309 25058 : if(blk_set_from_iobuf_sig(blk, rbuf)) return -1;
310 :
311 : // Need to send sigs to champ chooser, therefore need to point
312 : // to the oldest unsent one if nothing is pointed to yet.
313 25058 : if(!slist->blist->blk_for_champ_chooser)
314 16490 : slist->blist->blk_for_champ_chooser=blk;
315 :
316 : return 0;
317 : }
318 :
319 50647 : static int deal_with_read(struct iobuf *rbuf, struct slist *slist,
320 : struct cntr *cntr, uint8_t *end_flags, struct dpth *dpth)
321 : {
322 50647 : int ret=0;
323 : static struct iobuf attr;
324 : static uint64_t index;
325 :
326 50647 : switch(rbuf->cmd)
327 : {
328 : /* Incoming block data. */
329 : case CMD_DATA:
330 8523 : if(add_data_to_store(cntr, slist, rbuf, dpth))
331 : goto error;
332 : goto end;
333 :
334 : /* Incoming block signatures. */
335 : case CMD_ATTRIBS_SIGS:
336 :
337 17034 : iobuf_init(&attr);
338 17034 : iobuf_move(&attr, rbuf);
339 17034 : index=decode_file_no(&attr);
340 :
341 : // Need to go through slist to find the matching
342 : // entry.
343 17034 : if(set_up_for_sig_info(slist, &attr, index))
344 : goto error;
345 : return 0;
346 : case CMD_SIG:
347 25058 : if(add_to_sig_list(slist, rbuf))
348 : goto error;
349 : goto end;
350 :
351 : /* Incoming control/message stuff. */
352 : case CMD_MESSAGE:
353 : case CMD_WARNING:
354 : {
355 6 : log_recvd(rbuf, cntr, 0);
356 6 : goto end;
357 : }
358 : case CMD_GEN:
359 25 : if(!strcmp(rbuf->buf, "sigs_end"))
360 : {
361 13 : (*end_flags)|=END_SIGS;
362 13 : goto end;
363 : }
364 12 : else if(!strcmp(rbuf->buf, "backup_end"))
365 : {
366 12 : (*end_flags)|=END_BACKUP;
367 12 : goto end;
368 : }
369 : break;
370 : case CMD_INTERRUPT:
371 : {
372 : uint64_t file_no;
373 1 : file_no=base64_to_uint64(rbuf->buf);
374 1 : if(slist_del_sbuf_by_index(slist, file_no))
375 : goto error;
376 : goto end;
377 : }
378 : default:
379 : break;
380 : }
381 :
382 0 : iobuf_log_unexpected(rbuf, __func__);
383 : error:
384 : ret=-1;
385 : end:
386 33613 : iobuf_free_content(rbuf);
387 33613 : return ret;
388 : }
389 :
390 216982 : static int get_wbuf_from_sigs(struct iobuf *wbuf, struct slist *slist,
391 : uint8_t *end_flags)
392 : {
393 : static char req[32]="";
394 216982 : struct sbuf *sb=slist->blks_to_request;
395 :
396 13327 : while(sb && !(sb->flags & SBUF_NEED_DATA)) sb=sb->next;
397 :
398 216982 : if(!sb)
399 : {
400 16 : slist->blks_to_request=NULL;
401 16 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
402 : {
403 3 : iobuf_from_str(wbuf,
404 : CMD_GEN, (char *)"blk_requests_end");
405 3 : (*end_flags)|=END_BLK_REQUESTS;
406 : }
407 : return 0;
408 : }
409 216966 : if(!sb->protocol2->bsighead)
410 : {
411 : // Trying to move onto the next file.
412 : // ??? Does this really work?
413 129027 : if(sb->protocol2->bend)
414 : {
415 0 : slist->blks_to_request=sb->next;
416 0 : printf("move to next\n");
417 : }
418 129027 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
419 : {
420 10 : iobuf_from_str(wbuf,
421 : CMD_GEN, (char *)"blk_requests_end");
422 10 : (*end_flags)|=END_BLK_REQUESTS;
423 : }
424 : return 0;
425 : }
426 :
427 87939 : if(sb->protocol2->bsighead->got==BLK_INCOMING)
428 : return 0;
429 :
430 12538 : if(sb->protocol2->bsighead->got==BLK_NOT_GOT)
431 : {
432 8522 : base64_from_uint64(sb->protocol2->bsighead->index, req);
433 8522 : iobuf_from_str(wbuf, CMD_DATA_REQ, req);
434 8522 : sb->protocol2->bsighead->requested=1;
435 : }
436 :
437 : // Move on.
438 12538 : if(sb->protocol2->bsighead==sb->protocol2->bend)
439 : {
440 8517 : slist->blks_to_request=sb->next;
441 8517 : sb->protocol2->bsighead=sb->protocol2->bstart;
442 : }
443 : else
444 : {
445 4021 : sb->protocol2->bsighead=sb->protocol2->bsighead->next;
446 : }
447 : return 0;
448 : }
449 :
450 208447 : static void get_wbuf_from_files(struct iobuf *wbuf, struct slist *slist,
451 : struct manios *manios, uint8_t *end_flags, uint64_t *file_no)
452 : {
453 208447 : struct sbuf *sb=slist->last_requested;
454 208447 : if(!sb)
455 : {
456 157689 : if(!manios->phase1 && !((*end_flags)&END_REQUESTS))
457 : {
458 18 : iobuf_from_str(wbuf, CMD_GEN, (char *)"requests_end");
459 18 : (*end_flags)|=END_REQUESTS;
460 : }
461 : return;
462 : }
463 :
464 50758 : if(sb->flags & SBUF_SENT_PATH || !(sb->flags & SBUF_NEED_DATA))
465 : {
466 33712 : slist->last_requested=sb->next;
467 : return;
468 : }
469 :
470 : // Only need to request the path at this stage.
471 17046 : iobuf_copy(wbuf, &sb->path);
472 17046 : sb->flags |= SBUF_SENT_PATH;
473 17046 : sb->protocol2->index=(*file_no)++;
474 : }
475 :
476 2 : static void get_wbuf_from_index(struct iobuf *wbuf, uint64_t index)
477 : {
478 : static char *p;
479 : static char tmp[32];
480 2 : p=tmp;
481 2 : p+=to_base64(index, tmp);
482 2 : *p='\0';
483 2 : iobuf_from_str(wbuf, CMD_WRAP_UP, tmp);
484 2 : }
485 :
486 33704 : static int write_endfile(struct sbuf *sb, struct manios *manios)
487 : {
488 : struct iobuf endfile;
489 :
490 33704 : if(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST)
491 : return 0;
492 33704 : if(!iobuf_is_filedata(&sb->path))
493 : return 0;
494 :
495 17036 : sb->flags |= SBUF_END_WRITTEN_TO_MANIFEST;
496 : // FIX THIS: Should give a proper length and md5sum.
497 17036 : iobuf_from_str(&endfile, CMD_END_FILE, (char *)"0:0");
498 17036 : return iobuf_send_msg_fzp(&endfile, manios->changed->fzp);
499 : }
500 :
501 233992 : static void blist_adjust_head(struct blist *blist, struct sbuf *sb)
502 : {
503 : struct blk *b;
504 259050 : while(blist->head!=sb->protocol2->bstart)
505 : {
506 25058 : b=blist->head->next;
507 25058 : if(blist->head==blist->blk_from_champ_chooser)
508 8508 : blist->blk_from_champ_chooser=b;
509 25058 : blk_free(&blist->head);
510 25058 : blist->head=b;
511 : }
512 233992 : if(!blist->head)
513 51771 : blist->tail=NULL;
514 233992 : }
515 :
516 233992 : static int sbuf_needs_data(struct sbuf *sb, struct asfd *asfd,
517 : struct asfd *chfd, struct manios *manios,
518 : struct slist *slist, int end_flags)
519 : {
520 233992 : int ret=-1;
521 : struct blk *blk;
522 : static struct iobuf wbuf;
523 233992 : struct blist *blist=slist->blist;
524 : static int unrequested=0;
525 :
526 233992 : if(!(sb->flags & SBUF_HEADER_WRITTEN_TO_MANIFEST))
527 : {
528 17036 : if(manio_write_sbuf(manios->changed, sb)) goto end;
529 17036 : sb->flags |= SBUF_HEADER_WRITTEN_TO_MANIFEST;
530 : }
531 :
532 242025 : while((blk=sb->protocol2->bstart)
533 190254 : && blk->got==BLK_GOT
534 59413 : && (blk->next || end_flags&END_BACKUP))
535 : {
536 25058 : if(blk->got_save_path
537 25058 : && !blk_is_zero_length(blk))
538 : {
539 25058 : if(breaking && breakcount--==0)
540 : {
541 0 : breakpoint(breaking, __func__);
542 0 : goto end;
543 : }
544 25058 : if(manio_write_sig_and_path(manios->changed, blk))
545 : goto end;
546 25058 : if(manios->changed->sig_count==0)
547 : {
548 : // Have finished a manifest file. Want to start
549 : // using it as a dedup candidate now.
550 8 : if(manio_component_to_chfd(chfd,
551 4 : manios->changed->offset->ppath))
552 : goto end;
553 :
554 : // The champ chooser has the candidate. Now,
555 : // empty our local hash table.
556 4 : hash_delete_all();
557 : // Add the most recent block, so identical
558 : // adjacent blocks are deduplicated well.
559 4 : if(hash_load_blk(blk))
560 : goto end;
561 : }
562 : }
563 :
564 25058 : if(!blk->requested)
565 16536 : unrequested++;
566 :
567 25058 : if(unrequested>BLKS_MAX_UNREQUESTED)
568 : {
569 2 : unrequested=0;
570 : // Let the client know that it can free memory if there
571 : // was a long consecutive number of unrequested blocks.
572 2 : get_wbuf_from_index(&wbuf, blk->index);
573 2 : if(asfd->write(asfd, &wbuf))
574 : goto end;
575 : }
576 :
577 25058 : if(blk==sb->protocol2->bend)
578 : {
579 17025 : blist_adjust_head(blist, sb);
580 17025 : if(write_endfile(sb, manios)) return -1;
581 17025 : slist_advance(slist);
582 17025 : return 1;
583 : }
584 :
585 8033 : if(sb->protocol2->bsighead==sb->protocol2->bstart)
586 8020 : sb->protocol2->bsighead=blk->next;
587 8033 : sb->protocol2->bstart=blk->next;
588 8033 : if(blk==blist->blk_from_champ_chooser)
589 4021 : blist->blk_from_champ_chooser=blk->next;
590 : }
591 216967 : if(!blk && sb && !sb->protocol2->bend && (end_flags&END_BACKUP))
592 : {
593 : // Write endfile for the very last file.
594 11 : if(write_endfile(sb, manios)) return -1;
595 : }
596 : ret=0;
597 : end:
598 216967 : blist_adjust_head(blist, sb);
599 216967 : return ret;
600 : }
601 :
602 216984 : static int write_to_changed_file(struct asfd *asfd,
603 : struct asfd *chfd, struct manios *manios,
604 : struct slist *slist, int end_flags)
605 : {
606 : struct sbuf *sb;
607 216984 : if(!slist) return 0;
608 :
609 250677 : while((sb=slist->head))
610 : {
611 250660 : if(sb->flags & SBUF_NEED_DATA)
612 : {
613 233992 : switch(sbuf_needs_data(sb, asfd, chfd, manios, slist,
614 : end_flags))
615 : {
616 : case 0: return 0;
617 17025 : case 1: continue;
618 0 : default: return -1;
619 : }
620 :
621 : }
622 : else
623 : {
624 : // No change, can go straight in.
625 16668 : if(manio_write_sbuf(manios->changed, sb)) return -1;
626 16668 : if(write_endfile(sb, manios)) return -1;
627 :
628 : // Move along.
629 16668 : slist_advance(slist);
630 : }
631 : }
632 : return 0;
633 : }
634 :
635 216982 : static int maybe_add_from_scan(struct manios *manios,
636 : struct slist *slist, struct asfd *chfd, struct sbuf **csb,
637 : struct cntr *cntr)
638 : {
639 216982 : int ret=-1;
640 216982 : struct sbuf *snew=NULL;
641 :
642 : while(1)
643 : {
644 250702 : sbuf_free(&snew);
645 250702 : if(!manios->phase1) return 0;
646 : // Limit the amount loaded into memory at any one time.
647 33738 : if(slist->count>SLIST_MAX_IN_MEM)
648 : return 0;
649 33738 : if(!(snew=sbuf_alloc(PROTO_2))) goto end;
650 :
651 33738 : switch(manio_read(manios->phase1, snew))
652 : {
653 : case 0: break;
654 18 : case 1: manio_close(&manios->phase1);
655 18 : ret=0; // Finished.
656 : default: goto end;
657 : }
658 :
659 33720 : switch(entry_changed(snew, manios, chfd, csb, cntr))
660 : {
661 0 : case 0: continue; // No change.
662 : case 1: break;
663 : default: goto end; // Error.
664 : }
665 :
666 67440 : if(data_needed(snew)) snew->flags|=SBUF_NEED_DATA;
667 :
668 33720 : slist_add_sbuf(slist, snew);
669 33720 : snew=NULL;
670 : }
671 : return 0;
672 : end:
673 18 : sbuf_free(&snew);
674 18 : return ret;
675 : }
676 :
677 216981 : static int append_for_champ_chooser(struct asfd *chfd,
678 : struct blist *blist, int end_flags)
679 : {
680 : static int finished_sending=0;
681 : static struct iobuf wbuf;
682 : static struct blk *blk=NULL;
683 :
684 242039 : while(blist->blk_for_champ_chooser)
685 : {
686 112824 : blk=blist->blk_for_champ_chooser;
687 : // If we send too many blocks to the champ chooser at once,
688 : // it can go faster than we can send paths to completed
689 : // manifests to it. This means that deduplication efficiency
690 : // is reduced (although speed may be faster).
691 : // So limit the sending.
692 225648 : if(blk->index
693 112824 : - blist->head->index > MANIFEST_SIG_MAX)
694 : return 0;
695 :
696 25058 : blk_to_iobuf_sig(blk, &wbuf);
697 :
698 25058 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
699 : {
700 : case APPEND_OK: break;
701 : case APPEND_BLOCKED:
702 : return 0; // Try again later.
703 : default: return -1;
704 : }
705 25058 : blist->blk_for_champ_chooser=blk->next;
706 : }
707 129215 : if(end_flags&END_SIGS
708 49976 : && !finished_sending && !blist->blk_for_champ_chooser)
709 : {
710 13 : iobuf_from_str(&wbuf, CMD_GEN, (char *)"sigs_end");
711 13 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
712 : {
713 : case APPEND_OK: break;
714 : case APPEND_BLOCKED:
715 : return 0; // Try again later.
716 : default: return -1;
717 : }
718 12 : finished_sending++;
719 : }
720 : return 0;
721 : }
722 :
723 25063 : static int mark_not_got(struct blk *blk, struct dpth *dpth)
724 : {
725 : const char *path;
726 :
727 25063 : if(blk->got!=BLK_INCOMING) return 0;
728 8522 : blk->got=BLK_NOT_GOT;
729 :
730 : // Need to get the data for this blk from the client.
731 : // Set up the details of where it will be saved.
732 8522 : if(!(path=dpth_protocol2_mk(dpth))) return -1;
733 :
734 : // FIX THIS: make dpth give us the path in a uint8 array.
735 8522 : blk->savepath=savepathstr_with_sig_to_uint64(path);
736 8522 : blk->got_save_path=1;
737 : // Load it into our local hash table.
738 8522 : if(hash_load_blk(blk))
739 : return -1;
740 8522 : if(dpth_protocol2_incr_sig(dpth))
741 : return -1;
742 8522 : return 0;
743 : }
744 :
745 25058 : static struct hash_strong *in_local_hash(struct blk *blk)
746 : {
747 : static struct hash_weak *hash_weak;
748 :
749 25058 : if(!(hash_weak=hash_weak_find(blk->fingerprint)))
750 : return NULL;
751 4014 : return hash_strong_find(hash_weak, blk->md5sum);
752 : }
753 :
754 37587 : static int simple_deduplicate_blk(struct blk *blk)
755 : {
756 : static struct hash_strong *hash_strong;
757 37587 : if(blk->got!=BLK_INCOMING)
758 : return 0;
759 25058 : if((hash_strong=in_local_hash(blk)))
760 : {
761 4014 : blk->savepath=hash_strong->savepath;
762 4014 : blk->got_save_path=1;
763 4014 : blk->got=BLK_GOT;
764 4014 : return 1;
765 : }
766 : return 0;
767 : }
768 :
769 25058 : static int mark_up_to_index(struct blist *blist,
770 : uint64_t index, struct dpth *dpth)
771 : {
772 : struct blk *blk;
773 :
774 : // Mark everything that was not got, up to the given index.
775 62645 : for(blk=blist->blk_from_champ_chooser;
776 50116 : blk && blk->index!=index; blk=blk->next)
777 : {
778 12529 : if(simple_deduplicate_blk(blk))
779 0 : continue;
780 12529 : if(mark_not_got(blk, dpth))
781 : return -1;
782 : }
783 25058 : if(!blk)
784 : {
785 0 : logp("Could not find index from champ chooser: %" PRIu64 "\n",
786 : index);
787 : return -1;
788 : }
789 25058 : simple_deduplicate_blk(blk);
790 :
791 : //logp("Found index from champ chooser: %lu\n", index);
792 : //printf("index from cc: %d\n", index);
793 25058 : blist->blk_from_champ_chooser=blk;
794 : return 0;
795 : }
796 :
797 12524 : static int deal_with_sig_from_chfd(struct iobuf *rbuf, struct blist *blist,
798 : struct dpth *dpth)
799 : {
800 : static struct blk b;
801 12524 : if(blk_set_from_iobuf_index_and_savepath(&b, rbuf))
802 : return -1;
803 :
804 12524 : if(mark_up_to_index(blist, b.index, dpth))
805 : return -1;
806 12524 : blist->blk_from_champ_chooser->savepath=b.savepath;
807 12524 : blist->blk_from_champ_chooser->got=BLK_GOT;
808 12524 : blist->blk_from_champ_chooser->got_save_path=1;
809 12524 : return 0;
810 : }
811 :
812 12534 : static int deal_with_wrap_up_from_chfd(struct iobuf *rbuf, struct blist *blist,
813 : struct dpth *dpth)
814 : {
815 : static struct blk b;
816 12534 : if(blk_set_from_iobuf_wrap_up(&b, rbuf))
817 : return -1;
818 :
819 12534 : if(mark_up_to_index(blist, b.index, dpth)) return -1;
820 12534 : if(mark_not_got(blist->blk_from_champ_chooser, dpth)) return -1;
821 12534 : return 0;
822 : }
823 :
824 25059 : static int deal_with_read_from_chfd(struct asfd *chfd,
825 : struct blist *blist, struct dpth *dpth, struct cntr *cntr)
826 : {
827 25059 : int ret=-1;
828 :
829 : // Deal with champ chooser read here.
830 : //printf("read from cc: %s\n", chfd->rbuf->buf);
831 25059 : switch(chfd->rbuf->cmd)
832 : {
833 : case CMD_SIG:
834 : // Get these for blks that the champ chooser has found.
835 12524 : if(deal_with_sig_from_chfd(chfd->rbuf, blist, dpth))
836 : goto end;
837 12524 : cntr_add_same(cntr, CMD_DATA);
838 : break;
839 : case CMD_WRAP_UP:
840 12534 : if(deal_with_wrap_up_from_chfd(chfd->rbuf, blist, dpth))
841 : goto end;
842 : break;
843 : default:
844 1 : iobuf_log_unexpected(chfd->rbuf, __func__);
845 : goto end;
846 : }
847 : ret=0;
848 : end:
849 25059 : iobuf_free_content(chfd->rbuf);
850 25059 : return ret;
851 : }
852 :
853 12 : static int check_for_missing_work_in_slist(struct slist *slist)
854 : {
855 12 : struct sbuf *sb=NULL;
856 :
857 12 : if(slist->blist->head)
858 : {
859 0 : logp("ERROR: finishing but still want block: %" PRIu64 "\n",
860 : slist->blist->head->index);
861 : return -1;
862 : }
863 :
864 15 : for(sb=slist->head; sb; sb=sb->next)
865 : {
866 4 : if(!(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST))
867 : {
868 1 : logp("ERROR: finishing but still waiting for: %s\n",
869 : iobuf_to_printable(&slist->head->path));
870 : return -1;
871 : }
872 : }
873 : return 0;
874 : }
875 :
876 : #ifndef UTEST
877 : static
878 : #endif
879 24 : int do_backup_phase2_server_protocol2(struct async *as, struct asfd *chfd,
880 : struct sdirs *sdirs, int resume, struct conf **confs)
881 : {
882 24 : int ret=-1;
883 24 : uint8_t end_flags=0;
884 24 : struct slist *slist=NULL;
885 : struct iobuf wbuf;
886 24 : struct dpth *dpth=NULL;
887 24 : man_off_t *pos_phase1=NULL;
888 24 : man_off_t *pos_current=NULL;
889 24 : struct manios *manios=NULL;
890 : // This is used to tell the client that a number of consecutive blocks
891 : // have been found and can be freed.
892 24 : struct asfd *asfd=NULL;
893 24 : struct cntr *cntr=NULL;
894 24 : struct sbuf *csb=NULL;
895 24 : uint64_t file_no=1;
896 24 : int fail_on_warning=0;
897 24 : struct cntr_ent *warn_ent=NULL;
898 :
899 24 : if(!as)
900 : {
901 1 : logp("async not provided to %s()\n", __func__);
902 1 : goto end;
903 : }
904 23 : if(!sdirs)
905 : {
906 2 : logp("sdirs not provided to %s()\n", __func__);
907 2 : goto end;
908 : }
909 21 : if(!confs)
910 : {
911 1 : logp("confs not provided to %s()\n", __func__);
912 1 : goto end;
913 : }
914 20 : asfd=as->asfd;
915 20 : if(!asfd)
916 : {
917 1 : logp("asfd not provided to %s()\n", __func__);
918 1 : goto end;
919 : }
920 19 : if(!chfd)
921 : {
922 1 : logp("chfd not provided to %s()\n", __func__);
923 1 : goto end;
924 : }
925 18 : cntr=get_cntr(confs);
926 18 : fail_on_warning=get_int(confs[OPT_FAIL_ON_WARNING]);
927 18 : if(cntr)
928 0 : warn_ent=cntr->ent[CMD_WARNING];
929 :
930 18 : if(get_int(confs[OPT_BREAKPOINT])>=2000
931 0 : && get_int(confs[OPT_BREAKPOINT])<3000)
932 : {
933 0 : breaking=get_int(confs[OPT_BREAKPOINT]);
934 0 : breakcount=breaking-2000;
935 : }
936 :
937 18 : blks_generate_init();
938 :
939 18 : logp("Phase 2 begin (recv backup data)\n");
940 :
941 18 : if(!(dpth=dpth_alloc())
942 36 : || dpth_protocol2_init(dpth,
943 18 : sdirs->data,
944 18 : get_string(confs[OPT_CNAME]),
945 18 : sdirs->cfiles,
946 : get_int(confs[OPT_MAX_STORAGE_SUBDIRS])))
947 : goto end;
948 18 : if(resume)
949 : {
950 0 : if(do_resume(&pos_phase1, &pos_current,
951 : sdirs, dpth, confs))
952 : goto end;
953 0 : if(cntr_send_sdirs(asfd, sdirs, confs, CNTR_STATUS_BACKUP))
954 : goto end;
955 : }
956 :
957 18 : if(!(manios=manios_open_phase2(sdirs,
958 : pos_phase1, pos_current, PROTO_2))
959 18 : || !(slist=slist_alloc())
960 18 : || !(csb=sbuf_alloc(PROTO_2)))
961 : goto end;
962 :
963 18 : iobuf_free_content(asfd->rbuf);
964 :
965 18 : memset(&wbuf, 0, sizeof(struct iobuf));
966 217012 : while(!(end_flags&END_BACKUP))
967 : {
968 216982 : if(check_fail_on_warning(fail_on_warning, warn_ent))
969 : goto end;
970 :
971 216982 : if(write_status(CNTR_STATUS_BACKUP,
972 217008 : csb && csb->path.buf?csb->path.buf:"", cntr))
973 : goto end;
974 :
975 216982 : if(maybe_add_from_scan(manios, slist, chfd, &csb, cntr))
976 : goto end;
977 :
978 216982 : if(!wbuf.len)
979 : {
980 216982 : if(get_wbuf_from_sigs(&wbuf, slist, &end_flags))
981 : goto end;
982 216982 : if(!wbuf.len)
983 : {
984 208447 : get_wbuf_from_files(&wbuf, slist,
985 : manios, &end_flags, &file_no);
986 : }
987 : }
988 :
989 216982 : if(wbuf.len
990 25599 : && asfd->append_all_to_write_buffer(asfd, &wbuf)==APPEND_ERROR)
991 : goto end;
992 :
993 216981 : if(append_for_champ_chooser(chfd, slist->blist, end_flags))
994 : goto end;
995 :
996 216980 : if(as->read_write(as))
997 : {
998 2 : logp("error from as->read_write in %s\n", __func__);
999 2 : goto end;
1000 : }
1001 :
1002 267624 : while(asfd->rbuf->buf)
1003 : {
1004 50647 : if(deal_with_read(asfd->rbuf, slist, cntr,
1005 : &end_flags, dpth))
1006 : goto end;
1007 : // Get as much out of the readbuf as possible.
1008 50646 : if(asfd->parse_readbuf(asfd))
1009 : goto end;
1010 : }
1011 242035 : while(chfd->rbuf->buf)
1012 : {
1013 50118 : if(deal_with_read_from_chfd(chfd,
1014 25059 : slist->blist, dpth, cntr))
1015 : goto end;
1016 : // Get as much out of the readbuf as possible.
1017 25058 : if(chfd->parse_readbuf(chfd))
1018 : goto end;
1019 : }
1020 :
1021 216976 : if(write_to_changed_file(asfd, chfd, manios,
1022 : slist, end_flags))
1023 : goto end;
1024 : }
1025 :
1026 : // Hack: If there are some entries left after the last entry that
1027 : // contains block data, it will not be written to the changed file
1028 : // yet because the last entry of block data has not had
1029 : // sb->protocol2->bend set.
1030 12 : if(slist->head && slist->head->next)
1031 : {
1032 : struct sbuf *sb=NULL;
1033 8 : sb=slist->head;
1034 8 : slist->head=sb->next;
1035 8 : sbuf_free(&sb);
1036 8 : if(write_to_changed_file(asfd, chfd, manios,
1037 : slist, end_flags))
1038 : goto end;
1039 : }
1040 :
1041 12 : if(manios_close(&manios))
1042 : goto end;
1043 :
1044 12 : if(check_for_missing_work_in_slist(slist))
1045 : goto end;
1046 :
1047 : // Need to release the last left. There should be one at most.
1048 11 : if(dpth->head && dpth->head->next)
1049 : {
1050 0 : logp("ERROR: More data locks remaining after: %s\n",
1051 0 : dpth->head->save_path);
1052 0 : goto end;
1053 : }
1054 11 : if(dpth_release_all(dpth)) goto end;
1055 :
1056 11 : if(check_fail_on_warning(fail_on_warning, warn_ent))
1057 : goto end;
1058 :
1059 11 : ret=0;
1060 : end:
1061 24 : if(ret)
1062 : {
1063 13 : if(slist && slist->head)
1064 1 : logp(" last tried file: %s\n",
1065 : iobuf_to_printable(&slist->head->path));
1066 : }
1067 24 : logp("End backup\n");
1068 24 : sbuf_free(&csb);
1069 24 : slist_free(&slist);
1070 24 : if(asfd) iobuf_free_content(asfd->rbuf);
1071 24 : if(chfd) iobuf_free_content(chfd->rbuf);
1072 24 : dpth_free(&dpth);
1073 24 : manios_close(&manios);
1074 24 : man_off_t_free(&pos_phase1);
1075 24 : man_off_t_free(&pos_current);
1076 24 : blks_generate_free();
1077 24 : hash_delete_all();
1078 24 : return ret;
1079 : }
1080 :
1081 0 : int backup_phase2_server_protocol2(struct async *as, struct sdirs *sdirs,
1082 : int resume, struct conf **confs)
1083 : {
1084 0 : int ret=-1;
1085 0 : struct asfd *chfd=NULL;
1086 0 : if(!(chfd=champ_chooser_connect(as, sdirs, confs, resume)))
1087 : {
1088 0 : logp("problem connecting to champ chooser\n");
1089 0 : goto end;
1090 : }
1091 0 : ret=do_backup_phase2_server_protocol2(as, chfd, sdirs, resume, confs);
1092 : end:
1093 0 : if(chfd) as->asfd_remove(as, chfd);
1094 0 : asfd_free(&chfd);
1095 0 : return ret;
1096 : }
|