Line data Source code
1 : #include "../../burp.h"
2 : #include "../../alloc.h"
3 : #include "../../asfd.h"
4 : #include "../../async.h"
5 : #include "../../attribs.h"
6 : #include "../../base64.h"
7 : #include "../../cmd.h"
8 : #include "../../cntr.h"
9 : #include "../../cstat.h"
10 : #include "../../handy.h"
11 : #include "../../hexmap.h"
12 : #include "../../iobuf.h"
13 : #include "../../log.h"
14 : #include "../../server/manio.h"
15 : #include "../../protocol2/blist.h"
16 : #include "../../protocol2/rabin/rabin.h"
17 : #include "../../slist.h"
18 : #include "../child.h"
19 : #include "../manios.h"
20 : #include "../resume.h"
21 : #include "champ_chooser/champ_client.h"
22 : #include "champ_chooser/champ_server.h"
23 : #include "champ_chooser/hash.h"
24 : #include "dpth.h"
25 : #include "backup_phase2.h"
26 :
27 : #define END_SIGS 0x01
28 : #define END_BACKUP 0x02
29 : #define END_REQUESTS 0x04
30 : #define END_BLK_REQUESTS 0x08
31 :
32 : #define BLKS_MAX_UNREQUESTED BLKS_MAX_IN_MEM/4
33 : // This needs to be greater than BLKS_MAX_UNREQUESTED
34 : #define SLIST_MAX_IN_MEM BLKS_MAX_IN_MEM
35 :
36 : static int breaking=0;
37 : static int breakcount=0;
38 :
39 : static int data_needed(struct sbuf *sb)
40 : {
41 33720 : if(sb->path.cmd==CMD_FILE)
42 : return 1;
43 16674 : if(sb->path.cmd==CMD_METADATA)
44 : return 1;
45 : return 0;
46 : }
47 :
48 4 : static int manio_component_to_chfd(struct asfd *chfd, char *path)
49 : {
50 : struct iobuf wbuf;
51 4 : iobuf_from_str(&wbuf, CMD_MANIFEST, path);
52 4 : return chfd->write(chfd, &wbuf);
53 : }
54 :
55 0 : static int unchanged(struct sbuf *csb, struct sbuf *sb,
56 : struct manios *manios, struct asfd *chfd)
57 : {
58 0 : int ret=-1;
59 0 : char *fpath=NULL;
60 0 : if(!(fpath=strdup_w(manios->changed->offset->fpath, __func__)))
61 : goto end;
62 0 : if(manio_copy_entry(csb, sb,
63 : manios->current, manios->unchanged)<0)
64 : goto end;
65 0 : if(strcmp(fpath, manios->changed->offset->fpath))
66 : {
67 : // If the copy crossed a manio boundary, we should tell the
68 : // champ server to load the previous one as a candidate.
69 0 : if(manio_component_to_chfd(chfd, fpath))
70 : goto end;
71 : }
72 : ret=0;
73 : end:
74 0 : free_w(&fpath);
75 0 : return ret;
76 : }
77 :
78 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
79 0 : static int found_in_current_manifest(struct sbuf *csb, struct sbuf *sb,
80 : struct manios *manios, struct asfd *chfd,
81 : struct cntr *cntr)
82 : {
83 : // Located the entry in the current manifest.
84 : // If the file type changed, I think it is time to back it up again
85 : // (for example, EFS changing to normal file, or back again).
86 0 : if(csb->path.cmd!=sb->path.cmd)
87 : {
88 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
89 : return -1;
90 0 : return 1;
91 : }
92 :
93 : // mtime is the actual file data.
94 : // ctime is the attributes or meta data.
95 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
96 0 : && csb->statp.st_ctime==sb->statp.st_ctime)
97 : {
98 : // Got an unchanged file.
99 0 : cntr_add_same(cntr, sb->path.cmd);
100 0 : return unchanged(csb, sb, manios, chfd);
101 : }
102 :
103 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
104 0 : && csb->statp.st_ctime!=sb->statp.st_ctime)
105 : {
106 : // FIX THIS:
107 : // File data stayed the same, but attributes or meta data
108 : // changed. We already have the attributes, but may need to
109 : // get extra meta data.
110 0 : cntr_add_same(cntr, sb->path.cmd);
111 0 : return unchanged(csb, sb, manios, chfd);
112 : }
113 :
114 : // File data changed.
115 0 : cntr_add_changed(cntr, sb->path.cmd);
116 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
117 : return -1;
118 0 : return 1;
119 : }
120 :
121 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
122 33720 : static int entry_changed(struct sbuf *sb,
123 : struct manios *manios, struct asfd *chfd, struct sbuf **csb,
124 : struct cntr *cntr)
125 : {
126 : static int finished=0;
127 : int pcmp;
128 :
129 33720 : if(finished)
130 : {
131 33710 : cntr_add_new(cntr, sb->path.cmd);
132 33710 : return 1;
133 : }
134 :
135 10 : if(*csb && (*csb)->path.buf)
136 : {
137 : // Already have an entry.
138 : }
139 : else
140 : {
141 : // Need to read another.
142 10 : switch(manio_read(manios->current, *csb))
143 : {
144 : case 1: // Reached the end.
145 10 : sbuf_free(csb);
146 10 : finished=1;
147 10 : cntr_add_new(cntr, sb->path.cmd);
148 10 : return 1;
149 : case -1: return -1;
150 : }
151 0 : if(!(*csb)->path.buf)
152 : {
153 0 : logp("Should have a path at this point, but do not, in %s\n", __func__);
154 0 : return -1;
155 : }
156 : // Got an entry.
157 : }
158 :
159 : while(1)
160 : {
161 0 : if(!(pcmp=sbuf_pathcmp(*csb, sb)))
162 0 : return found_in_current_manifest(*csb, sb,
163 : manios, chfd, cntr);
164 0 : else if(pcmp>0)
165 : {
166 0 : cntr_add_new(cntr, sb->path.cmd);
167 0 : return 1;
168 : }
169 : // cntr_add_deleted(cntr, (*csb)->path.cmd);
170 : // Behind - need to read more data from the old manifest.
171 0 : switch(manio_read(manios->current, *csb))
172 : {
173 : case 1: // Reached the end.
174 0 : sbuf_free(csb);
175 0 : cntr_add_new(cntr, sb->path.cmd);
176 0 : return 1;
177 : case -1: return -1;
178 : }
179 : // Got something, go back around the loop.
180 : }
181 :
182 : return 0;
183 : }
184 :
185 8523 : static int add_data_to_store(struct cntr *cntr,
186 : struct slist *slist, struct iobuf *rbuf, struct dpth *dpth)
187 : {
188 : static struct blk *blk=NULL;
189 :
190 : // Find the first one in the list that was requested.
191 : // FIX THIS: Going up the list here, and then later
192 : // when writing to the manifest is not efficient.
193 17046 : for(blk=slist->blist->head;
194 8522 : blk && (!blk->requested || blk->got==BLK_GOT); blk=blk->next)
195 : {
196 : // logp("try: %d %d\n", blk->index, blk->got);
197 : }
198 8523 : if(!blk)
199 : {
200 1 : logp("Received data but could not find next requested block.\n");
201 1 : if(!slist->blist->head)
202 1 : logp("and slist->blist->head is null\n");
203 : else
204 0 : logp("head index: %" PRIu64 "\n", slist->blist->head->index);
205 : return -1;
206 : }
207 :
208 : // FIX THIS
209 : #ifndef UTEST
210 : if(blk_verify(blk->fingerprint, blk->md5sum, rbuf->buf, rbuf->len)<=0)
211 : {
212 : logp("ERROR: Block %" PRIu64 " from client did not verify.\n",
213 : blk->index);
214 : return -1;
215 : }
216 : #endif
217 :
218 : // Add it to the data store straight away.
219 8522 : if(dpth_protocol2_fwrite(dpth, rbuf, blk)) return -1;
220 :
221 8522 : cntr_add(cntr, CMD_DATA, 0);
222 :
223 8522 : blk->got=BLK_GOT;
224 8522 : blk=blk->next;
225 :
226 : return 0;
227 : }
228 :
229 17034 : static int set_up_for_sig_info(struct slist *slist, struct iobuf *attr,
230 : uint64_t index)
231 : {
232 : struct sbuf *sb;
233 :
234 50701 : for(sb=slist->add_sigs_here; sb; sb=sb->next)
235 : {
236 50701 : if(!sb->protocol2->index)
237 16642 : continue;
238 34059 : if(index==sb->protocol2->index)
239 : break;
240 : }
241 17034 : if(!sb)
242 : {
243 0 : logp("Could not find %" PRIu64 " in request list\n", index);
244 : return -1;
245 : }
246 : // Replace the attribs with the more recent values.
247 17034 : iobuf_free_content(&sb->attr);
248 17034 : iobuf_move(&sb->attr, attr);
249 :
250 : // Mark the end of the previous file.
251 17034 : slist->add_sigs_here->protocol2->bend=slist->blist->tail;
252 :
253 17034 : slist->add_sigs_here=sb;
254 :
255 : // Incoming sigs now need to get added to 'add_sigs_here'
256 : return 0;
257 : }
258 :
259 : /*
260 : static void dump_blks(const char *msg, struct blk *b)
261 : {
262 : struct blk *xx;
263 : for(xx=b; xx; xx=xx->next)
264 : printf("%s: %d %d %p\n", msg, xx->index, xx->got, xx);
265 : }
266 : */
267 :
268 25058 : static int add_to_sig_list(struct slist *slist, struct iobuf *rbuf)
269 : {
270 : // Goes on slist->add_sigs_here
271 : struct blk *blk;
272 : struct protocol2 *protocol2;
273 :
274 25058 : if(!(blk=blk_alloc())) return -1;
275 25058 : blist_add_blk(slist->blist, blk);
276 :
277 25058 : protocol2=slist->add_sigs_here->protocol2;
278 25058 : if(!protocol2->bstart) protocol2->bstart=blk;
279 25058 : if(!protocol2->bsighead) protocol2->bsighead=blk;
280 :
281 25058 : if(blk_set_from_iobuf_sig(blk, rbuf)) return -1;
282 :
283 : // Need to send sigs to champ chooser, therefore need to point
284 : // to the oldest unsent one if nothing is pointed to yet.
285 25058 : if(!slist->blist->blk_for_champ_chooser)
286 16490 : slist->blist->blk_for_champ_chooser=blk;
287 :
288 : return 0;
289 : }
290 :
291 50647 : static int deal_with_read(struct iobuf *rbuf, struct slist *slist,
292 : struct cntr *cntr, uint8_t *end_flags, struct dpth *dpth)
293 : {
294 50647 : int ret=0;
295 : static struct iobuf attr;
296 : static uint64_t index;
297 :
298 50647 : switch(rbuf->cmd)
299 : {
300 : /* Incoming block data. */
301 : case CMD_DATA:
302 8523 : if(add_data_to_store(cntr, slist, rbuf, dpth))
303 : goto error;
304 : goto end;
305 :
306 : /* Incoming block signatures. */
307 : case CMD_ATTRIBS_SIGS:
308 :
309 17034 : iobuf_init(&attr);
310 17034 : iobuf_move(&attr, rbuf);
311 17034 : index=decode_file_no(&attr);
312 :
313 : // Need to go through slist to find the matching
314 : // entry.
315 17034 : if(set_up_for_sig_info(slist, &attr, index))
316 : goto error;
317 : return 0;
318 : case CMD_SIG:
319 25058 : if(add_to_sig_list(slist, rbuf))
320 : goto error;
321 : goto end;
322 :
323 : /* Incoming control/message stuff. */
324 : case CMD_MESSAGE:
325 : case CMD_WARNING:
326 : {
327 6 : log_recvd(rbuf, cntr, 0);
328 6 : goto end;
329 : }
330 : case CMD_GEN:
331 25 : if(!strcmp(rbuf->buf, "sigs_end"))
332 : {
333 13 : (*end_flags)|=END_SIGS;
334 13 : goto end;
335 : }
336 12 : else if(!strcmp(rbuf->buf, "backup_end"))
337 : {
338 12 : (*end_flags)|=END_BACKUP;
339 12 : goto end;
340 : }
341 : break;
342 : case CMD_INTERRUPT:
343 : {
344 : uint64_t file_no;
345 1 : file_no=base64_to_uint64(rbuf->buf);
346 1 : if(slist_del_sbuf_by_index(slist, file_no))
347 : goto error;
348 : goto end;
349 : }
350 : default:
351 : break;
352 : }
353 :
354 0 : iobuf_log_unexpected(rbuf, __func__);
355 : error:
356 : ret=-1;
357 : end:
358 33613 : iobuf_free_content(rbuf);
359 33613 : return ret;
360 : }
361 :
362 216982 : static int get_wbuf_from_sigs(struct iobuf *wbuf, struct slist *slist,
363 : uint8_t *end_flags)
364 : {
365 : static char req[32]="";
366 216982 : struct sbuf *sb=slist->blks_to_request;
367 :
368 13327 : while(sb && !(sb->flags & SBUF_NEED_DATA)) sb=sb->next;
369 :
370 216982 : if(!sb)
371 : {
372 16 : slist->blks_to_request=NULL;
373 16 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
374 : {
375 3 : iobuf_from_str(wbuf,
376 : CMD_GEN, (char *)"blk_requests_end");
377 3 : (*end_flags)|=END_BLK_REQUESTS;
378 : }
379 : return 0;
380 : }
381 216966 : if(!sb->protocol2->bsighead)
382 : {
383 : // Trying to move onto the next file.
384 : // ??? Does this really work?
385 129027 : if(sb->protocol2->bend)
386 : {
387 0 : slist->blks_to_request=sb->next;
388 : printf("move to next\n");
389 : }
390 129027 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
391 : {
392 10 : iobuf_from_str(wbuf,
393 : CMD_GEN, (char *)"blk_requests_end");
394 10 : (*end_flags)|=END_BLK_REQUESTS;
395 : }
396 : return 0;
397 : }
398 :
399 87939 : if(sb->protocol2->bsighead->got==BLK_INCOMING)
400 : return 0;
401 :
402 12538 : if(sb->protocol2->bsighead->got==BLK_NOT_GOT)
403 : {
404 8522 : base64_from_uint64(sb->protocol2->bsighead->index, req);
405 8522 : iobuf_from_str(wbuf, CMD_DATA_REQ, req);
406 8522 : sb->protocol2->bsighead->requested=1;
407 : }
408 :
409 : // Move on.
410 12538 : if(sb->protocol2->bsighead==sb->protocol2->bend)
411 : {
412 8517 : slist->blks_to_request=sb->next;
413 8517 : sb->protocol2->bsighead=sb->protocol2->bstart;
414 : }
415 : else
416 : {
417 4021 : sb->protocol2->bsighead=sb->protocol2->bsighead->next;
418 : }
419 : return 0;
420 : }
421 :
422 208447 : static void get_wbuf_from_files(struct iobuf *wbuf, struct slist *slist,
423 : struct manios *manios, uint8_t *end_flags, uint64_t *file_no)
424 : {
425 208447 : struct sbuf *sb=slist->last_requested;
426 208447 : if(!sb)
427 : {
428 157689 : if(!manios->phase1 && !((*end_flags)&END_REQUESTS))
429 : {
430 18 : iobuf_from_str(wbuf, CMD_GEN, (char *)"requests_end");
431 18 : (*end_flags)|=END_REQUESTS;
432 : }
433 : return;
434 : }
435 :
436 50758 : if(sb->flags & SBUF_SENT_PATH || !(sb->flags & SBUF_NEED_DATA))
437 : {
438 33712 : slist->last_requested=sb->next;
439 : return;
440 : }
441 :
442 : // Only need to request the path at this stage.
443 17046 : iobuf_copy(wbuf, &sb->path);
444 17046 : sb->flags |= SBUF_SENT_PATH;
445 17046 : sb->protocol2->index=(*file_no)++;
446 : }
447 :
448 2 : static void get_wbuf_from_index(struct iobuf *wbuf, uint64_t index)
449 : {
450 : static char *p;
451 : static char tmp[32];
452 2 : p=tmp;
453 2 : p+=to_base64(index, tmp);
454 2 : *p='\0';
455 2 : iobuf_from_str(wbuf, CMD_WRAP_UP, tmp);
456 2 : }
457 :
458 33704 : static int write_endfile(struct sbuf *sb, struct manios *manios)
459 : {
460 : struct iobuf endfile;
461 :
462 33704 : if(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST)
463 : return 0;
464 33704 : if(!iobuf_is_filedata(&sb->path))
465 : return 0;
466 :
467 17036 : sb->flags |= SBUF_END_WRITTEN_TO_MANIFEST;
468 : // FIX THIS: Should give a proper length and md5sum.
469 17036 : iobuf_from_str(&endfile, CMD_END_FILE, (char *)"0:0");
470 17036 : return iobuf_send_msg_fzp(&endfile, manios->changed->fzp);
471 : }
472 :
473 233992 : static void blist_adjust_head(struct blist *blist, struct sbuf *sb)
474 : {
475 : struct blk *b;
476 259050 : while(blist->head!=sb->protocol2->bstart)
477 : {
478 25058 : b=blist->head->next;
479 25058 : if(blist->head==blist->blk_from_champ_chooser)
480 8508 : blist->blk_from_champ_chooser=b;
481 25058 : blk_free(&blist->head);
482 25058 : blist->head=b;
483 : }
484 233992 : if(!blist->head)
485 51771 : blist->tail=NULL;
486 233992 : }
487 :
488 233992 : static int sbuf_needs_data(struct sbuf *sb, struct asfd *asfd,
489 : struct asfd *chfd, struct manios *manios,
490 : struct slist *slist, int end_flags)
491 : {
492 233992 : int ret=-1;
493 : struct blk *blk;
494 : static struct iobuf wbuf;
495 233992 : struct blist *blist=slist->blist;
496 : static int unrequested=0;
497 :
498 233992 : if(!(sb->flags & SBUF_HEADER_WRITTEN_TO_MANIFEST))
499 : {
500 17036 : if(manio_write_sbuf(manios->changed, sb)) goto end;
501 17036 : sb->flags |= SBUF_HEADER_WRITTEN_TO_MANIFEST;
502 : }
503 :
504 242025 : while((blk=sb->protocol2->bstart)
505 190254 : && blk->got==BLK_GOT
506 59413 : && (blk->next || end_flags&END_BACKUP))
507 : {
508 25058 : if(blk->got_save_path
509 25058 : && !blk_is_zero_length(blk))
510 : {
511 25058 : if(breaking && breakcount--==0)
512 : {
513 0 : breakpoint(breaking, __func__);
514 0 : goto end;
515 : }
516 25058 : if(manio_write_sig_and_path(manios->changed, blk))
517 : goto end;
518 25058 : if(manios->changed->sig_count==0)
519 : {
520 : // Have finished a manifest file. Want to start
521 : // using it as a dedup candidate now.
522 4 : if(manio_component_to_chfd(chfd,
523 4 : manios->changed->offset->ppath))
524 : goto end;
525 :
526 : // The champ chooser has the candidate. Now,
527 : // empty our local hash table.
528 4 : hash_delete_all();
529 : // Add the most recent block, so identical
530 : // adjacent blocks are deduplicated well.
531 4 : if(hash_load_blk(blk))
532 : goto end;
533 : }
534 : }
535 :
536 25058 : if(!blk->requested)
537 16536 : unrequested++;
538 :
539 25058 : if(unrequested>BLKS_MAX_UNREQUESTED)
540 : {
541 2 : unrequested=0;
542 : // Let the client know that it can free memory if there
543 : // was a long consecutive number of unrequested blocks.
544 2 : get_wbuf_from_index(&wbuf, blk->index);
545 2 : if(asfd->write(asfd, &wbuf))
546 : goto end;
547 : }
548 :
549 25058 : if(blk==sb->protocol2->bend)
550 : {
551 17025 : blist_adjust_head(blist, sb);
552 17025 : if(write_endfile(sb, manios)) return -1;
553 17025 : slist_advance(slist);
554 17025 : return 1;
555 : }
556 :
557 8033 : if(sb->protocol2->bsighead==sb->protocol2->bstart)
558 8020 : sb->protocol2->bsighead=blk->next;
559 8033 : sb->protocol2->bstart=blk->next;
560 8033 : if(blk==blist->blk_from_champ_chooser)
561 4021 : blist->blk_from_champ_chooser=blk->next;
562 : }
563 216967 : if(!blk && sb && !sb->protocol2->bend && (end_flags&END_BACKUP))
564 : {
565 : // Write endfile for the very last file.
566 11 : if(write_endfile(sb, manios)) return -1;
567 : }
568 : ret=0;
569 : end:
570 216967 : blist_adjust_head(blist, sb);
571 216967 : return ret;
572 : }
573 :
574 216984 : static int write_to_changed_file(struct asfd *asfd,
575 : struct asfd *chfd, struct manios *manios,
576 : struct slist *slist, int end_flags)
577 : {
578 : struct sbuf *sb;
579 216984 : if(!slist) return 0;
580 :
581 250677 : while((sb=slist->head))
582 : {
583 250660 : if(sb->flags & SBUF_NEED_DATA)
584 : {
585 233992 : switch(sbuf_needs_data(sb, asfd, chfd, manios, slist,
586 : end_flags))
587 : {
588 : case 0: return 0;
589 17025 : case 1: continue;
590 0 : default: return -1;
591 : }
592 :
593 : }
594 : else
595 : {
596 : // No change, can go straight in.
597 16668 : if(manio_write_sbuf(manios->changed, sb)) return -1;
598 16668 : if(write_endfile(sb, manios)) return -1;
599 :
600 : // Move along.
601 16668 : slist_advance(slist);
602 : }
603 : }
604 : return 0;
605 : }
606 :
607 216982 : static int maybe_add_from_scan(struct manios *manios,
608 : struct slist *slist, struct asfd *chfd, struct sbuf **csb,
609 : struct cntr *cntr)
610 : {
611 216982 : int ret=-1;
612 216982 : struct sbuf *snew=NULL;
613 :
614 : while(1)
615 : {
616 250702 : sbuf_free(&snew);
617 250702 : if(!manios->phase1) return 0;
618 : // Limit the amount loaded into memory at any one time.
619 33738 : if(slist->count>SLIST_MAX_IN_MEM)
620 : return 0;
621 67458 : if(!(snew=sbuf_alloc(PROTO_2))) goto end;
622 :
623 33738 : switch(manio_read(manios->phase1, snew))
624 : {
625 : case 0: break;
626 18 : case 1: manio_close(&manios->phase1);
627 18 : ret=0; // Finished.
628 : default: goto end;
629 : }
630 :
631 33720 : switch(entry_changed(snew, manios, chfd, csb, cntr))
632 : {
633 0 : case 0: continue; // No change.
634 : case 1: break;
635 : default: goto end; // Error.
636 : }
637 :
638 101160 : if(data_needed(snew)) snew->flags|=SBUF_NEED_DATA;
639 :
640 33720 : slist_add_sbuf(slist, snew);
641 33720 : snew=NULL;
642 : }
643 : return 0;
644 : end:
645 18 : sbuf_free(&snew);
646 18 : return ret;
647 : }
648 :
649 216981 : static int append_for_champ_chooser(struct asfd *chfd,
650 : struct blist *blist, int end_flags)
651 : {
652 : static int finished_sending=0;
653 : static struct iobuf wbuf;
654 : static struct blk *blk=NULL;
655 :
656 242039 : while(blist->blk_for_champ_chooser)
657 : {
658 112824 : blk=blist->blk_for_champ_chooser;
659 : // If we send too many blocks to the champ chooser at once,
660 : // it can go faster than we can send paths to completed
661 : // manifests to it. This means that deduplication efficiency
662 : // is reduced (although speed may be faster).
663 : // So limit the sending.
664 225648 : if(blk->index
665 112824 : - blist->head->index > MANIFEST_SIG_MAX)
666 : return 0;
667 :
668 25058 : blk_to_iobuf_sig(blk, &wbuf);
669 :
670 25058 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
671 : {
672 : case APPEND_OK: break;
673 : case APPEND_BLOCKED:
674 : return 0; // Try again later.
675 : default: return -1;
676 : }
677 25058 : blist->blk_for_champ_chooser=blk->next;
678 : }
679 129215 : if(end_flags&END_SIGS
680 49976 : && !finished_sending && !blist->blk_for_champ_chooser)
681 : {
682 13 : iobuf_from_str(&wbuf, CMD_GEN, (char *)"sigs_end");
683 13 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
684 : {
685 : case APPEND_OK: break;
686 : case APPEND_BLOCKED:
687 : return 0; // Try again later.
688 : default: return -1;
689 : }
690 12 : finished_sending++;
691 : }
692 : return 0;
693 : }
694 :
695 25063 : static int mark_not_got(struct blk *blk, struct dpth *dpth)
696 : {
697 : const char *path;
698 :
699 25063 : if(blk->got!=BLK_INCOMING) return 0;
700 8522 : blk->got=BLK_NOT_GOT;
701 :
702 : // Need to get the data for this blk from the client.
703 : // Set up the details of where it will be saved.
704 8522 : if(!(path=dpth_protocol2_mk(dpth))) return -1;
705 :
706 : // FIX THIS: make dpth give us the path in a uint8 array.
707 8522 : blk->savepath=savepathstr_with_sig_to_uint64(path);
708 8522 : blk->got_save_path=1;
709 : // Load it into our local hash table.
710 8522 : if(hash_load_blk(blk))
711 : return -1;
712 8522 : if(dpth_protocol2_incr_sig(dpth))
713 : return -1;
714 8522 : return 0;
715 : }
716 :
717 25058 : static struct hash_strong *in_local_hash(struct blk *blk)
718 : {
719 : static struct hash_weak *hash_weak;
720 :
721 25058 : if(!(hash_weak=hash_weak_find(blk->fingerprint)))
722 : return NULL;
723 4014 : return hash_strong_find(hash_weak, blk->md5sum);
724 : }
725 :
726 37587 : static int simple_deduplicate_blk(struct blk *blk)
727 : {
728 : static struct hash_strong *hash_strong;
729 37587 : if(blk->got!=BLK_INCOMING)
730 : return 0;
731 25058 : if((hash_strong=in_local_hash(blk)))
732 : {
733 4014 : blk->savepath=hash_strong->savepath;
734 4014 : blk->got_save_path=1;
735 4014 : blk->got=BLK_GOT;
736 4014 : return 1;
737 : }
738 : return 0;
739 : }
740 :
741 25058 : static int mark_up_to_index(struct blist *blist,
742 : uint64_t index, struct dpth *dpth)
743 : {
744 : struct blk *blk;
745 :
746 : // Mark everything that was not got, up to the given index.
747 62645 : for(blk=blist->blk_from_champ_chooser;
748 50116 : blk && blk->index!=index; blk=blk->next)
749 : {
750 12529 : if(simple_deduplicate_blk(blk))
751 0 : continue;
752 12529 : if(mark_not_got(blk, dpth))
753 : return -1;
754 : }
755 25058 : if(!blk)
756 : {
757 0 : logp("Could not find index from champ chooser: %" PRIu64 "\n",
758 : index);
759 : return -1;
760 : }
761 25058 : simple_deduplicate_blk(blk);
762 :
763 : //logp("Found index from champ chooser: %lu\n", index);
764 : //printf("index from cc: %d\n", index);
765 25058 : blist->blk_from_champ_chooser=blk;
766 : return 0;
767 : }
768 :
769 12524 : static int deal_with_sig_from_chfd(struct iobuf *rbuf, struct blist *blist,
770 : struct dpth *dpth)
771 : {
772 : static struct blk b;
773 12524 : if(blk_set_from_iobuf_index_and_savepath(&b, rbuf))
774 : return -1;
775 :
776 12524 : if(mark_up_to_index(blist, b.index, dpth))
777 : return -1;
778 12524 : blist->blk_from_champ_chooser->savepath=b.savepath;
779 12524 : blist->blk_from_champ_chooser->got=BLK_GOT;
780 12524 : blist->blk_from_champ_chooser->got_save_path=1;
781 12524 : return 0;
782 : }
783 :
784 12534 : static int deal_with_wrap_up_from_chfd(struct iobuf *rbuf, struct blist *blist,
785 : struct dpth *dpth)
786 : {
787 : static struct blk b;
788 12534 : if(blk_set_from_iobuf_wrap_up(&b, rbuf))
789 : return -1;
790 :
791 12534 : if(mark_up_to_index(blist, b.index, dpth)) return -1;
792 12534 : if(mark_not_got(blist->blk_from_champ_chooser, dpth)) return -1;
793 12534 : return 0;
794 : }
795 :
796 25059 : static int deal_with_read_from_chfd(struct asfd *chfd,
797 : struct blist *blist, struct dpth *dpth, struct cntr *cntr)
798 : {
799 25059 : int ret=-1;
800 :
801 : // Deal with champ chooser read here.
802 : //printf("read from cc: %s\n", chfd->rbuf->buf);
803 25059 : switch(chfd->rbuf->cmd)
804 : {
805 : case CMD_SIG:
806 : // Get these for blks that the champ chooser has found.
807 12524 : if(deal_with_sig_from_chfd(chfd->rbuf, blist, dpth))
808 : goto end;
809 12524 : cntr_add_same(cntr, CMD_DATA);
810 : break;
811 : case CMD_WRAP_UP:
812 12534 : if(deal_with_wrap_up_from_chfd(chfd->rbuf, blist, dpth))
813 : goto end;
814 : break;
815 : default:
816 1 : iobuf_log_unexpected(chfd->rbuf, __func__);
817 : goto end;
818 : }
819 : ret=0;
820 : end:
821 25059 : iobuf_free_content(chfd->rbuf);
822 25059 : return ret;
823 : }
824 :
825 12 : static int check_for_missing_work_in_slist(struct slist *slist)
826 : {
827 12 : struct sbuf *sb=NULL;
828 :
829 12 : if(slist->blist->head)
830 : {
831 0 : logp("ERROR: finishing but still want block: %" PRIu64 "\n",
832 : slist->blist->head->index);
833 : return -1;
834 : }
835 :
836 15 : for(sb=slist->head; sb; sb=sb->next)
837 : {
838 4 : if(!(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST))
839 : {
840 1 : logp("ERROR: finishing but still waiting for: %s\n",
841 : iobuf_to_printable(&slist->head->path));
842 : return -1;
843 : }
844 : }
845 : return 0;
846 : }
847 :
848 : #ifndef UTEST
849 : static
850 : #endif
851 24 : int do_backup_phase2_server_protocol2(struct async *as, struct asfd *chfd,
852 : struct sdirs *sdirs, int resume, struct conf **confs)
853 : {
854 24 : int ret=-1;
855 24 : uint8_t end_flags=0;
856 24 : struct slist *slist=NULL;
857 : struct iobuf wbuf;
858 24 : struct dpth *dpth=NULL;
859 24 : man_off_t *p1pos=NULL;
860 24 : struct manios *manios=NULL;
861 : // This is used to tell the client that a number of consecutive blocks
862 : // have been found and can be freed.
863 24 : struct asfd *asfd=NULL;
864 24 : struct cntr *cntr=NULL;
865 24 : struct sbuf *csb=NULL;
866 24 : uint64_t file_no=1;
867 :
868 24 : if(!as)
869 : {
870 1 : logp("async not provided to %s()\n", __func__);
871 1 : goto end;
872 : }
873 23 : if(!sdirs)
874 : {
875 2 : logp("sdirs not provided to %s()\n", __func__);
876 2 : goto end;
877 : }
878 21 : if(!confs)
879 : {
880 1 : logp("confs not provided to %s()\n", __func__);
881 1 : goto end;
882 : }
883 20 : asfd=as->asfd;
884 20 : if(!asfd)
885 : {
886 1 : logp("asfd not provided to %s()\n", __func__);
887 1 : goto end;
888 : }
889 19 : if(!chfd)
890 : {
891 1 : logp("chfd not provided to %s()\n", __func__);
892 1 : goto end;
893 : }
894 18 : cntr=get_cntr(confs);
895 18 : if(get_int(confs[OPT_BREAKPOINT])>=2000
896 0 : && get_int(confs[OPT_BREAKPOINT])<3000)
897 : {
898 0 : breaking=get_int(confs[OPT_BREAKPOINT]);
899 0 : breakcount=breaking-2000;
900 : }
901 :
902 18 : blks_generate_init();
903 :
904 18 : logp("Phase 2 begin (recv backup data)\n");
905 :
906 18 : if(!(dpth=dpth_alloc())
907 36 : || dpth_protocol2_init(dpth,
908 18 : sdirs->data,
909 18 : get_string(confs[OPT_CNAME]),
910 18 : sdirs->cfiles,
911 : get_int(confs[OPT_MAX_STORAGE_SUBDIRS])))
912 : goto end;
913 18 : if(resume)
914 : {
915 0 : if(!(p1pos=do_resume(sdirs, dpth, confs)))
916 : goto end;
917 0 : if(cntr_send_sdirs(asfd, sdirs, confs, CNTR_STATUS_BACKUP))
918 : goto end;
919 : }
920 :
921 18 : if(!(manios=manios_open_phase2(sdirs, p1pos, PROTO_2))
922 42 : || !(slist=slist_alloc())
923 18 : || !(csb=sbuf_alloc(PROTO_2)))
924 : goto end;
925 :
926 18 : iobuf_free_content(asfd->rbuf);
927 :
928 : memset(&wbuf, 0, sizeof(struct iobuf));
929 216994 : while(!(end_flags&END_BACKUP))
930 : {
931 216982 : if(write_status(CNTR_STATUS_BACKUP,
932 217008 : csb && csb->path.buf?csb->path.buf:"", cntr))
933 : goto end;
934 :
935 216982 : if(maybe_add_from_scan(manios, slist, chfd, &csb, cntr))
936 : goto end;
937 :
938 216982 : if(!wbuf.len)
939 : {
940 216982 : if(get_wbuf_from_sigs(&wbuf, slist, &end_flags))
941 : goto end;
942 216982 : if(!wbuf.len)
943 : {
944 208447 : get_wbuf_from_files(&wbuf, slist,
945 : manios, &end_flags, &file_no);
946 : }
947 : }
948 :
949 216982 : if(wbuf.len
950 25599 : && asfd->append_all_to_write_buffer(asfd, &wbuf)==APPEND_ERROR)
951 : goto end;
952 :
953 216981 : if(append_for_champ_chooser(chfd, slist->blist, end_flags))
954 : goto end;
955 :
956 216980 : if(as->read_write(as))
957 : {
958 2 : logp("error from as->read_write in %s\n", __func__);
959 2 : goto end;
960 : }
961 :
962 267624 : while(asfd->rbuf->buf)
963 : {
964 50647 : if(deal_with_read(asfd->rbuf, slist, cntr,
965 : &end_flags, dpth))
966 : goto end;
967 : // Get as much out of the readbuf as possible.
968 50646 : if(asfd->parse_readbuf(asfd))
969 : goto end;
970 : }
971 242035 : while(chfd->rbuf->buf)
972 : {
973 50118 : if(deal_with_read_from_chfd(chfd,
974 25059 : slist->blist, dpth, cntr))
975 : goto end;
976 : // Get as much out of the readbuf as possible.
977 25058 : if(chfd->parse_readbuf(chfd))
978 : goto end;
979 : }
980 :
981 216976 : if(write_to_changed_file(asfd, chfd, manios,
982 : slist, end_flags))
983 : goto end;
984 : }
985 :
986 : // Hack: If there are some entries left after the last entry that
987 : // contains block data, it will not be written to the changed file
988 : // yet because the last entry of block data has not had
989 : // sb->protocol2->bend set.
990 12 : if(slist->head && slist->head->next)
991 : {
992 8 : struct sbuf *sb=NULL;
993 8 : sb=slist->head;
994 8 : slist->head=sb->next;
995 8 : sbuf_free(&sb);
996 8 : if(write_to_changed_file(asfd, chfd, manios,
997 : slist, end_flags))
998 : goto end;
999 : }
1000 :
1001 12 : if(manios_close(&manios))
1002 : goto end;
1003 :
1004 36 : if(check_for_missing_work_in_slist(slist))
1005 : goto end;
1006 :
1007 : // Need to release the last left. There should be one at most.
1008 11 : if(dpth->head && dpth->head->next)
1009 : {
1010 0 : logp("ERROR: More data locks remaining after: %s\n",
1011 0 : dpth->head->save_path);
1012 0 : goto end;
1013 : }
1014 11 : if(dpth_release_all(dpth)) goto end;
1015 :
1016 11 : ret=0;
1017 : end:
1018 24 : logp("End backup\n");
1019 24 : sbuf_free(&csb);
1020 24 : slist_free(&slist);
1021 24 : if(asfd) iobuf_free_content(asfd->rbuf);
1022 24 : if(chfd) iobuf_free_content(chfd->rbuf);
1023 24 : dpth_free(&dpth);
1024 24 : manios_close(&manios);
1025 24 : man_off_t_free(&p1pos);
1026 24 : blks_generate_free();
1027 24 : hash_delete_all();
1028 24 : return ret;
1029 : }
1030 :
1031 0 : int backup_phase2_server_protocol2(struct async *as, struct sdirs *sdirs,
1032 : int resume, struct conf **confs)
1033 : {
1034 0 : int ret=-1;
1035 0 : struct asfd *chfd=NULL;
1036 0 : if(!(chfd=champ_chooser_connect(as, sdirs, confs, resume)))
1037 : {
1038 0 : logp("problem connecting to champ chooser\n");
1039 0 : goto end;
1040 : }
1041 0 : ret=do_backup_phase2_server_protocol2(as, chfd, sdirs, resume, confs);
1042 : end:
1043 0 : if(chfd) as->asfd_remove(as, chfd);
1044 0 : asfd_free(&chfd);
1045 0 : return ret;
1046 : }
|