source: tags/ms_r18q1/PROBE/PT_main.cxx

Last change on this file was 16763, checked in by westram, 6 years ago
  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 21.2 KB
Line 
1// =============================================================== //
2//                                                                 //
3//   File      : PT_main.cxx                                       //
4//   Purpose   :                                                   //
5//                                                                 //
6//   Institute of Microbiology (Technical University Munich)       //
7//   http://www.arb-home.de/                                       //
8//                                                                 //
9// =============================================================== //
10
11#include "probe.h"
12#include <PT_server_prototypes.h>
13#include "pt_prototypes.h"
14#include "PT_mem.h"
15
16#include <BI_basepos.hxx>
17
18#include <arbdbt.h>
19#include <arb_file.h>
20#include <arb_defs.h>
21#include <arb_sleep.h>
22#include <servercntrl.h>
23#include <server.h>
24#include <client.h>
25#include <struct_man.h>
26#include <ut_valgrinded.h>
27#include <ptclean.h>
28
29#include <unistd.h>
30#include <sys/stat.h>
31#include <time.h>
32
33#define MAX_TRY 10
34#define TIME_OUT 1000*60*60*24
35
36struct probe_struct_global psg;
37
38// globals of gene-pt-server
39int gene_flag = 0;
40
41PT_main *aisc_main;
42
43static time_t startTime;
44
45static gene_struct_list    all_gene_structs;         // stores all gene_structs
46gene_struct_index_arb      gene_struct_arb2internal; // sorted by arb species+gene name
47gene_struct_index_internal gene_struct_internal2arb; // sorted by internal name
48
49// ----------------------------------------------
50//      global data initialization / cleanup
51
52void probe_statistic_struct::setup() {
53    cut_offs    = 0;
54    single_node = 0;
55    short_node  = 0;
56    long_node   = 0;
57    longs       = 0;
58    shorts      = 0;
59    shorts2     = 0;
60    chars       = 0;
61
62#ifdef ARB_64
63    int_node = 0;
64    ints     = 0;
65    ints2    = 0;
66    maxdiff  = 0;
67#endif
68}
69
70void probe_struct_global::setup() {
71    // init uninitialized data
72
73    gb_shell = NULp;
74    gb_main  = NULp;
75
76    alignment_name = NULp;
77    namehash       = NULp;
78
79    data_count = 0;
80    data       = NULp;
81
82    max_size   = 0;
83    char_count = 0;
84   
85    reversed = 0;
86
87    pos_to_weight = NULp;
88
89    sort_by = 0;
90
91    main_probe  = NULp;
92    server_name = NULp;
93    link        = NULp;
94
95    main.clear();
96
97    com_so = NULp;
98    pt.p1 = NULp;
99
100    stat.setup();
101}
102
103void probe_struct_global::cleanup() {
104    if (gb_main) {
105        delete [] data;
106
107        GB_close(gb_main);
108        gb_main = NULp;
109    }
110
111    if (gb_shell) {
112        delete gb_shell;
113        gb_shell = NULp;
114    }
115
116    if (namehash) GBS_free_hash(namehash);
117
118    free(ecoli);
119    delete bi_ecoli;
120    delete [] pos_to_weight;
121    free(alignment_name);
122
123    pt_assert(!com_so);
124
125    setup();
126}
127
128Memory MEM;
129
130static bool psg_initialized = false;
131void PT_init_psg() {
132    pt_assert(!psg_initialized);
133    psg.setup();
134    psg_initialized = true;
135}
136
137void PT_exit_psg() {
138    pt_assert(psg_initialized);
139    if (psg_initialized) {
140        psg.cleanup();
141        psg_initialized = false;
142    }
143    MEM.clear();
144}
145
146static void PT_exit() { 
147    // unique exit point to ensure cleanup
148    if (aisc_main) destroy_PT_main(aisc_main);
149    if (psg_initialized) PT_exit_psg();
150}
151
152// ----------------------
153//      Communication
154
155static ARB_ERROR pt_init_main_struct(PT_main *, const char *filename) { // __ATTR__USERESULT
156    ARB_ERROR error = probe_read_data_base(filename, true);
157    if (!error) {
158        GB_transaction ta(psg.gb_main);
159        psg.alignment_name = GBT_get_default_alignment(psg.gb_main);
160        error              = GB_incur_error_if(!psg.alignment_name);
161    }
162
163    if (!error) {
164        printf("Building PT-Server for alignment '%s'...\n", psg.alignment_name);
165        error = PT_init_input_data();
166        PT_build_species_hash();
167    }
168    return error;
169}
170
171int server_shutdown(PT_main */*pm*/, aisc_string passwd) {
172    // password check
173    bool authorized = strcmp(passwd, "47@#34543df43%&3667gh") == 0;
174    free(passwd);
175    if (!authorized) return 1;
176
177    fflush_all();
178    fprintf(stderr, "\nARB_PT_SERVER: received shutdown message\n");
179
180    // shutdown clients
181    aisc_broadcast(psg.com_so, 0, "Used PT-server has been shut down");
182
183    // shutdown server
184    aisc_server_shutdown(psg.com_so);
185    PT_exit();
186    fflush_all();
187    exit(EXIT_SUCCESS);
188}
189
190int broadcast(PT_main *main, int) {
191    aisc_broadcast(psg.com_so, main->m_type, main->m_text);
192    return 0;
193}
194
195// ------------------------------------------------------------------------------ name mapping
196// the mapping is generated in ../TOOLS/arb_gene_probe.cxx
197
198inline const char *find_sep(const char *str, char sep) {
199    // sep may occur escaped (by \)
200    const char *found = strchr(str, sep);
201    while (found) {
202        if (found>str && found[-1] == '\\') { // escaped separator
203            found = strchr(found+1, sep);
204        }
205        else {
206            break;              // non-escaped -> report
207        }
208    }
209    return found;
210}
211
212inline bool copy_to_buf(const char *start, const char *behindEnd, int MAXLEN, char *destBuf) {
213    int len = behindEnd-start;
214    if (len>MAXLEN) {
215        return false;
216    }
217    memcpy(destBuf, start, len);
218    destBuf[len] = 0;
219    return true;
220}
221
222__ATTR__USERESULT static ARB_ERROR parse_names_into_gene_struct(const char *map_str, gene_struct_list& listOfGenes) {
223#define MAX_INAME_LEN 30
224#define MAX_ONAME_LEN 30
225#define MAX_GNAME_LEN 1024
226
227    char iname[MAX_INAME_LEN+1]; // internal name
228    char oname[MAX_ONAME_LEN+1]; // organism name
229    char gname[MAX_GNAME_LEN+1]; // gene name
230
231    ARB_ERROR err;
232
233    while (*map_str) {
234        const char *sep1 = strchr(map_str, ';');
235        const char *sep2 = sep1 ? strchr(sep1+1, ';') : NULp;
236        const char *sep3 = sep2 ? find_sep(sep2+1, ';') :  NULp;
237
238        if (sep3) {
239            bool ok = copy_to_buf(map_str, sep1, MAX_INAME_LEN, iname);
240            ok = ok && copy_to_buf(sep1+1, sep2, MAX_ONAME_LEN, oname);
241            ok = ok && copy_to_buf(sep2+1, sep3, MAX_GNAME_LEN, gname);
242
243            if (ok) {
244                char *unesc               = GBS_unescape_string(gname, ";", '\\');
245                listOfGenes.push_back(gene_struct(iname, oname, unesc));
246                free(unesc);
247            }
248            else {
249                err = "buffer overflow (some name too long)";
250                break;
251            }
252        }
253        else {
254            err = GBS_global_string("expected at least 3 ';' in '%s'", map_str);
255            break;
256        }
257        map_str = sep3+1;
258    }
259
260    if (err) err = GBS_global_string("while parsing name-mapping: %s", err.deliver());
261
262    return err;
263}
264
265static GB_ERROR PT_init_map() { // goes to header: __ATTR__USERESULT
266    GB_ERROR error = GB_push_transaction(psg.gb_main);
267    if (!error) {
268        GBDATA *gb_gene_map = GB_entry(psg.gb_main, "gene_map");
269
270        if (gb_gene_map) {
271            gene_flag = 1;
272           
273            GBDATA     *map_ptr_str = GB_entry(gb_gene_map, "map_string");
274            const char *map_str     = GB_read_char_pntr(map_ptr_str);
275
276            error = parse_names_into_gene_struct(map_str, all_gene_structs).deliver();
277
278            // build indices :
279            gene_struct_list::const_iterator end = all_gene_structs.end();
280
281            for (gene_struct_list::const_iterator gs = all_gene_structs.begin(); gs != end; ++gs) {
282                if (gene_struct_internal2arb.find(&*gs) != gene_struct_internal2arb.end()) {
283                    fprintf(stderr, "  Duplicated internal entry for '%s'\n", gs->get_internal_gene_name());
284                }
285                gene_struct_internal2arb.insert(&*gs);
286                if (gene_struct_arb2internal.find(&*gs) != gene_struct_arb2internal.end()) {
287                    fprintf(stderr, "  Duplicated entry for '%s/%s'\n", gs->get_arb_species_name(), gs->get_arb_gene_name());
288                }
289                gene_struct_arb2internal.insert(&*gs);
290            }
291
292            size_t list_size = all_gene_structs.size();
293            if (list_size == 0) {
294                error = "name map is empty";
295            }
296            else if (gene_struct_arb2internal.size() != list_size) {
297                size_t dups = list_size-gene_struct_arb2internal.size();
298                error       = GBS_global_string("detected %zi duplicated 'species/gene' combinations in name mapping", dups);
299            }
300        }
301        else {
302            gene_flag = 0;
303        }
304    }
305
306    return GB_end_transaction(psg.gb_main, error);
307}
308
309__ATTR__USERESULT static ARB_ERROR start_pt_server(const char *socket_name, const char *arbdb_name, const char *pt_name, const char *exename) {
310    ARB_ERROR error;
311
312    fprintf(stdout,
313            "\n"
314            "ARB POS_TREE SERVER v%i (C)1993-2013 by O.Strunk, J.Boehnel, R.Westram\n"
315            "initializing:\n"
316            "- opening connection...\n",
317            PT_SERVER_VERSION);
318    ARB_sleep(1, SEC);
319
320    Hs_struct *so = NULp;
321    {
322        const int MAX_STARTUP_SEC = 100;
323        const int RETRY_AFTER_SEC = 5;
324        for (int i = 0; i<MAX_STARTUP_SEC; i += RETRY_AFTER_SEC) {
325            so = open_aisc_server(socket_name, TIME_OUT, 0);
326            if (so) break;
327
328            printf("  Cannot bind to socket (retry in %i seconds)\n", RETRY_AFTER_SEC);
329            ARB_sleep(RETRY_AFTER_SEC, SEC);
330        }
331    }
332
333    if (!so) error = "can't bind to socket";
334    else {
335        psg.com_so = so;
336
337        struct stat arbdb_stat;
338        if (stat(arbdb_name, &arbdb_stat)) {
339            error = GB_IO_error("stat", arbdb_name);
340            error = GBS_global_string("Source DB missing (%s)", error.deliver());
341        }
342        if (!error) {
343            const char *update_reason = NULp;
344
345            {
346                struct stat pt_stat;
347                if (stat(pt_name, &pt_stat)) {
348                    update_reason = GB_IO_error("stat", pt_name);
349                }
350                else if (arbdb_stat.st_mtime > pt_stat.st_mtime) {
351                    update_reason = GBS_global_string("'%s' has been modified more recently than '%s'", arbdb_name, pt_name);
352                }
353                else if (pt_stat.st_size == 0) {
354                    update_reason = GBS_global_string("'%s' is empty", pt_name);
355                }
356            }
357
358            if (update_reason) {
359                printf("- updating postree (Reason: %s)\n", update_reason);
360
361                char *quotedDatabaseArg = GBK_singlequote(GBS_global_string("-D%s", arbdb_name));
362
363                // run build_clean
364                char *cmd = GBS_global_string_copy("%s -build_clean %s", exename, quotedDatabaseArg);
365                make_valgrinded_call(cmd);
366                error           = GBK_system(cmd);
367                free(cmd);
368
369                // run build
370                if (!error) {
371                    cmd   = GBS_global_string_copy("%s -build %s", exename, quotedDatabaseArg);
372                    make_valgrinded_call(cmd);
373                    error = GBK_system(cmd);
374                    free(cmd);
375                }
376
377                if (error) error = GBS_global_string("Failed to update postree (Reason: %s)", error.deliver());
378
379                free(quotedDatabaseArg);
380            }
381        }
382        if (!error) {
383            // init server main struct
384            fputs("- init internal structs...\n", stdout);
385            {
386                void *reserved_for_mmap = NULp;
387                long  size_of_file      = GB_size_of_file(pt_name);
388                if (size_of_file > 0) {
389                    reserved_for_mmap = malloc(size_of_file);
390                    if (!reserved_for_mmap) {
391                        error = GBS_global_string("cannot reserve enough memory to map postree (needed=%li)", size_of_file);
392                    }
393                }
394
395                if (!error) error = pt_init_main_struct(aisc_main, arbdb_name);
396                free(reserved_for_mmap);
397            }
398
399            if (!error) error = enter_stage_2_load_tree(aisc_main, pt_name);
400            if (!error) error = PT_init_map();
401
402            if (!error) {
403                // all ok -> main "loop"
404                {
405                    time_t now; time(&now);
406                    fflush_all();
407                    printf("[startup took %s]\n", GBS_readable_timediff(difftime(now, startTime)));
408                }
409
410                printf("ok, server is running.\n"); // do NOT change or remove! others depend on it
411
412                fflush_all();
413                aisc_accept_calls(so);
414            }
415        }
416        aisc_server_shutdown(psg.com_so);
417    }
418    return error;
419}
420
421static int get_DB_state(GBDATA *gb_main, ARB_ERROR& error) {
422    pt_assert(!error);
423    error     = GB_push_transaction(gb_main);
424    int state = -1;
425    if (!error) {
426        GBDATA *gb_ptserver = GBT_find_or_create(gb_main, "ptserver", 7);
427        long   *statePtr    = gb_ptserver ? GBT_readOrCreate_int(gb_ptserver, "dbstate", 0) : NULp;
428        if (!statePtr) {
429            error = GB_await_error();
430        }
431        else {
432            state = *statePtr;
433        }
434    }
435    error = GB_end_transaction(gb_main, error);
436    return state;
437}
438
439static GB_ERROR set_DB_state(GBDATA *gb_main, int dbstate) {
440    GB_ERROR error = GB_push_transaction(gb_main);
441    if (!error) {
442        GBDATA *gb_ptserver  = GBT_find_or_create(gb_main, "ptserver", 7);
443        GBDATA *gb_state     = gb_ptserver ? GB_searchOrCreate_int(gb_ptserver, "dbstate", 0) : NULp;
444        if (!gb_state) error = GB_await_error();
445        else    error        = GB_write_int(gb_state, dbstate);
446    }
447    error = GB_end_transaction(gb_main, error);
448    return error;
449}
450
451__ATTR__USERESULT static ARB_ERROR run_command(const char *exename, const char *command, const arb_params *params) {
452    ARB_ERROR  error;
453    char      *msg = NULp;
454
455    // check that arb_pt_server knows its socket
456    const char *socket_name = params->tcp;
457    if (!socket_name) {
458        socket_name = GBS_read_arb_tcp("ARB_PT_SERVER0");
459        if (!socket_name) {
460            error = GB_await_error();
461            error = GBS_global_string("Don't know which socket to use (Reason: %s)", error.deliver());
462        }
463    }
464
465    if (!error) {
466        char *pt_name = GBS_global_string_copy("%s.pt", params->db_server);
467
468        if (strcmp(command, "-build_clean") == 0) {  // cleanup source DB
469            error = probe_read_data_base(params->db_server, false);
470            if (!error) {
471                pt_assert(psg.gb_main);
472
473                error = GB_no_transaction(psg.gb_main); // don't waste memory for transaction (no abort may happen)
474
475                if (!error) {
476                    int dbstate = get_DB_state(psg.gb_main, error);
477                    if (!error && dbstate>0) {
478                        if (dbstate == 1) {
479                            fputs("Warning: database already has been prepared for ptserver\n", stdout);
480                        }
481                        else {
482                            error = GBS_global_string("unexpected dbstate='%i'", dbstate);
483                        }
484                    }
485
486                    if (!error && dbstate == 0) {
487                        GB_push_my_security(psg.gb_main);
488
489                        if (!error) error = cleanup_ptserver_database(psg.gb_main, PTSERVER);
490                        if (!error) error = PT_prepare_data(psg.gb_main);
491
492                        if (!error) error = set_DB_state(psg.gb_main, 1);
493
494                        GB_pop_my_security(psg.gb_main);
495
496                        if (!error) {
497                            const char *mode = GB_supports_mapfile() ? "bfm" : "bf";
498                            error            = GB_save_as(psg.gb_main, params->db_server, mode);
499                        }
500                    }
501                }
502            }
503        }
504        else if (strcmp(command, "-build") == 0) {  // build command
505            if (!error) error = pt_init_main_struct(aisc_main, params->db_server);
506            if (!error) {
507                int dbstate = get_DB_state(psg.gb_main, error);
508                if (!error && dbstate != 1) {
509                    error = "database has not been prepared for ptserver";
510                }
511            }
512            if (error) error = GBS_global_string("Gave up (Reason: %s)", error.deliver());
513
514            if (!error) {
515                ULONG ARM_size_kb = 0;
516                {
517                    const char *mapfile = GB_mapfile(psg.gb_main);
518                    if (GB_is_regularfile(mapfile)) {
519                        ARM_size_kb = GB_size_of_file(mapfile)/1024.0+0.5;
520                    }
521                    else {
522                        fputs("Warning: cannot detect size of mapfile (have none).\n"
523                              "         Ptserver will probably use too much memory.\n",
524                              stderr);
525                    }
526                }
527
528                error = enter_stage_1_build_tree(aisc_main, pt_name, ARM_size_kb);
529                if (error) {
530                    error = GBS_global_string("Failed to build index (Reason: %s)", error.deliver());
531                }
532                else {
533                    msg = GBS_global_string_copy("PT_SERVER database \"%s\" has been created.", params->db_server);
534                }
535            }
536        }
537        else if (strcmp(command, "-QUERY") == 0) {
538            error = pt_init_main_struct(aisc_main, params->db_server);
539            if (error) error = GBS_global_string("Gave up (Reason: %s)", error.deliver());
540            else {
541                error = enter_stage_2_load_tree(aisc_main, pt_name); // now stage 2
542#if defined(CALCULATE_STATS_ON_QUERY)
543                if (!error) {
544                    puts("[index loaded - calculating statistic]");
545                    PT_dump_tree_statistics(pt_name);
546                    puts("[statistic done]");
547                }
548#endif
549            }
550        }
551        else {
552            GB_ERROR openerr = NULp;
553            psg.link         = aisc_open(socket_name, psg.main, AISC_MAGIC_NUMBER, &openerr);
554
555            if (openerr) {
556                error = openerr;
557            }
558            else {
559                bool running = psg.link;
560                bool kill    = false;
561                bool start   = false;
562
563                if      (strcmp(command, "-look") == 0) { start = !running; }
564                else if (strcmp(command, "-boot") == 0) { kill  = running; start = true; }
565                else if (strcmp(command, "-kill") == 0) { kill  = running; }
566                else { error = GBS_global_string("Unknown command '%s'", command); }
567
568                if (!error) {
569                    if (kill) {
570                        pt_assert(running);
571                        fputs("There is another active server. Sending shutdown message..\n", stderr);
572                        if (aisc_nput(psg.link, PT_MAIN, psg.main, MAIN_SHUTDOWN, "47@#34543df43%&3667gh", NULp)) {
573                            fprintf(stderr,
574                                    "%s: Warning: Problem connecting to the running %s\n"
575                                    "             You might need to kill it manually to ensure proper operation\n",
576                                    exename, exename);
577                        }
578                        aisc_close(psg.link, psg.main);
579                        psg.link = NULp;
580                    }
581
582                    if (start) {
583                        error = start_pt_server(socket_name, params->db_server, pt_name, exename); // @@@ does not always return - fix! (see also server_shutdown())
584                    }
585                }
586            }
587        }
588
589        free(pt_name);
590    }
591
592    if (error) msg = ARB_strdup(error.preserve());
593    if (msg) {
594        puts(msg);                      // log to console ..
595        GBS_add_ptserver_logentry(msg); // .. and logfile
596
597        char     *quotedErrorMsg = GBK_singlequote(GBS_global_string("arb_pt_server: %s", msg));
598        GB_ERROR  msgerror       = GBK_system(GBS_global_string("arb_message %s &", quotedErrorMsg));    // send async to avoid deadlock
599        if (msgerror) fprintf(stderr, "Error: %s\n", msgerror);
600        free(quotedErrorMsg);
601        free(msg);
602    }
603
604    return error;
605}
606
607int ARB_main(int argc, char *argv[]) {
608    time(&startTime);
609
610    int         exitcode = EXIT_SUCCESS;
611    arb_params *params   = arb_trace_argv(&argc, (const char **)argv);
612    const char *exename  = argv[0];
613
614    PT_init_psg();
615    GB_install_pid(0);          // not arb_clean able
616
617    extern int aisc_core_on_error;
618    aisc_core_on_error = 0;
619
620    // parse arguments
621    char *command = NULp;
622    {
623        if ((argc>2)                         ||
624            ((argc<2) && !params->db_server) ||
625            (argc >= 2 && strcmp(argv[1], "--help") == 0))
626        {
627            fprintf(stderr, "Syntax: %s [-look/-build/-kill/-QUERY/-boot] -Dfile.arb -TSocketid\n", exename);
628            exitcode = EXIT_FAILURE;
629        }
630        else {
631            if (argc==2) command = ARB_strdup(argv[1]);
632            else command         = ARB_strdup("-boot");
633        }
634    }
635
636    if (!command) {
637        fprintf(stderr, "%s: Error: No command specified on command line", exename);
638    }
639    else {
640        aisc_main = create_PT_main();
641
642        GB_ERROR error = run_command(exename, command, params).deliver();
643        if (error) {
644            fprintf(stderr, "%s: Error: %s\n", exename, error);
645            exitcode = EXIT_FAILURE;
646        }
647        else {
648            time_t now; time(&now);
649            fflush_all();
650            printf("[ptserver '%s' took %s]\n", command, GBS_readable_timediff(difftime(now, startTime)));
651            fflush(stdout);
652        }
653        free(command);
654    }
655
656    free_arb_params(params);
657    PT_exit();
658    fflush_all();
659    return exitcode;
660}
Note: See TracBrowser for help on using the repository browser.