Line data Source code
1 : #include "../../burp.h"
2 : #include "../../alloc.h"
3 : #include "../../asfd.h"
4 : #include "../../async.h"
5 : #include "../../attribs.h"
6 : #include "../../base64.h"
7 : #include "../../cmd.h"
8 : #include "../../cntr.h"
9 : #include "../../handy.h"
10 : #include "../../hexmap.h"
11 : #include "../../iobuf.h"
12 : #include "../../log.h"
13 : #include "../../server/manio.h"
14 : #include "../../protocol2/blist.h"
15 : #include "../../slist.h"
16 : #include "../manios.h"
17 : #include "../resume.h"
18 : #include "champ_chooser/champ_server.h"
19 : #include "dpth.h"
20 :
21 : #define END_SIGS 0x01
22 : #define END_BACKUP 0x02
23 : #define END_REQUESTS 0x04
24 : #define END_BLK_REQUESTS 0x08
25 :
26 : static int breaking=0;
27 : static int breakcount=0;
28 :
29 : static int data_needed(struct sbuf *sb)
30 : {
31 33720 : if(sb->path.cmd==CMD_FILE) return 1;
32 : return 0;
33 : }
34 :
35 4 : static int manio_component_to_chfd(struct asfd *chfd, char *path)
36 : {
37 : struct iobuf wbuf;
38 4 : iobuf_from_str(&wbuf, CMD_MANIFEST, path);
39 4 : return chfd->write(chfd, &wbuf);
40 : }
41 :
42 0 : static int unchanged(struct sbuf *csb, struct sbuf *sb,
43 : struct blk **blk, struct manios *manios, struct asfd *chfd)
44 : {
45 0 : int ret=-1;
46 0 : char *fpath=NULL;
47 0 : if(!(fpath=strdup_w(manios->changed->offset->fpath, __func__)))
48 : goto end;
49 0 : if(manio_copy_entry(csb, sb, blk,
50 0 : manios->current, manios->unchanged)<0)
51 : goto end;
52 0 : if(strcmp(fpath, manios->changed->offset->fpath))
53 : {
54 : // If the copy crossed a manio boundary, we should tell the
55 : // champ server to load the previous one as a candidate.
56 0 : if(manio_component_to_chfd(chfd, fpath))
57 : goto end;
58 : }
59 : ret=0;
60 : end:
61 0 : free_w(&fpath);
62 0 : return ret;
63 : }
64 :
65 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
66 0 : static int found_in_current_manifest(struct sbuf *csb, struct sbuf *sb,
67 : struct manios *manios, struct blk **blk, struct asfd *chfd)
68 : {
69 : // Located the entry in the current manifest.
70 : // If the file type changed, I think it is time to back it up again
71 : // (for example, EFS changing to normal file, or back again).
72 0 : if(csb->path.cmd!=sb->path.cmd)
73 : {
74 0 : if(manio_forward_through_sigs(csb, blk, manios->current)<0)
75 : return -1;
76 0 : return 1;
77 : }
78 :
79 : // mtime is the actual file data.
80 : // ctime is the attributes or meta data.
81 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
82 0 : && csb->statp.st_ctime==sb->statp.st_ctime)
83 : {
84 : // Got an unchanged file.
85 0 : return unchanged(csb, sb, blk, manios, chfd);
86 : }
87 :
88 0 : if(csb->statp.st_mtime==sb->statp.st_mtime
89 0 : && csb->statp.st_ctime!=sb->statp.st_ctime)
90 : {
91 : // FIX THIS:
92 : // File data stayed the same, but attributes or meta data
93 : // changed. We already have the attributes, but may need to
94 : // get extra meta data.
95 0 : return unchanged(csb, sb, blk, manios, chfd);
96 : }
97 :
98 : // File data changed.
99 0 : if(manio_forward_through_sigs(csb, blk, manios->current)<0)
100 : return -1;
101 0 : return 1;
102 : }
103 :
104 : // Return -1 for error, 0 for entry not changed, 1 for entry changed (or new).
105 33720 : static int entry_changed(struct sbuf *sb,
106 : struct manios *manios, struct asfd *chfd, struct sbuf **csb)
107 : {
108 : static int finished=0;
109 : static struct blk *blk=NULL;
110 :
111 33720 : if(finished) return 1;
112 :
113 10 : if((*csb)->path.buf)
114 : {
115 : // Already have an entry.
116 : }
117 : else
118 : {
119 : // Need to read another.
120 10 : if(!blk && !(blk=blk_alloc())) return -1;
121 10 : switch(manio_read_with_blk(manios->current,
122 10 : *csb, blk, NULL))
123 : {
124 : case 1: // Reached the end.
125 10 : sbuf_free(csb);
126 10 : blk_free(&blk);
127 10 : finished=1;
128 10 : return 1;
129 : case -1: return -1;
130 : }
131 0 : if(!(*csb)->path.buf)
132 : {
133 0 : logp("Should have a path at this point, but do not, in %s\n", __func__);
134 0 : return -1;
135 : }
136 : // Got an entry.
137 : }
138 :
139 : while(1)
140 : {
141 0 : switch(sbuf_pathcmp(*csb, sb))
142 : {
143 : case 0: return found_in_current_manifest(*csb, sb,
144 0 : manios, &blk, chfd);
145 : case 1: return 1;
146 : case -1:
147 : // Behind - need to read more data from the old
148 : // manifest.
149 0 : switch(manio_read_with_blk(manios->current,
150 0 : *csb, blk, NULL))
151 : {
152 : case 1: // Reached the end.
153 0 : sbuf_free(csb);
154 0 : blk_free(&blk);
155 0 : return 1;
156 : case -1: return -1;
157 : }
158 : // Got something, go back around the loop.
159 : }
160 : }
161 :
162 : return 0;
163 : }
164 :
165 9993 : static int add_data_to_store(struct cntr *cntr,
166 : struct slist *slist, struct iobuf *rbuf, struct dpth *dpth)
167 : {
168 : static struct blk *blk=NULL;
169 :
170 : // Find the first one in the list that was requested.
171 : // FIX THIS: Going up the list here, and then later
172 : // when writing to the manifest is not efficient.
173 9993 : for(blk=slist->blist->head;
174 9992 : blk && (!blk->requested || blk->got==BLK_GOT); blk=blk->next)
175 : {
176 : // logp("try: %d %d\n", blk->index, blk->got);
177 : }
178 9993 : if(!blk)
179 : {
180 1 : logp("Received data but could not find next requested block.\n");
181 1 : if(!slist->blist->head)
182 1 : logp("and slist->blist->head is null\n");
183 : else
184 0 : logp("head index: %d\n", slist->blist->head->index);
185 : return -1;
186 : }
187 :
188 : // Add it to the data store straight away.
189 9992 : if(dpth_protocol2_fwrite(dpth, rbuf, blk)) return -1;
190 :
191 9992 : cntr_add(cntr, CMD_DATA, 0);
192 9992 : cntr_add_recvbytes(cntr, blk->length);
193 :
194 9992 : blk->got=BLK_GOT;
195 9992 : blk=blk->next;
196 :
197 : return 0;
198 : }
199 :
200 13596 : static int set_up_for_sig_info(struct slist *slist, struct iobuf *attr,
201 : uint64_t index)
202 : {
203 : struct sbuf *sb;
204 :
205 47263 : for(sb=slist->add_sigs_here; sb; sb=sb->next)
206 : {
207 47263 : if(!sb->protocol2->index)
208 : continue;
209 27183 : if(index==sb->protocol2->index)
210 : break;
211 : }
212 13596 : if(!sb)
213 : {
214 0 : logp("Could not find %lu in request list\n", index);
215 : return -1;
216 : }
217 : // Replace the attribs with the more recent values.
218 13596 : iobuf_free_content(&sb->attr);
219 13596 : iobuf_move(&sb->attr, attr);
220 :
221 : // Mark the end of the previous file.
222 13596 : slist->add_sigs_here->protocol2->bend=slist->blist->tail;
223 :
224 13596 : slist->add_sigs_here=sb;
225 :
226 : // Incoming sigs now need to get added to 'add_sigs_here'
227 : return 0;
228 : }
229 :
230 : /*
231 : static void dump_blks(const char *msg, struct blk *b)
232 : {
233 : struct blk *xx;
234 : for(xx=b; xx; xx=xx->next)
235 : printf("%s: %d %d %p\n", msg, xx->index, xx->got, xx);
236 : }
237 : */
238 :
239 19976 : static int add_to_sig_list(struct slist *slist, struct iobuf *rbuf)
240 : {
241 : // Goes on slist->add_sigs_here
242 : struct blk *blk;
243 : struct protocol2 *protocol2;
244 :
245 19976 : if(!(blk=blk_alloc())) return -1;
246 19976 : blist_add_blk(slist->blist, blk);
247 :
248 19976 : protocol2=slist->add_sigs_here->protocol2;
249 19976 : if(!protocol2->bstart) protocol2->bstart=blk;
250 19976 : if(!protocol2->bsighead) protocol2->bsighead=blk;
251 :
252 19976 : if(blk_set_from_iobuf_sig(blk, rbuf)) return -1;
253 :
254 : // Need to send sigs to champ chooser, therefore need to point
255 : // to the oldest unsent one if nothing is pointed to yet.
256 19976 : if(!slist->blist->blk_for_champ_chooser)
257 16472 : slist->blist->blk_for_champ_chooser=blk;
258 :
259 : return 0;
260 : }
261 :
262 43597 : static int deal_with_read(struct iobuf *rbuf, struct slist *slist,
263 : struct cntr *cntr, uint8_t *end_flags, struct dpth *dpth)
264 : {
265 43597 : int ret=0;
266 :
267 43597 : switch(rbuf->cmd)
268 : {
269 : /* Incoming block data. */
270 : case CMD_DATA:
271 9993 : if(add_data_to_store(cntr, slist, rbuf, dpth))
272 : goto error;
273 : goto end;
274 :
275 : /* Incoming block signatures. */
276 : case CMD_ATTRIBS_SIGS:
277 : static struct iobuf attr;
278 : static uint64_t index;
279 :
280 13596 : iobuf_init(&attr);
281 13596 : iobuf_move(&attr, rbuf);
282 13596 : index=decode_file_no(&attr);
283 :
284 : // Need to go through slist to find the matching
285 : // entry.
286 13596 : if(set_up_for_sig_info(slist, &attr, index))
287 : goto error;
288 : return 0;
289 : case CMD_SIG:
290 19976 : if(add_to_sig_list(slist, rbuf))
291 : goto error;
292 : goto end;
293 :
294 : /* Incoming control/message stuff. */
295 : case CMD_MESSAGE:
296 : case CMD_WARNING:
297 : {
298 6 : struct cntr *cntr=NULL;
299 6 : log_recvd(rbuf, cntr, 0);
300 6 : goto end;
301 : }
302 : case CMD_GEN:
303 25 : if(!strcmp(rbuf->buf, "sigs_end"))
304 : {
305 13 : (*end_flags)|=END_SIGS;
306 13 : goto end;
307 : }
308 12 : else if(!strcmp(rbuf->buf, "backup_end"))
309 : {
310 12 : (*end_flags)|=END_BACKUP;
311 12 : goto end;
312 : }
313 : break;
314 : case CMD_INTERRUPT:
315 : {
316 : uint64_t file_no;
317 1 : file_no=base64_to_uint64(rbuf->buf);
318 1 : if(slist_del_sbuf_by_index(slist, file_no))
319 : goto error;
320 : goto end;
321 : }
322 : default:
323 : break;
324 : }
325 :
326 0 : iobuf_log_unexpected(rbuf, __func__);
327 : error:
328 : ret=-1;
329 : end:
330 30001 : iobuf_free_content(rbuf);
331 30001 : return ret;
332 : }
333 :
334 199902 : static int get_wbuf_from_sigs(struct iobuf *wbuf, struct slist *slist,
335 : uint8_t *end_flags)
336 : {
337 : static char req[32]="";
338 199902 : struct sbuf *sb=slist->blks_to_request;
339 :
340 77342 : while(sb && !(sb->flags & SBUF_NEED_DATA)) sb=sb->next;
341 :
342 199902 : if(!sb)
343 : {
344 16 : slist->blks_to_request=NULL;
345 16 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
346 : {
347 : iobuf_from_str(wbuf,
348 3 : CMD_GEN, (char *)"blk_requests_end");
349 3 : (*end_flags)|=END_BLK_REQUESTS;
350 : }
351 : return 0;
352 : }
353 199886 : if(!sb->protocol2->bsighead)
354 : {
355 : // Trying to move onto the next file.
356 : // ??? Does this really work?
357 127029 : if(sb->protocol2->bend)
358 : {
359 0 : slist->blks_to_request=sb->next;
360 : printf("move to next\n");
361 : }
362 127029 : if((*end_flags)&END_SIGS && !((*end_flags)&END_BLK_REQUESTS))
363 : {
364 : iobuf_from_str(wbuf,
365 10 : CMD_GEN, (char *)"blk_requests_end");
366 10 : (*end_flags)|=END_BLK_REQUESTS;
367 : }
368 : return 0;
369 : }
370 :
371 72857 : if(sb->protocol2->bsighead->got==BLK_INCOMING)
372 : return 0;
373 :
374 9996 : if(sb->protocol2->bsighead->got==BLK_NOT_GOT)
375 : {
376 9992 : base64_from_uint64(sb->protocol2->bsighead->index, req);
377 9992 : iobuf_from_str(wbuf, CMD_DATA_REQ, req);
378 9992 : sb->protocol2->bsighead->requested=1;
379 : }
380 :
381 : // Move on.
382 9996 : if(sb->protocol2->bsighead==sb->protocol2->bend)
383 : {
384 6797 : slist->blks_to_request=sb->next;
385 6797 : sb->protocol2->bsighead=sb->protocol2->bstart;
386 : }
387 : else
388 : {
389 3199 : sb->protocol2->bsighead=sb->protocol2->bsighead->next;
390 : }
391 : return 0;
392 : }
393 :
394 189897 : static void get_wbuf_from_files(struct iobuf *wbuf, struct slist *slist,
395 : struct manios *manios, uint8_t *end_flags, uint64_t *file_no)
396 : {
397 189897 : struct sbuf *sb=slist->last_requested;
398 189897 : if(!sb)
399 : {
400 142579 : if(!manios->phase1 && !((*end_flags)&END_REQUESTS))
401 : {
402 18 : iobuf_from_str(wbuf, CMD_GEN, (char *)"requests_end");
403 18 : (*end_flags)|=END_REQUESTS;
404 : }
405 : return;
406 : }
407 :
408 47318 : if(sb->flags & SBUF_SENT_PATH || !(sb->flags & SBUF_NEED_DATA))
409 : {
410 33712 : slist->last_requested=sb->next;
411 : return;
412 : }
413 :
414 : // Only need to request the path at this stage.
415 13606 : iobuf_copy(wbuf, &sb->path);
416 13606 : sb->flags |= SBUF_SENT_PATH;
417 13606 : sb->protocol2->index=(*file_no)++;
418 : }
419 :
420 2 : static void get_wbuf_from_index(struct iobuf *wbuf, uint64_t index)
421 : {
422 : static char *p;
423 : static char tmp[32];
424 2 : p=tmp;
425 2 : p+=to_base64(index, tmp);
426 2 : *p='\0';
427 2 : iobuf_from_str(wbuf, CMD_WRAP_UP, tmp);
428 2 : }
429 :
430 33704 : static int write_endfile(struct sbuf *sb, struct manios *manios)
431 : {
432 : struct iobuf endfile;
433 :
434 33704 : if(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST)
435 : return 0;
436 33704 : if(!iobuf_is_filedata(&sb->path))
437 : return 0;
438 :
439 17036 : sb->flags |= SBUF_END_WRITTEN_TO_MANIFEST;
440 : // FIX THIS: Should give a proper length and md5sum.
441 17036 : iobuf_from_str(&endfile, CMD_END_FILE, (char *)"0:0");
442 17036 : return iobuf_send_msg_fzp(&endfile, manios->changed->fzp);
443 : }
444 :
445 213474 : static void blist_adjust_head(struct blist *blist, struct sbuf *sb)
446 : {
447 : struct blk *b;
448 233450 : while(blist->head!=sb->protocol2->bstart)
449 : {
450 19976 : b=blist->head->next;
451 19976 : if(blist->head==blist->blk_from_champ_chooser)
452 6790 : blist->blk_from_champ_chooser=b;
453 19976 : blk_free(&blist->head);
454 19976 : blist->head=b;
455 : }
456 213474 : if(!blist->head)
457 51741 : blist->tail=NULL;
458 213474 : }
459 :
460 213474 : static int sbuf_needs_data(struct sbuf *sb, struct asfd *asfd,
461 : struct asfd *chfd, struct manios *manios,
462 : struct slist *slist, int end_flags)
463 : {
464 213474 : int ret=-1;
465 : struct blk *blk;
466 : static struct iobuf wbuf;
467 213474 : struct blist *blist=slist->blist;
468 :
469 213474 : if(!(sb->flags & SBUF_HEADER_WRITTEN_TO_MANIFEST))
470 : {
471 13598 : if(manio_write_sbuf(manios->changed, sb)) goto end;
472 13598 : sb->flags |= SBUF_HEADER_WRITTEN_TO_MANIFEST;
473 : }
474 :
475 219863 : while((blk=sb->protocol2->bstart)
476 168122 : && blk->got==BLK_GOT
477 272476 : && (blk->next || end_flags&END_BACKUP))
478 : {
479 19976 : if(blk->got_save_path
480 19976 : && !blk_is_zero_length(blk))
481 : {
482 19976 : if(breaking && breakcount--==0)
483 : {
484 0 : breakpoint(breaking, __func__);
485 0 : goto end;
486 : }
487 19976 : if(manio_write_sig_and_path(manios->changed, blk))
488 : goto end;
489 19976 : if(manios->changed->sig_count==0)
490 : {
491 : // Have finished a manifest file. Want to start
492 : // using it as a dedup candidate now.
493 4 : if(manio_component_to_chfd(chfd,
494 4 : manios->changed->offset->ppath))
495 : goto end;
496 :
497 4 : if(!blk->requested)
498 : {
499 : // Also let the client know, so that it
500 : // can free memory if there was a long
501 : // consecutive number of unrequested
502 : // blocks.
503 2 : get_wbuf_from_index(&wbuf, blk->index);
504 2 : if(asfd->write(asfd, &wbuf)) goto end;
505 : }
506 : }
507 : }
508 :
509 19976 : if(blk==sb->protocol2->bend)
510 : {
511 13587 : blist_adjust_head(blist, sb);
512 13587 : if(write_endfile(sb, manios)) return -1;
513 13587 : slist_advance(slist);
514 13587 : return 1;
515 : }
516 :
517 6389 : if(sb->protocol2->bsighead==sb->protocol2->bstart)
518 6376 : sb->protocol2->bsighead=blk->next;
519 6389 : sb->protocol2->bstart=blk->next;
520 6389 : if(blk==blist->blk_from_champ_chooser)
521 3199 : blist->blk_from_champ_chooser=blk->next;
522 : }
523 199887 : if(!blk && sb && !sb->protocol2->bend && (end_flags&END_BACKUP))
524 : {
525 : // Write endfile for the very last file.
526 11 : if(write_endfile(sb, manios)) return -1;
527 : }
528 : ret=0;
529 : end:
530 199887 : blist_adjust_head(blist, sb);
531 199887 : return ret;
532 : }
533 :
534 199904 : static int write_to_changed_file(struct asfd *asfd,
535 : struct asfd *chfd, struct manios *manios,
536 : struct slist *slist, int end_flags)
537 : {
538 : struct sbuf *sb;
539 199904 : if(!slist) return 0;
540 :
541 233597 : while((sb=slist->head))
542 : {
543 233580 : if(sb->flags & SBUF_NEED_DATA)
544 : {
545 213474 : switch(sbuf_needs_data(sb, asfd, chfd, manios, slist,
546 : end_flags))
547 : {
548 : case 0: return 0;
549 : case 1: continue;
550 0 : default: return -1;
551 : }
552 :
553 : }
554 : else
555 : {
556 : // No change, can go straight in.
557 20106 : if(manio_write_sbuf(manios->changed, sb)) return -1;
558 20106 : if(write_endfile(sb, manios)) return -1;
559 :
560 : // Move along.
561 20106 : slist_advance(slist);
562 : }
563 : }
564 : return 0;
565 : }
566 :
567 199902 : static int maybe_add_from_scan(struct manios *manios,
568 : struct slist *slist, struct asfd *chfd, struct sbuf **csb)
569 : {
570 199902 : int ret=-1;
571 199902 : struct sbuf *snew=NULL;
572 :
573 : while(1)
574 : {
575 233622 : sbuf_free(&snew);
576 233622 : if(!manios->phase1) return 0;
577 : // Limit the amount loaded into memory at any one time.
578 33738 : if(slist && slist->head)
579 : {
580 33720 : if(slist->head->protocol2->index
581 33720 : - slist->tail->protocol2->index>4096)
582 : return 0;
583 : }
584 67458 : if(!(snew=sbuf_alloc(PROTO_2))) goto end;
585 :
586 33738 : switch(manio_read(manios->phase1, snew))
587 : {
588 : case 0: break;
589 18 : case 1: manio_close(&manios->phase1);
590 18 : ret=0; // Finished.
591 : default: goto end;
592 : }
593 :
594 33720 : switch(entry_changed(snew, manios, chfd, csb))
595 : {
596 : case 0: continue; // No change.
597 : case 1: break;
598 : default: goto end; // Error.
599 : }
600 :
601 101160 : if(data_needed(snew)) snew->flags|=SBUF_NEED_DATA;
602 :
603 33720 : slist_add_sbuf(slist, snew);
604 33720 : snew=NULL;
605 : }
606 : return 0;
607 : end:
608 18 : sbuf_free(&snew);
609 18 : return ret;
610 : }
611 :
612 199901 : static int append_for_champ_chooser(struct asfd *chfd,
613 : struct blist *blist, int end_flags)
614 : {
615 : static int finished_sending=0;
616 : static struct iobuf wbuf;
617 : static struct blk *blk=NULL;
618 :
619 219877 : while(blist->blk_for_champ_chooser)
620 : {
621 89690 : blk=blist->blk_for_champ_chooser;
622 : // If we send too many blocks to the champ chooser at once,
623 : // it can go faster than we can send paths to completed
624 : // manifests to it. This means that deduplication efficiency
625 : // is reduced (although speed may be faster).
626 : // So limit the sending.
627 89690 : if(blk->index
628 89690 : - blist->head->index > MANIFEST_SIG_MAX)
629 : return 0;
630 :
631 19976 : blk_to_iobuf_sig(blk, &wbuf);
632 :
633 19976 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
634 : {
635 : case APPEND_OK: break;
636 : case APPEND_BLOCKED:
637 : return 0; // Try again later.
638 : default: return -1;
639 : }
640 19976 : blist->blk_for_champ_chooser=blk->next;
641 : }
642 130187 : if(end_flags&END_SIGS
643 51006 : && !finished_sending && !blist->blk_for_champ_chooser)
644 : {
645 13 : iobuf_from_str(&wbuf, CMD_GEN, (char *)"sigs_end");
646 13 : switch(chfd->append_all_to_write_buffer(chfd, &wbuf))
647 : {
648 : case APPEND_OK: break;
649 : case APPEND_BLOCKED:
650 : return 0; // Try again later.
651 : default: return -1;
652 : }
653 12 : finished_sending++;
654 : }
655 : return 0;
656 : }
657 :
658 19979 : static int mark_not_got(struct blk *blk, struct dpth *dpth)
659 : {
660 : const char *path;
661 :
662 19979 : if(blk->got!=BLK_INCOMING) return 0;
663 9992 : blk->got=BLK_NOT_GOT;
664 :
665 : // Need to get the data for this blk from the client.
666 : // Set up the details of where it will be saved.
667 9992 : if(!(path=dpth_protocol2_mk(dpth))) return -1;
668 :
669 : // FIX THIS: make dpth give us the path in a uint8 array.
670 9992 : blk->savepath=savepathstr_with_sig_to_uint64(path);
671 9992 : blk->got_save_path=1;
672 9992 : if(dpth_protocol2_incr_sig(dpth)) return -1;
673 9992 : return 0;
674 : }
675 :
676 19976 : static int mark_up_to_index(struct blist *blist,
677 : uint64_t index, struct dpth *dpth)
678 : {
679 : struct blk *blk;
680 :
681 : // Mark everything that was not got, up to the given index.
682 29963 : for(blk=blist->blk_from_champ_chooser;
683 29963 : blk && blk->index!=index; blk=blk->next)
684 9987 : if(mark_not_got(blk, dpth))
685 : return -1;
686 19976 : if(!blk)
687 : {
688 0 : logp("Could not find index from champ chooser: %lu\n", index);
689 : return -1;
690 : }
691 : //logp("Found index from champ chooser: %lu\n", index);
692 : //printf("index from cc: %d\n", index);
693 19976 : blist->blk_from_champ_chooser=blk;
694 : return 0;
695 : }
696 :
697 9984 : static int deal_with_sig_from_chfd(struct iobuf *rbuf, struct blist *blist,
698 : struct dpth *dpth)
699 : {
700 : static struct blk b;
701 9984 : if(blk_set_from_iobuf_index_and_savepath(&b, rbuf))
702 : return -1;
703 :
704 9984 : if(mark_up_to_index(blist, b.index, dpth))
705 : return -1;
706 9984 : blist->blk_from_champ_chooser->savepath=b.savepath;
707 9984 : blist->blk_from_champ_chooser->got=BLK_GOT;
708 9984 : blist->blk_from_champ_chooser->got_save_path=1;
709 9984 : return 0;
710 : }
711 :
712 9992 : static int deal_with_wrap_up_from_chfd(struct iobuf *rbuf, struct blist *blist,
713 : struct dpth *dpth)
714 : {
715 : static struct blk b;
716 9992 : if(blk_set_from_iobuf_wrap_up(&b, rbuf))
717 : return -1;
718 :
719 9992 : if(mark_up_to_index(blist, b.index, dpth)) return -1;
720 9992 : if(mark_not_got(blist->blk_from_champ_chooser, dpth)) return -1;
721 9992 : return 0;
722 : }
723 :
724 19977 : static int deal_with_read_from_chfd(struct asfd *chfd,
725 : struct blist *blist, uint64_t *wrap_up, struct dpth *dpth,
726 : struct cntr *cntr)
727 : {
728 19977 : int ret=-1;
729 :
730 : // Deal with champ chooser read here.
731 : //printf("read from cc: %s\n", chfd->rbuf->buf);
732 19977 : switch(chfd->rbuf->cmd)
733 : {
734 : case CMD_SIG:
735 : // Get these for blks that the champ chooser has found.
736 9984 : if(deal_with_sig_from_chfd(chfd->rbuf, blist, dpth))
737 : goto end;
738 9984 : cntr_add_same(cntr, CMD_DATA);
739 : break;
740 : case CMD_WRAP_UP:
741 9992 : if(deal_with_wrap_up_from_chfd(chfd->rbuf, blist, dpth))
742 : goto end;
743 : break;
744 : default:
745 1 : iobuf_log_unexpected(chfd->rbuf, __func__);
746 : goto end;
747 : }
748 : ret=0;
749 : end:
750 19977 : iobuf_free_content(chfd->rbuf);
751 19977 : return ret;
752 : }
753 :
754 : static struct asfd *get_asfd_from_list_by_fdtype(struct async *as,
755 : enum asfd_fdtype fdtype)
756 : {
757 : struct asfd *a;
758 19 : for(a=as->asfd; a; a=a->next)
759 37 : if(a->fdtype==fdtype) return a;
760 : return NULL;
761 : }
762 :
763 12 : static int check_for_missing_work_in_slist(struct slist *slist)
764 : {
765 12 : struct sbuf *sb=NULL;
766 :
767 12 : if(slist->blist->head)
768 : {
769 : logp("ERROR: finishing but still want block: %lu\n",
770 0 : slist->blist->head->index);
771 : return -1;
772 : }
773 :
774 15 : for(sb=slist->head; sb; sb=sb->next)
775 : {
776 4 : if(!(sb->flags & SBUF_END_WRITTEN_TO_MANIFEST))
777 : {
778 : logp("ERROR: finishing but still waiting for: %c:%s\n",
779 1 : slist->head->path.cmd, slist->head->path.buf);
780 : return -1;
781 : }
782 : }
783 : return 0;
784 : }
785 :
786 24 : int backup_phase2_server_protocol2(struct async *as, struct sdirs *sdirs,
787 : int resume, struct conf **confs)
788 : {
789 24 : int ret=-1;
790 24 : uint8_t end_flags=0;
791 24 : struct slist *slist=NULL;
792 : struct iobuf wbuf;
793 24 : struct dpth *dpth=NULL;
794 24 : man_off_t *p1pos=NULL;
795 24 : struct manios *manios=NULL;
796 : // This is used to tell the client that a number of consecutive blocks
797 : // have been found and can be freed.
798 24 : uint64_t wrap_up=0;
799 24 : struct asfd *asfd=NULL;
800 24 : struct asfd *chfd=NULL;
801 24 : struct cntr *cntr=NULL;
802 24 : struct sbuf *csb=NULL;
803 24 : uint64_t file_no=1;
804 :
805 24 : if(!as)
806 : {
807 1 : logp("async not provided to %s()\n", __func__);
808 1 : goto end;
809 : }
810 23 : if(!sdirs)
811 : {
812 2 : logp("sdirs not provided to %s()\n", __func__);
813 2 : goto end;
814 : }
815 21 : if(!confs)
816 : {
817 1 : logp("confs not provided to %s()\n", __func__);
818 1 : goto end;
819 : }
820 20 : asfd=as->asfd;
821 20 : if(!asfd)
822 : {
823 1 : logp("asfd not provided to %s()\n", __func__);
824 1 : goto end;
825 : }
826 19 : chfd=get_asfd_from_list_by_fdtype(as, ASFD_FD_SERVER_TO_CHAMP_CHOOSER);
827 19 : if(!chfd)
828 : {
829 1 : logp("chfd not provided to %s()\n", __func__);
830 1 : goto end;
831 : }
832 18 : cntr=get_cntr(confs);
833 36 : if(get_int(confs[OPT_BREAKPOINT])>=2000
834 18 : && get_int(confs[OPT_BREAKPOINT])<3000)
835 : {
836 0 : breaking=get_int(confs[OPT_BREAKPOINT]);
837 0 : breakcount=breaking-2000;
838 : }
839 :
840 18 : logp("Phase 2 begin (recv backup data)\n");
841 :
842 36 : if(!(dpth=dpth_alloc())
843 36 : || dpth_protocol2_init(dpth,
844 18 : sdirs->data, get_int(confs[OPT_MAX_STORAGE_SUBDIRS])))
845 : goto end;
846 18 : if(resume && !(p1pos=do_resume(sdirs, dpth, confs)))
847 : goto end;
848 :
849 36 : if(!(manios=manios_open_phase2(sdirs, p1pos, PROTO_2))
850 42 : || !(slist=slist_alloc())
851 36 : || !(csb=sbuf_alloc(PROTO_2)))
852 : goto end;
853 :
854 18 : iobuf_free_content(asfd->rbuf);
855 :
856 : memset(&wbuf, 0, sizeof(struct iobuf));
857 199914 : while(!(end_flags&END_BACKUP))
858 : {
859 199902 : if(maybe_add_from_scan(manios, slist, chfd, &csb))
860 : goto end;
861 :
862 199902 : if(!wbuf.len)
863 : {
864 199902 : if(get_wbuf_from_sigs(&wbuf, slist, &end_flags))
865 : goto end;
866 199902 : if(!wbuf.len)
867 : {
868 : get_wbuf_from_files(&wbuf, slist,
869 189897 : manios, &end_flags, &file_no);
870 : }
871 : }
872 :
873 199902 : if(wbuf.len
874 199902 : && asfd->append_all_to_write_buffer(asfd, &wbuf)==APPEND_ERROR)
875 : goto end;
876 :
877 199901 : if(append_for_champ_chooser(chfd, slist->blist, end_flags))
878 : goto end;
879 :
880 199900 : if(as->read_write(as))
881 : {
882 2 : logp("error from as->read_write in %s\n", __func__);
883 2 : goto end;
884 : }
885 :
886 243494 : while(asfd->rbuf->buf)
887 : {
888 43597 : if(deal_with_read(asfd->rbuf, slist, cntr,
889 43597 : &end_flags, dpth))
890 : goto end;
891 : // Get as much out of the readbuf as possible.
892 43596 : if(asfd->parse_readbuf(asfd))
893 : goto end;
894 : }
895 219873 : while(chfd->rbuf->buf)
896 : {
897 19977 : if(deal_with_read_from_chfd(chfd,
898 19977 : slist->blist, &wrap_up, dpth, cntr))
899 : goto end;
900 : // Get as much out of the readbuf as possible.
901 19976 : if(chfd->parse_readbuf(chfd))
902 : goto end;
903 : }
904 :
905 199896 : if(write_to_changed_file(asfd, chfd, manios,
906 199896 : slist, end_flags))
907 : goto end;
908 : }
909 :
910 : // Hack: If there are some entries left after the last entry that
911 : // contains block data, it will not be written to the changed file
912 : // yet because the last entry of block data has not had
913 : // sb->protocol2->bend set.
914 12 : if(slist->head && slist->head->next)
915 : {
916 8 : struct sbuf *sb=NULL;
917 8 : sb=slist->head;
918 8 : slist->head=sb->next;
919 8 : sbuf_free(&sb);
920 8 : if(write_to_changed_file(asfd, chfd, manios,
921 8 : slist, end_flags))
922 : goto end;
923 : }
924 :
925 12 : if(manios_close(&manios))
926 : goto end;
927 :
928 36 : if(check_for_missing_work_in_slist(slist))
929 : goto end;
930 :
931 : // Need to release the last left. There should be one at most.
932 11 : if(dpth->head && dpth->head->next)
933 : {
934 : logp("ERROR: More data locks remaining after: %s\n",
935 0 : dpth->head->save_path);
936 0 : goto end;
937 : }
938 11 : if(dpth_release_all(dpth)) goto end;
939 :
940 11 : ret=0;
941 : end:
942 24 : logp("End backup\n");
943 24 : sbuf_free(&csb);
944 24 : slist_free(&slist);
945 24 : if(asfd) iobuf_free_content(asfd->rbuf);
946 24 : if(chfd) iobuf_free_content(chfd->rbuf);
947 24 : dpth_free(&dpth);
948 24 : manios_close(&manios);
949 24 : man_off_t_free(&p1pos);
950 24 : return ret;
951 : }
|