Line data Source code
1 : #include "../../burp.h"
2 : #include "../../alloc.h"
3 : #include "../../asfd.h"
4 : #include "../../async.h"
5 : #include "../../attribs.h"
6 : #include "../../base64.h"
7 : #include "../../cmd.h"
8 : #include "../../cntr.h"
9 : #include "../../handy.h"
10 : #include "../../hexmap.h"
11 : #include "../../iobuf.h"
12 : #include "../../log.h"
13 : #include "../../server/manio.h"
14 : #include "../../protocol2/blist.h"
15 : #include "../../protocol2/rabin/rabin.h"
16 : #include "../../slist.h"
17 : #include "../manios.h"
18 : #include "../resume.h"
19 : #include "champ_chooser/champ_client.h"
20 : #include "champ_chooser/champ_server.h"
21 : #include "champ_chooser/hash.h"
22 : #include "dpth.h"
23 : #include "backup_phase2.h"
24 :
25 : #define END_SIGS 0x01
26 : #define END_BACKUP 0x02
27 : #define END_REQUESTS 0x04
28 : #define END_BLK_REQUESTS 0x08
29 :
30 : #define BLKS_MAX_UNREQUESTED BLKS_MAX_IN_MEM/4
31 : // This needs to be greater than BLKS_MAX_UNREQUESTED
32 : #define SLIST_MAX_IN_MEM BLKS_MAX_IN_MEM
33 :
34 : static int breaking=0;
35 : static int breakcount=0;
36 :
37 : static int data_needed(struct sbuf *sb)
38 : {
39 33720 : if(sb->path.cmd==CMD_FILE)
40 : return 1;
41 16674 : if(sb->path.cmd==CMD_METADATA)
42 : return 1;
43 : return 0;
44 : }
45 :
46 4 : static int manio_component_to_chfd(struct asfd *chfd, char *path)
47 : {
48 : struct iobuf wbuf;
49 4 : iobuf_from_str(&wbuf, CMD_MANIFEST, path);
50 4 : return chfd->write(chfd, &wbuf);
51 : }
52 :
53 0 : static int unchanged(struct sbuf *csb, struct sbuf *sb,
54 : struct manios *manios, struct asfd *chfd)
55 : {
56 0 : int ret=-1;
57 0 : char *fpath=NULL;
58 0 : if(!(fpath=strdup_w(manios->changed->offset->fpath, __func__)))
59 : goto end;
60 0 : if(manio_copy_entry(csb, sb,
61 : manios->current, manios->unchanged)<0)
62 : goto end;
63 0 : if(strcmp(fpath, manios->changed->offset->fpath))
64 : {
65 : // If the copy crossed a manio boundary, we should tell the
66 : // champ server to load the previous one as a candidate.
67 0 : if(manio_component_to_chfd(chfd, fpath))
68 : goto end;
69 : }
70 : ret=0;
71 : end:
72 0 : free_w(&fpath);
73 0 : return ret;
74 : }
75 :
76 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
77 0 : static int found_in_current_manifest(struct sbuf *csb, struct sbuf *sb,
78 : struct manios *manios, struct asfd *chfd,
79 : struct cntr *cntr)
80 : {
81 : // Located the entry in the current manifest.
82 : // If the file type changed, I think it is time to back it up again
83 : // (for example, EFS changing to normal file, or back again).
84 0 : if(csb->path.cmd!=sb->path.cmd)
85 : {
86 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
87 : return -1;
88 0 : return 1;
89 : }
90 :
91 : // mtime is the actual file data.
92 : // ctime is the attributes or meta data.
93 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
94 0 : && csb->statp.st_ctime==sb->statp.st_ctime)
95 : {
96 : // Got an unchanged file.
97 0 : cntr_add_same(cntr, sb->path.cmd);
98 0 : return unchanged(csb, sb, manios, chfd);
99 : }
100 :
101 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
102 0 : && csb->statp.st_ctime!=sb->statp.st_ctime)
103 : {
104 : // FIX THIS:
105 : // File data stayed the same, but attributes or meta data
106 : // changed. We already have the attributes, but may need to
107 : // get extra meta data.
108 0 : cntr_add_same(cntr, sb->path.cmd);
109 0 : return unchanged(csb, sb, manios, chfd);
110 : }
111 :
112 : // File data changed.
113 0 : cntr_add_changed(cntr, sb->path.cmd);
114 0 : if(manio_forward_through_sigs(csb, manios->current)<0)
115 : return -1;
116 0 : return 1;
117 : }
118 :
119 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
120 33720 : static int entry_changed(struct sbuf *sb,
121 : struct manios *manios, struct asfd *chfd, struct sbuf **csb,
122 : struct cntr *cntr)
123 : {
124 : static int finished=0;
125 : int pcmp;
126 :
127 33720 : if(finished)
128 : {
129 33710 : cntr_add_new(cntr, sb->path.cmd);
130 33710 : return 1;
131 : }
132 :
133 10 : if((*csb)->path.buf)
134 : {
135 : // Already have an entry.
136 : }
137 : else
138 : {
139 : // Need to read another.
140 10 : switch(manio_read(manios->current, *csb))
141 : {
142 : case 1: // Reached the end.
143 10 : sbuf_free(csb);
144 10 : finished=1;
145 10 : cntr_add_new(cntr, sb->path.cmd);
146 10 : return 1;
147 : case -1: return -1;
148 : }
149 0 : if(!(*csb)->path.buf)
150 : {
151 0 : logp("Should have a path at this point, but do not, in %s\n", __func__);
152 0 : return -1;
153 : }
154 : // Got an entry.
155 : }
156 :
157 : while(1)
158 : {
159 0 : if(!(pcmp=sbuf_pathcmp(*csb, sb)))
160 0 : return found_in_current_manifest(*csb, sb,
161 : manios, chfd, cntr);
162 0 : else if(pcmp>0)
163 : {
164 0 : cntr_add_new(cntr, sb->path.cmd);
165 0 : return 1;
166 : }
167 : // cntr_add_deleted(cntr, (*csb)->path.cmd);
168 : // Behind - need to read more data from the old manifest.
169 0 : switch(manio_read(manios->current, *csb))
170 : {
171 : case 1: // Reached the end.
172 0 : sbuf_free(csb);
173 0 : cntr_add_new(cntr, sb->path.cmd);
174 0 : return 1;
175 : case -1: return -1;
176 : }
177 : // Got something, go back around the loop.
178 : }
179 :
180 : return 0;
181 : }
182 :
183 8523 : static int add_data_to_store(struct cntr *cntr,
184 : struct slist *slist, struct iobuf *rbuf, struct dpth *dpth)
185 : {
186 : static struct blk *blk=NULL;
187 :
188 : // Find the first one in the list that was requested.
189 : // FIX THIS: Going up the list here, and then later
190 : // when writing to the manifest is not efficient.
191 17046 : for(blk=slist->blist->head;
192 8522 : blk && (!blk->requested || blk->got==BLK_GOT); blk=blk->next)
193 : {
194 : // logp("try: %d %d\n", blk->index, blk->got);
195 : }
196 8523 : if(!blk)
197 : {
198 1 : logp("Received data but could not find next requested block.\n");
199 1 : if(!slist->blist->head)
200 1 : logp("and slist->blist->head is null\n");
201 : else
202 0 : logp("head index: %" PRIu64 "\n", slist->blist->head->index);
203 : return -1;
204 : }
205 :
206 : // FIX THIS
207 : #ifndef UTEST
208 : if(blk_verify(blk->fingerprint, blk->md5sum, rbuf->buf, rbuf->len)<=0)
209 : {
210 : logp("ERROR: Block %" PRIu64 " from client did not verify.\n",
211 : blk->index);
212 : return -1;
213 : }
214 : #endif
215 :
216 : // Add it to the data store straight away.
217 8522 : if(dpth_protocol2_fwrite(dpth, rbuf, blk)) return -1;
218 :
219 8522 : cntr_add(cntr, CMD_DATA, 0);
220 :
221 8522 : blk->got=BLK_GOT;
222 8522 : blk=blk->next;
223 :
224 : return 0;
225 : }
226 :
227 17034 : static int set_up_for_sig_info(struct slist *slist, struct iobuf *attr,
228 : uint64_t index)
229 : {
230 : struct sbuf *sb;
231 :
232 50701 : for(sb=slist->add_sigs_here; sb; sb=sb->next)
233 : {
234 50701 : if(!sb->protocol2->index)
235 16642 : continue;
236 34059 : if(index==sb->protocol2->index)
237 : break;
238 : }
239 17034 : if(!sb)
240 : {
241 0 : logp("Could not find %" PRIu64 " in request list\n", index);
242 : return -1;
243 : }
244 : // Replace the attribs with the more recent values.
245 17034 : iobuf_free_content(&sb->attr);
246 17034 : iobuf_move(&sb->attr, attr);
247 :
248 : // Mark the end of the previous file.
249 17034 : slist->add_sigs_here->protocol2->bend=slist->blist->tail;
250 :
251 17034 : slist->add_sigs_here=sb;
252 :
253 : // Incoming sigs now need to get added to 'add_sigs_here'
254 : return 0;
255 : }
256 :
257 : /*
258 : static void dump_blks(const char *msg, struct blk *b)
259 : {
260 : struct blk *xx;
261 : for(xx=b; xx; xx=xx->next)
262 : printf("%s: %d %d %p\n", msg, xx->index, xx->got, xx);
263 : }
264 : */
265 :
266 25058 : static int add_to_sig_list(struct slist *slist, struct iobuf *rbuf)
267 : {
268 : // Goes on slist->add_sigs_here
269 : struct blk *blk;
270 : struct protocol2 *protocol2;
271 :
272 25058 : if(!(blk=blk_alloc())) return -1;
273 25058 : blist_add_blk(slist->blist, blk);
274 :
275 25058 : protocol2=slist->add_sigs_here->protocol2;
276 25058 : if(!protocol2->bstart) protocol2->bstart=blk;
277 25058 : if(!protocol2->bsighead) protocol2->bsighead=blk;
278 :
279 25058 : if(blk_set_from_iobuf_sig(blk, rbuf)) return -1;
280 :
281 : // Need to send sigs to champ chooser, therefore need to point
282 : // to the oldest unsent one if nothing is pointed to yet.
283 25058 : if(!slist->blist->blk_for_champ_chooser)
284 16490 : slist->blist->blk_for_champ_chooser=blk;
285 :
286 : return 0;
287 : }
288 :
289 50647 : static int deal_with_read(struct iobuf *rbuf, struct slist *slist,
290 : struct cntr *cntr, uint8_t *end_flags, struct dpth *dpth)
291 : {
292 50647 : int ret=0;
293 : static struct iobuf attr;
294 : static uint64_t index;
295 :
296 50647 : switch(rbuf->cmd)
297 : {
298 : /* Incoming block data. */
299 : case CMD_DATA:
300 8523 : if(add_data_to_store(cntr, slist, rbuf, dpth))
301 : goto error;
302 : goto end;
303 :
304 : /* Incoming block signatures. */
305 : case CMD_ATTRIBS_SIGS:
306 :
307 17034 : iobuf_init(&attr);
308 17034 : iobuf_move(&attr, rbuf);
309 17034 : index=decode_file_no(&attr);
310 :
311 : // Need to go through slist to find the matching
312 : // entry.
313 17034 : if(set_up_for_sig_info(slist, &attr, index))
314 : goto error;
315 : return 0;
316 : case CMD_SIG:
317 25058 : if(add_to_sig_list(slist, rbuf))
318 : goto error;
319 : goto end;
320 :
321 : /* Incoming control/message stuff. */
322 : case CMD_MESSAGE:
323 : case CMD_WARNING:
324 : {
325 6 : struct cntr *cntr=NULL;
326 6 : log_recvd(rbuf, cntr, 0);
327 6 : goto end;
328 : }
329 : case CMD_GEN:
330 25 : if(!strcmp(rbuf->buf, "sigs_end"))
331 : {
332 13 : (*end_flags)|=END_SIGS;
333 13 : goto end;
334 : }
335 12 : else if(!strcmp(rbuf->buf, "backup_end"))
336 : {
337 12 : (*end_flags)|=END_BACKUP;
338 12 : goto end;
339 : }
340 : break;
341 : case CMD_INTERRUPT:
342 : {
343 : uint64_t file_no;
344 1 : file_no=base64_to_uint64(rbuf->buf);
345 1 : if(slist_del_sbuf_by_index(slist, file_no))
346 : goto error;
347 : goto end;
348 : }
349 : default:
350 : break;
351 : }
352 :
353 0 : iobuf_log_unexpected(rbuf, __func__);
354 : error:
355 : ret=-1;
356 : end:
357 33613 : iobuf_free_content(rbuf);
358 33613 : return ret;
359 : }
360 :
361 216982 : static int get_wbuf_from_sigs(struct iobuf *wbuf, struct slist *slist,
362 : uint8_t *end_flags)
363 : {
364 : static char req[32]="";
365 216982 : struct sbuf *sb=slist->blks_to_request;
366 :
367 13327 : while(sb && !(sb->flags & SBUF_NEED_DATA)) sb=sb->next;
368 :
369 216982 : if(!sb)
370 : {
371 16 : slist->blks_to_request=NULL;
372 16 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
373 : {
374 3 : iobuf_from_str(wbuf,
375 : CMD_GEN, (char *)"blk_requests_end");
376 3 : (*end_flags)|=END_BLK_REQUESTS;
377 : }
378 : return 0;
379 : }
380 216966 : if(!sb->protocol2->bsighead)
381 : {
382 : // Trying to move onto the next file.
383 : // ??? Does this really work?
384 129027 : if(sb->protocol2->bend)
385 : {
386 0 : slist->blks_to_request=sb->next;
387 : printf("move to next\n");
388 : }
389 129027 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
390 : {
391 10 : iobuf_from_str(wbuf,
392 : CMD_GEN, (char *)"blk_requests_end");
393 10 : (*end_flags)|=END_BLK_REQUESTS;
394 : }
395 : return 0;
396 : }
397 :
398 87939 : if(sb->protocol2->bsighead->got==BLK_INCOMING)
399 : return 0;
400 :
401 12538 : if(sb->protocol2->bsighead->got==BLK_NOT_GOT)
402 : {
403 8522 : base64_from_uint64(sb->protocol2->bsighead->index, req);
404 8522 : iobuf_from_str(wbuf, CMD_DATA_REQ, req);
405 8522 : sb->protocol2->bsighead->requested=1;
406 : }
407 :
408 : // Move on.
409 12538 : if(sb->protocol2->bsighead==sb->protocol2->bend)
410 : {
411 8517 : slist->blks_to_request=sb->next;
412 8517 : sb->protocol2->bsighead=sb->protocol2->bstart;
413 : }
414 : else
415 : {
416 4021 : sb->protocol2->bsighead=sb->protocol2->bsighead->next;
417 : }
418 : return 0;
419 : }
420 :
421 208447 : static void get_wbuf_from_files(struct iobuf *wbuf, struct slist *slist,
422 : struct manios *manios, uint8_t *end_flags, uint64_t *file_no)
423 : {
424 208447 : struct sbuf *sb=slist->last_requested;
425 208447 : if(!sb)
426 : {
427 157689 : if(!manios->phase1 && !((*end_flags)&END_REQUESTS))
428 : {
429 18 : iobuf_from_str(wbuf, CMD_GEN, (char *)"requests_end");
430 18 : (*end_flags)|=END_REQUESTS;
431 : }
432 : return;
433 : }
434 :
435 50758 : if(sb->flags & SBUF_SENT_PATH || !(sb->flags & SBUF_NEED_DATA))
436 : {
437 33712 : slist->last_requested=sb->next;
438 : return;
439 : }
440 :
441 : // Only need to request the path at this stage.
442 17046 : iobuf_copy(wbuf, &sb->path);
443 17046 : sb->flags |= SBUF_SENT_PATH;
444 17046 : sb->protocol2->index=(*file_no)++;
445 : }
446 :
447 2 : static void get_wbuf_from_index(struct iobuf *wbuf, uint64_t index)
448 : {
449 : static char *p;
450 : static char tmp[32];
451 2 : p=tmp;
452 2 : p+=to_base64(index, tmp);
453 2 : *p='\0';
454 2 : iobuf_from_str(wbuf, CMD_WRAP_UP, tmp);
455 2 : }
456 :
457 33704 : static int write_endfile(struct sbuf *sb, struct manios *manios)
458 : {
459 : struct iobuf endfile;
460 :
461 33704 : if(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST)
462 : return 0;
463 33704 : if(!iobuf_is_filedata(&sb->path))
464 : return 0;
465 :
466 17036 : sb->flags |= SBUF_END_WRITTEN_TO_MANIFEST;
467 : // FIX THIS: Should give a proper length and md5sum.
468 17036 : iobuf_from_str(&endfile, CMD_END_FILE, (char *)"0:0");
469 17036 : return iobuf_send_msg_fzp(&endfile, manios->changed->fzp);
470 : }
471 :
472 233992 : static void blist_adjust_head(struct blist *blist, struct sbuf *sb)
473 : {
474 : struct blk *b;
475 259050 : while(blist->head!=sb->protocol2->bstart)
476 : {
477 25058 : b=blist->head->next;
478 25058 : if(blist->head==blist->blk_from_champ_chooser)
479 8508 : blist->blk_from_champ_chooser=b;
480 25058 : blk_free(&blist->head);
481 25058 : blist->head=b;
482 : }
483 233992 : if(!blist->head)
484 51771 : blist->tail=NULL;
485 233992 : }
486 :
487 233992 : static int sbuf_needs_data(struct sbuf *sb, struct asfd *asfd,
488 : struct asfd *chfd, struct manios *manios,
489 : struct slist *slist, int end_flags)
490 : {
491 233992 : int ret=-1;
492 : struct blk *blk;
493 : static struct iobuf wbuf;
494 233992 : struct blist *blist=slist->blist;
495 : static int unrequested=0;
496 :
497 233992 : if(!(sb->flags & SBUF_HEADER_WRITTEN_TO_MANIFEST))
498 : {
499 17036 : if(manio_write_sbuf(manios->changed, sb)) goto end;
500 17036 : sb->flags |= SBUF_HEADER_WRITTEN_TO_MANIFEST;
501 : }
502 :
503 242025 : while((blk=sb->protocol2->bstart)
504 190254 : && blk->got==BLK_GOT
505 59413 : && (blk->next || end_flags&END_BACKUP))
506 : {
507 25058 : if(blk->got_save_path
508 25058 : && !blk_is_zero_length(blk))
509 : {
510 25058 : if(breaking && breakcount--==0)
511 : {
512 0 : breakpoint(breaking, __func__);
513 0 : goto end;
514 : }
515 25058 : if(manio_write_sig_and_path(manios->changed, blk))
516 : goto end;
517 25058 : if(manios->changed->sig_count==0)
518 : {
519 : // Have finished a manifest file. Want to start
520 : // using it as a dedup candidate now.
521 4 : if(manio_component_to_chfd(chfd,
522 4 : manios->changed->offset->ppath))
523 : goto end;
524 :
525 : // The champ chooser has the candidate. Now,
526 : // empty our local hash table.
527 4 : hash_delete_all();
528 : // Add the most recent block, so identical
529 : // adjacent blocks are deduplicated well.
530 4 : if(hash_load_blk(blk))
531 : goto end;
532 : }
533 : }
534 :
535 25058 : if(!blk->requested)
536 16536 : unrequested++;
537 :
538 25058 : if(unrequested>BLKS_MAX_UNREQUESTED)
539 : {
540 2 : unrequested=0;
541 : // Let the client know that it can free memory if there
542 : // was a long consecutive number of unrequested blocks.
543 2 : get_wbuf_from_index(&wbuf, blk->index);
544 2 : if(asfd->write(asfd, &wbuf))
545 : goto end;
546 : }
547 :
548 25058 : if(blk==sb->protocol2->bend)
549 : {
550 17025 : blist_adjust_head(blist, sb);
551 17025 : if(write_endfile(sb, manios)) return -1;
552 17025 : slist_advance(slist);
553 17025 : return 1;
554 : }
555 :
556 8033 : if(sb->protocol2->bsighead==sb->protocol2->bstart)
557 8020 : sb->protocol2->bsighead=blk->next;
558 8033 : sb->protocol2->bstart=blk->next;
559 8033 : if(blk==blist->blk_from_champ_chooser)
560 4021 : blist->blk_from_champ_chooser=blk->next;
561 : }
562 216967 : if(!blk && sb && !sb->protocol2->bend && (end_flags&END_BACKUP))
563 : {
564 : // Write endfile for the very last file.
565 11 : if(write_endfile(sb, manios)) return -1;
566 : }
567 : ret=0;
568 : end:
569 216967 : blist_adjust_head(blist, sb);
570 216967 : return ret;
571 : }
572 :
573 216984 : static int write_to_changed_file(struct asfd *asfd,
574 : struct asfd *chfd, struct manios *manios,
575 : struct slist *slist, int end_flags)
576 : {
577 : struct sbuf *sb;
578 216984 : if(!slist) return 0;
579 :
580 250677 : while((sb=slist->head))
581 : {
582 250660 : if(sb->flags & SBUF_NEED_DATA)
583 : {
584 233992 : switch(sbuf_needs_data(sb, asfd, chfd, manios, slist,
585 : end_flags))
586 : {
587 : case 0: return 0;
588 17025 : case 1: continue;
589 0 : default: return -1;
590 : }
591 :
592 : }
593 : else
594 : {
595 : // No change, can go straight in.
596 16668 : if(manio_write_sbuf(manios->changed, sb)) return -1;
597 16668 : if(write_endfile(sb, manios)) return -1;
598 :
599 : // Move along.
600 16668 : slist_advance(slist);
601 : }
602 : }
603 : return 0;
604 : }
605 :
606 216982 : static int maybe_add_from_scan(struct manios *manios,
607 : struct slist *slist, struct asfd *chfd, struct sbuf **csb,
608 : struct cntr *cntr)
609 : {
610 216982 : int ret=-1;
611 216982 : struct sbuf *snew=NULL;
612 :
613 : while(1)
614 : {
615 250702 : sbuf_free(&snew);
616 250702 : if(!manios->phase1) return 0;
617 : // Limit the amount loaded into memory at any one time.
618 33738 : if(slist->count>SLIST_MAX_IN_MEM)
619 : return 0;
620 67458 : if(!(snew=sbuf_alloc(PROTO_2))) goto end;
621 :
622 33738 : switch(manio_read(manios->phase1, snew))
623 : {
624 : case 0: break;
625 18 : case 1: manio_close(&manios->phase1);
626 18 : ret=0; // Finished.
627 : default: goto end;
628 : }
629 :
630 33720 : switch(entry_changed(snew, manios, chfd, csb, cntr))
631 : {
632 0 : case 0: continue; // No change.
633 : case 1: break;
634 : default: goto end; // Error.
635 : }
636 :
637 101160 : if(data_needed(snew)) snew->flags|=SBUF_NEED_DATA;
638 :
639 33720 : slist_add_sbuf(slist, snew);
640 33720 : snew=NULL;
641 : }
642 : return 0;
643 : end:
644 18 : sbuf_free(&snew);
645 18 : return ret;
646 : }
647 :
648 216981 : static int append_for_champ_chooser(struct asfd *chfd,
649 : struct blist *blist, int end_flags)
650 : {
651 : static int finished_sending=0;
652 : static struct iobuf wbuf;
653 : static struct blk *blk=NULL;
654 :
655 242039 : while(blist->blk_for_champ_chooser)
656 : {
657 112824 : blk=blist->blk_for_champ_chooser;
658 : // If we send too many blocks to the champ chooser at once,
659 : // it can go faster than we can send paths to completed
660 : // manifests to it. This means that deduplication efficiency
661 : // is reduced (although speed may be faster).
662 : // So limit the sending.
663 225648 : if(blk->index
664 112824 : - blist->head->index > MANIFEST_SIG_MAX)
665 : return 0;
666 :
667 25058 : blk_to_iobuf_sig(blk, &wbuf);
668 :
669 25058 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
670 : {
671 : case APPEND_OK: break;
672 : case APPEND_BLOCKED:
673 : return 0; // Try again later.
674 : default: return -1;
675 : }
676 25058 : blist->blk_for_champ_chooser=blk->next;
677 : }
678 129215 : if(end_flags&END_SIGS
679 49976 : && !finished_sending && !blist->blk_for_champ_chooser)
680 : {
681 13 : iobuf_from_str(&wbuf, CMD_GEN, (char *)"sigs_end");
682 13 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
683 : {
684 : case APPEND_OK: break;
685 : case APPEND_BLOCKED:
686 : return 0; // Try again later.
687 : default: return -1;
688 : }
689 12 : finished_sending++;
690 : }
691 : return 0;
692 : }
693 :
694 25063 : static int mark_not_got(struct blk *blk, struct dpth *dpth)
695 : {
696 : const char *path;
697 :
698 25063 : if(blk->got!=BLK_INCOMING) return 0;
699 8522 : blk->got=BLK_NOT_GOT;
700 :
701 : // Need to get the data for this blk from the client.
702 : // Set up the details of where it will be saved.
703 8522 : if(!(path=dpth_protocol2_mk(dpth))) return -1;
704 :
705 : // FIX THIS: make dpth give us the path in a uint8 array.
706 8522 : blk->savepath=savepathstr_with_sig_to_uint64(path);
707 8522 : blk->got_save_path=1;
708 : // Load it into our local hash table.
709 8522 : if(hash_load_blk(blk))
710 : return -1;
711 8522 : if(dpth_protocol2_incr_sig(dpth))
712 : return -1;
713 8522 : return 0;
714 : }
715 :
716 25058 : static struct hash_strong *in_local_hash(struct blk *blk)
717 : {
718 : static struct hash_weak *hash_weak;
719 :
720 25058 : if(!(hash_weak=hash_weak_find(blk->fingerprint)))
721 : return NULL;
722 4014 : return hash_strong_find(hash_weak, blk->md5sum);
723 : }
724 :
725 37587 : static int simple_deduplicate_blk(struct blk *blk)
726 : {
727 : static struct hash_strong *hash_strong;
728 37587 : if(blk->got!=BLK_INCOMING)
729 : return 0;
730 25058 : if((hash_strong=in_local_hash(blk)))
731 : {
732 4014 : blk->savepath=hash_strong->savepath;
733 4014 : blk->got_save_path=1;
734 4014 : blk->got=BLK_GOT;
735 4014 : return 1;
736 : }
737 : return 0;
738 : }
739 :
740 25058 : static int mark_up_to_index(struct blist *blist,
741 : uint64_t index, struct dpth *dpth)
742 : {
743 : struct blk *blk;
744 :
745 : // Mark everything that was not got, up to the given index.
746 62645 : for(blk=blist->blk_from_champ_chooser;
747 50116 : blk && blk->index!=index; blk=blk->next)
748 : {
749 12529 : if(simple_deduplicate_blk(blk))
750 0 : continue;
751 12529 : if(mark_not_got(blk, dpth))
752 : return -1;
753 : }
754 25058 : if(!blk)
755 : {
756 0 : logp("Could not find index from champ chooser: %" PRIu64 "\n",
757 : index);
758 : return -1;
759 : }
760 25058 : simple_deduplicate_blk(blk);
761 :
762 : //logp("Found index from champ chooser: %lu\n", index);
763 : //printf("index from cc: %d\n", index);
764 25058 : blist->blk_from_champ_chooser=blk;
765 : return 0;
766 : }
767 :
768 12524 : static int deal_with_sig_from_chfd(struct iobuf *rbuf, struct blist *blist,
769 : struct dpth *dpth)
770 : {
771 : static struct blk b;
772 12524 : if(blk_set_from_iobuf_index_and_savepath(&b, rbuf))
773 : return -1;
774 :
775 12524 : if(mark_up_to_index(blist, b.index, dpth))
776 : return -1;
777 12524 : blist->blk_from_champ_chooser->savepath=b.savepath;
778 12524 : blist->blk_from_champ_chooser->got=BLK_GOT;
779 12524 : blist->blk_from_champ_chooser->got_save_path=1;
780 12524 : return 0;
781 : }
782 :
783 12534 : static int deal_with_wrap_up_from_chfd(struct iobuf *rbuf, struct blist *blist,
784 : struct dpth *dpth)
785 : {
786 : static struct blk b;
787 12534 : if(blk_set_from_iobuf_wrap_up(&b, rbuf))
788 : return -1;
789 :
790 12534 : if(mark_up_to_index(blist, b.index, dpth)) return -1;
791 12534 : if(mark_not_got(blist->blk_from_champ_chooser, dpth)) return -1;
792 12534 : return 0;
793 : }
794 :
795 25059 : static int deal_with_read_from_chfd(struct asfd *chfd,
796 : struct blist *blist, struct dpth *dpth, struct cntr *cntr)
797 : {
798 25059 : int ret=-1;
799 :
800 : // Deal with champ chooser read here.
801 : //printf("read from cc: %s\n", chfd->rbuf->buf);
802 25059 : switch(chfd->rbuf->cmd)
803 : {
804 : case CMD_SIG:
805 : // Get these for blks that the champ chooser has found.
806 12524 : if(deal_with_sig_from_chfd(chfd->rbuf, blist, dpth))
807 : goto end;
808 12524 : cntr_add_same(cntr, CMD_DATA);
809 : break;
810 : case CMD_WRAP_UP:
811 12534 : if(deal_with_wrap_up_from_chfd(chfd->rbuf, blist, dpth))
812 : goto end;
813 : break;
814 : default:
815 1 : iobuf_log_unexpected(chfd->rbuf, __func__);
816 : goto end;
817 : }
818 : ret=0;
819 : end:
820 25059 : iobuf_free_content(chfd->rbuf);
821 25059 : return ret;
822 : }
823 :
824 12 : static int check_for_missing_work_in_slist(struct slist *slist)
825 : {
826 12 : struct sbuf *sb=NULL;
827 :
828 12 : if(slist->blist->head)
829 : {
830 0 : logp("ERROR: finishing but still want block: %" PRIu64 "\n",
831 : slist->blist->head->index);
832 : return -1;
833 : }
834 :
835 15 : for(sb=slist->head; sb; sb=sb->next)
836 : {
837 4 : if(!(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST))
838 : {
839 2 : logp("ERROR: finishing but still waiting for: %c:%s\n",
840 1 : slist->head->path.cmd, slist->head->path.buf);
841 : return -1;
842 : }
843 : }
844 : return 0;
845 : }
846 :
847 : #ifndef UTEST
848 : static
849 : #endif
850 24 : int do_backup_phase2_server_protocol2(struct async *as, struct asfd *chfd,
851 : struct sdirs *sdirs, int resume, struct conf **confs)
852 : {
853 24 : int ret=-1;
854 24 : uint8_t end_flags=0;
855 24 : struct slist *slist=NULL;
856 : struct iobuf wbuf;
857 24 : struct dpth *dpth=NULL;
858 24 : man_off_t *p1pos=NULL;
859 24 : struct manios *manios=NULL;
860 : // This is used to tell the client that a number of consecutive blocks
861 : // have been found and can be freed.
862 24 : struct asfd *asfd=NULL;
863 24 : struct cntr *cntr=NULL;
864 24 : struct sbuf *csb=NULL;
865 24 : uint64_t file_no=1;
866 :
867 24 : if(!as)
868 : {
869 1 : logp("async not provided to %s()\n", __func__);
870 1 : goto end;
871 : }
872 23 : if(!sdirs)
873 : {
874 2 : logp("sdirs not provided to %s()\n", __func__);
875 2 : goto end;
876 : }
877 21 : if(!confs)
878 : {
879 1 : logp("confs not provided to %s()\n", __func__);
880 1 : goto end;
881 : }
882 20 : asfd=as->asfd;
883 20 : if(!asfd)
884 : {
885 1 : logp("asfd not provided to %s()\n", __func__);
886 1 : goto end;
887 : }
888 19 : if(!chfd)
889 : {
890 1 : logp("chfd not provided to %s()\n", __func__);
891 1 : goto end;
892 : }
893 18 : cntr=get_cntr(confs);
894 18 : if(get_int(confs[OPT_BREAKPOINT])>=2000
895 0 : && get_int(confs[OPT_BREAKPOINT])<3000)
896 : {
897 0 : breaking=get_int(confs[OPT_BREAKPOINT]);
898 0 : breakcount=breaking-2000;
899 : }
900 :
901 18 : blks_generate_init();
902 :
903 18 : logp("Phase 2 begin (recv backup data)\n");
904 :
905 18 : if(!(dpth=dpth_alloc())
906 36 : || dpth_protocol2_init(dpth,
907 18 : sdirs->data,
908 18 : get_string(confs[OPT_CNAME]),
909 18 : sdirs->cfiles,
910 : get_int(confs[OPT_MAX_STORAGE_SUBDIRS])))
911 : goto end;
912 18 : if(resume)
913 : {
914 0 : if(!(p1pos=do_resume(sdirs, dpth, confs)))
915 : goto end;
916 0 : if(cntr_send_sdirs(asfd, sdirs, confs))
917 : goto end;
918 : }
919 :
920 18 : if(!(manios=manios_open_phase2(sdirs, p1pos, PROTO_2))
921 42 : || !(slist=slist_alloc())
922 18 : || !(csb=sbuf_alloc(PROTO_2)))
923 : goto end;
924 :
925 18 : iobuf_free_content(asfd->rbuf);
926 :
927 : memset(&wbuf, 0, sizeof(struct iobuf));
928 216994 : while(!(end_flags&END_BACKUP))
929 : {
930 216982 : if(maybe_add_from_scan(manios, slist, chfd, &csb, cntr))
931 : goto end;
932 :
933 216982 : if(!wbuf.len)
934 : {
935 216982 : if(get_wbuf_from_sigs(&wbuf, slist, &end_flags))
936 : goto end;
937 216982 : if(!wbuf.len)
938 : {
939 208447 : get_wbuf_from_files(&wbuf, slist,
940 : manios, &end_flags, &file_no);
941 : }
942 : }
943 :
944 216982 : if(wbuf.len
945 25599 : && asfd->append_all_to_write_buffer(asfd, &wbuf)==APPEND_ERROR)
946 : goto end;
947 :
948 216981 : if(append_for_champ_chooser(chfd, slist->blist, end_flags))
949 : goto end;
950 :
951 216980 : if(as->read_write(as))
952 : {
953 2 : logp("error from as->read_write in %s\n", __func__);
954 2 : goto end;
955 : }
956 :
957 267624 : while(asfd->rbuf->buf)
958 : {
959 50647 : if(deal_with_read(asfd->rbuf, slist, cntr,
960 : &end_flags, dpth))
961 : goto end;
962 : // Get as much out of the readbuf as possible.
963 50646 : if(asfd->parse_readbuf(asfd))
964 : goto end;
965 : }
966 242035 : while(chfd->rbuf->buf)
967 : {
968 50118 : if(deal_with_read_from_chfd(chfd,
969 25059 : slist->blist, dpth, cntr))
970 : goto end;
971 : // Get as much out of the readbuf as possible.
972 25058 : if(chfd->parse_readbuf(chfd))
973 : goto end;
974 : }
975 :
976 216976 : if(write_to_changed_file(asfd, chfd, manios,
977 : slist, end_flags))
978 : goto end;
979 : }
980 :
981 : // Hack: If there are some entries left after the last entry that
982 : // contains block data, it will not be written to the changed file
983 : // yet because the last entry of block data has not had
984 : // sb->protocol2->bend set.
985 12 : if(slist->head && slist->head->next)
986 : {
987 8 : struct sbuf *sb=NULL;
988 8 : sb=slist->head;
989 8 : slist->head=sb->next;
990 8 : sbuf_free(&sb);
991 8 : if(write_to_changed_file(asfd, chfd, manios,
992 : slist, end_flags))
993 : goto end;
994 : }
995 :
996 12 : if(manios_close(&manios))
997 : goto end;
998 :
999 36 : if(check_for_missing_work_in_slist(slist))
1000 : goto end;
1001 :
1002 : // Need to release the last left. There should be one at most.
1003 11 : if(dpth->head && dpth->head->next)
1004 : {
1005 0 : logp("ERROR: More data locks remaining after: %s\n",
1006 0 : dpth->head->save_path);
1007 0 : goto end;
1008 : }
1009 11 : if(dpth_release_all(dpth)) goto end;
1010 :
1011 11 : ret=0;
1012 : end:
1013 24 : logp("End backup\n");
1014 24 : sbuf_free(&csb);
1015 24 : slist_free(&slist);
1016 24 : if(asfd) iobuf_free_content(asfd->rbuf);
1017 24 : if(chfd) iobuf_free_content(chfd->rbuf);
1018 24 : dpth_free(&dpth);
1019 24 : manios_close(&manios);
1020 24 : man_off_t_free(&p1pos);
1021 24 : blks_generate_free();
1022 24 : hash_delete_all();
1023 24 : return ret;
1024 : }
1025 :
1026 0 : int backup_phase2_server_protocol2(struct async *as, struct sdirs *sdirs,
1027 : int resume, struct conf **confs)
1028 : {
1029 0 : int ret=-1;
1030 0 : struct asfd *chfd=NULL;
1031 0 : if(!(chfd=champ_chooser_connect(as, sdirs, confs, resume)))
1032 : {
1033 0 : logp("problem connecting to champ chooser\n");
1034 0 : goto end;
1035 : }
1036 0 : ret=do_backup_phase2_server_protocol2(as, chfd, sdirs, resume, confs);
1037 : end:
1038 0 : if(chfd) as->asfd_remove(as, chfd);
1039 0 : asfd_free(&chfd);
1040 0 : return ret;
1041 : }
|