Line data Source code
1 : #include "../../burp.h"
2 : #include "../../alloc.h"
3 : #include "../../asfd.h"
4 : #include "../../async.h"
5 : #include "../../attribs.h"
6 : #include "../../base64.h"
7 : #include "../../cmd.h"
8 : #include "../../cntr.h"
9 : #include "../../handy.h"
10 : #include "../../hexmap.h"
11 : #include "../../iobuf.h"
12 : #include "../../log.h"
13 : #include "../../server/manio.h"
14 : #include "../../protocol2/blist.h"
15 : #include "../../slist.h"
16 : #include "../manios.h"
17 : #include "../resume.h"
18 : #include "champ_chooser/champ_client.h"
19 : #include "champ_chooser/champ_server.h"
20 : #include "dpth.h"
21 :
22 : #define END_SIGS 0x01
23 : #define END_BACKUP 0x02
24 : #define END_REQUESTS 0x04
25 : #define END_BLK_REQUESTS 0x08
26 :
27 : static int breaking=0;
28 : static int breakcount=0;
29 :
30 : static int data_needed(struct sbuf *sb)
31 : {
32 33720 : if(sb->path.cmd==CMD_FILE)
33 : return 1;
34 16674 : if(sb->path.cmd==CMD_METADATA)
35 : return 1;
36 : return 0;
37 : }
38 :
39 4 : static int manio_component_to_chfd(struct asfd *chfd, char *path)
40 : {
41 : struct iobuf wbuf;
42 4 : iobuf_from_str(&wbuf, CMD_MANIFEST, path);
43 4 : return chfd->write(chfd, &wbuf);
44 : }
45 :
46 0 : static int unchanged(struct sbuf *csb, struct sbuf *sb,
47 : struct blk **blk, struct manios *manios, struct asfd *chfd)
48 : {
49 0 : int ret=-1;
50 0 : char *fpath=NULL;
51 0 : if(!(fpath=strdup_w(manios->changed->offset->fpath, __func__)))
52 : goto end;
53 0 : if(manio_copy_entry(csb, sb, blk,
54 0 : manios->current, manios->unchanged)<0)
55 : goto end;
56 0 : if(strcmp(fpath, manios->changed->offset->fpath))
57 : {
58 : // If the copy crossed a manio boundary, we should tell the
59 : // champ server to load the previous one as a candidate.
60 0 : if(manio_component_to_chfd(chfd, fpath))
61 : goto end;
62 : }
63 : ret=0;
64 : end:
65 0 : free_w(&fpath);
66 0 : return ret;
67 : }
68 :
69 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
70 0 : static int found_in_current_manifest(struct sbuf *csb, struct sbuf *sb,
71 : struct manios *manios, struct blk **blk, struct asfd *chfd)
72 : {
73 : // Located the entry in the current manifest.
74 : // If the file type changed, I think it is time to back it up again
75 : // (for example, EFS changing to normal file, or back again).
76 0 : if(csb->path.cmd!=sb->path.cmd)
77 : {
78 0 : if(manio_forward_through_sigs(csb, blk, manios->current)<0)
79 : return -1;
80 0 : return 1;
81 : }
82 :
83 : // mtime is the actual file data.
84 : // ctime is the attributes or meta data.
85 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
86 0 : && csb->statp.st_ctime==sb->statp.st_ctime)
87 : {
88 : // Got an unchanged file.
89 0 : return unchanged(csb, sb, blk, manios, chfd);
90 : }
91 :
92 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
93 0 : && csb->statp.st_ctime!=sb->statp.st_ctime)
94 : {
95 : // FIX THIS:
96 : // File data stayed the same, but attributes or meta data
97 : // changed. We already have the attributes, but may need to
98 : // get extra meta data.
99 0 : return unchanged(csb, sb, blk, manios, chfd);
100 : }
101 :
102 : // File data changed.
103 0 : if(manio_forward_through_sigs(csb, blk, manios->current)<0)
104 : return -1;
105 0 : return 1;
106 : }
107 :
108 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
109 33720 : static int entry_changed(struct sbuf *sb,
110 : struct manios *manios, struct asfd *chfd, struct sbuf **csb)
111 : {
112 : static int finished=0;
113 : static struct blk *blk=NULL;
114 : int pcmp;
115 :
116 33720 : if(finished) return 1;
117 :
118 10 : if((*csb)->path.buf)
119 : {
120 : // Already have an entry.
121 : }
122 : else
123 : {
124 : // Need to read another.
125 10 : if(!blk && !(blk=blk_alloc())) return -1;
126 10 : switch(manio_read_with_blk(manios->current,
127 10 : *csb, blk, NULL))
128 : {
129 : case 1: // Reached the end.
130 10 : sbuf_free(csb);
131 10 : blk_free(&blk);
132 10 : finished=1;
133 10 : return 1;
134 : case -1: return -1;
135 : }
136 0 : if(!(*csb)->path.buf)
137 : {
138 0 : logp("Should have a path at this point, but do not, in %s\n", __func__);
139 0 : return -1;
140 : }
141 : // Got an entry.
142 : }
143 :
144 : while(1)
145 : {
146 0 : if(!(pcmp=sbuf_pathcmp(*csb, sb)))
147 : return found_in_current_manifest(*csb, sb,
148 0 : manios, &blk, chfd);
149 0 : else if(pcmp>0)
150 : return 1;
151 : // Behind - need to read more data from the old manifest.
152 0 : switch(manio_read_with_blk(manios->current, *csb, blk, NULL))
153 : {
154 : case 1: // Reached the end.
155 0 : sbuf_free(csb);
156 0 : blk_free(&blk);
157 0 : return 1;
158 : case -1: return -1;
159 : }
160 : // Got something, go back around the loop.
161 : }
162 :
163 : return 0;
164 : }
165 :
166 12535 : static int add_data_to_store(struct cntr *cntr,
167 : struct slist *slist, struct iobuf *rbuf, struct dpth *dpth)
168 : {
169 : static struct blk *blk=NULL;
170 :
171 : // Find the first one in the list that was requested.
172 : // FIX THIS: Going up the list here, and then later
173 : // when writing to the manifest is not efficient.
174 12535 : for(blk=slist->blist->head;
175 12534 : blk && (!blk->requested || blk->got==BLK_GOT); blk=blk->next)
176 : {
177 : // logp("try: %d %d\n", blk->index, blk->got);
178 : }
179 12535 : if(!blk)
180 : {
181 1 : logp("Received data but could not find next requested block.\n");
182 1 : if(!slist->blist->head)
183 1 : logp("and slist->blist->head is null\n");
184 : else
185 0 : logp("head index: %" PRIu64 "\n", slist->blist->head->index);
186 : return -1;
187 : }
188 :
189 : // Add it to the data store straight away.
190 12534 : if(dpth_protocol2_fwrite(dpth, rbuf, blk)) return -1;
191 :
192 12534 : cntr_add(cntr, CMD_DATA, 0);
193 12534 : cntr_add_recvbytes(cntr, blk->length);
194 :
195 12534 : blk->got=BLK_GOT;
196 12534 : blk=blk->next;
197 :
198 : return 0;
199 : }
200 :
201 17034 : static int set_up_for_sig_info(struct slist *slist, struct iobuf *attr,
202 : uint64_t index)
203 : {
204 : struct sbuf *sb;
205 :
206 50701 : for(sb=slist->add_sigs_here; sb; sb=sb->next)
207 : {
208 50701 : if(!sb->protocol2->index)
209 : continue;
210 34059 : if(index==sb->protocol2->index)
211 : break;
212 : }
213 17034 : if(!sb)
214 : {
215 0 : logp("Could not find %" PRIu64 " in request list\n", index);
216 : return -1;
217 : }
218 : // Replace the attribs with the more recent values.
219 17034 : iobuf_free_content(&sb->attr);
220 17034 : iobuf_move(&sb->attr, attr);
221 :
222 : // Mark the end of the previous file.
223 17034 : slist->add_sigs_here->protocol2->bend=slist->blist->tail;
224 :
225 17034 : slist->add_sigs_here=sb;
226 :
227 : // Incoming sigs now need to get added to 'add_sigs_here'
228 : return 0;
229 : }
230 :
231 : /*
232 : static void dump_blks(const char *msg, struct blk *b)
233 : {
234 : struct blk *xx;
235 : for(xx=b; xx; xx=xx->next)
236 : printf("%s: %d %d %p\n", msg, xx->index, xx->got, xx);
237 : }
238 : */
239 :
240 25058 : static int add_to_sig_list(struct slist *slist, struct iobuf *rbuf)
241 : {
242 : // Goes on slist->add_sigs_here
243 : struct blk *blk;
244 : struct protocol2 *protocol2;
245 :
246 25058 : if(!(blk=blk_alloc())) return -1;
247 25058 : blist_add_blk(slist->blist, blk);
248 :
249 25058 : protocol2=slist->add_sigs_here->protocol2;
250 25058 : if(!protocol2->bstart) protocol2->bstart=blk;
251 25058 : if(!protocol2->bsighead) protocol2->bsighead=blk;
252 :
253 25058 : if(blk_set_from_iobuf_sig(blk, rbuf)) return -1;
254 :
255 : // Need to send sigs to champ chooser, therefore need to point
256 : // to the oldest unsent one if nothing is pointed to yet.
257 25058 : if(!slist->blist->blk_for_champ_chooser)
258 16490 : slist->blist->blk_for_champ_chooser=blk;
259 :
260 : return 0;
261 : }
262 :
263 54659 : static int deal_with_read(struct iobuf *rbuf, struct slist *slist,
264 : struct cntr *cntr, uint8_t *end_flags, struct dpth *dpth)
265 : {
266 54659 : int ret=0;
267 :
268 54659 : switch(rbuf->cmd)
269 : {
270 : /* Incoming block data. */
271 : case CMD_DATA:
272 12535 : if(add_data_to_store(cntr, slist, rbuf, dpth))
273 : goto error;
274 : goto end;
275 :
276 : /* Incoming block signatures. */
277 : case CMD_ATTRIBS_SIGS:
278 : static struct iobuf attr;
279 : static uint64_t index;
280 :
281 17034 : iobuf_init(&attr);
282 17034 : iobuf_move(&attr, rbuf);
283 17034 : index=decode_file_no(&attr);
284 :
285 : // Need to go through slist to find the matching
286 : // entry.
287 17034 : if(set_up_for_sig_info(slist, &attr, index))
288 : goto error;
289 : return 0;
290 : case CMD_SIG:
291 25058 : if(add_to_sig_list(slist, rbuf))
292 : goto error;
293 : goto end;
294 :
295 : /* Incoming control/message stuff. */
296 : case CMD_MESSAGE:
297 : case CMD_WARNING:
298 : {
299 6 : struct cntr *cntr=NULL;
300 6 : log_recvd(rbuf, cntr, 0);
301 6 : goto end;
302 : }
303 : case CMD_GEN:
304 25 : if(!strcmp(rbuf->buf, "sigs_end"))
305 : {
306 13 : (*end_flags)|=END_SIGS;
307 13 : goto end;
308 : }
309 12 : else if(!strcmp(rbuf->buf, "backup_end"))
310 : {
311 12 : (*end_flags)|=END_BACKUP;
312 12 : goto end;
313 : }
314 : break;
315 : case CMD_INTERRUPT:
316 : {
317 : uint64_t file_no;
318 1 : file_no=base64_to_uint64(rbuf->buf);
319 1 : if(slist_del_sbuf_by_index(slist, file_no))
320 : goto error;
321 : goto end;
322 : }
323 : default:
324 : break;
325 : }
326 :
327 0 : iobuf_log_unexpected(rbuf, __func__);
328 : error:
329 : ret=-1;
330 : end:
331 37625 : iobuf_free_content(rbuf);
332 37625 : return ret;
333 : }
334 :
335 220994 : static int get_wbuf_from_sigs(struct iobuf *wbuf, struct slist *slist,
336 : uint8_t *end_flags)
337 : {
338 : static char req[32]="";
339 220994 : struct sbuf *sb=slist->blks_to_request;
340 :
341 13387 : while(sb && !(sb->flags & SBUF_NEED_DATA)) sb=sb->next;
342 :
343 220994 : if(!sb)
344 : {
345 16 : slist->blks_to_request=NULL;
346 16 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
347 : {
348 : iobuf_from_str(wbuf,
349 3 : CMD_GEN, (char *)"blk_requests_end");
350 3 : (*end_flags)|=END_BLK_REQUESTS;
351 : }
352 : return 0;
353 : }
354 220978 : if(!sb->protocol2->bsighead)
355 : {
356 : // Trying to move onto the next file.
357 : // ??? Does this really work?
358 133039 : if(sb->protocol2->bend)
359 : {
360 0 : slist->blks_to_request=sb->next;
361 : printf("move to next\n");
362 : }
363 133039 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
364 : {
365 : iobuf_from_str(wbuf,
366 10 : CMD_GEN, (char *)"blk_requests_end");
367 10 : (*end_flags)|=END_BLK_REQUESTS;
368 : }
369 : return 0;
370 : }
371 :
372 87939 : if(sb->protocol2->bsighead->got==BLK_INCOMING)
373 : return 0;
374 :
375 12538 : if(sb->protocol2->bsighead->got==BLK_NOT_GOT)
376 : {
377 12534 : base64_from_uint64(sb->protocol2->bsighead->index, req);
378 12534 : iobuf_from_str(wbuf, CMD_DATA_REQ, req);
379 12534 : sb->protocol2->bsighead->requested=1;
380 : }
381 :
382 : // Move on.
383 12538 : if(sb->protocol2->bsighead==sb->protocol2->bend)
384 : {
385 8517 : slist->blks_to_request=sb->next;
386 8517 : sb->protocol2->bsighead=sb->protocol2->bstart;
387 : }
388 : else
389 : {
390 4021 : sb->protocol2->bsighead=sb->protocol2->bsighead->next;
391 : }
392 : return 0;
393 : }
394 :
395 208447 : static void get_wbuf_from_files(struct iobuf *wbuf, struct slist *slist,
396 : struct manios *manios, uint8_t *end_flags, uint64_t *file_no)
397 : {
398 208447 : struct sbuf *sb=slist->last_requested;
399 208447 : if(!sb)
400 : {
401 157689 : if(!manios->phase1 && !((*end_flags)&END_REQUESTS))
402 : {
403 18 : iobuf_from_str(wbuf, CMD_GEN, (char *)"requests_end");
404 18 : (*end_flags)|=END_REQUESTS;
405 : }
406 : return;
407 : }
408 :
409 50758 : if(sb->flags & SBUF_SENT_PATH || !(sb->flags & SBUF_NEED_DATA))
410 : {
411 33712 : slist->last_requested=sb->next;
412 : return;
413 : }
414 :
415 : // Only need to request the path at this stage.
416 17046 : iobuf_copy(wbuf, &sb->path);
417 17046 : sb->flags |= SBUF_SENT_PATH;
418 17046 : sb->protocol2->index=(*file_no)++;
419 : }
420 :
421 2 : static void get_wbuf_from_index(struct iobuf *wbuf, uint64_t index)
422 : {
423 : static char *p;
424 : static char tmp[32];
425 2 : p=tmp;
426 2 : p+=to_base64(index, tmp);
427 2 : *p='\0';
428 2 : iobuf_from_str(wbuf, CMD_WRAP_UP, tmp);
429 2 : }
430 :
431 33704 : static int write_endfile(struct sbuf *sb, struct manios *manios)
432 : {
433 : struct iobuf endfile;
434 :
435 33704 : if(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST)
436 : return 0;
437 33704 : if(!iobuf_is_filedata(&sb->path))
438 : return 0;
439 :
440 17036 : sb->flags |= SBUF_END_WRITTEN_TO_MANIFEST;
441 : // FIX THIS: Should give a proper length and md5sum.
442 17036 : iobuf_from_str(&endfile, CMD_END_FILE, (char *)"0:0");
443 17036 : return iobuf_send_msg_fzp(&endfile, manios->changed->fzp);
444 : }
445 :
446 238004 : static void blist_adjust_head(struct blist *blist, struct sbuf *sb)
447 : {
448 : struct blk *b;
449 263062 : while(blist->head!=sb->protocol2->bstart)
450 : {
451 25058 : b=blist->head->next;
452 25058 : if(blist->head==blist->blk_from_champ_chooser)
453 8508 : blist->blk_from_champ_chooser=b;
454 25058 : blk_free(&blist->head);
455 25058 : blist->head=b;
456 : }
457 238004 : if(!blist->head)
458 51771 : blist->tail=NULL;
459 238004 : }
460 :
461 238004 : static int sbuf_needs_data(struct sbuf *sb, struct asfd *asfd,
462 : struct asfd *chfd, struct manios *manios,
463 : struct slist *slist, int end_flags)
464 : {
465 238004 : int ret=-1;
466 : struct blk *blk;
467 : static struct iobuf wbuf;
468 238004 : struct blist *blist=slist->blist;
469 :
470 238004 : if(!(sb->flags & SBUF_HEADER_WRITTEN_TO_MANIFEST))
471 : {
472 17036 : if(manio_write_sbuf(manios->changed, sb)) goto end;
473 17036 : sb->flags |= SBUF_HEADER_WRITTEN_TO_MANIFEST;
474 : }
475 :
476 246037 : while((blk=sb->protocol2->bstart)
477 194266 : && blk->got==BLK_GOT
478 305450 : && (blk->next || end_flags&END_BACKUP))
479 : {
480 25058 : if(blk->got_save_path
481 25058 : && !blk_is_zero_length(blk))
482 : {
483 25058 : if(breaking && breakcount--==0)
484 : {
485 0 : breakpoint(breaking, __func__);
486 0 : goto end;
487 : }
488 25058 : if(manio_write_sig_and_path(manios->changed, blk))
489 : goto end;
490 25058 : if(manios->changed->sig_count==0)
491 : {
492 : // Have finished a manifest file. Want to start
493 : // using it as a dedup candidate now.
494 4 : if(manio_component_to_chfd(chfd,
495 4 : manios->changed->offset->ppath))
496 : goto end;
497 :
498 4 : if(!blk->requested)
499 : {
500 : // Also let the client know, so that it
501 : // can free memory if there was a long
502 : // consecutive number of unrequested
503 : // blocks.
504 2 : get_wbuf_from_index(&wbuf, blk->index);
505 2 : if(asfd->write(asfd, &wbuf)) goto end;
506 : }
507 : }
508 : }
509 :
510 25058 : if(blk==sb->protocol2->bend)
511 : {
512 17025 : blist_adjust_head(blist, sb);
513 17025 : if(write_endfile(sb, manios)) return -1;
514 17025 : slist_advance(slist);
515 17025 : return 1;
516 : }
517 :
518 8033 : if(sb->protocol2->bsighead==sb->protocol2->bstart)
519 8020 : sb->protocol2->bsighead=blk->next;
520 8033 : sb->protocol2->bstart=blk->next;
521 8033 : if(blk==blist->blk_from_champ_chooser)
522 4021 : blist->blk_from_champ_chooser=blk->next;
523 : }
524 220979 : if(!blk && sb && !sb->protocol2->bend && (end_flags&END_BACKUP))
525 : {
526 : // Write endfile for the very last file.
527 11 : if(write_endfile(sb, manios)) return -1;
528 : }
529 : ret=0;
530 : end:
531 220979 : blist_adjust_head(blist, sb);
532 220979 : return ret;
533 : }
534 :
535 220996 : static int write_to_changed_file(struct asfd *asfd,
536 : struct asfd *chfd, struct manios *manios,
537 : struct slist *slist, int end_flags)
538 : {
539 : struct sbuf *sb;
540 220996 : if(!slist) return 0;
541 :
542 254689 : while((sb=slist->head))
543 : {
544 254672 : if(sb->flags & SBUF_NEED_DATA)
545 : {
546 238004 : switch(sbuf_needs_data(sb, asfd, chfd, manios, slist,
547 : end_flags))
548 : {
549 : case 0: return 0;
550 : case 1: continue;
551 0 : default: return -1;
552 : }
553 :
554 : }
555 : else
556 : {
557 : // No change, can go straight in.
558 16668 : if(manio_write_sbuf(manios->changed, sb)) return -1;
559 16668 : if(write_endfile(sb, manios)) return -1;
560 :
561 : // Move along.
562 16668 : slist_advance(slist);
563 : }
564 : }
565 : return 0;
566 : }
567 :
568 220994 : static int maybe_add_from_scan(struct manios *manios,
569 : struct slist *slist, struct asfd *chfd, struct sbuf **csb)
570 : {
571 220994 : int ret=-1;
572 220994 : struct sbuf *snew=NULL;
573 :
574 : while(1)
575 : {
576 254714 : sbuf_free(&snew);
577 254714 : if(!manios->phase1) return 0;
578 : // Limit the amount loaded into memory at any one time.
579 33738 : if(slist && slist->head)
580 : {
581 33720 : if(slist->head->protocol2->index
582 33720 : - slist->tail->protocol2->index>4096)
583 : return 0;
584 : }
585 67458 : if(!(snew=sbuf_alloc(PROTO_2))) goto end;
586 :
587 33738 : switch(manio_read(manios->phase1, snew))
588 : {
589 : case 0: break;
590 18 : case 1: manio_close(&manios->phase1);
591 18 : ret=0; // Finished.
592 : default: goto end;
593 : }
594 :
595 33720 : switch(entry_changed(snew, manios, chfd, csb))
596 : {
597 : case 0: continue; // No change.
598 : case 1: break;
599 : default: goto end; // Error.
600 : }
601 :
602 101160 : if(data_needed(snew)) snew->flags|=SBUF_NEED_DATA;
603 :
604 33720 : slist_add_sbuf(slist, snew);
605 33720 : snew=NULL;
606 : }
607 : return 0;
608 : end:
609 18 : sbuf_free(&snew);
610 18 : return ret;
611 : }
612 :
613 220993 : static int append_for_champ_chooser(struct asfd *chfd,
614 : struct blist *blist, int end_flags)
615 : {
616 : static int finished_sending=0;
617 : static struct iobuf wbuf;
618 : static struct blk *blk=NULL;
619 :
620 246051 : while(blist->blk_for_champ_chooser)
621 : {
622 114082 : blk=blist->blk_for_champ_chooser;
623 : // If we send too many blocks to the champ chooser at once,
624 : // it can go faster than we can send paths to completed
625 : // manifests to it. This means that deduplication efficiency
626 : // is reduced (although speed may be faster).
627 : // So limit the sending.
628 114082 : if(blk->index
629 114082 : - blist->head->index > MANIFEST_SIG_MAX)
630 : return 0;
631 :
632 25058 : blk_to_iobuf_sig(blk, &wbuf);
633 :
634 25058 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
635 : {
636 : case APPEND_OK: break;
637 : case APPEND_BLOCKED:
638 : return 0; // Try again later.
639 : default: return -1;
640 : }
641 25058 : blist->blk_for_champ_chooser=blk->next;
642 : }
643 131969 : if(end_flags&END_SIGS
644 52730 : && !finished_sending && !blist->blk_for_champ_chooser)
645 : {
646 13 : iobuf_from_str(&wbuf, CMD_GEN, (char *)"sigs_end");
647 13 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
648 : {
649 : case APPEND_OK: break;
650 : case APPEND_BLOCKED:
651 : return 0; // Try again later.
652 : default: return -1;
653 : }
654 12 : finished_sending++;
655 : }
656 : return 0;
657 : }
658 :
659 25063 : static int mark_not_got(struct blk *blk, struct dpth *dpth)
660 : {
661 : const char *path;
662 :
663 25063 : if(blk->got!=BLK_INCOMING) return 0;
664 12534 : blk->got=BLK_NOT_GOT;
665 :
666 : // Need to get the data for this blk from the client.
667 : // Set up the details of where it will be saved.
668 12534 : if(!(path=dpth_protocol2_mk(dpth))) return -1;
669 :
670 : // FIX THIS: make dpth give us the path in a uint8 array.
671 12534 : blk->savepath=savepathstr_with_sig_to_uint64(path);
672 12534 : blk->got_save_path=1;
673 12534 : if(dpth_protocol2_incr_sig(dpth)) return -1;
674 12534 : return 0;
675 : }
676 :
677 25058 : static int mark_up_to_index(struct blist *blist,
678 : uint64_t index, struct dpth *dpth)
679 : {
680 : struct blk *blk;
681 :
682 : // Mark everything that was not got, up to the given index.
683 37587 : for(blk=blist->blk_from_champ_chooser;
684 37587 : blk && blk->index!=index; blk=blk->next)
685 12529 : if(mark_not_got(blk, dpth))
686 : return -1;
687 25058 : if(!blk)
688 : {
689 0 : logp("Could not find index from champ chooser: %" PRIu64 "\n", index);
690 : return -1;
691 : }
692 : //logp("Found index from champ chooser: %lu\n", index);
693 : //printf("index from cc: %d\n", index);
694 25058 : blist->blk_from_champ_chooser=blk;
695 : return 0;
696 : }
697 :
698 12524 : static int deal_with_sig_from_chfd(struct iobuf *rbuf, struct blist *blist,
699 : struct dpth *dpth)
700 : {
701 : static struct blk b;
702 12524 : if(blk_set_from_iobuf_index_and_savepath(&b, rbuf))
703 : return -1;
704 :
705 12524 : if(mark_up_to_index(blist, b.index, dpth))
706 : return -1;
707 12524 : blist->blk_from_champ_chooser->savepath=b.savepath;
708 12524 : blist->blk_from_champ_chooser->got=BLK_GOT;
709 12524 : blist->blk_from_champ_chooser->got_save_path=1;
710 12524 : return 0;
711 : }
712 :
713 12534 : static int deal_with_wrap_up_from_chfd(struct iobuf *rbuf, struct blist *blist,
714 : struct dpth *dpth)
715 : {
716 : static struct blk b;
717 12534 : if(blk_set_from_iobuf_wrap_up(&b, rbuf))
718 : return -1;
719 :
720 12534 : if(mark_up_to_index(blist, b.index, dpth)) return -1;
721 12534 : if(mark_not_got(blist->blk_from_champ_chooser, dpth)) return -1;
722 12534 : return 0;
723 : }
724 :
725 25059 : static int deal_with_read_from_chfd(struct asfd *chfd,
726 : struct blist *blist, uint64_t *wrap_up, struct dpth *dpth,
727 : struct cntr *cntr)
728 : {
729 25059 : int ret=-1;
730 :
731 : // Deal with champ chooser read here.
732 : //printf("read from cc: %s\n", chfd->rbuf->buf);
733 25059 : switch(chfd->rbuf->cmd)
734 : {
735 : case CMD_SIG:
736 : // Get these for blks that the champ chooser has found.
737 12524 : if(deal_with_sig_from_chfd(chfd->rbuf, blist, dpth))
738 : goto end;
739 12524 : cntr_add_same(cntr, CMD_DATA);
740 : break;
741 : case CMD_WRAP_UP:
742 12534 : if(deal_with_wrap_up_from_chfd(chfd->rbuf, blist, dpth))
743 : goto end;
744 : break;
745 : default:
746 1 : iobuf_log_unexpected(chfd->rbuf, __func__);
747 : goto end;
748 : }
749 : ret=0;
750 : end:
751 25059 : iobuf_free_content(chfd->rbuf);
752 25059 : return ret;
753 : }
754 :
755 12 : static int check_for_missing_work_in_slist(struct slist *slist)
756 : {
757 12 : struct sbuf *sb=NULL;
758 :
759 12 : if(slist->blist->head)
760 : {
761 : logp("ERROR: finishing but still want block: %" PRIu64 "\n",
762 0 : slist->blist->head->index);
763 : return -1;
764 : }
765 :
766 15 : for(sb=slist->head; sb; sb=sb->next)
767 : {
768 4 : if(!(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST))
769 : {
770 : logp("ERROR: finishing but still waiting for: %c:%s\n",
771 1 : slist->head->path.cmd, slist->head->path.buf);
772 : return -1;
773 : }
774 : }
775 : return 0;
776 : }
777 :
778 : #ifndef UTEST
779 : static
780 : #endif
781 24 : int do_backup_phase2_server_protocol2(struct async *as, struct asfd *chfd,
782 : struct sdirs *sdirs, int resume, struct conf **confs)
783 : {
784 24 : int ret=-1;
785 24 : uint8_t end_flags=0;
786 24 : struct slist *slist=NULL;
787 : struct iobuf wbuf;
788 24 : struct dpth *dpth=NULL;
789 24 : man_off_t *p1pos=NULL;
790 24 : struct manios *manios=NULL;
791 : // This is used to tell the client that a number of consecutive blocks
792 : // have been found and can be freed.
793 24 : uint64_t wrap_up=0;
794 24 : struct asfd *asfd=NULL;
795 24 : struct cntr *cntr=NULL;
796 24 : struct sbuf *csb=NULL;
797 24 : uint64_t file_no=1;
798 :
799 24 : if(!as)
800 : {
801 1 : logp("async not provided to %s()\n", __func__);
802 1 : goto end;
803 : }
804 23 : if(!sdirs)
805 : {
806 2 : logp("sdirs not provided to %s()\n", __func__);
807 2 : goto end;
808 : }
809 21 : if(!confs)
810 : {
811 1 : logp("confs not provided to %s()\n", __func__);
812 1 : goto end;
813 : }
814 20 : asfd=as->asfd;
815 20 : if(!asfd)
816 : {
817 1 : logp("asfd not provided to %s()\n", __func__);
818 1 : goto end;
819 : }
820 19 : if(!chfd)
821 : {
822 1 : logp("chfd not provided to %s()\n", __func__);
823 1 : goto end;
824 : }
825 18 : cntr=get_cntr(confs);
826 36 : if(get_int(confs[OPT_BREAKPOINT])>=2000
827 18 : && get_int(confs[OPT_BREAKPOINT])<3000)
828 : {
829 0 : breaking=get_int(confs[OPT_BREAKPOINT]);
830 0 : breakcount=breaking-2000;
831 : }
832 :
833 18 : logp("Phase 2 begin (recv backup data)\n");
834 :
835 36 : if(!(dpth=dpth_alloc())
836 36 : || dpth_protocol2_init(dpth,
837 : sdirs->data,
838 18 : get_string(confs[OPT_CNAME]),
839 : sdirs->cfiles,
840 36 : get_int(confs[OPT_MAX_STORAGE_SUBDIRS])))
841 : goto end;
842 18 : if(resume && !(p1pos=do_resume(sdirs, dpth, confs)))
843 : goto end;
844 :
845 36 : if(!(manios=manios_open_phase2(sdirs, p1pos, PROTO_2))
846 42 : || !(slist=slist_alloc())
847 36 : || !(csb=sbuf_alloc(PROTO_2)))
848 : goto end;
849 :
850 18 : iobuf_free_content(asfd->rbuf);
851 :
852 : memset(&wbuf, 0, sizeof(struct iobuf));
853 221006 : while(!(end_flags&END_BACKUP))
854 : {
855 220994 : if(maybe_add_from_scan(manios, slist, chfd, &csb))
856 : goto end;
857 :
858 220994 : if(!wbuf.len)
859 : {
860 220994 : if(get_wbuf_from_sigs(&wbuf, slist, &end_flags))
861 : goto end;
862 220994 : if(!wbuf.len)
863 : {
864 : get_wbuf_from_files(&wbuf, slist,
865 208447 : manios, &end_flags, &file_no);
866 : }
867 : }
868 :
869 220994 : if(wbuf.len
870 220994 : && asfd->append_all_to_write_buffer(asfd, &wbuf)==APPEND_ERROR)
871 : goto end;
872 :
873 220993 : if(append_for_champ_chooser(chfd, slist->blist, end_flags))
874 : goto end;
875 :
876 220992 : if(as->read_write(as))
877 : {
878 2 : logp("error from as->read_write in %s\n", __func__);
879 2 : goto end;
880 : }
881 :
882 275648 : while(asfd->rbuf->buf)
883 : {
884 54659 : if(deal_with_read(asfd->rbuf, slist, cntr,
885 54659 : &end_flags, dpth))
886 : goto end;
887 : // Get as much out of the readbuf as possible.
888 54658 : if(asfd->parse_readbuf(asfd))
889 : goto end;
890 : }
891 246047 : while(chfd->rbuf->buf)
892 : {
893 25059 : if(deal_with_read_from_chfd(chfd,
894 25059 : slist->blist, &wrap_up, dpth, cntr))
895 : goto end;
896 : // Get as much out of the readbuf as possible.
897 25058 : if(chfd->parse_readbuf(chfd))
898 : goto end;
899 : }
900 :
901 220988 : if(write_to_changed_file(asfd, chfd, manios,
902 220988 : slist, end_flags))
903 : goto end;
904 : }
905 :
906 : // Hack: If there are some entries left after the last entry that
907 : // contains block data, it will not be written to the changed file
908 : // yet because the last entry of block data has not had
909 : // sb->protocol2->bend set.
910 12 : if(slist->head && slist->head->next)
911 : {
912 8 : struct sbuf *sb=NULL;
913 8 : sb=slist->head;
914 8 : slist->head=sb->next;
915 8 : sbuf_free(&sb);
916 8 : if(write_to_changed_file(asfd, chfd, manios,
917 8 : slist, end_flags))
918 : goto end;
919 : }
920 :
921 12 : if(manios_close(&manios))
922 : goto end;
923 :
924 36 : if(check_for_missing_work_in_slist(slist))
925 : goto end;
926 :
927 : // Need to release the last left. There should be one at most.
928 11 : if(dpth->head && dpth->head->next)
929 : {
930 : logp("ERROR: More data locks remaining after: %s\n",
931 0 : dpth->head->save_path);
932 0 : goto end;
933 : }
934 11 : if(dpth_release_all(dpth)) goto end;
935 :
936 11 : ret=0;
937 : end:
938 24 : logp("End backup\n");
939 24 : sbuf_free(&csb);
940 24 : slist_free(&slist);
941 24 : if(asfd) iobuf_free_content(asfd->rbuf);
942 24 : if(chfd) iobuf_free_content(chfd->rbuf);
943 24 : dpth_free(&dpth);
944 24 : manios_close(&manios);
945 24 : man_off_t_free(&p1pos);
946 24 : return ret;
947 : }
948 :
949 0 : int backup_phase2_server_protocol2(struct async *as, struct sdirs *sdirs,
950 : int resume, struct conf **confs)
951 : {
952 0 : int ret=-1;
953 0 : struct asfd *chfd=NULL;
954 0 : if(!(chfd=champ_chooser_connect(as, sdirs, confs, resume)))
955 : {
956 0 : logp("problem connecting to champ chooser\n");
957 0 : goto end;
958 : }
959 0 : ret=do_backup_phase2_server_protocol2(as, chfd, sdirs, resume, confs);
960 : end:
961 0 : if(chfd) as->asfd_remove(as, chfd);
962 0 : asfd_free(&chfd);
963 0 : return ret;
964 : }
|