Line data Source code
1 : #include "../../burp.h"
2 : #include "../../alloc.h"
3 : #include "../../asfd.h"
4 : #include "../../async.h"
5 : #include "../../attribs.h"
6 : #include "../../base64.h"
7 : #include "../../cmd.h"
8 : #include "../../cntr.h"
9 : #include "../../handy.h"
10 : #include "../../hexmap.h"
11 : #include "../../iobuf.h"
12 : #include "../../log.h"
13 : #include "../../server/manio.h"
14 : #include "../../protocol2/blist.h"
15 : #include "../../protocol2/rabin/rabin.h"
16 : #include "../../slist.h"
17 : #include "../manios.h"
18 : #include "../resume.h"
19 : #include "champ_chooser/champ_client.h"
20 : #include "champ_chooser/champ_server.h"
21 : #include "champ_chooser/hash.h"
22 : #include "dpth.h"
23 : #include "backup_phase2.h"
24 :
25 : #define END_SIGS 0x01
26 : #define END_BACKUP 0x02
27 : #define END_REQUESTS 0x04
28 : #define END_BLK_REQUESTS 0x08
29 :
30 : static int breaking=0;
31 : static int breakcount=0;
32 :
33 : static int data_needed(struct sbuf *sb)
34 : {
35 33720 : if(sb->path.cmd==CMD_FILE)
36 : return 1;
37 16674 : if(sb->path.cmd==CMD_METADATA)
38 : return 1;
39 : return 0;
40 : }
41 :
42 4 : static int manio_component_to_chfd(struct asfd *chfd, char *path)
43 : {
44 : struct iobuf wbuf;
45 4 : iobuf_from_str(&wbuf, CMD_MANIFEST, path);
46 4 : return chfd->write(chfd, &wbuf);
47 : }
48 :
49 0 : static int unchanged(struct sbuf *csb, struct sbuf *sb,
50 : struct manios *manios, struct asfd *chfd)
51 : {
52 0 : int ret=-1;
53 0 : char *fpath=NULL;
54 0 : if(!(fpath=strdup_w(manios->changed->offset->fpath, __func__)))
55 : goto end;
56 0 : if(manio_copy_entry(csb, sb,
57 : manios->current, manios->unchanged)<0)
58 : goto end;
59 0 : if(strcmp(fpath, manios->changed->offset->fpath))
60 : {
61 : // If the copy crossed a manio boundary, we should tell the
62 : // champ server to load the previous one as a candidate.
63 0 : if(manio_component_to_chfd(chfd, fpath))
64 : goto end;
65 : }
66 : ret=0;
67 : end:
68 0 : free_w(&fpath);
69 0 : return ret;
70 : }
71 :
72 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
73 0 : static int found_in_current_manifest(struct sbuf *csb, struct sbuf *sb,
74 : struct manios *manios, struct asfd *chfd,
75 : struct cntr *cntr)
76 : {
77 : // Located the entry in the current manifest.
78 : // If the file type changed, I think it is time to back it up again
79 : // (for example, EFS changing to normal file, or back again).
80 0 : if(csb->path.cmd!=sb->path.cmd)
81 : {
82 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
83 : return -1;
84 0 : return 1;
85 : }
86 :
87 : // mtime is the actual file data.
88 : // ctime is the attributes or meta data.
89 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
90 0 : && csb->statp.st_ctime==sb->statp.st_ctime)
91 : {
92 : // Got an unchanged file.
93 0 : cntr_add_same(cntr, sb->path.cmd);
94 0 : return unchanged(csb, sb, manios, chfd);
95 : }
96 :
97 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
98 0 : && csb->statp.st_ctime!=sb->statp.st_ctime)
99 : {
100 : // FIX THIS:
101 : // File data stayed the same, but attributes or meta data
102 : // changed. We already have the attributes, but may need to
103 : // get extra meta data.
104 0 : cntr_add_same(cntr, sb->path.cmd);
105 0 : return unchanged(csb, sb, manios, chfd);
106 : }
107 :
108 : // File data changed.
109 0 : cntr_add_changed(cntr, sb->path.cmd);
110 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
111 : return -1;
112 0 : return 1;
113 : }
114 :
115 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
116 33720 : static int entry_changed(struct sbuf *sb,
117 : struct manios *manios, struct asfd *chfd, struct sbuf **csb,
118 : struct cntr *cntr)
119 : {
120 : static int finished=0;
121 : int pcmp;
122 :
123 33720 : if(finished)
124 : {
125 33710 : cntr_add_new(cntr, sb->path.cmd);
126 33710 : return 1;
127 : }
128 :
129 10 : if((*csb)->path.buf)
130 : {
131 : // Already have an entry.
132 : }
133 : else
134 : {
135 : // Need to read another.
136 10 : switch(manio_read(manios->current, *csb))
137 : {
138 : case 1: // Reached the end.
139 10 : sbuf_free(csb);
140 10 : finished=1;
141 10 : cntr_add_new(cntr, sb->path.cmd);
142 10 : return 1;
143 : case -1: return -1;
144 : }
145 0 : if(!(*csb)->path.buf)
146 : {
147 0 : logp("Should have a path at this point, but do not, in %s\n", __func__);
148 0 : return -1;
149 : }
150 : // Got an entry.
151 : }
152 :
153 : while(1)
154 : {
155 0 : if(!(pcmp=sbuf_pathcmp(*csb, sb)))
156 0 : return found_in_current_manifest(*csb, sb,
157 : manios, chfd, cntr);
158 0 : else if(pcmp>0)
159 : {
160 0 : cntr_add_new(cntr, sb->path.cmd);
161 0 : return 1;
162 : }
163 : // cntr_add_deleted(cntr, (*csb)->path.cmd);
164 : // Behind - need to read more data from the old manifest.
165 0 : switch(manio_read(manios->current, *csb))
166 : {
167 : case 1: // Reached the end.
168 0 : sbuf_free(csb);
169 0 : cntr_add_new(cntr, sb->path.cmd);
170 0 : return 1;
171 : case -1: return -1;
172 : }
173 : // Got something, go back around the loop.
174 : }
175 :
176 : return 0;
177 : }
178 :
179 8523 : static int add_data_to_store(struct cntr *cntr,
180 : struct slist *slist, struct iobuf *rbuf, struct dpth *dpth)
181 : {
182 : static struct blk *blk=NULL;
183 :
184 : // Find the first one in the list that was requested.
185 : // FIX THIS: Going up the list here, and then later
186 : // when writing to the manifest is not efficient.
187 17046 : for(blk=slist->blist->head;
188 8522 : blk && (!blk->requested || blk->got==BLK_GOT); blk=blk->next)
189 : {
190 : // logp("try: %d %d\n", blk->index, blk->got);
191 : }
192 8523 : if(!blk)
193 : {
194 1 : logp("Received data but could not find next requested block.\n");
195 1 : if(!slist->blist->head)
196 1 : logp("and slist->blist->head is null\n");
197 : else
198 0 : logp("head index: %" PRIu64 "\n", slist->blist->head->index);
199 : return -1;
200 : }
201 :
202 : // FIX THIS
203 : #ifndef UTEST
204 : if(blk_verify(blk->fingerprint, blk->md5sum, rbuf->buf, rbuf->len)<=0)
205 : {
206 : logp("ERROR: Block %" PRIu64 " from client did not verify.\n",
207 : blk->index);
208 : return -1;
209 : }
210 : #endif
211 :
212 : // Add it to the data store straight away.
213 8522 : if(dpth_protocol2_fwrite(dpth, rbuf, blk)) return -1;
214 :
215 8522 : cntr_add(cntr, CMD_DATA, 0);
216 :
217 8522 : blk->got=BLK_GOT;
218 8522 : blk=blk->next;
219 :
220 : return 0;
221 : }
222 :
223 17034 : static int set_up_for_sig_info(struct slist *slist, struct iobuf *attr,
224 : uint64_t index)
225 : {
226 : struct sbuf *sb;
227 :
228 50701 : for(sb=slist->add_sigs_here; sb; sb=sb->next)
229 : {
230 50701 : if(!sb->protocol2->index)
231 16642 : continue;
232 34059 : if(index==sb->protocol2->index)
233 : break;
234 : }
235 17034 : if(!sb)
236 : {
237 0 : logp("Could not find %" PRIu64 " in request list\n", index);
238 : return -1;
239 : }
240 : // Replace the attribs with the more recent values.
241 17034 : iobuf_free_content(&sb->attr);
242 17034 : iobuf_move(&sb->attr, attr);
243 :
244 : // Mark the end of the previous file.
245 17034 : slist->add_sigs_here->protocol2->bend=slist->blist->tail;
246 :
247 17034 : slist->add_sigs_here=sb;
248 :
249 : // Incoming sigs now need to get added to 'add_sigs_here'
250 : return 0;
251 : }
252 :
253 : /*
254 : static void dump_blks(const char *msg, struct blk *b)
255 : {
256 : struct blk *xx;
257 : for(xx=b; xx; xx=xx->next)
258 : printf("%s: %d %d %p\n", msg, xx->index, xx->got, xx);
259 : }
260 : */
261 :
262 25058 : static int add_to_sig_list(struct slist *slist, struct iobuf *rbuf)
263 : {
264 : // Goes on slist->add_sigs_here
265 : struct blk *blk;
266 : struct protocol2 *protocol2;
267 :
268 25058 : if(!(blk=blk_alloc())) return -1;
269 25058 : blist_add_blk(slist->blist, blk);
270 :
271 25058 : protocol2=slist->add_sigs_here->protocol2;
272 25058 : if(!protocol2->bstart) protocol2->bstart=blk;
273 25058 : if(!protocol2->bsighead) protocol2->bsighead=blk;
274 :
275 25058 : if(blk_set_from_iobuf_sig(blk, rbuf)) return -1;
276 :
277 : // Need to send sigs to champ chooser, therefore need to point
278 : // to the oldest unsent one if nothing is pointed to yet.
279 25058 : if(!slist->blist->blk_for_champ_chooser)
280 16490 : slist->blist->blk_for_champ_chooser=blk;
281 :
282 : return 0;
283 : }
284 :
285 50647 : static int deal_with_read(struct iobuf *rbuf, struct slist *slist,
286 : struct cntr *cntr, uint8_t *end_flags, struct dpth *dpth)
287 : {
288 50647 : int ret=0;
289 : static struct iobuf attr;
290 : static uint64_t index;
291 :
292 50647 : switch(rbuf->cmd)
293 : {
294 : /* Incoming block data. */
295 : case CMD_DATA:
296 8523 : if(add_data_to_store(cntr, slist, rbuf, dpth))
297 : goto error;
298 : goto end;
299 :
300 : /* Incoming block signatures. */
301 : case CMD_ATTRIBS_SIGS:
302 :
303 17034 : iobuf_init(&attr);
304 17034 : iobuf_move(&attr, rbuf);
305 17034 : index=decode_file_no(&attr);
306 :
307 : // Need to go through slist to find the matching
308 : // entry.
309 17034 : if(set_up_for_sig_info(slist, &attr, index))
310 : goto error;
311 : return 0;
312 : case CMD_SIG:
313 25058 : if(add_to_sig_list(slist, rbuf))
314 : goto error;
315 : goto end;
316 :
317 : /* Incoming control/message stuff. */
318 : case CMD_MESSAGE:
319 : case CMD_WARNING:
320 : {
321 6 : struct cntr *cntr=NULL;
322 6 : log_recvd(rbuf, cntr, 0);
323 6 : goto end;
324 : }
325 : case CMD_GEN:
326 25 : if(!strcmp(rbuf->buf, "sigs_end"))
327 : {
328 13 : (*end_flags)|=END_SIGS;
329 13 : goto end;
330 : }
331 12 : else if(!strcmp(rbuf->buf, "backup_end"))
332 : {
333 12 : (*end_flags)|=END_BACKUP;
334 12 : goto end;
335 : }
336 : break;
337 : case CMD_INTERRUPT:
338 : {
339 : uint64_t file_no;
340 1 : file_no=base64_to_uint64(rbuf->buf);
341 1 : if(slist_del_sbuf_by_index(slist, file_no))
342 : goto error;
343 : goto end;
344 : }
345 : default:
346 : break;
347 : }
348 :
349 0 : iobuf_log_unexpected(rbuf, __func__);
350 : error:
351 : ret=-1;
352 : end:
353 33613 : iobuf_free_content(rbuf);
354 33613 : return ret;
355 : }
356 :
357 216982 : static int get_wbuf_from_sigs(struct iobuf *wbuf, struct slist *slist,
358 : uint8_t *end_flags)
359 : {
360 : static char req[32]="";
361 216982 : struct sbuf *sb=slist->blks_to_request;
362 :
363 13327 : while(sb && !(sb->flags & SBUF_NEED_DATA)) sb=sb->next;
364 :
365 216982 : if(!sb)
366 : {
367 16 : slist->blks_to_request=NULL;
368 16 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
369 : {
370 3 : iobuf_from_str(wbuf,
371 : CMD_GEN, (char *)"blk_requests_end");
372 3 : (*end_flags)|=END_BLK_REQUESTS;
373 : }
374 : return 0;
375 : }
376 216966 : if(!sb->protocol2->bsighead)
377 : {
378 : // Trying to move onto the next file.
379 : // ??? Does this really work?
380 129027 : if(sb->protocol2->bend)
381 : {
382 0 : slist->blks_to_request=sb->next;
383 : printf("move to next\n");
384 : }
385 129027 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
386 : {
387 10 : iobuf_from_str(wbuf,
388 : CMD_GEN, (char *)"blk_requests_end");
389 10 : (*end_flags)|=END_BLK_REQUESTS;
390 : }
391 : return 0;
392 : }
393 :
394 87939 : if(sb->protocol2->bsighead->got==BLK_INCOMING)
395 : return 0;
396 :
397 12538 : if(sb->protocol2->bsighead->got==BLK_NOT_GOT)
398 : {
399 8522 : base64_from_uint64(sb->protocol2->bsighead->index, req);
400 8522 : iobuf_from_str(wbuf, CMD_DATA_REQ, req);
401 8522 : sb->protocol2->bsighead->requested=1;
402 : }
403 :
404 : // Move on.
405 12538 : if(sb->protocol2->bsighead==sb->protocol2->bend)
406 : {
407 8517 : slist->blks_to_request=sb->next;
408 8517 : sb->protocol2->bsighead=sb->protocol2->bstart;
409 : }
410 : else
411 : {
412 4021 : sb->protocol2->bsighead=sb->protocol2->bsighead->next;
413 : }
414 : return 0;
415 : }
416 :
417 208447 : static void get_wbuf_from_files(struct iobuf *wbuf, struct slist *slist,
418 : struct manios *manios, uint8_t *end_flags, uint64_t *file_no)
419 : {
420 208447 : struct sbuf *sb=slist->last_requested;
421 208447 : if(!sb)
422 : {
423 157689 : if(!manios->phase1 && !((*end_flags)&END_REQUESTS))
424 : {
425 18 : iobuf_from_str(wbuf, CMD_GEN, (char *)"requests_end");
426 18 : (*end_flags)|=END_REQUESTS;
427 : }
428 : return;
429 : }
430 :
431 50758 : if(sb->flags & SBUF_SENT_PATH || !(sb->flags & SBUF_NEED_DATA))
432 : {
433 33712 : slist->last_requested=sb->next;
434 : return;
435 : }
436 :
437 : // Only need to request the path at this stage.
438 17046 : iobuf_copy(wbuf, &sb->path);
439 17046 : sb->flags |= SBUF_SENT_PATH;
440 17046 : sb->protocol2->index=(*file_no)++;
441 : }
442 :
443 2 : static void get_wbuf_from_index(struct iobuf *wbuf, uint64_t index)
444 : {
445 : static char *p;
446 : static char tmp[32];
447 2 : p=tmp;
448 2 : p+=to_base64(index, tmp);
449 2 : *p='\0';
450 2 : iobuf_from_str(wbuf, CMD_WRAP_UP, tmp);
451 2 : }
452 :
453 33704 : static int write_endfile(struct sbuf *sb, struct manios *manios)
454 : {
455 : struct iobuf endfile;
456 :
457 33704 : if(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST)
458 : return 0;
459 33704 : if(!iobuf_is_filedata(&sb->path))
460 : return 0;
461 :
462 17036 : sb->flags |= SBUF_END_WRITTEN_TO_MANIFEST;
463 : // FIX THIS: Should give a proper length and md5sum.
464 17036 : iobuf_from_str(&endfile, CMD_END_FILE, (char *)"0:0");
465 17036 : return iobuf_send_msg_fzp(&endfile, manios->changed->fzp);
466 : }
467 :
468 233992 : static void blist_adjust_head(struct blist *blist, struct sbuf *sb)
469 : {
470 : struct blk *b;
471 259050 : while(blist->head!=sb->protocol2->bstart)
472 : {
473 25058 : b=blist->head->next;
474 25058 : if(blist->head==blist->blk_from_champ_chooser)
475 8508 : blist->blk_from_champ_chooser=b;
476 25058 : blk_free(&blist->head);
477 25058 : blist->head=b;
478 : }
479 233992 : if(!blist->head)
480 51771 : blist->tail=NULL;
481 233992 : }
482 :
483 233992 : static int sbuf_needs_data(struct sbuf *sb, struct asfd *asfd,
484 : struct asfd *chfd, struct manios *manios,
485 : struct slist *slist, int end_flags)
486 : {
487 233992 : int ret=-1;
488 : struct blk *blk;
489 : static struct iobuf wbuf;
490 233992 : struct blist *blist=slist->blist;
491 :
492 233992 : if(!(sb->flags & SBUF_HEADER_WRITTEN_TO_MANIFEST))
493 : {
494 17036 : if(manio_write_sbuf(manios->changed, sb)) goto end;
495 17036 : sb->flags |= SBUF_HEADER_WRITTEN_TO_MANIFEST;
496 : }
497 :
498 242025 : while((blk=sb->protocol2->bstart)
499 190254 : && blk->got==BLK_GOT
500 59413 : && (blk->next || end_flags&END_BACKUP))
501 : {
502 25058 : if(blk->got_save_path
503 25058 : && !blk_is_zero_length(blk))
504 : {
505 25058 : if(breaking && breakcount--==0)
506 : {
507 0 : breakpoint(breaking, __func__);
508 0 : goto end;
509 : }
510 25058 : if(manio_write_sig_and_path(manios->changed, blk))
511 : goto end;
512 25058 : if(manios->changed->sig_count==0)
513 : {
514 : // Have finished a manifest file. Want to start
515 : // using it as a dedup candidate now.
516 4 : if(manio_component_to_chfd(chfd,
517 4 : manios->changed->offset->ppath))
518 : goto end;
519 :
520 : // The champ chooser has the candidate. Now,
521 : // empty our local hash table.
522 4 : hash_delete_all();
523 : // Add the most recent block, so identical
524 : // adjacent blocks are deduplicated well.
525 4 : if(hash_load_blk(blk))
526 : goto end;
527 :
528 4 : if(!blk->requested)
529 : {
530 : // Also let the client know, so that it
531 : // can free memory if there was a long
532 : // consecutive number of unrequested
533 : // blocks.
534 2 : get_wbuf_from_index(&wbuf, blk->index);
535 2 : if(asfd->write(asfd, &wbuf))
536 : goto end;
537 : }
538 : }
539 : }
540 :
541 25058 : if(blk==sb->protocol2->bend)
542 : {
543 17025 : blist_adjust_head(blist, sb);
544 17025 : if(write_endfile(sb, manios)) return -1;
545 17025 : slist_advance(slist);
546 17025 : return 1;
547 : }
548 :
549 8033 : if(sb->protocol2->bsighead==sb->protocol2->bstart)
550 8020 : sb->protocol2->bsighead=blk->next;
551 8033 : sb->protocol2->bstart=blk->next;
552 8033 : if(blk==blist->blk_from_champ_chooser)
553 4021 : blist->blk_from_champ_chooser=blk->next;
554 : }
555 216967 : if(!blk && sb && !sb->protocol2->bend && (end_flags&END_BACKUP))
556 : {
557 : // Write endfile for the very last file.
558 11 : if(write_endfile(sb, manios)) return -1;
559 : }
560 : ret=0;
561 : end:
562 216967 : blist_adjust_head(blist, sb);
563 216967 : return ret;
564 : }
565 :
566 216984 : static int write_to_changed_file(struct asfd *asfd,
567 : struct asfd *chfd, struct manios *manios,
568 : struct slist *slist, int end_flags)
569 : {
570 : struct sbuf *sb;
571 216984 : if(!slist) return 0;
572 :
573 250677 : while((sb=slist->head))
574 : {
575 250660 : if(sb->flags & SBUF_NEED_DATA)
576 : {
577 233992 : switch(sbuf_needs_data(sb, asfd, chfd, manios, slist,
578 : end_flags))
579 : {
580 : case 0: return 0;
581 17025 : case 1: continue;
582 0 : default: return -1;
583 : }
584 :
585 : }
586 : else
587 : {
588 : // No change, can go straight in.
589 16668 : if(manio_write_sbuf(manios->changed, sb)) return -1;
590 16668 : if(write_endfile(sb, manios)) return -1;
591 :
592 : // Move along.
593 16668 : slist_advance(slist);
594 : }
595 : }
596 : return 0;
597 : }
598 :
599 216982 : static int maybe_add_from_scan(struct manios *manios,
600 : struct slist *slist, struct asfd *chfd, struct sbuf **csb,
601 : struct cntr *cntr)
602 : {
603 216982 : int ret=-1;
604 216982 : struct sbuf *snew=NULL;
605 :
606 : while(1)
607 : {
608 250702 : sbuf_free(&snew);
609 250702 : if(!manios->phase1) return 0;
610 : // Limit the amount loaded into memory at any one time.
611 33738 : if(slist && slist->head)
612 : {
613 67440 : if(slist->head->protocol2->index
614 33720 : - slist->tail->protocol2->index>4096)
615 : return 0;
616 : }
617 67458 : if(!(snew=sbuf_alloc(PROTO_2))) goto end;
618 :
619 33738 : switch(manio_read(manios->phase1, snew))
620 : {
621 : case 0: break;
622 18 : case 1: manio_close(&manios->phase1);
623 18 : ret=0; // Finished.
624 : default: goto end;
625 : }
626 :
627 33720 : switch(entry_changed(snew, manios, chfd, csb, cntr))
628 : {
629 0 : case 0: continue; // No change.
630 : case 1: break;
631 : default: goto end; // Error.
632 : }
633 :
634 101160 : if(data_needed(snew)) snew->flags|=SBUF_NEED_DATA;
635 :
636 33720 : slist_add_sbuf(slist, snew);
637 33720 : snew=NULL;
638 : }
639 : return 0;
640 : end:
641 18 : sbuf_free(&snew);
642 18 : return ret;
643 : }
644 :
645 216981 : static int append_for_champ_chooser(struct asfd *chfd,
646 : struct blist *blist, int end_flags)
647 : {
648 : static int finished_sending=0;
649 : static struct iobuf wbuf;
650 : static struct blk *blk=NULL;
651 :
652 242039 : while(blist->blk_for_champ_chooser)
653 : {
654 112824 : blk=blist->blk_for_champ_chooser;
655 : // If we send too many blocks to the champ chooser at once,
656 : // it can go faster than we can send paths to completed
657 : // manifests to it. This means that deduplication efficiency
658 : // is reduced (although speed may be faster).
659 : // So limit the sending.
660 225648 : if(blk->index
661 112824 : - blist->head->index > MANIFEST_SIG_MAX)
662 : return 0;
663 :
664 25058 : blk_to_iobuf_sig(blk, &wbuf);
665 :
666 25058 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
667 : {
668 : case APPEND_OK: break;
669 : case APPEND_BLOCKED:
670 : return 0; // Try again later.
671 : default: return -1;
672 : }
673 25058 : blist->blk_for_champ_chooser=blk->next;
674 : }
675 129215 : if(end_flags&END_SIGS
676 49976 : && !finished_sending && !blist->blk_for_champ_chooser)
677 : {
678 13 : iobuf_from_str(&wbuf, CMD_GEN, (char *)"sigs_end");
679 13 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
680 : {
681 : case APPEND_OK: break;
682 : case APPEND_BLOCKED:
683 : return 0; // Try again later.
684 : default: return -1;
685 : }
686 12 : finished_sending++;
687 : }
688 : return 0;
689 : }
690 :
691 25063 : static int mark_not_got(struct blk *blk, struct dpth *dpth)
692 : {
693 : const char *path;
694 :
695 25063 : if(blk->got!=BLK_INCOMING) return 0;
696 8522 : blk->got=BLK_NOT_GOT;
697 :
698 : // Need to get the data for this blk from the client.
699 : // Set up the details of where it will be saved.
700 8522 : if(!(path=dpth_protocol2_mk(dpth))) return -1;
701 :
702 : // FIX THIS: make dpth give us the path in a uint8 array.
703 8522 : blk->savepath=savepathstr_with_sig_to_uint64(path);
704 8522 : blk->got_save_path=1;
705 : // Load it into our local hash table.
706 8522 : if(hash_load_blk(blk))
707 : return -1;
708 8522 : if(dpth_protocol2_incr_sig(dpth))
709 : return -1;
710 8522 : return 0;
711 : }
712 :
713 25058 : static struct hash_strong *in_local_hash(struct blk *blk)
714 : {
715 : static struct hash_weak *hash_weak;
716 :
717 25058 : if(!(hash_weak=hash_weak_find(blk->fingerprint)))
718 : return NULL;
719 4014 : return hash_strong_find(hash_weak, blk->md5sum);
720 : }
721 :
722 37587 : static int simple_deduplicate_blk(struct blk *blk)
723 : {
724 : static struct hash_strong *hash_strong;
725 37587 : if(blk->got!=BLK_INCOMING)
726 : return 0;
727 25058 : if((hash_strong=in_local_hash(blk)))
728 : {
729 4014 : blk->savepath=hash_strong->savepath;
730 4014 : blk->got_save_path=1;
731 4014 : blk->got=BLK_GOT;
732 4014 : return 1;
733 : }
734 : return 0;
735 : }
736 :
737 25058 : static int mark_up_to_index(struct blist *blist,
738 : uint64_t index, struct dpth *dpth)
739 : {
740 : struct blk *blk;
741 :
742 : // Mark everything that was not got, up to the given index.
743 62645 : for(blk=blist->blk_from_champ_chooser;
744 50116 : blk && blk->index!=index; blk=blk->next)
745 : {
746 12529 : if(simple_deduplicate_blk(blk))
747 0 : continue;
748 12529 : if(mark_not_got(blk, dpth))
749 : return -1;
750 : }
751 25058 : if(!blk)
752 : {
753 0 : logp("Could not find index from champ chooser: %" PRIu64 "\n",
754 : index);
755 : return -1;
756 : }
757 25058 : simple_deduplicate_blk(blk);
758 :
759 : //logp("Found index from champ chooser: %lu\n", index);
760 : //printf("index from cc: %d\n", index);
761 25058 : blist->blk_from_champ_chooser=blk;
762 : return 0;
763 : }
764 :
765 12524 : static int deal_with_sig_from_chfd(struct iobuf *rbuf, struct blist *blist,
766 : struct dpth *dpth)
767 : {
768 : static struct blk b;
769 12524 : if(blk_set_from_iobuf_index_and_savepath(&b, rbuf))
770 : return -1;
771 :
772 12524 : if(mark_up_to_index(blist, b.index, dpth))
773 : return -1;
774 12524 : blist->blk_from_champ_chooser->savepath=b.savepath;
775 12524 : blist->blk_from_champ_chooser->got=BLK_GOT;
776 12524 : blist->blk_from_champ_chooser->got_save_path=1;
777 12524 : return 0;
778 : }
779 :
780 12534 : static int deal_with_wrap_up_from_chfd(struct iobuf *rbuf, struct blist *blist,
781 : struct dpth *dpth)
782 : {
783 : static struct blk b;
784 12534 : if(blk_set_from_iobuf_wrap_up(&b, rbuf))
785 : return -1;
786 :
787 12534 : if(mark_up_to_index(blist, b.index, dpth)) return -1;
788 12534 : if(mark_not_got(blist->blk_from_champ_chooser, dpth)) return -1;
789 12534 : return 0;
790 : }
791 :
792 25059 : static int deal_with_read_from_chfd(struct asfd *chfd,
793 : struct blist *blist, struct dpth *dpth, struct cntr *cntr)
794 : {
795 25059 : int ret=-1;
796 :
797 : // Deal with champ chooser read here.
798 : //printf("read from cc: %s\n", chfd->rbuf->buf);
799 25059 : switch(chfd->rbuf->cmd)
800 : {
801 : case CMD_SIG:
802 : // Get these for blks that the champ chooser has found.
803 12524 : if(deal_with_sig_from_chfd(chfd->rbuf, blist, dpth))
804 : goto end;
805 12524 : cntr_add_same(cntr, CMD_DATA);
806 : break;
807 : case CMD_WRAP_UP:
808 12534 : if(deal_with_wrap_up_from_chfd(chfd->rbuf, blist, dpth))
809 : goto end;
810 : break;
811 : default:
812 1 : iobuf_log_unexpected(chfd->rbuf, __func__);
813 : goto end;
814 : }
815 : ret=0;
816 : end:
817 25059 : iobuf_free_content(chfd->rbuf);
818 25059 : return ret;
819 : }
820 :
821 12 : static int check_for_missing_work_in_slist(struct slist *slist)
822 : {
823 12 : struct sbuf *sb=NULL;
824 :
825 12 : if(slist->blist->head)
826 : {
827 0 : logp("ERROR: finishing but still want block: %" PRIu64 "\n",
828 : slist->blist->head->index);
829 : return -1;
830 : }
831 :
832 15 : for(sb=slist->head; sb; sb=sb->next)
833 : {
834 4 : if(!(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST))
835 : {
836 2 : logp("ERROR: finishing but still waiting for: %c:%s\n",
837 1 : slist->head->path.cmd, slist->head->path.buf);
838 : return -1;
839 : }
840 : }
841 : return 0;
842 : }
843 :
844 : #ifndef UTEST
845 : static
846 : #endif
847 24 : int do_backup_phase2_server_protocol2(struct async *as, struct asfd *chfd,
848 : struct sdirs *sdirs, int resume, struct conf **confs)
849 : {
850 24 : int ret=-1;
851 24 : uint8_t end_flags=0;
852 24 : struct slist *slist=NULL;
853 : struct iobuf wbuf;
854 24 : struct dpth *dpth=NULL;
855 24 : man_off_t *p1pos=NULL;
856 24 : struct manios *manios=NULL;
857 : // This is used to tell the client that a number of consecutive blocks
858 : // have been found and can be freed.
859 24 : struct asfd *asfd=NULL;
860 24 : struct cntr *cntr=NULL;
861 24 : struct sbuf *csb=NULL;
862 24 : uint64_t file_no=1;
863 :
864 24 : if(!as)
865 : {
866 1 : logp("async not provided to %s()\n", __func__);
867 1 : goto end;
868 : }
869 23 : if(!sdirs)
870 : {
871 2 : logp("sdirs not provided to %s()\n", __func__);
872 2 : goto end;
873 : }
874 21 : if(!confs)
875 : {
876 1 : logp("confs not provided to %s()\n", __func__);
877 1 : goto end;
878 : }
879 20 : asfd=as->asfd;
880 20 : if(!asfd)
881 : {
882 1 : logp("asfd not provided to %s()\n", __func__);
883 1 : goto end;
884 : }
885 19 : if(!chfd)
886 : {
887 1 : logp("chfd not provided to %s()\n", __func__);
888 1 : goto end;
889 : }
890 18 : cntr=get_cntr(confs);
891 18 : if(get_int(confs[OPT_BREAKPOINT])>=2000
892 0 : && get_int(confs[OPT_BREAKPOINT])<3000)
893 : {
894 0 : breaking=get_int(confs[OPT_BREAKPOINT]);
895 0 : breakcount=breaking-2000;
896 : }
897 :
898 18 : blks_generate_init();
899 :
900 18 : logp("Phase 2 begin (recv backup data)\n");
901 :
902 18 : if(!(dpth=dpth_alloc())
903 36 : || dpth_protocol2_init(dpth,
904 18 : sdirs->data,
905 18 : get_string(confs[OPT_CNAME]),
906 18 : sdirs->cfiles,
907 : get_int(confs[OPT_MAX_STORAGE_SUBDIRS])))
908 : goto end;
909 18 : if(resume)
910 : {
911 0 : if(!(p1pos=do_resume(sdirs, dpth, confs)))
912 : goto end;
913 0 : if(cntr_send_sdirs(asfd, sdirs, confs))
914 : goto end;
915 : }
916 :
917 18 : if(!(manios=manios_open_phase2(sdirs, p1pos, PROTO_2))
918 42 : || !(slist=slist_alloc())
919 18 : || !(csb=sbuf_alloc(PROTO_2)))
920 : goto end;
921 :
922 18 : iobuf_free_content(asfd->rbuf);
923 :
924 : memset(&wbuf, 0, sizeof(struct iobuf));
925 216994 : while(!(end_flags&END_BACKUP))
926 : {
927 216982 : if(maybe_add_from_scan(manios, slist, chfd, &csb, cntr))
928 : goto end;
929 :
930 216982 : if(!wbuf.len)
931 : {
932 216982 : if(get_wbuf_from_sigs(&wbuf, slist, &end_flags))
933 : goto end;
934 216982 : if(!wbuf.len)
935 : {
936 208447 : get_wbuf_from_files(&wbuf, slist,
937 : manios, &end_flags, &file_no);
938 : }
939 : }
940 :
941 216982 : if(wbuf.len
942 25599 : && asfd->append_all_to_write_buffer(asfd, &wbuf)==APPEND_ERROR)
943 : goto end;
944 :
945 216981 : if(append_for_champ_chooser(chfd, slist->blist, end_flags))
946 : goto end;
947 :
948 216980 : if(as->read_write(as))
949 : {
950 2 : logp("error from as->read_write in %s\n", __func__);
951 2 : goto end;
952 : }
953 :
954 267624 : while(asfd->rbuf->buf)
955 : {
956 50647 : if(deal_with_read(asfd->rbuf, slist, cntr,
957 : &end_flags, dpth))
958 : goto end;
959 : // Get as much out of the readbuf as possible.
960 50646 : if(asfd->parse_readbuf(asfd))
961 : goto end;
962 : }
963 242035 : while(chfd->rbuf->buf)
964 : {
965 50118 : if(deal_with_read_from_chfd(chfd,
966 25059 : slist->blist, dpth, cntr))
967 : goto end;
968 : // Get as much out of the readbuf as possible.
969 25058 : if(chfd->parse_readbuf(chfd))
970 : goto end;
971 : }
972 :
973 216976 : if(write_to_changed_file(asfd, chfd, manios,
974 : slist, end_flags))
975 : goto end;
976 : }
977 :
978 : // Hack: If there are some entries left after the last entry that
979 : // contains block data, it will not be written to the changed file
980 : // yet because the last entry of block data has not had
981 : // sb->protocol2->bend set.
982 12 : if(slist->head && slist->head->next)
983 : {
984 8 : struct sbuf *sb=NULL;
985 8 : sb=slist->head;
986 8 : slist->head=sb->next;
987 8 : sbuf_free(&sb);
988 8 : if(write_to_changed_file(asfd, chfd, manios,
989 : slist, end_flags))
990 : goto end;
991 : }
992 :
993 12 : if(manios_close(&manios))
994 : goto end;
995 :
996 36 : if(check_for_missing_work_in_slist(slist))
997 : goto end;
998 :
999 : // Need to release the last left. There should be one at most.
1000 11 : if(dpth->head && dpth->head->next)
1001 : {
1002 0 : logp("ERROR: More data locks remaining after: %s\n",
1003 0 : dpth->head->save_path);
1004 0 : goto end;
1005 : }
1006 11 : if(dpth_release_all(dpth)) goto end;
1007 :
1008 11 : ret=0;
1009 : end:
1010 24 : logp("End backup\n");
1011 24 : sbuf_free(&csb);
1012 24 : slist_free(&slist);
1013 24 : if(asfd) iobuf_free_content(asfd->rbuf);
1014 24 : if(chfd) iobuf_free_content(chfd->rbuf);
1015 24 : dpth_free(&dpth);
1016 24 : manios_close(&manios);
1017 24 : man_off_t_free(&p1pos);
1018 24 : blks_generate_free();
1019 24 : hash_delete_all();
1020 24 : return ret;
1021 : }
1022 :
1023 0 : int backup_phase2_server_protocol2(struct async *as, struct sdirs *sdirs,
1024 : int resume, struct conf **confs)
1025 : {
1026 0 : int ret=-1;
1027 0 : struct asfd *chfd=NULL;
1028 0 : if(!(chfd=champ_chooser_connect(as, sdirs, confs, resume)))
1029 : {
1030 0 : logp("problem connecting to champ chooser\n");
1031 0 : goto end;
1032 : }
1033 0 : ret=do_backup_phase2_server_protocol2(as, chfd, sdirs, resume, confs);
1034 : end:
1035 0 : if(chfd) as->asfd_remove(as, chfd);
1036 0 : asfd_free(&chfd);
1037 0 : return ret;
1038 : }
|