source: tags/ms_r16q3/DBSERVER/db_server.cxx

Last change on this file was 15176, checked in by westram, 8 years ago
  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 17.5 KB
Line 
1// ============================================================= //
2//                                                               //
3//   File      : db_server.cxx                                   //
4//   Purpose   : CL ARB database server                          //
5//                                                               //
6//   Institute of Microbiology (Technical University Munich)     //
7//   http://www.arb-home.de/                                     //
8//                                                               //
9// ============================================================= //
10
11#include <arbdb.h>
12#include <ad_cb.h>
13#include <servercntrl.h>
14#include <ut_valgrinded.h>
15#include <arb_file.h>
16#include <arb_sleep.h>
17#include <arb_diff.h>
18
19#define TIMEOUT 1000*60*2       // save every 2 minutes
20#define LOOPS   30              // wait 30*TIMEOUT (1 hour) till shutdown
21
22inline GBDATA *dbserver_container(GBDATA *gb_main) {
23    return GB_search(gb_main, "/tmp/dbserver", GB_CREATE_CONTAINER);
24}
25inline GBDATA *dbserver_entry(GBDATA *gb_main, const char *entry) {
26    GBDATA *gb_entry = GB_searchOrCreate_string(dbserver_container(gb_main), entry, "<undefined>");
27#if defined(DEBUG)
28    if (gb_entry) {
29        GBDATA *gb_brother = GB_nextEntry(gb_entry);
30        arb_assert(!gb_brother);
31    }
32#endif
33    return gb_entry;
34}
35
36inline GBDATA *get_command_entry(GBDATA *gb_main) { return dbserver_entry(gb_main, "cmd"); }
37inline GBDATA *get_param_entry(GBDATA *gb_main) { return dbserver_entry(gb_main, "param"); }
38inline GBDATA *get_result_entry(GBDATA *gb_main) { return dbserver_entry(gb_main, "result"); }
39
40inline GB_ERROR init_data(GBDATA *gb_main) {
41    GB_ERROR error = NULL;
42
43    if (!get_command_entry(gb_main) || !get_param_entry(gb_main) || !get_result_entry(gb_main)) {
44        error = GB_await_error();
45    }
46   
47    return error;
48}
49
50inline bool served(GBDATA *gb_main) {
51    GB_begin_transaction(gb_main);
52    GB_commit_transaction(gb_main);
53    return GBCMS_accept_calls(gb_main, false);
54}
55
56static bool do_shutdown       = false;
57static bool command_triggered = false;
58
59static const char *savemode = "b";
60
61static void command_cb() {
62    command_triggered = true;
63}
64
65static void react_to_command(GBDATA *gb_main) {
66    static bool in_reaction = false;
67
68    if (!in_reaction) {
69        LocallyModify<bool> flag(in_reaction, true);
70
71        command_triggered = false;
72
73        GB_ERROR  error        = NULL;
74
75        GB_begin_transaction(gb_main);
76        GBDATA   *gb_cmd_entry = get_command_entry(gb_main);
77        GB_commit_transaction(gb_main);
78
79        char *command;
80        {
81            GB_transaction ta(gb_main);
82
83            command = GB_read_string(gb_cmd_entry);
84            if (command[0]) {
85                error             = GB_write_string(gb_cmd_entry, "");
86                if (!error) error = GB_write_string(get_result_entry(gb_main), "busy");
87            }
88        }
89
90        if (command[0]) {
91            if      (strcmp(command, "shutdown") == 0) do_shutdown = true;
92            else if (strcmp(command, "ping")     == 0) {} // ping is a noop
93            else if (strcmp(command, "save")     == 0) {
94                fprintf(stdout, "arb_db_server: save requested (savemode is \"%s\")\n", savemode);
95
96                char *param = NULL;
97                {
98                    GB_transaction  ta(gb_main);
99                    GBDATA         *gb_param = get_param_entry(gb_main);
100                    if (!gb_param) error     = GB_await_error();
101                    else {
102                        param             = GB_read_string(gb_param);
103                        if (!param) error = GB_await_error();
104                        else error        = GB_write_string(gb_param, "");
105                    }
106                }
107
108                if (!error) {
109                    if (param[0]) error = GB_save(gb_main, param, savemode); // @@@ support compression here?
110                    else error          = "No savename specified";
111                }
112
113                printf("arb_db_server: save returns '%s'\n", error);
114
115                free(param);
116            }
117            else {
118                error = GBS_global_string("Unknown command '%s'", command);
119            }
120        }
121        {
122            GB_transaction ta(gb_main);
123
124            GB_ERROR err2             = GB_write_string(get_result_entry(gb_main), error ? error : "ok");
125            if (!error && err2) error = GBS_global_string("could not write result (reason: %s)", err2);
126            if (error) fprintf(stderr, "arb_db_server: failed to react to command '%s' (reason: %s)\n", command, error);
127        }
128        free(command);
129    }
130}
131
132static GB_ERROR server_main_loop(GBDATA *gb_main) {
133    GB_ERROR error = NULL;
134    {
135        GB_transaction ta(gb_main);
136
137        GBDATA *cmd_entry = get_command_entry(gb_main);
138        error             = GB_add_callback(cmd_entry, GB_CB_CHANGED, makeDatabaseCallback(command_cb));
139    }
140
141    while (!error) {
142        served(gb_main);
143        if (command_triggered) {
144            react_to_command(gb_main);
145        }
146        if (do_shutdown) {
147            int clients;
148            do {
149                clients = GB_read_clients(gb_main);
150                if (clients>0) {
151                    fprintf(stdout, "arb_db_server: shutdown requested (waiting for %i clients)\n", clients);
152                    served(gb_main);
153                }
154            }
155            while (clients>0);
156            fputs("arb_db_server: all clients left, performing shutdown\n", stdout);
157            break;
158        }
159    }
160    return error;
161}
162
163static GB_ERROR check_socket_available(const arb_params& params) {
164    GBDATA *gb_extern = GB_open(params.tcp, "rwc");
165    if (gb_extern) {
166        GB_close(gb_extern);
167        return GBS_global_string("socket '%s' already in use", params.tcp);
168    }
169    GB_clear_error();
170    return NULL;
171}
172
173static GB_ERROR run_server(const arb_params& params) {
174    GB_shell shell;
175    GB_ERROR error = check_socket_available(params);
176    if (!error) {
177        printf("Loading '%s'...\n", params.default_file);
178        GBDATA *gb_main     = GB_open(params.default_file, "rw");
179        if (!gb_main) error = GBS_global_string("Can't open DB '%s' (reason: %s)", params.default_file, GB_await_error());
180        else {
181            error = GBCMS_open(params.tcp, TIMEOUT, gb_main);
182            if (error) error = GBS_global_string("Error starting server: %s", error);
183        }
184
185        if (!error) {
186            GB_transaction ta(gb_main);
187            error = init_data(gb_main);
188        }
189        if (!error) {
190            error = server_main_loop(gb_main);
191            fprintf(stderr, "server_main_loop returns with '%s'\n", error);
192        }
193
194        if (gb_main) {
195            GBCMS_shutdown(gb_main);
196            GB_close(gb_main);
197        }
198    }
199    return error;
200}
201
202static GB_ERROR run_command(const arb_params& params, const char *command) {
203    GB_ERROR  error     = NULL;
204    GB_shell  shell;
205    GBDATA   *gb_main   = GB_open(params.tcp, "rw");
206    if (!gb_main) error = GB_await_error();
207    else {
208        {
209            GB_transaction ta(gb_main);
210
211            GBDATA *gb_cmd_entry = get_command_entry(gb_main);
212            error                = gb_cmd_entry ? GB_write_string(gb_cmd_entry, command) : GB_await_error();
213
214            if (!error) {
215                if (strcmp(command, "save") == 0) {
216                    GBDATA *gb_param     = get_param_entry(gb_main);
217                    if (!gb_param) error = GB_await_error();
218                    else    error        = GB_write_string(gb_param, params.default_file); // send save-name
219                }
220            }
221        }
222
223        if (!error) {
224            GB_transaction ta(gb_main);
225
226            GBDATA *gb_result_entry     = get_result_entry(gb_main);
227            if (!gb_result_entry) error = GB_await_error();
228            else {
229                const char *result = GB_read_char_pntr(gb_result_entry);
230
231                if (strcmp(result, "ok") != 0) {
232                    error = GBS_global_string("Error from server: %s", result);
233                }
234            }
235        }
236
237        GB_close(gb_main);
238    }
239    return error;
240}
241
242static void show_help() {
243    fputs("arb_db_server 2.0 -- ARB-database server\n", stdout);
244    fputs("options:\n", stdout);
245    fputs("    -h         show this help\n", stdout);
246    fputs("    -A         use ASCII-DB-version\n", stdout);
247    fputs("    -Ccmd      execute command 'cmd' on running server\n", stdout);
248    fputs("               known command are:\n", stdout);
249    fputs("                   ping      test if server is up (crash or failure if not)\n", stdout);
250    fputs("                   save      save the database (use -d to change name)\n", stdout);
251    fputs("                   shutdown  shutdown running arb_db_server\n", stdout);
252    arb_print_server_params();
253}
254
255int ARB_main(int argc, char *argv[]) {
256    arb_params *params = arb_trace_argv(&argc, (const char **)argv);
257
258    bool        help  = false;
259    const char *cmd   = NULL;  // run server command
260
261    GB_ERROR error = NULL;
262    while (argc>1 && !error) {
263        const char *arg = argv[1];
264        if (arg[0] == '-') {
265            char sw_char = arg[1];
266            switch (sw_char) {
267                case 'h': help     = true; break;
268                case 'C': cmd      = arg+2; break;
269                case 'A': savemode = "a"; break;
270
271                default:
272                    error = GBS_global_string("Unknown switch '-%c'", sw_char);
273                    break;
274            }
275        }
276        else {
277            error = GBS_global_string("Unknown argument '%s'", arg);
278        }
279        argc--; argv++;
280    }
281
282    if (!error) {
283        if (help) show_help();
284        else if (cmd) error = run_command(*params, cmd);
285        else          error = run_server(*params);
286    }
287
288    free_arb_params(params);
289    if (error) {
290        fprintf(stderr, "Error in arb_db_server: %s\n", error);
291        return EXIT_FAILURE;
292    }
293    return EXIT_SUCCESS;
294}
295
296// --------------------------------------------------------------------------------
297
298#ifdef UNIT_TESTS
299#ifndef TEST_UNIT_H
300#include <test_unit.h>
301#endif
302
303#include <unistd.h>
304#include <sys/wait.h>
305
306inline GB_ERROR valgrinded_system(const char *cmdline) {
307    char *cmddup = ARB_strdup(cmdline);
308    make_valgrinded_call(cmddup);
309
310    GB_ERROR error = GBK_system(cmddup);
311    free(cmddup);
312    return error;
313}
314
315#define RUN_TOOL(cmdline)            valgrinded_system(cmdline)
316#define TEST_RUN_TOOL(cmdline)       TEST_EXPECT_NO_ERROR(RUN_TOOL(cmdline))
317#define TEST_RUN_TOOL_FAILS(cmdline) TEST_EXPECT_ERROR_CONTAINS(RUN_TOOL(cmdline), "System call failed")
318
319inline bool server_is_down(const char *tcp) {
320    char     *ping_cmd = ARB_strdup(GBS_global_string("arb_db_server -T%s -Cping", tcp));
321    GB_ERROR  error    = GBK_system(ping_cmd); // causes a crash in called command (as long as server is not up)
322    free(ping_cmd);
323    return error;
324}
325
326static int entry_changed_cb_called = 0;
327static void entry_changed_cb() {
328    entry_changed_cb_called++;
329}
330
331void TEST_SLOW_dbserver() {
332    // MISSING_TEST(TEST_dbserver);
333    TEST_RUN_TOOL("arb_db_server -h");
334    TEST_RUN_TOOL_FAILS("arb_db_server -X");
335    TEST_RUN_TOOL_FAILS("arb_db_server brzl");
336
337    char *sock = ARB_strdup(GB_path_in_ARBHOME("UNIT_TESTER/sockets/dbserver.socket"));
338    char *tcp  = GBS_global_string_copy(":%s", sock);
339    char *db   = ARB_strdup(GB_path_in_ARBHOME("UNIT_TESTER/run/TEST_loadsave.arb"));
340
341    char *shutdown_cmd = ARB_strdup(GBS_global_string("arb_db_server -T%s -Cshutdown", tcp));
342
343// #define DEBUG_SERVER // uncomment when debugging server manually
344
345#if !defined(DEBUG_SERVER)
346    TEST_EXPECT(server_is_down(tcp));
347#endif
348
349    pid_t child_pid = fork();
350    if (child_pid) { // parent ("the client")
351        bool down = true;
352        int max_wait = (60*1000)/25; // set timeout to ~60 seconds
353        while (down) {
354            GB_sleep(25, MS);
355            down = server_is_down(tcp);
356            TEST_EXPECT(max_wait-->0);
357        }
358        // ok - server is up
359
360        {
361            char *bad_cmd = ARB_strdup(GBS_global_string("arb_db_server -T%s -Cbad", tcp));
362            TEST_RUN_TOOL_FAILS(bad_cmd);
363            free(bad_cmd);
364        }
365
366        { // now directly connect to server
367            GB_shell  shell;
368            GB_ERROR  error      = NULL;
369            GBDATA   *gb_main1   = GB_open(tcp, "rw"); // first client
370            if (!gb_main1) error = GB_await_error();
371            else {
372                GBDATA *gb_main2     = GB_open(tcp, "rw"); // second client
373                if (!gb_main2) error = GB_await_error();
374                else {
375                    // test existing entries
376                    {
377                        GB_transaction  ta(gb_main1);
378                        GBDATA         *gb_ecoli = GB_search(gb_main1, "/extended_data/extended/ali_16s/data", GB_FIND);
379
380                        TEST_REJECT_NULL(gb_ecoli);
381
382                        const char *ecoli = GB_read_char_pntr(gb_ecoli);
383                        if (!ecoli) error = GB_await_error();
384                        else        TEST_EXPECT_EQUAL(GBS_checksum(ecoli, 0, NULL), 0x3558760cU);
385                    }
386                    {
387                        GB_transaction  ta(gb_main2);
388                        GBDATA         *gb_alitype = GB_search(gb_main2, "/presets/alignment/alignment_type", GB_FIND);
389
390                        TEST_REJECT_NULL(gb_alitype);
391
392                        const char *alitype = GB_read_char_pntr(gb_alitype);
393                        if (!alitype) error = GB_await_error();
394                        else        TEST_EXPECT_EQUAL(alitype, "rna");
395                    }
396
397                    // test value written in client1 is changed in client2
398                    const char *test_entry   = "/tmp/TEST_SLOW_dbserver";
399                    const char *test_content = "hello world!";
400
401                    GBDATA *gb_entry1;
402                    GBDATA *gb_entry2;
403
404                    if (!error) {
405                        GB_transaction ta(gb_main1);
406
407                        gb_entry1             = GB_search(gb_main1, test_entry, GB_STRING);
408                        if (!gb_entry1) error = GB_await_error();
409                        else error            = GB_write_string(gb_entry1, test_content);
410                    }
411
412                    if (!error) { // test value arrived in other client
413                        GB_transaction ta(gb_main2);
414
415                        gb_entry2             = GB_search(gb_main2, test_entry, GB_STRING);
416                        if (!gb_entry2) error = GB_await_error();
417                        else {
418                            const char *read_content = GB_read_char_pntr(gb_entry2);
419                            if (!read_content) {
420                                error = GB_await_error();
421                            }
422                            else {
423                                TEST_EXPECT_EQUAL(read_content, test_content);
424                            }
425                        }
426                    }
427
428                    // test change-callback gets triggered
429                    if (!error) {
430
431                        TEST_EXPECT_EQUAL(entry_changed_cb_called, 0);
432                        {
433                            GB_transaction ta(gb_entry1);
434                            error = GB_add_callback(gb_entry1, GB_CB_CHANGED, makeDatabaseCallback(entry_changed_cb));
435                        }
436                        TEST_EXPECT_EQUAL(entry_changed_cb_called, 0);
437                        {
438                            GB_transaction ta(gb_entry2);
439                            GB_touch(gb_entry2);
440                        }
441                        TEST_EXPECT_EQUAL(entry_changed_cb_called, 0); // client1 did not transact yet
442                        delete new GB_transaction(gb_main1);
443                        TEST_EXPECT_EQUAL(entry_changed_cb_called, 1);
444                    }
445
446                    GB_close(gb_main2);
447                }
448                GB_close(gb_main1);
449            }
450
451            TEST_EXPECT_NO_ERROR(error);
452        }
453
454        // test remote save
455        {
456            char *savename = ARB_strdup(GB_path_in_ARBHOME("UNIT_TESTER/run/TEST_arbdbserver_save.arb"));
457            char *expected = ARB_strdup(GB_path_in_ARBHOME("UNIT_TESTER/run/TEST_arbdbserver_save_expected.arb"));
458            char *save_cmd = ARB_strdup(GBS_global_string("arb_db_server -T%s -Csave -d%s", tcp, savename));
459            char *bad_savecmd = ARB_strdup(GBS_global_string("arb_db_server -T%s -Csave", tcp));
460
461            TEST_RUN_TOOL(save_cmd);
462            TEST_EXPECT(GB_is_regularfile(savename));
463
464// #define TEST_AUTO_UPDATE
465#if defined(TEST_AUTO_UPDATE)
466            TEST_COPY_FILE(savename, expected);
467#else // !defined(TEST_AUTO_UPDATE)
468            TEST_EXPECT_FILES_EQUAL(savename, expected);
469#endif
470            TEST_EXPECT_ZERO_OR_SHOW_ERRNO(GB_unlink(savename));
471
472            TEST_RUN_TOOL_FAILS(bad_savecmd);
473           
474            free(bad_savecmd);
475            free(save_cmd);
476            free(expected);
477            free(savename);
478        }
479
480        // stop the server
481        TEST_RUN_TOOL(shutdown_cmd);
482        while (child_pid != wait(NULL)) {} // wait for child to finish = wait for server to terminate
483    }
484    else { // child ("the server")
485#if !defined(DEBUG_SERVER)
486        GB_sleep(100, MS);
487        TEST_RUN_TOOL(GBS_global_string("arb_db_server -T%s -d%s -A", tcp, db)); // start the server (in ASCII-mode)
488#endif
489        exit(EXIT_SUCCESS);
490    }
491
492    TEST_EXPECT(server_is_down(tcp));
493   
494#define socket_disappeared(sock) (GB_time_of_file(sock) == 0)
495
496    TEST_EXPECT(socket_disappeared(sock));
497
498    free(shutdown_cmd);
499    free(db);
500    free(tcp);
501    free(sock);
502}
503
504#endif // UNIT_TESTS
505
506// --------------------------------------------------------------------------------
Note: See TracBrowser for help on using the repository browser.