Line data Source code
1 : #include "../../burp.h"
2 : #include "../../alloc.h"
3 : #include "../../asfd.h"
4 : #include "../../async.h"
5 : #include "../../attribs.h"
6 : #include "../../base64.h"
7 : #include "../../cmd.h"
8 : #include "../../cntr.h"
9 : #include "../../handy.h"
10 : #include "../../hexmap.h"
11 : #include "../../iobuf.h"
12 : #include "../../log.h"
13 : #include "../../server/manio.h"
14 : #include "../../protocol2/blist.h"
15 : #include "../../slist.h"
16 : #include "../manios.h"
17 : #include "../resume.h"
18 : #include "champ_chooser/champ_server.h"
19 : #include "dpth.h"
20 :
21 : #define END_SIGS 0x01
22 : #define END_BACKUP 0x02
23 : #define END_REQUESTS 0x04
24 : #define END_BLK_REQUESTS 0x08
25 :
26 : static int breaking=0;
27 : static int breakcount=0;
28 :
29 : static int data_needed(struct sbuf *sb)
30 : {
31 33720 : if(sb->path.cmd==CMD_FILE)
32 : return 1;
33 16674 : if(sb->path.cmd==CMD_METADATA)
34 : return 1;
35 : return 0;
36 : }
37 :
38 4 : static int manio_component_to_chfd(struct asfd *chfd, char *path)
39 : {
40 : struct iobuf wbuf;
41 4 : iobuf_from_str(&wbuf, CMD_MANIFEST, path);
42 4 : return chfd->write(chfd, &wbuf);
43 : }
44 :
45 0 : static int unchanged(struct sbuf *csb, struct sbuf *sb,
46 : struct blk **blk, struct manios *manios, struct asfd *chfd)
47 : {
48 0 : int ret=-1;
49 0 : char *fpath=NULL;
50 0 : if(!(fpath=strdup_w(manios->changed->offset->fpath, __func__)))
51 : goto end;
52 0 : if(manio_copy_entry(csb, sb, blk,
53 0 : manios->current, manios->unchanged)<0)
54 : goto end;
55 0 : if(strcmp(fpath, manios->changed->offset->fpath))
56 : {
57 : // If the copy crossed a manio boundary, we should tell the
58 : // champ server to load the previous one as a candidate.
59 0 : if(manio_component_to_chfd(chfd, fpath))
60 : goto end;
61 : }
62 : ret=0;
63 : end:
64 0 : free_w(&fpath);
65 0 : return ret;
66 : }
67 :
68 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
69 0 : static int found_in_current_manifest(struct sbuf *csb, struct sbuf *sb,
70 : struct manios *manios, struct blk **blk, struct asfd *chfd)
71 : {
72 : // Located the entry in the current manifest.
73 : // If the file type changed, I think it is time to back it up again
74 : // (for example, EFS changing to normal file, or back again).
75 0 : if(csb->path.cmd!=sb->path.cmd)
76 : {
77 0 : if(manio_forward_through_sigs(csb, blk, manios->current)<0)
78 : return -1;
79 0 : return 1;
80 : }
81 :
82 : // mtime is the actual file data.
83 : // ctime is the attributes or meta data.
84 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
85 0 : && csb->statp.st_ctime==sb->statp.st_ctime)
86 : {
87 : // Got an unchanged file.
88 0 : return unchanged(csb, sb, blk, manios, chfd);
89 : }
90 :
91 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
92 0 : && csb->statp.st_ctime!=sb->statp.st_ctime)
93 : {
94 : // FIX THIS:
95 : // File data stayed the same, but attributes or meta data
96 : // changed. We already have the attributes, but may need to
97 : // get extra meta data.
98 0 : return unchanged(csb, sb, blk, manios, chfd);
99 : }
100 :
101 : // File data changed.
102 0 : if(manio_forward_through_sigs(csb, blk, manios->current)<0)
103 : return -1;
104 0 : return 1;
105 : }
106 :
107 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
108 33720 : static int entry_changed(struct sbuf *sb,
109 : struct manios *manios, struct asfd *chfd, struct sbuf **csb)
110 : {
111 : static int finished=0;
112 : static struct blk *blk=NULL;
113 :
114 33720 : if(finished) return 1;
115 :
116 10 : if((*csb)->path.buf)
117 : {
118 : // Already have an entry.
119 : }
120 : else
121 : {
122 : // Need to read another.
123 10 : if(!blk && !(blk=blk_alloc())) return -1;
124 10 : switch(manio_read_with_blk(manios->current,
125 10 : *csb, blk, NULL))
126 : {
127 : case 1: // Reached the end.
128 10 : sbuf_free(csb);
129 10 : blk_free(&blk);
130 10 : finished=1;
131 10 : return 1;
132 : case -1: return -1;
133 : }
134 0 : if(!(*csb)->path.buf)
135 : {
136 0 : logp("Should have a path at this point, but do not, in %s\n", __func__);
137 0 : return -1;
138 : }
139 : // Got an entry.
140 : }
141 :
142 : while(1)
143 : {
144 0 : switch(sbuf_pathcmp(*csb, sb))
145 : {
146 : case 0: return found_in_current_manifest(*csb, sb,
147 0 : manios, &blk, chfd);
148 : case 1: return 1;
149 : case -1:
150 : // Behind - need to read more data from the old
151 : // manifest.
152 0 : switch(manio_read_with_blk(manios->current,
153 0 : *csb, blk, NULL))
154 : {
155 : case 1: // Reached the end.
156 0 : sbuf_free(csb);
157 0 : blk_free(&blk);
158 0 : return 1;
159 : case -1: return -1;
160 : }
161 : // Got something, go back around the loop.
162 : }
163 : }
164 :
165 : return 0;
166 : }
167 :
168 12535 : static int add_data_to_store(struct cntr *cntr,
169 : struct slist *slist, struct iobuf *rbuf, struct dpth *dpth)
170 : {
171 : static struct blk *blk=NULL;
172 :
173 : // Find the first one in the list that was requested.
174 : // FIX THIS: Going up the list here, and then later
175 : // when writing to the manifest is not efficient.
176 12535 : for(blk=slist->blist->head;
177 12534 : blk && (!blk->requested || blk->got==BLK_GOT); blk=blk->next)
178 : {
179 : // logp("try: %d %d\n", blk->index, blk->got);
180 : }
181 12535 : if(!blk)
182 : {
183 1 : logp("Received data but could not find next requested block.\n");
184 1 : if(!slist->blist->head)
185 1 : logp("and slist->blist->head is null\n");
186 : else
187 0 : logp("head index: %" PRIu64 "\n", slist->blist->head->index);
188 : return -1;
189 : }
190 :
191 : // Add it to the data store straight away.
192 12534 : if(dpth_protocol2_fwrite(dpth, rbuf, blk)) return -1;
193 :
194 12534 : cntr_add(cntr, CMD_DATA, 0);
195 12534 : cntr_add_recvbytes(cntr, blk->length);
196 :
197 12534 : blk->got=BLK_GOT;
198 12534 : blk=blk->next;
199 :
200 : return 0;
201 : }
202 :
203 17034 : static int set_up_for_sig_info(struct slist *slist, struct iobuf *attr,
204 : uint64_t index)
205 : {
206 : struct sbuf *sb;
207 :
208 50701 : for(sb=slist->add_sigs_here; sb; sb=sb->next)
209 : {
210 50701 : if(!sb->protocol2->index)
211 : continue;
212 34059 : if(index==sb->protocol2->index)
213 : break;
214 : }
215 17034 : if(!sb)
216 : {
217 0 : logp("Could not find %" PRIu64 " in request list\n", index);
218 : return -1;
219 : }
220 : // Replace the attribs with the more recent values.
221 17034 : iobuf_free_content(&sb->attr);
222 17034 : iobuf_move(&sb->attr, attr);
223 :
224 : // Mark the end of the previous file.
225 17034 : slist->add_sigs_here->protocol2->bend=slist->blist->tail;
226 :
227 17034 : slist->add_sigs_here=sb;
228 :
229 : // Incoming sigs now need to get added to 'add_sigs_here'
230 : return 0;
231 : }
232 :
233 : /*
234 : static void dump_blks(const char *msg, struct blk *b)
235 : {
236 : struct blk *xx;
237 : for(xx=b; xx; xx=xx->next)
238 : printf("%s: %d %d %p\n", msg, xx->index, xx->got, xx);
239 : }
240 : */
241 :
242 25058 : static int add_to_sig_list(struct slist *slist, struct iobuf *rbuf)
243 : {
244 : // Goes on slist->add_sigs_here
245 : struct blk *blk;
246 : struct protocol2 *protocol2;
247 :
248 25058 : if(!(blk=blk_alloc())) return -1;
249 25058 : blist_add_blk(slist->blist, blk);
250 :
251 25058 : protocol2=slist->add_sigs_here->protocol2;
252 25058 : if(!protocol2->bstart) protocol2->bstart=blk;
253 25058 : if(!protocol2->bsighead) protocol2->bsighead=blk;
254 :
255 25058 : if(blk_set_from_iobuf_sig(blk, rbuf)) return -1;
256 :
257 : // Need to send sigs to champ chooser, therefore need to point
258 : // to the oldest unsent one if nothing is pointed to yet.
259 25058 : if(!slist->blist->blk_for_champ_chooser)
260 16490 : slist->blist->blk_for_champ_chooser=blk;
261 :
262 : return 0;
263 : }
264 :
265 54659 : static int deal_with_read(struct iobuf *rbuf, struct slist *slist,
266 : struct cntr *cntr, uint8_t *end_flags, struct dpth *dpth)
267 : {
268 54659 : int ret=0;
269 :
270 54659 : switch(rbuf->cmd)
271 : {
272 : /* Incoming block data. */
273 : case CMD_DATA:
274 12535 : if(add_data_to_store(cntr, slist, rbuf, dpth))
275 : goto error;
276 : goto end;
277 :
278 : /* Incoming block signatures. */
279 : case CMD_ATTRIBS_SIGS:
280 : static struct iobuf attr;
281 : static uint64_t index;
282 :
283 17034 : iobuf_init(&attr);
284 17034 : iobuf_move(&attr, rbuf);
285 17034 : index=decode_file_no(&attr);
286 :
287 : // Need to go through slist to find the matching
288 : // entry.
289 17034 : if(set_up_for_sig_info(slist, &attr, index))
290 : goto error;
291 : return 0;
292 : case CMD_SIG:
293 25058 : if(add_to_sig_list(slist, rbuf))
294 : goto error;
295 : goto end;
296 :
297 : /* Incoming control/message stuff. */
298 : case CMD_MESSAGE:
299 : case CMD_WARNING:
300 : {
301 6 : struct cntr *cntr=NULL;
302 6 : log_recvd(rbuf, cntr, 0);
303 6 : goto end;
304 : }
305 : case CMD_GEN:
306 25 : if(!strcmp(rbuf->buf, "sigs_end"))
307 : {
308 13 : (*end_flags)|=END_SIGS;
309 13 : goto end;
310 : }
311 12 : else if(!strcmp(rbuf->buf, "backup_end"))
312 : {
313 12 : (*end_flags)|=END_BACKUP;
314 12 : goto end;
315 : }
316 : break;
317 : case CMD_INTERRUPT:
318 : {
319 : uint64_t file_no;
320 1 : file_no=base64_to_uint64(rbuf->buf);
321 1 : if(slist_del_sbuf_by_index(slist, file_no))
322 : goto error;
323 : goto end;
324 : }
325 : default:
326 : break;
327 : }
328 :
329 0 : iobuf_log_unexpected(rbuf, __func__);
330 : error:
331 : ret=-1;
332 : end:
333 37625 : iobuf_free_content(rbuf);
334 37625 : return ret;
335 : }
336 :
337 220994 : static int get_wbuf_from_sigs(struct iobuf *wbuf, struct slist *slist,
338 : uint8_t *end_flags)
339 : {
340 : static char req[32]="";
341 220994 : struct sbuf *sb=slist->blks_to_request;
342 :
343 13387 : while(sb && !(sb->flags & SBUF_NEED_DATA)) sb=sb->next;
344 :
345 220994 : if(!sb)
346 : {
347 16 : slist->blks_to_request=NULL;
348 16 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
349 : {
350 : iobuf_from_str(wbuf,
351 3 : CMD_GEN, (char *)"blk_requests_end");
352 3 : (*end_flags)|=END_BLK_REQUESTS;
353 : }
354 : return 0;
355 : }
356 220978 : if(!sb->protocol2->bsighead)
357 : {
358 : // Trying to move onto the next file.
359 : // ??? Does this really work?
360 133039 : if(sb->protocol2->bend)
361 : {
362 0 : slist->blks_to_request=sb->next;
363 : printf("move to next\n");
364 : }
365 133039 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
366 : {
367 : iobuf_from_str(wbuf,
368 10 : CMD_GEN, (char *)"blk_requests_end");
369 10 : (*end_flags)|=END_BLK_REQUESTS;
370 : }
371 : return 0;
372 : }
373 :
374 87939 : if(sb->protocol2->bsighead->got==BLK_INCOMING)
375 : return 0;
376 :
377 12538 : if(sb->protocol2->bsighead->got==BLK_NOT_GOT)
378 : {
379 12534 : base64_from_uint64(sb->protocol2->bsighead->index, req);
380 12534 : iobuf_from_str(wbuf, CMD_DATA_REQ, req);
381 12534 : sb->protocol2->bsighead->requested=1;
382 : }
383 :
384 : // Move on.
385 12538 : if(sb->protocol2->bsighead==sb->protocol2->bend)
386 : {
387 8517 : slist->blks_to_request=sb->next;
388 8517 : sb->protocol2->bsighead=sb->protocol2->bstart;
389 : }
390 : else
391 : {
392 4021 : sb->protocol2->bsighead=sb->protocol2->bsighead->next;
393 : }
394 : return 0;
395 : }
396 :
397 208447 : static void get_wbuf_from_files(struct iobuf *wbuf, struct slist *slist,
398 : struct manios *manios, uint8_t *end_flags, uint64_t *file_no)
399 : {
400 208447 : struct sbuf *sb=slist->last_requested;
401 208447 : if(!sb)
402 : {
403 157689 : if(!manios->phase1 && !((*end_flags)&END_REQUESTS))
404 : {
405 18 : iobuf_from_str(wbuf, CMD_GEN, (char *)"requests_end");
406 18 : (*end_flags)|=END_REQUESTS;
407 : }
408 : return;
409 : }
410 :
411 50758 : if(sb->flags & SBUF_SENT_PATH || !(sb->flags & SBUF_NEED_DATA))
412 : {
413 33712 : slist->last_requested=sb->next;
414 : return;
415 : }
416 :
417 : // Only need to request the path at this stage.
418 17046 : iobuf_copy(wbuf, &sb->path);
419 17046 : sb->flags |= SBUF_SENT_PATH;
420 17046 : sb->protocol2->index=(*file_no)++;
421 : }
422 :
423 2 : static void get_wbuf_from_index(struct iobuf *wbuf, uint64_t index)
424 : {
425 : static char *p;
426 : static char tmp[32];
427 2 : p=tmp;
428 2 : p+=to_base64(index, tmp);
429 2 : *p='\0';
430 2 : iobuf_from_str(wbuf, CMD_WRAP_UP, tmp);
431 2 : }
432 :
433 33704 : static int write_endfile(struct sbuf *sb, struct manios *manios)
434 : {
435 : struct iobuf endfile;
436 :
437 33704 : if(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST)
438 : return 0;
439 33704 : if(!iobuf_is_filedata(&sb->path))
440 : return 0;
441 :
442 17036 : sb->flags |= SBUF_END_WRITTEN_TO_MANIFEST;
443 : // FIX THIS: Should give a proper length and md5sum.
444 17036 : iobuf_from_str(&endfile, CMD_END_FILE, (char *)"0:0");
445 17036 : return iobuf_send_msg_fzp(&endfile, manios->changed->fzp);
446 : }
447 :
448 238004 : static void blist_adjust_head(struct blist *blist, struct sbuf *sb)
449 : {
450 : struct blk *b;
451 263062 : while(blist->head!=sb->protocol2->bstart)
452 : {
453 25058 : b=blist->head->next;
454 25058 : if(blist->head==blist->blk_from_champ_chooser)
455 8508 : blist->blk_from_champ_chooser=b;
456 25058 : blk_free(&blist->head);
457 25058 : blist->head=b;
458 : }
459 238004 : if(!blist->head)
460 51771 : blist->tail=NULL;
461 238004 : }
462 :
463 238004 : static int sbuf_needs_data(struct sbuf *sb, struct asfd *asfd,
464 : struct asfd *chfd, struct manios *manios,
465 : struct slist *slist, int end_flags)
466 : {
467 238004 : int ret=-1;
468 : struct blk *blk;
469 : static struct iobuf wbuf;
470 238004 : struct blist *blist=slist->blist;
471 :
472 238004 : if(!(sb->flags & SBUF_HEADER_WRITTEN_TO_MANIFEST))
473 : {
474 17036 : if(manio_write_sbuf(manios->changed, sb)) goto end;
475 17036 : sb->flags |= SBUF_HEADER_WRITTEN_TO_MANIFEST;
476 : }
477 :
478 246037 : while((blk=sb->protocol2->bstart)
479 194266 : && blk->got==BLK_GOT
480 305450 : && (blk->next || end_flags&END_BACKUP))
481 : {
482 25058 : if(blk->got_save_path
483 25058 : && !blk_is_zero_length(blk))
484 : {
485 25058 : if(breaking && breakcount--==0)
486 : {
487 0 : breakpoint(breaking, __func__);
488 0 : goto end;
489 : }
490 25058 : if(manio_write_sig_and_path(manios->changed, blk))
491 : goto end;
492 25058 : if(manios->changed->sig_count==0)
493 : {
494 : // Have finished a manifest file. Want to start
495 : // using it as a dedup candidate now.
496 4 : if(manio_component_to_chfd(chfd,
497 4 : manios->changed->offset->ppath))
498 : goto end;
499 :
500 4 : if(!blk->requested)
501 : {
502 : // Also let the client know, so that it
503 : // can free memory if there was a long
504 : // consecutive number of unrequested
505 : // blocks.
506 2 : get_wbuf_from_index(&wbuf, blk->index);
507 2 : if(asfd->write(asfd, &wbuf)) goto end;
508 : }
509 : }
510 : }
511 :
512 25058 : if(blk==sb->protocol2->bend)
513 : {
514 17025 : blist_adjust_head(blist, sb);
515 17025 : if(write_endfile(sb, manios)) return -1;
516 17025 : slist_advance(slist);
517 17025 : return 1;
518 : }
519 :
520 8033 : if(sb->protocol2->bsighead==sb->protocol2->bstart)
521 8020 : sb->protocol2->bsighead=blk->next;
522 8033 : sb->protocol2->bstart=blk->next;
523 8033 : if(blk==blist->blk_from_champ_chooser)
524 4021 : blist->blk_from_champ_chooser=blk->next;
525 : }
526 220979 : if(!blk && sb && !sb->protocol2->bend && (end_flags&END_BACKUP))
527 : {
528 : // Write endfile for the very last file.
529 11 : if(write_endfile(sb, manios)) return -1;
530 : }
531 : ret=0;
532 : end:
533 220979 : blist_adjust_head(blist, sb);
534 220979 : return ret;
535 : }
536 :
537 220996 : static int write_to_changed_file(struct asfd *asfd,
538 : struct asfd *chfd, struct manios *manios,
539 : struct slist *slist, int end_flags)
540 : {
541 : struct sbuf *sb;
542 220996 : if(!slist) return 0;
543 :
544 254689 : while((sb=slist->head))
545 : {
546 254672 : if(sb->flags & SBUF_NEED_DATA)
547 : {
548 238004 : switch(sbuf_needs_data(sb, asfd, chfd, manios, slist,
549 : end_flags))
550 : {
551 : case 0: return 0;
552 : case 1: continue;
553 0 : default: return -1;
554 : }
555 :
556 : }
557 : else
558 : {
559 : // No change, can go straight in.
560 16668 : if(manio_write_sbuf(manios->changed, sb)) return -1;
561 16668 : if(write_endfile(sb, manios)) return -1;
562 :
563 : // Move along.
564 16668 : slist_advance(slist);
565 : }
566 : }
567 : return 0;
568 : }
569 :
570 220994 : static int maybe_add_from_scan(struct manios *manios,
571 : struct slist *slist, struct asfd *chfd, struct sbuf **csb)
572 : {
573 220994 : int ret=-1;
574 220994 : struct sbuf *snew=NULL;
575 :
576 : while(1)
577 : {
578 254714 : sbuf_free(&snew);
579 254714 : if(!manios->phase1) return 0;
580 : // Limit the amount loaded into memory at any one time.
581 33738 : if(slist && slist->head)
582 : {
583 33720 : if(slist->head->protocol2->index
584 33720 : - slist->tail->protocol2->index>4096)
585 : return 0;
586 : }
587 67458 : if(!(snew=sbuf_alloc(PROTO_2))) goto end;
588 :
589 33738 : switch(manio_read(manios->phase1, snew))
590 : {
591 : case 0: break;
592 18 : case 1: manio_close(&manios->phase1);
593 18 : ret=0; // Finished.
594 : default: goto end;
595 : }
596 :
597 33720 : switch(entry_changed(snew, manios, chfd, csb))
598 : {
599 : case 0: continue; // No change.
600 : case 1: break;
601 : default: goto end; // Error.
602 : }
603 :
604 101160 : if(data_needed(snew)) snew->flags|=SBUF_NEED_DATA;
605 :
606 33720 : slist_add_sbuf(slist, snew);
607 33720 : snew=NULL;
608 : }
609 : return 0;
610 : end:
611 18 : sbuf_free(&snew);
612 18 : return ret;
613 : }
614 :
615 220993 : static int append_for_champ_chooser(struct asfd *chfd,
616 : struct blist *blist, int end_flags)
617 : {
618 : static int finished_sending=0;
619 : static struct iobuf wbuf;
620 : static struct blk *blk=NULL;
621 :
622 246051 : while(blist->blk_for_champ_chooser)
623 : {
624 114082 : blk=blist->blk_for_champ_chooser;
625 : // If we send too many blocks to the champ chooser at once,
626 : // it can go faster than we can send paths to completed
627 : // manifests to it. This means that deduplication efficiency
628 : // is reduced (although speed may be faster).
629 : // So limit the sending.
630 114082 : if(blk->index
631 114082 : - blist->head->index > MANIFEST_SIG_MAX)
632 : return 0;
633 :
634 25058 : blk_to_iobuf_sig(blk, &wbuf);
635 :
636 25058 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
637 : {
638 : case APPEND_OK: break;
639 : case APPEND_BLOCKED:
640 : return 0; // Try again later.
641 : default: return -1;
642 : }
643 25058 : blist->blk_for_champ_chooser=blk->next;
644 : }
645 131969 : if(end_flags&END_SIGS
646 52730 : && !finished_sending && !blist->blk_for_champ_chooser)
647 : {
648 13 : iobuf_from_str(&wbuf, CMD_GEN, (char *)"sigs_end");
649 13 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
650 : {
651 : case APPEND_OK: break;
652 : case APPEND_BLOCKED:
653 : return 0; // Try again later.
654 : default: return -1;
655 : }
656 12 : finished_sending++;
657 : }
658 : return 0;
659 : }
660 :
661 25063 : static int mark_not_got(struct blk *blk, struct dpth *dpth)
662 : {
663 : const char *path;
664 :
665 25063 : if(blk->got!=BLK_INCOMING) return 0;
666 12534 : blk->got=BLK_NOT_GOT;
667 :
668 : // Need to get the data for this blk from the client.
669 : // Set up the details of where it will be saved.
670 12534 : if(!(path=dpth_protocol2_mk(dpth))) return -1;
671 :
672 : // FIX THIS: make dpth give us the path in a uint8 array.
673 12534 : blk->savepath=savepathstr_with_sig_to_uint64(path);
674 12534 : blk->got_save_path=1;
675 12534 : if(dpth_protocol2_incr_sig(dpth)) return -1;
676 12534 : return 0;
677 : }
678 :
679 25058 : static int mark_up_to_index(struct blist *blist,
680 : uint64_t index, struct dpth *dpth)
681 : {
682 : struct blk *blk;
683 :
684 : // Mark everything that was not got, up to the given index.
685 37587 : for(blk=blist->blk_from_champ_chooser;
686 37587 : blk && blk->index!=index; blk=blk->next)
687 12529 : if(mark_not_got(blk, dpth))
688 : return -1;
689 25058 : if(!blk)
690 : {
691 0 : logp("Could not find index from champ chooser: %" PRIu64 "\n", index);
692 : return -1;
693 : }
694 : //logp("Found index from champ chooser: %lu\n", index);
695 : //printf("index from cc: %d\n", index);
696 25058 : blist->blk_from_champ_chooser=blk;
697 : return 0;
698 : }
699 :
700 12524 : static int deal_with_sig_from_chfd(struct iobuf *rbuf, struct blist *blist,
701 : struct dpth *dpth)
702 : {
703 : static struct blk b;
704 12524 : if(blk_set_from_iobuf_index_and_savepath(&b, rbuf))
705 : return -1;
706 :
707 12524 : if(mark_up_to_index(blist, b.index, dpth))
708 : return -1;
709 12524 : blist->blk_from_champ_chooser->savepath=b.savepath;
710 12524 : blist->blk_from_champ_chooser->got=BLK_GOT;
711 12524 : blist->blk_from_champ_chooser->got_save_path=1;
712 12524 : return 0;
713 : }
714 :
715 12534 : static int deal_with_wrap_up_from_chfd(struct iobuf *rbuf, struct blist *blist,
716 : struct dpth *dpth)
717 : {
718 : static struct blk b;
719 12534 : if(blk_set_from_iobuf_wrap_up(&b, rbuf))
720 : return -1;
721 :
722 12534 : if(mark_up_to_index(blist, b.index, dpth)) return -1;
723 12534 : if(mark_not_got(blist->blk_from_champ_chooser, dpth)) return -1;
724 12534 : return 0;
725 : }
726 :
727 25059 : static int deal_with_read_from_chfd(struct asfd *chfd,
728 : struct blist *blist, uint64_t *wrap_up, struct dpth *dpth,
729 : struct cntr *cntr)
730 : {
731 25059 : int ret=-1;
732 :
733 : // Deal with champ chooser read here.
734 : //printf("read from cc: %s\n", chfd->rbuf->buf);
735 25059 : switch(chfd->rbuf->cmd)
736 : {
737 : case CMD_SIG:
738 : // Get these for blks that the champ chooser has found.
739 12524 : if(deal_with_sig_from_chfd(chfd->rbuf, blist, dpth))
740 : goto end;
741 12524 : cntr_add_same(cntr, CMD_DATA);
742 : break;
743 : case CMD_WRAP_UP:
744 12534 : if(deal_with_wrap_up_from_chfd(chfd->rbuf, blist, dpth))
745 : goto end;
746 : break;
747 : default:
748 1 : iobuf_log_unexpected(chfd->rbuf, __func__);
749 : goto end;
750 : }
751 : ret=0;
752 : end:
753 25059 : iobuf_free_content(chfd->rbuf);
754 25059 : return ret;
755 : }
756 :
757 : static struct asfd *get_asfd_from_list_by_fdtype(struct async *as,
758 : enum asfd_fdtype fdtype)
759 : {
760 : struct asfd *a;
761 19 : for(a=as->asfd; a; a=a->next)
762 37 : if(a->fdtype==fdtype) return a;
763 : return NULL;
764 : }
765 :
766 12 : static int check_for_missing_work_in_slist(struct slist *slist)
767 : {
768 12 : struct sbuf *sb=NULL;
769 :
770 12 : if(slist->blist->head)
771 : {
772 : logp("ERROR: finishing but still want block: %" PRIu64 "\n",
773 0 : slist->blist->head->index);
774 : return -1;
775 : }
776 :
777 15 : for(sb=slist->head; sb; sb=sb->next)
778 : {
779 4 : if(!(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST))
780 : {
781 : logp("ERROR: finishing but still waiting for: %c:%s\n",
782 1 : slist->head->path.cmd, slist->head->path.buf);
783 : return -1;
784 : }
785 : }
786 : return 0;
787 : }
788 :
789 24 : int backup_phase2_server_protocol2(struct async *as, struct sdirs *sdirs,
790 : int resume, struct conf **confs)
791 : {
792 24 : int ret=-1;
793 24 : uint8_t end_flags=0;
794 24 : struct slist *slist=NULL;
795 : struct iobuf wbuf;
796 24 : struct dpth *dpth=NULL;
797 24 : man_off_t *p1pos=NULL;
798 24 : struct manios *manios=NULL;
799 : // This is used to tell the client that a number of consecutive blocks
800 : // have been found and can be freed.
801 24 : uint64_t wrap_up=0;
802 24 : struct asfd *asfd=NULL;
803 24 : struct asfd *chfd=NULL;
804 24 : struct cntr *cntr=NULL;
805 24 : struct sbuf *csb=NULL;
806 24 : uint64_t file_no=1;
807 :
808 24 : if(!as)
809 : {
810 1 : logp("async not provided to %s()\n", __func__);
811 1 : goto end;
812 : }
813 23 : if(!sdirs)
814 : {
815 2 : logp("sdirs not provided to %s()\n", __func__);
816 2 : goto end;
817 : }
818 21 : if(!confs)
819 : {
820 1 : logp("confs not provided to %s()\n", __func__);
821 1 : goto end;
822 : }
823 20 : asfd=as->asfd;
824 20 : if(!asfd)
825 : {
826 1 : logp("asfd not provided to %s()\n", __func__);
827 1 : goto end;
828 : }
829 19 : chfd=get_asfd_from_list_by_fdtype(as, ASFD_FD_SERVER_TO_CHAMP_CHOOSER);
830 19 : if(!chfd)
831 : {
832 1 : logp("chfd not provided to %s()\n", __func__);
833 1 : goto end;
834 : }
835 18 : cntr=get_cntr(confs);
836 36 : if(get_int(confs[OPT_BREAKPOINT])>=2000
837 18 : && get_int(confs[OPT_BREAKPOINT])<3000)
838 : {
839 0 : breaking=get_int(confs[OPT_BREAKPOINT]);
840 0 : breakcount=breaking-2000;
841 : }
842 :
843 18 : logp("Phase 2 begin (recv backup data)\n");
844 :
845 36 : if(!(dpth=dpth_alloc())
846 36 : || dpth_protocol2_init(dpth,
847 : sdirs->data,
848 18 : get_string(confs[OPT_CNAME]),
849 : sdirs->cfiles,
850 36 : get_int(confs[OPT_MAX_STORAGE_SUBDIRS])))
851 : goto end;
852 18 : if(resume && !(p1pos=do_resume(sdirs, dpth, confs)))
853 : goto end;
854 :
855 36 : if(!(manios=manios_open_phase2(sdirs, p1pos, PROTO_2))
856 42 : || !(slist=slist_alloc())
857 36 : || !(csb=sbuf_alloc(PROTO_2)))
858 : goto end;
859 :
860 18 : iobuf_free_content(asfd->rbuf);
861 :
862 : memset(&wbuf, 0, sizeof(struct iobuf));
863 221006 : while(!(end_flags&END_BACKUP))
864 : {
865 220994 : if(maybe_add_from_scan(manios, slist, chfd, &csb))
866 : goto end;
867 :
868 220994 : if(!wbuf.len)
869 : {
870 220994 : if(get_wbuf_from_sigs(&wbuf, slist, &end_flags))
871 : goto end;
872 220994 : if(!wbuf.len)
873 : {
874 : get_wbuf_from_files(&wbuf, slist,
875 208447 : manios, &end_flags, &file_no);
876 : }
877 : }
878 :
879 220994 : if(wbuf.len
880 220994 : && asfd->append_all_to_write_buffer(asfd, &wbuf)==APPEND_ERROR)
881 : goto end;
882 :
883 220993 : if(append_for_champ_chooser(chfd, slist->blist, end_flags))
884 : goto end;
885 :
886 220992 : if(as->read_write(as))
887 : {
888 2 : logp("error from as->read_write in %s\n", __func__);
889 2 : goto end;
890 : }
891 :
892 275648 : while(asfd->rbuf->buf)
893 : {
894 54659 : if(deal_with_read(asfd->rbuf, slist, cntr,
895 54659 : &end_flags, dpth))
896 : goto end;
897 : // Get as much out of the readbuf as possible.
898 54658 : if(asfd->parse_readbuf(asfd))
899 : goto end;
900 : }
901 246047 : while(chfd->rbuf->buf)
902 : {
903 25059 : if(deal_with_read_from_chfd(chfd,
904 25059 : slist->blist, &wrap_up, dpth, cntr))
905 : goto end;
906 : // Get as much out of the readbuf as possible.
907 25058 : if(chfd->parse_readbuf(chfd))
908 : goto end;
909 : }
910 :
911 220988 : if(write_to_changed_file(asfd, chfd, manios,
912 220988 : slist, end_flags))
913 : goto end;
914 : }
915 :
916 : // Hack: If there are some entries left after the last entry that
917 : // contains block data, it will not be written to the changed file
918 : // yet because the last entry of block data has not had
919 : // sb->protocol2->bend set.
920 12 : if(slist->head && slist->head->next)
921 : {
922 8 : struct sbuf *sb=NULL;
923 8 : sb=slist->head;
924 8 : slist->head=sb->next;
925 8 : sbuf_free(&sb);
926 8 : if(write_to_changed_file(asfd, chfd, manios,
927 8 : slist, end_flags))
928 : goto end;
929 : }
930 :
931 12 : if(manios_close(&manios))
932 : goto end;
933 :
934 36 : if(check_for_missing_work_in_slist(slist))
935 : goto end;
936 :
937 : // Need to release the last left. There should be one at most.
938 11 : if(dpth->head && dpth->head->next)
939 : {
940 : logp("ERROR: More data locks remaining after: %s\n",
941 0 : dpth->head->save_path);
942 0 : goto end;
943 : }
944 11 : if(dpth_release_all(dpth)) goto end;
945 :
946 11 : ret=0;
947 : end:
948 24 : logp("End backup\n");
949 24 : sbuf_free(&csb);
950 24 : slist_free(&slist);
951 24 : if(asfd) iobuf_free_content(asfd->rbuf);
952 24 : if(chfd) iobuf_free_content(chfd->rbuf);
953 24 : dpth_free(&dpth);
954 24 : manios_close(&manios);
955 24 : man_off_t_free(&p1pos);
956 24 : return ret;
957 : }
|