Line data Source code
1 : #include "../../burp.h"
2 : #include "../../alloc.h"
3 : #include "../../asfd.h"
4 : #include "../../async.h"
5 : #include "../../attribs.h"
6 : #include "../../base64.h"
7 : #include "../../cmd.h"
8 : #include "../../cntr.h"
9 : #include "../../cstat.h"
10 : #include "../../handy.h"
11 : #include "../../hexmap.h"
12 : #include "../../iobuf.h"
13 : #include "../../log.h"
14 : #include "../../server/manio.h"
15 : #include "../../protocol2/blist.h"
16 : #include "../../protocol2/rabin/rabin.h"
17 : #include "../../slist.h"
18 : #include "../child.h"
19 : #include "../manios.h"
20 : #include "../resume.h"
21 : #include "champ_chooser/champ_client.h"
22 : #include "champ_chooser/champ_server.h"
23 : #include "champ_chooser/hash.h"
24 : #include "dpth.h"
25 : #include "backup_phase2.h"
26 :
27 : #define END_SIGS 0x01
28 : #define END_BACKUP 0x02
29 : #define END_REQUESTS 0x04
30 : #define END_BLK_REQUESTS 0x08
31 :
32 : #define BLKS_MAX_UNREQUESTED BLKS_MAX_IN_MEM/4
33 : // This needs to be greater than BLKS_MAX_UNREQUESTED
34 : #define SLIST_MAX_IN_MEM BLKS_MAX_IN_MEM
35 :
36 : static int breaking=0;
37 : static int breakcount=0;
38 :
39 : static int data_needed(struct sbuf *sb)
40 : {
41 33720 : if(sb->path.cmd==CMD_FILE)
42 : return 1;
43 16674 : if(sb->path.cmd==CMD_METADATA)
44 : return 1;
45 : return 0;
46 : }
47 :
48 : static int manio_component_to_chfd(struct asfd *chfd, char *path)
49 : {
50 : struct iobuf wbuf;
51 4 : iobuf_from_str(&wbuf, CMD_MANIFEST, path);
52 4 : return chfd->write(chfd, &wbuf);
53 : }
54 :
55 0 : static int unchanged(struct sbuf *csb, struct sbuf *sb,
56 : struct manios *manios, struct asfd *chfd)
57 : {
58 0 : int ret=-1;
59 0 : char *fpath=NULL;
60 0 : if(!(fpath=strdup_w(manios->changed->offset->fpath, __func__)))
61 : goto end;
62 0 : if(manio_copy_entry(csb, sb,
63 : manios->current, manios->unchanged,
64 : /*seed_src*/NULL, /*seed_dst*/NULL)<0)
65 : goto end;
66 0 : if(strcmp(fpath, manios->changed->offset->fpath))
67 : {
68 : // If the copy crossed a manio boundary, we should tell the
69 : // champ server to load the previous one as a candidate.
70 0 : if(manio_component_to_chfd(chfd, fpath))
71 : goto end;
72 : }
73 : ret=0;
74 : end:
75 0 : free_w(&fpath);
76 0 : return ret;
77 : }
78 :
79 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
80 0 : static int found_in_current_manifest(struct sbuf *csb, struct sbuf *sb,
81 : struct manios *manios, struct asfd *chfd,
82 : struct cntr *cntr)
83 : {
84 : // Located the entry in the current manifest.
85 : // If the file type changed, I think it is time to back it up again
86 : // (for example, EFS changing to normal file, or back again).
87 0 : if(csb->path.cmd!=sb->path.cmd)
88 : {
89 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
90 : return -1;
91 0 : return 1;
92 : }
93 :
94 : // mtime is the actual file data.
95 : // ctime is the attributes or meta data.
96 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
97 0 : && csb->statp.st_ctime==sb->statp.st_ctime)
98 : {
99 : // Got an unchanged file.
100 0 : cntr_add_same(cntr, sb->path.cmd);
101 0 : return unchanged(csb, sb, manios, chfd);
102 : }
103 :
104 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
105 0 : && csb->statp.st_ctime!=sb->statp.st_ctime)
106 : {
107 : // FIX THIS:
108 : // File data stayed the same, but attributes or meta data
109 : // changed. We already have the attributes, but may need to
110 : // get extra meta data.
111 0 : cntr_add_same(cntr, sb->path.cmd);
112 0 : return unchanged(csb, sb, manios, chfd);
113 : }
114 :
115 : // File data changed.
116 0 : cntr_add_changed(cntr, sb->path.cmd);
117 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
118 : return -1;
119 0 : return 1;
120 : }
121 :
122 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
123 33720 : static int entry_changed(struct sbuf *sb,
124 : struct manios *manios, struct asfd *chfd, struct sbuf **csb,
125 : struct cntr *cntr)
126 : {
127 : static int finished=0;
128 : int pcmp;
129 :
130 33720 : if(finished)
131 : {
132 33710 : cntr_add_new(cntr, sb->path.cmd);
133 33710 : return 1;
134 : }
135 :
136 10 : if(*csb && (*csb)->path.buf)
137 : {
138 : // Already have an entry.
139 : }
140 : else
141 : {
142 : // Need to read another.
143 10 : switch(manio_read(manios->current, *csb))
144 : {
145 : case 1: // Reached the end.
146 10 : sbuf_free(csb);
147 10 : finished=1;
148 10 : cntr_add_new(cntr, sb->path.cmd);
149 10 : return 1;
150 : case -1: return -1;
151 : }
152 0 : if(!(*csb)->path.buf)
153 : {
154 0 : logp("Should have a path at this point, but do not, in %s\n", __func__);
155 0 : return -1;
156 : }
157 : // Got an entry.
158 : }
159 :
160 : while(1)
161 : {
162 0 : if(!(pcmp=sbuf_pathcmp(*csb, sb)))
163 0 : return found_in_current_manifest(*csb, sb,
164 : manios, chfd, cntr);
165 0 : else if(pcmp>0)
166 : {
167 0 : cntr_add_new(cntr, sb->path.cmd);
168 0 : return 1;
169 : }
170 : // cntr_add_deleted(cntr, (*csb)->path.cmd);
171 : // Behind - need to read more data from the old manifest.
172 0 : switch(manio_read(manios->current, *csb))
173 : {
174 : case 1: // Reached the end.
175 0 : sbuf_free(csb);
176 0 : cntr_add_new(cntr, sb->path.cmd);
177 0 : return 1;
178 : case -1: return -1;
179 : }
180 : // Got something, go back around the loop.
181 : }
182 :
183 : return 0;
184 : }
185 :
186 8523 : static int add_data_to_store(struct cntr *cntr,
187 : struct slist *slist, struct iobuf *rbuf, struct dpth *dpth)
188 : {
189 : static struct blk *blk=NULL;
190 :
191 : // Find the first one in the list that was requested.
192 : // FIX THIS: Going up the list here, and then later
193 : // when writing to the manifest is not efficient.
194 17046 : for(blk=slist->blist->head;
195 8522 : blk && (!blk->requested || blk->got==BLK_GOT); blk=blk->next)
196 : {
197 : // logp("try: %d %d\n", blk->index, blk->got);
198 : }
199 8523 : if(!blk)
200 : {
201 1 : logp("Received data but could not find next requested block.\n");
202 1 : if(!slist->blist->head)
203 1 : logp("and slist->blist->head is null\n");
204 : else
205 0 : logp("head index: %" PRIu64 "\n", slist->blist->head->index);
206 : return -1;
207 : }
208 :
209 : // FIX THIS
210 : #ifndef UTEST
211 : if(blk_verify(blk->fingerprint, blk->md5sum, rbuf->buf, rbuf->len)<=0)
212 : {
213 : logp("ERROR: Block %" PRIu64 " from client did not verify.\n",
214 : blk->index);
215 : return -1;
216 : }
217 : #endif
218 :
219 : // Add it to the data store straight away.
220 8522 : if(dpth_protocol2_fwrite(dpth, rbuf, blk)) return -1;
221 :
222 8522 : cntr_add(cntr, CMD_DATA, 0);
223 :
224 8522 : blk->got=BLK_GOT;
225 8522 : blk=blk->next;
226 :
227 : return 0;
228 : }
229 :
230 17034 : static int set_up_for_sig_info(struct slist *slist, struct iobuf *attr,
231 : uint64_t index)
232 : {
233 : struct sbuf *sb;
234 :
235 50701 : for(sb=slist->add_sigs_here; sb; sb=sb->next)
236 : {
237 50701 : if(!sb->protocol2->index)
238 16642 : continue;
239 34059 : if(index==sb->protocol2->index)
240 : break;
241 : }
242 17034 : if(!sb)
243 : {
244 0 : logp("Could not find %" PRIu64 " in request list\n", index);
245 : return -1;
246 : }
247 : // Replace the attribs with the more recent values.
248 17034 : iobuf_free_content(&sb->attr);
249 17034 : iobuf_move(&sb->attr, attr);
250 :
251 : // Mark the end of the previous file.
252 17034 : slist->add_sigs_here->protocol2->bend=slist->blist->tail;
253 :
254 17034 : slist->add_sigs_here=sb;
255 :
256 : // Incoming sigs now need to get added to 'add_sigs_here'
257 : return 0;
258 : }
259 :
260 : /*
261 : static void dump_blks(const char *msg, struct blk *b)
262 : {
263 : struct blk *xx;
264 : for(xx=b; xx; xx=xx->next)
265 : printf("%s: %d %d %p\n", msg, xx->index, xx->got, xx);
266 : }
267 : */
268 :
269 25058 : static int add_to_sig_list(struct slist *slist, struct iobuf *rbuf)
270 : {
271 : // Goes on slist->add_sigs_here
272 : struct blk *blk;
273 : struct protocol2 *protocol2;
274 :
275 25058 : if(!(blk=blk_alloc())) return -1;
276 25058 : blist_add_blk(slist->blist, blk);
277 :
278 25058 : protocol2=slist->add_sigs_here->protocol2;
279 25058 : if(!protocol2->bstart) protocol2->bstart=blk;
280 25058 : if(!protocol2->bsighead) protocol2->bsighead=blk;
281 :
282 25058 : if(blk_set_from_iobuf_sig(blk, rbuf)) return -1;
283 :
284 : // Need to send sigs to champ chooser, therefore need to point
285 : // to the oldest unsent one if nothing is pointed to yet.
286 25058 : if(!slist->blist->blk_for_champ_chooser)
287 16490 : slist->blist->blk_for_champ_chooser=blk;
288 :
289 : return 0;
290 : }
291 :
292 50647 : static int deal_with_read(struct iobuf *rbuf, struct slist *slist,
293 : struct cntr *cntr, uint8_t *end_flags, struct dpth *dpth)
294 : {
295 50647 : int ret=0;
296 : static struct iobuf attr;
297 : static uint64_t index;
298 :
299 50647 : switch(rbuf->cmd)
300 : {
301 : /* Incoming block data. */
302 : case CMD_DATA:
303 8523 : if(add_data_to_store(cntr, slist, rbuf, dpth))
304 : goto error;
305 : goto end;
306 :
307 : /* Incoming block signatures. */
308 : case CMD_ATTRIBS_SIGS:
309 :
310 17034 : iobuf_init(&attr);
311 17034 : iobuf_move(&attr, rbuf);
312 17034 : index=decode_file_no(&attr);
313 :
314 : // Need to go through slist to find the matching
315 : // entry.
316 17034 : if(set_up_for_sig_info(slist, &attr, index))
317 : goto error;
318 : return 0;
319 : case CMD_SIG:
320 25058 : if(add_to_sig_list(slist, rbuf))
321 : goto error;
322 : goto end;
323 :
324 : /* Incoming control/message stuff. */
325 : case CMD_MESSAGE:
326 : case CMD_WARNING:
327 : {
328 6 : log_recvd(rbuf, cntr, 0);
329 6 : goto end;
330 : }
331 : case CMD_GEN:
332 25 : if(!strcmp(rbuf->buf, "sigs_end"))
333 : {
334 13 : (*end_flags)|=END_SIGS;
335 13 : goto end;
336 : }
337 12 : else if(!strcmp(rbuf->buf, "backup_end"))
338 : {
339 12 : (*end_flags)|=END_BACKUP;
340 12 : goto end;
341 : }
342 : break;
343 : case CMD_INTERRUPT:
344 : {
345 : uint64_t file_no;
346 1 : file_no=base64_to_uint64(rbuf->buf);
347 1 : if(slist_del_sbuf_by_index(slist, file_no))
348 : goto error;
349 : goto end;
350 : }
351 : default:
352 : break;
353 : }
354 :
355 0 : iobuf_log_unexpected(rbuf, __func__);
356 : error:
357 : ret=-1;
358 : end:
359 33613 : iobuf_free_content(rbuf);
360 33613 : return ret;
361 : }
362 :
363 216982 : static int get_wbuf_from_sigs(struct iobuf *wbuf, struct slist *slist,
364 : uint8_t *end_flags)
365 : {
366 : static char req[32]="";
367 216982 : struct sbuf *sb=slist->blks_to_request;
368 :
369 13327 : while(sb && !(sb->flags & SBUF_NEED_DATA)) sb=sb->next;
370 :
371 216982 : if(!sb)
372 : {
373 16 : slist->blks_to_request=NULL;
374 16 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
375 : {
376 3 : iobuf_from_str(wbuf,
377 : CMD_GEN, (char *)"blk_requests_end");
378 3 : (*end_flags)|=END_BLK_REQUESTS;
379 : }
380 : return 0;
381 : }
382 216966 : if(!sb->protocol2->bsighead)
383 : {
384 : // Trying to move onto the next file.
385 : // ??? Does this really work?
386 129027 : if(sb->protocol2->bend)
387 : {
388 0 : slist->blks_to_request=sb->next;
389 0 : printf("move to next\n");
390 : }
391 129027 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
392 : {
393 10 : iobuf_from_str(wbuf,
394 : CMD_GEN, (char *)"blk_requests_end");
395 10 : (*end_flags)|=END_BLK_REQUESTS;
396 : }
397 : return 0;
398 : }
399 :
400 87939 : if(sb->protocol2->bsighead->got==BLK_INCOMING)
401 : return 0;
402 :
403 12538 : if(sb->protocol2->bsighead->got==BLK_NOT_GOT)
404 : {
405 8522 : base64_from_uint64(sb->protocol2->bsighead->index, req);
406 8522 : iobuf_from_str(wbuf, CMD_DATA_REQ, req);
407 8522 : sb->protocol2->bsighead->requested=1;
408 : }
409 :
410 : // Move on.
411 12538 : if(sb->protocol2->bsighead==sb->protocol2->bend)
412 : {
413 8517 : slist->blks_to_request=sb->next;
414 8517 : sb->protocol2->bsighead=sb->protocol2->bstart;
415 : }
416 : else
417 : {
418 4021 : sb->protocol2->bsighead=sb->protocol2->bsighead->next;
419 : }
420 : return 0;
421 : }
422 :
423 208447 : static void get_wbuf_from_files(struct iobuf *wbuf, struct slist *slist,
424 : struct manios *manios, uint8_t *end_flags, uint64_t *file_no)
425 : {
426 208447 : struct sbuf *sb=slist->last_requested;
427 208447 : if(!sb)
428 : {
429 157689 : if(!manios->phase1 && !((*end_flags)&END_REQUESTS))
430 : {
431 18 : iobuf_from_str(wbuf, CMD_GEN, (char *)"requests_end");
432 18 : (*end_flags)|=END_REQUESTS;
433 : }
434 : return;
435 : }
436 :
437 50758 : if(sb->flags & SBUF_SENT_PATH || !(sb->flags & SBUF_NEED_DATA))
438 : {
439 33712 : slist->last_requested=sb->next;
440 : return;
441 : }
442 :
443 : // Only need to request the path at this stage.
444 17046 : iobuf_copy(wbuf, &sb->path);
445 17046 : sb->flags |= SBUF_SENT_PATH;
446 17046 : sb->protocol2->index=(*file_no)++;
447 : }
448 :
449 2 : static void get_wbuf_from_index(struct iobuf *wbuf, uint64_t index)
450 : {
451 : static char *p;
452 : static char tmp[32];
453 2 : p=tmp;
454 2 : p+=to_base64(index, tmp);
455 2 : *p='\0';
456 2 : iobuf_from_str(wbuf, CMD_WRAP_UP, tmp);
457 2 : }
458 :
459 33704 : static int write_endfile(struct sbuf *sb, struct manios *manios)
460 : {
461 : struct iobuf endfile;
462 :
463 33704 : if(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST)
464 : return 0;
465 33704 : if(!iobuf_is_filedata(&sb->path))
466 : return 0;
467 :
468 17036 : sb->flags |= SBUF_END_WRITTEN_TO_MANIFEST;
469 : // FIX THIS: Should give a proper length and md5sum.
470 17036 : iobuf_from_str(&endfile, CMD_END_FILE, (char *)"0:0");
471 17036 : return iobuf_send_msg_fzp(&endfile, manios->changed->fzp);
472 : }
473 :
474 233992 : static void blist_adjust_head(struct blist *blist, struct sbuf *sb)
475 : {
476 : struct blk *b;
477 259050 : while(blist->head!=sb->protocol2->bstart)
478 : {
479 25058 : b=blist->head->next;
480 25058 : if(blist->head==blist->blk_from_champ_chooser)
481 8508 : blist->blk_from_champ_chooser=b;
482 25058 : blk_free(&blist->head);
483 25058 : blist->head=b;
484 : }
485 233992 : if(!blist->head)
486 51771 : blist->tail=NULL;
487 233992 : }
488 :
489 233992 : static int sbuf_needs_data(struct sbuf *sb, struct asfd *asfd,
490 : struct asfd *chfd, struct manios *manios,
491 : struct slist *slist, int end_flags)
492 : {
493 233992 : int ret=-1;
494 : struct blk *blk;
495 : static struct iobuf wbuf;
496 233992 : struct blist *blist=slist->blist;
497 : static int unrequested=0;
498 :
499 233992 : if(!(sb->flags & SBUF_HEADER_WRITTEN_TO_MANIFEST))
500 : {
501 17036 : if(manio_write_sbuf(manios->changed, sb)) goto end;
502 17036 : sb->flags |= SBUF_HEADER_WRITTEN_TO_MANIFEST;
503 : }
504 :
505 242025 : while((blk=sb->protocol2->bstart)
506 190254 : && blk->got==BLK_GOT
507 59413 : && (blk->next || end_flags&END_BACKUP))
508 : {
509 25058 : if(blk->got_save_path
510 25058 : && !blk_is_zero_length(blk))
511 : {
512 25058 : if(breaking && breakcount--==0)
513 : {
514 0 : breakpoint(breaking, __func__);
515 0 : goto end;
516 : }
517 25058 : if(manio_write_sig_and_path(manios->changed, blk))
518 : goto end;
519 25058 : if(manios->changed->sig_count==0)
520 : {
521 : // Have finished a manifest file. Want to start
522 : // using it as a dedup candidate now.
523 8 : if(manio_component_to_chfd(chfd,
524 4 : manios->changed->offset->ppath))
525 : goto end;
526 :
527 : // The champ chooser has the candidate. Now,
528 : // empty our local hash table.
529 4 : hash_delete_all();
530 : // Add the most recent block, so identical
531 : // adjacent blocks are deduplicated well.
532 4 : if(hash_load_blk(blk))
533 : goto end;
534 : }
535 : }
536 :
537 25058 : if(!blk->requested)
538 16536 : unrequested++;
539 :
540 25058 : if(unrequested>BLKS_MAX_UNREQUESTED)
541 : {
542 2 : unrequested=0;
543 : // Let the client know that it can free memory if there
544 : // was a long consecutive number of unrequested blocks.
545 2 : get_wbuf_from_index(&wbuf, blk->index);
546 2 : if(asfd->write(asfd, &wbuf))
547 : goto end;
548 : }
549 :
550 25058 : if(blk==sb->protocol2->bend)
551 : {
552 17025 : blist_adjust_head(blist, sb);
553 17025 : if(write_endfile(sb, manios)) return -1;
554 17025 : slist_advance(slist);
555 17025 : return 1;
556 : }
557 :
558 8033 : if(sb->protocol2->bsighead==sb->protocol2->bstart)
559 8020 : sb->protocol2->bsighead=blk->next;
560 8033 : sb->protocol2->bstart=blk->next;
561 8033 : if(blk==blist->blk_from_champ_chooser)
562 4021 : blist->blk_from_champ_chooser=blk->next;
563 : }
564 216967 : if(!blk && sb && !sb->protocol2->bend && (end_flags&END_BACKUP))
565 : {
566 : // Write endfile for the very last file.
567 11 : if(write_endfile(sb, manios)) return -1;
568 : }
569 : ret=0;
570 : end:
571 216967 : blist_adjust_head(blist, sb);
572 216967 : return ret;
573 : }
574 :
575 216984 : static int write_to_changed_file(struct asfd *asfd,
576 : struct asfd *chfd, struct manios *manios,
577 : struct slist *slist, int end_flags)
578 : {
579 : struct sbuf *sb;
580 216984 : if(!slist) return 0;
581 :
582 250677 : while((sb=slist->head))
583 : {
584 250660 : if(sb->flags & SBUF_NEED_DATA)
585 : {
586 233992 : switch(sbuf_needs_data(sb, asfd, chfd, manios, slist,
587 : end_flags))
588 : {
589 : case 0: return 0;
590 17025 : case 1: continue;
591 0 : default: return -1;
592 : }
593 :
594 : }
595 : else
596 : {
597 : // No change, can go straight in.
598 16668 : if(manio_write_sbuf(manios->changed, sb)) return -1;
599 16668 : if(write_endfile(sb, manios)) return -1;
600 :
601 : // Move along.
602 16668 : slist_advance(slist);
603 : }
604 : }
605 : return 0;
606 : }
607 :
608 216982 : static int maybe_add_from_scan(struct manios *manios,
609 : struct slist *slist, struct asfd *chfd, struct sbuf **csb,
610 : struct cntr *cntr)
611 : {
612 216982 : int ret=-1;
613 216982 : struct sbuf *snew=NULL;
614 :
615 : while(1)
616 : {
617 250702 : sbuf_free(&snew);
618 250702 : if(!manios->phase1) return 0;
619 : // Limit the amount loaded into memory at any one time.
620 33738 : if(slist->count>SLIST_MAX_IN_MEM)
621 : return 0;
622 33738 : if(!(snew=sbuf_alloc(PROTO_2))) goto end;
623 :
624 33738 : switch(manio_read(manios->phase1, snew))
625 : {
626 : case 0: break;
627 18 : case 1: manio_close(&manios->phase1);
628 18 : ret=0; // Finished.
629 : default: goto end;
630 : }
631 :
632 33720 : switch(entry_changed(snew, manios, chfd, csb, cntr))
633 : {
634 0 : case 0: continue; // No change.
635 : case 1: break;
636 : default: goto end; // Error.
637 : }
638 :
639 67440 : if(data_needed(snew)) snew->flags|=SBUF_NEED_DATA;
640 :
641 33720 : slist_add_sbuf(slist, snew);
642 33720 : snew=NULL;
643 : }
644 : return 0;
645 : end:
646 18 : sbuf_free(&snew);
647 18 : return ret;
648 : }
649 :
650 216981 : static int append_for_champ_chooser(struct asfd *chfd,
651 : struct blist *blist, int end_flags)
652 : {
653 : static int finished_sending=0;
654 : static struct iobuf wbuf;
655 : static struct blk *blk=NULL;
656 :
657 242039 : while(blist->blk_for_champ_chooser)
658 : {
659 112824 : blk=blist->blk_for_champ_chooser;
660 : // If we send too many blocks to the champ chooser at once,
661 : // it can go faster than we can send paths to completed
662 : // manifests to it. This means that deduplication efficiency
663 : // is reduced (although speed may be faster).
664 : // So limit the sending.
665 225648 : if(blk->index
666 112824 : - blist->head->index > MANIFEST_SIG_MAX)
667 : return 0;
668 :
669 25058 : blk_to_iobuf_sig(blk, &wbuf);
670 :
671 25058 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
672 : {
673 : case APPEND_OK: break;
674 : case APPEND_BLOCKED:
675 : return 0; // Try again later.
676 : default: return -1;
677 : }
678 25058 : blist->blk_for_champ_chooser=blk->next;
679 : }
680 129215 : if(end_flags&END_SIGS
681 49976 : && !finished_sending && !blist->blk_for_champ_chooser)
682 : {
683 13 : iobuf_from_str(&wbuf, CMD_GEN, (char *)"sigs_end");
684 13 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
685 : {
686 : case APPEND_OK: break;
687 : case APPEND_BLOCKED:
688 : return 0; // Try again later.
689 : default: return -1;
690 : }
691 12 : finished_sending++;
692 : }
693 : return 0;
694 : }
695 :
696 25063 : static int mark_not_got(struct blk *blk, struct dpth *dpth)
697 : {
698 : const char *path;
699 :
700 25063 : if(blk->got!=BLK_INCOMING) return 0;
701 8522 : blk->got=BLK_NOT_GOT;
702 :
703 : // Need to get the data for this blk from the client.
704 : // Set up the details of where it will be saved.
705 8522 : if(!(path=dpth_protocol2_mk(dpth))) return -1;
706 :
707 : // FIX THIS: make dpth give us the path in a uint8 array.
708 8522 : blk->savepath=savepathstr_with_sig_to_uint64(path);
709 8522 : blk->got_save_path=1;
710 : // Load it into our local hash table.
711 8522 : if(hash_load_blk(blk))
712 : return -1;
713 8522 : if(dpth_protocol2_incr_sig(dpth))
714 : return -1;
715 8522 : return 0;
716 : }
717 :
718 25058 : static struct hash_strong *in_local_hash(struct blk *blk)
719 : {
720 : static struct hash_weak *hash_weak;
721 :
722 25058 : if(!(hash_weak=hash_weak_find(blk->fingerprint)))
723 : return NULL;
724 4014 : return hash_strong_find(hash_weak, blk->md5sum);
725 : }
726 :
727 37587 : static int simple_deduplicate_blk(struct blk *blk)
728 : {
729 : static struct hash_strong *hash_strong;
730 37587 : if(blk->got!=BLK_INCOMING)
731 : return 0;
732 25058 : if((hash_strong=in_local_hash(blk)))
733 : {
734 4014 : blk->savepath=hash_strong->savepath;
735 4014 : blk->got_save_path=1;
736 4014 : blk->got=BLK_GOT;
737 4014 : return 1;
738 : }
739 : return 0;
740 : }
741 :
742 25058 : static int mark_up_to_index(struct blist *blist,
743 : uint64_t index, struct dpth *dpth)
744 : {
745 : struct blk *blk;
746 :
747 : // Mark everything that was not got, up to the given index.
748 62645 : for(blk=blist->blk_from_champ_chooser;
749 50116 : blk && blk->index!=index; blk=blk->next)
750 : {
751 12529 : if(simple_deduplicate_blk(blk))
752 0 : continue;
753 12529 : if(mark_not_got(blk, dpth))
754 : return -1;
755 : }
756 25058 : if(!blk)
757 : {
758 0 : logp("Could not find index from champ chooser: %" PRIu64 "\n",
759 : index);
760 : return -1;
761 : }
762 25058 : simple_deduplicate_blk(blk);
763 :
764 : //logp("Found index from champ chooser: %lu\n", index);
765 : //printf("index from cc: %d\n", index);
766 25058 : blist->blk_from_champ_chooser=blk;
767 : return 0;
768 : }
769 :
770 12524 : static int deal_with_sig_from_chfd(struct iobuf *rbuf, struct blist *blist,
771 : struct dpth *dpth)
772 : {
773 : static struct blk b;
774 12524 : if(blk_set_from_iobuf_index_and_savepath(&b, rbuf))
775 : return -1;
776 :
777 12524 : if(mark_up_to_index(blist, b.index, dpth))
778 : return -1;
779 12524 : blist->blk_from_champ_chooser->savepath=b.savepath;
780 12524 : blist->blk_from_champ_chooser->got=BLK_GOT;
781 12524 : blist->blk_from_champ_chooser->got_save_path=1;
782 12524 : return 0;
783 : }
784 :
785 12534 : static int deal_with_wrap_up_from_chfd(struct iobuf *rbuf, struct blist *blist,
786 : struct dpth *dpth)
787 : {
788 : static struct blk b;
789 12534 : if(blk_set_from_iobuf_wrap_up(&b, rbuf))
790 : return -1;
791 :
792 12534 : if(mark_up_to_index(blist, b.index, dpth)) return -1;
793 12534 : if(mark_not_got(blist->blk_from_champ_chooser, dpth)) return -1;
794 12534 : return 0;
795 : }
796 :
797 25059 : static int deal_with_read_from_chfd(struct asfd *chfd,
798 : struct blist *blist, struct dpth *dpth, struct cntr *cntr)
799 : {
800 25059 : int ret=-1;
801 :
802 : // Deal with champ chooser read here.
803 : //printf("read from cc: %s\n", chfd->rbuf->buf);
804 25059 : switch(chfd->rbuf->cmd)
805 : {
806 : case CMD_SIG:
807 : // Get these for blks that the champ chooser has found.
808 12524 : if(deal_with_sig_from_chfd(chfd->rbuf, blist, dpth))
809 : goto end;
810 12524 : cntr_add_same(cntr, CMD_DATA);
811 : break;
812 : case CMD_WRAP_UP:
813 12534 : if(deal_with_wrap_up_from_chfd(chfd->rbuf, blist, dpth))
814 : goto end;
815 : break;
816 : default:
817 1 : iobuf_log_unexpected(chfd->rbuf, __func__);
818 : goto end;
819 : }
820 : ret=0;
821 : end:
822 25059 : iobuf_free_content(chfd->rbuf);
823 25059 : return ret;
824 : }
825 :
826 12 : static int check_for_missing_work_in_slist(struct slist *slist)
827 : {
828 12 : struct sbuf *sb=NULL;
829 :
830 12 : if(slist->blist->head)
831 : {
832 0 : logp("ERROR: finishing but still want block: %" PRIu64 "\n",
833 : slist->blist->head->index);
834 : return -1;
835 : }
836 :
837 15 : for(sb=slist->head; sb; sb=sb->next)
838 : {
839 4 : if(!(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST))
840 : {
841 1 : logp("ERROR: finishing but still waiting for: %s\n",
842 : iobuf_to_printable(&slist->head->path));
843 : return -1;
844 : }
845 : }
846 : return 0;
847 : }
848 :
849 : #ifndef UTEST
850 : static
851 : #endif
852 24 : int do_backup_phase2_server_protocol2(struct async *as, struct asfd *chfd,
853 : struct sdirs *sdirs, int resume, struct conf **confs)
854 : {
855 24 : int ret=-1;
856 24 : uint8_t end_flags=0;
857 24 : struct slist *slist=NULL;
858 : struct iobuf wbuf;
859 24 : struct dpth *dpth=NULL;
860 24 : man_off_t *p1pos=NULL;
861 24 : struct manios *manios=NULL;
862 : // This is used to tell the client that a number of consecutive blocks
863 : // have been found and can be freed.
864 24 : struct asfd *asfd=NULL;
865 24 : struct cntr *cntr=NULL;
866 24 : struct sbuf *csb=NULL;
867 24 : uint64_t file_no=1;
868 24 : int fail_on_warning=0;
869 24 : struct cntr_ent *warn_ent=NULL;
870 :
871 24 : if(!as)
872 : {
873 1 : logp("async not provided to %s()\n", __func__);
874 1 : goto end;
875 : }
876 23 : if(!sdirs)
877 : {
878 2 : logp("sdirs not provided to %s()\n", __func__);
879 2 : goto end;
880 : }
881 21 : if(!confs)
882 : {
883 1 : logp("confs not provided to %s()\n", __func__);
884 1 : goto end;
885 : }
886 20 : asfd=as->asfd;
887 20 : if(!asfd)
888 : {
889 1 : logp("asfd not provided to %s()\n", __func__);
890 1 : goto end;
891 : }
892 19 : if(!chfd)
893 : {
894 1 : logp("chfd not provided to %s()\n", __func__);
895 1 : goto end;
896 : }
897 18 : cntr=get_cntr(confs);
898 18 : fail_on_warning=get_int(confs[OPT_FAIL_ON_WARNING]);
899 18 : if(cntr)
900 0 : warn_ent=cntr->ent[CMD_WARNING];
901 :
902 18 : if(get_int(confs[OPT_BREAKPOINT])>=2000
903 0 : && get_int(confs[OPT_BREAKPOINT])<3000)
904 : {
905 0 : breaking=get_int(confs[OPT_BREAKPOINT]);
906 0 : breakcount=breaking-2000;
907 : }
908 :
909 18 : blks_generate_init();
910 :
911 18 : logp("Phase 2 begin (recv backup data)\n");
912 :
913 18 : if(!(dpth=dpth_alloc())
914 36 : || dpth_protocol2_init(dpth,
915 18 : sdirs->data,
916 18 : get_string(confs[OPT_CNAME]),
917 18 : sdirs->cfiles,
918 : get_int(confs[OPT_MAX_STORAGE_SUBDIRS])))
919 : goto end;
920 18 : if(resume)
921 : {
922 0 : if(!(p1pos=do_resume(sdirs, dpth, confs)))
923 : goto end;
924 0 : if(cntr_send_sdirs(asfd, sdirs, confs, CNTR_STATUS_BACKUP))
925 : goto end;
926 : }
927 :
928 18 : if(!(manios=manios_open_phase2(sdirs, p1pos, PROTO_2))
929 18 : || !(slist=slist_alloc())
930 18 : || !(csb=sbuf_alloc(PROTO_2)))
931 : goto end;
932 :
933 18 : iobuf_free_content(asfd->rbuf);
934 :
935 18 : memset(&wbuf, 0, sizeof(struct iobuf));
936 217012 : while(!(end_flags&END_BACKUP))
937 : {
938 216982 : if(check_fail_on_warning(fail_on_warning, warn_ent))
939 : goto end;
940 :
941 216982 : if(write_status(CNTR_STATUS_BACKUP,
942 217008 : csb && csb->path.buf?csb->path.buf:"", cntr))
943 : goto end;
944 :
945 216982 : if(maybe_add_from_scan(manios, slist, chfd, &csb, cntr))
946 : goto end;
947 :
948 216982 : if(!wbuf.len)
949 : {
950 216982 : if(get_wbuf_from_sigs(&wbuf, slist, &end_flags))
951 : goto end;
952 216982 : if(!wbuf.len)
953 : {
954 208447 : get_wbuf_from_files(&wbuf, slist,
955 : manios, &end_flags, &file_no);
956 : }
957 : }
958 :
959 216982 : if(wbuf.len
960 25599 : && asfd->append_all_to_write_buffer(asfd, &wbuf)==APPEND_ERROR)
961 : goto end;
962 :
963 216981 : if(append_for_champ_chooser(chfd, slist->blist, end_flags))
964 : goto end;
965 :
966 216980 : if(as->read_write(as))
967 : {
968 2 : logp("error from as->read_write in %s\n", __func__);
969 2 : goto end;
970 : }
971 :
972 267624 : while(asfd->rbuf->buf)
973 : {
974 50647 : if(deal_with_read(asfd->rbuf, slist, cntr,
975 : &end_flags, dpth))
976 : goto end;
977 : // Get as much out of the readbuf as possible.
978 50646 : if(asfd->parse_readbuf(asfd))
979 : goto end;
980 : }
981 242035 : while(chfd->rbuf->buf)
982 : {
983 50118 : if(deal_with_read_from_chfd(chfd,
984 25059 : slist->blist, dpth, cntr))
985 : goto end;
986 : // Get as much out of the readbuf as possible.
987 25058 : if(chfd->parse_readbuf(chfd))
988 : goto end;
989 : }
990 :
991 216976 : if(write_to_changed_file(asfd, chfd, manios,
992 : slist, end_flags))
993 : goto end;
994 : }
995 :
996 : // Hack: If there are some entries left after the last entry that
997 : // contains block data, it will not be written to the changed file
998 : // yet because the last entry of block data has not had
999 : // sb->protocol2->bend set.
1000 12 : if(slist->head && slist->head->next)
1001 : {
1002 : struct sbuf *sb=NULL;
1003 8 : sb=slist->head;
1004 8 : slist->head=sb->next;
1005 8 : sbuf_free(&sb);
1006 8 : if(write_to_changed_file(asfd, chfd, manios,
1007 : slist, end_flags))
1008 : goto end;
1009 : }
1010 :
1011 12 : if(manios_close(&manios))
1012 : goto end;
1013 :
1014 12 : if(check_for_missing_work_in_slist(slist))
1015 : goto end;
1016 :
1017 : // Need to release the last left. There should be one at most.
1018 11 : if(dpth->head && dpth->head->next)
1019 : {
1020 0 : logp("ERROR: More data locks remaining after: %s\n",
1021 0 : dpth->head->save_path);
1022 0 : goto end;
1023 : }
1024 11 : if(dpth_release_all(dpth)) goto end;
1025 :
1026 11 : if(check_fail_on_warning(fail_on_warning, warn_ent))
1027 : goto end;
1028 :
1029 11 : ret=0;
1030 : end:
1031 24 : if(ret)
1032 : {
1033 13 : if(slist && slist->head)
1034 1 : logp(" last tried file: %s\n",
1035 : iobuf_to_printable(&slist->head->path));
1036 : }
1037 24 : logp("End backup\n");
1038 24 : sbuf_free(&csb);
1039 24 : slist_free(&slist);
1040 24 : if(asfd) iobuf_free_content(asfd->rbuf);
1041 24 : if(chfd) iobuf_free_content(chfd->rbuf);
1042 24 : dpth_free(&dpth);
1043 24 : manios_close(&manios);
1044 24 : man_off_t_free(&p1pos);
1045 24 : blks_generate_free();
1046 24 : hash_delete_all();
1047 24 : return ret;
1048 : }
1049 :
1050 0 : int backup_phase2_server_protocol2(struct async *as, struct sdirs *sdirs,
1051 : int resume, struct conf **confs)
1052 : {
1053 0 : int ret=-1;
1054 0 : struct asfd *chfd=NULL;
1055 0 : if(!(chfd=champ_chooser_connect(as, sdirs, confs, resume)))
1056 : {
1057 0 : logp("problem connecting to champ chooser\n");
1058 0 : goto end;
1059 : }
1060 0 : ret=do_backup_phase2_server_protocol2(as, chfd, sdirs, resume, confs);
1061 : end:
1062 0 : if(chfd) as->asfd_remove(as, chfd);
1063 0 : asfd_free(&chfd);
1064 0 : return ret;
1065 : }
|