source: trunk/GDE/SINA/builddir/src/query_arb.cpp

Last change on this file was 19806, checked in by westram, 2 weeks ago
File size: 31.2 KB
Line 
1/*
2Copyright (c) 2006-2018 Elmar Pruesse <elmar.pruesse@ucdenver.edu>
3
4This file is part of SINA.
5SINA is free software: you can redistribute it and/or modify it under
6the terms of the GNU General Public License as published by the Free
7Software Foundation, either version 3 of the License, or (at your
8option) any later version.
9
10SINA is distributed in the hope that it will be useful, but WITHOUT ANY
11WARRANTY; without even the implied warranty of MERCHANTABILITY or
12FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13for more details.
14
15You should have received a copy of the GNU General Public License
16along with SINA.  If not, see <http://www.gnu.org/licenses/>.
17
18Additional permission under GNU GPL version 3 section 7
19
20If you modify SINA, or any covered work, by linking or combining it
21with components of ARB (or a modified version of that software),
22containing parts covered by the terms of the
23ARB-public-library-license, the licensors of SINA grant you additional
24permission to convey the resulting work. Corresponding Source for a
25non-source form of such a combination shall include the source code
26for the parts of ARB used as well as that of the covered work.
27*/
28
29/* this file contains the code that communicates with arb
30 * => get data from db, query pt-server */
31
32#include "config.h"
33
34#include "query_arb.h"
35#include "timer.h"
36#include "alignment_stats.h"
37#include "log.h"
38#include "progress.h"
39
40#include <iostream>
41using std::uppercase;
42using std::hex;
43
44#include <string>
45using std::string;
46
47#include <vector>
48using std::vector;
49
50#include <list>
51using std::list;
52
53#include <fstream>
54using std::ifstream;
55using std::ofstream;
56
57#include <cmath>
58#include <exception>
59#include <mutex>
60
61#include <sstream>
62using std::stringstream;
63
64#include <map>
65using std::map;
66using std::pair;
67
68#include <unordered_map>
69#include <tbb/concurrent_unordered_map.h>
70
71#include <cstdio>
72
73#ifdef HAVE_TBB
74#  include "tbb/pipeline.h"
75#endif
76
77// ARB needs either DEBUG or NDEBUG defined
78#ifdef DEBUG
79#  define typeof __typeof__
80#else
81# ifndef NDEBUG
82#  define NDEBUG 1
83# endif
84#endif
85
86#include <arbdbt.h>
87#include <BI_helix.hxx>
88#include <arb_handlers.h>
89
90#ifndef HAVE_GBT_FIND_SEQUENCE
91inline GBDATA* GBT_find_sequence(GBDATA* gbd, const char* ali) {
92    return GBT_read_sequence(gbd, ali);
93}
94#endif
95
96#include <boost/functional/hash/hash.hpp>
97
98#include <boost/archive/binary_iarchive.hpp>
99#include <boost/archive/binary_oarchive.hpp>
100#include <boost/filesystem.hpp>
101
102namespace fs = boost::filesystem;
103
104using namespace sina;
105
106// const fieldnames for arb export
107// (fields get exported for aligned sequences only; use set_attr() to prepare field-data for export)
108const char* query_arb::fn_turn       = "turn";
109const char* query_arb::fn_acc        = "acc";
110const char* query_arb::fn_start      = "start";
111const char* query_arb::fn_used_rels  = "used_rels";
112const char* query_arb::fn_fullname   = "full_name";
113const char* query_arb::fn_nuc        = "nuc";
114
115const char* query_arb::fn_qual       = "align_quality_slv";
116const char* query_arb::fn_head       = "align_cutoff_head_slv";
117const char* query_arb::fn_tail       = "align_cutoff_tail_slv";
118const char* query_arb::fn_date       = "aligned_slv";
119const char* query_arb::fn_astart     = "align_startpos_slv";
120const char* query_arb::fn_astop      = "align_stoppos_slv";
121const char* query_arb::fn_idty       = "align_ident_slv";
122const char* query_arb::fn_nuc_gene   = "nuc_gene_slv";
123const char* query_arb::fn_bpscore    = "align_bp_score_slv";
124const char* query_arb::fn_family     = "align_family_slv";
125const char* query_arb::fn_align_log  = "align_log_slv";
126const char* query_arb::fn_filter     = "align_filter_slv";
127const char* query_arb::fn_nearest    = "nearest_slv";
128
129// Global lock -- ARB database access is not thread safe! Not even between
130//                open datases.
131static std::mutex arb_db_access;
132
133// List of opened ARB databases
134map<fs::path, query_arb*> query_arb::open_arb_dbs;
135
136static auto arb_logger = Log::create_logger("libARBDB");
137static auto logger = Log::create_logger("ARB I/O");
138
139template<typename... Args>
140query_arb_exception make_exception(const char *fmt, const Args&... args) {
141    return query_arb_exception(fmt::format(fmt, args...));
142}
143
144
145/////////////////////   Hidden Implementation Class ///////////////
146
147struct query_arb::priv_data {
148    ~priv_data() {
149        if (default_alignment != nullptr) {
150            free(const_cast<char*>(default_alignment));
151        }
152        if (gbmain != nullptr) {
153            logger->warn("Closing ARB database '{}' ...", filename);
154            GB_close(gbmain);
155        }
156    }
157
158    static GB_shell* the_arb_shell;
159    static string use_default_alignment;
160
161    using sequence_cache_type = tbb::concurrent_unordered_map<std::string, cseq>;
162    using gbdata_cache_type = std::unordered_map<string, GBDATA*, boost::hash<string>>;
163    using error_list_type = list<std::string>;
164
165    sequence_cache_type sequence_cache;
166    gbdata_cache_type gbdata_cache;
167    error_list_type write_errors;
168    const char* default_alignment{nullptr};
169    int alignment_length{0};
170    fs::path filename;
171    GBDATA *gbmain{nullptr}, *gblast{nullptr}, *gbspec{nullptr};
172    int count{0};
173
174    void write(GBDATA* pData, const string& str);
175    GBDATA* getGBDATA(const string& name, bool create = false);
176    const char* getSequence(GBDATA*);
177    void putSequence(const cseq& seq);
178    void get_weights_nolock(vector<float>& res,
179                            const char *name, const char *ali);
180};
181
182GB_shell *query_arb::priv_data::the_arb_shell = nullptr;
183string query_arb::priv_data::use_default_alignment;
184
185inline void log_GB_write_string_failure(const string& str, GB_ERROR err) {
186    string    shortened;
187    const int MAXSHOWN = 50; // truncate overlong values (e.g. sequence) to 50 characters
188    if (str.length()>MAXSHOWN) {
189        shortened = str.substr(0, MAXSHOWN-5)+"[...]";
190    }
191    logger->error("GB_write_string(value = {}) failed. Reason: {}",
192                  shortened.empty() ? str : shortened, err);
193}
194
195void
196query_arb::priv_data::write(GBDATA *pData, const string& str) {
197    if (pData == nullptr) {
198        throw make_exception("GB_write_string pData is null");
199    }
200
201    GB_ERROR err = GB_write_string(pData, str.c_str());
202    if (err != nullptr) {
203        log_GB_write_string_failure(str, err);
204    }
205}
206
207
208GBDATA*
209query_arb::priv_data::getGBDATA(const string& name, bool create) {
210    auto it = gbdata_cache.find(name);
211    if (it != gbdata_cache.end()) {
212        return it->second;
213    }
214    GBDATA* gbd = GBT_find_species(gbmain, name.c_str());
215    if (gbd == nullptr) {
216        if (!create) {
217            throw make_exception("No sequence \"{}\" in {}",
218                                 name, filename.filename());
219        }
220        logger->info("Creating new sequence in {}", filename.filename());
221        gbd = GB_create_container(gbspec, "species");
222        GBDATA *gbname = GB_create(gbd, "name", GB_STRING);
223        ++count;
224        if (name.empty()) {
225            write(gbname, fmt::format("sina_{}", count));
226        } else {
227            write(gbname, name);
228        }
229    }
230    gbdata_cache[name] =  gbd;
231
232    return gbd;
233}
234
235const char*
236query_arb::priv_data::getSequence(GBDATA* gbdata) {
237    gbdata = GBT_find_sequence(gbdata, default_alignment);
238    if (gbdata != nullptr) {
239        const char *res = GB_read_char_pntr(gbdata);
240        if (res != nullptr) {
241            return res;
242        }
243    }
244    return nullptr;
245}
246
247void
248query_arb::priv_data::putSequence(const cseq& seq) {
249    string aseq = seq.getAligned();
250
251    std::lock_guard<std::mutex> lock(arb_db_access);
252    GB_transaction trans(gbmain);
253    GBDATA *gbdata = getGBDATA(seq.getName(), true);
254    GBDATA *gbseq = GBT_find_sequence(gbdata, default_alignment);
255    if (gbseq == nullptr) {
256        gbseq = GBT_create_sequence_data(gbdata, default_alignment, "data", GB_STRING, 0);
257        GBDATA *gbacc = GB_create(gbdata, "acc", GB_STRING);
258        string acc = fmt::format(
259            "ARB_{:X}", GB_checksum(aseq.c_str(), aseq.size(), 1, ".-")
260            );
261        write(gbacc, acc);
262    }
263    write(gbseq, aseq);
264}
265
266/////////////////////  Reading / Writing attributes  ///////////////
267
268/* Loads a metadata attribute from ARB into cseq
269 */
270static void loadKey(cseq& c, const string& key, GBDATA* gbspec, bool warnMissingField = true) {
271    GBDATA *gbd = GB_find(gbspec, key.c_str(), SEARCH_CHILD);
272    if (gbd != nullptr) {
273        int type = GB_read_type(gbd);
274        switch(type) {
275            case GB_STRING:
276                c.set_attr(key, (const char*)GB_read_pntr(gbd));
277                return;
278            case GB_INT:
279                c.set_attr(key, (int)GB_read_int(gbd));
280                return;
281            case GB_FLOAT:
282                c.set_attr(key, (float)GB_read_float(gbd));
283                return;
284            case GB_BYTE:
285            case GB_BITS:
286            default:
287                logger->error("loadKey '{}' from '{}' failed: type {} unsupported", key, c.getName(), type);
288                return;
289        }
290    } else if (warnMissingField) {
291        logger->error("loadKey '{}' from '{}' failed: not found", key, c.getName());
292    }
293}
294
295
296struct storeKey_visitor : public boost::static_visitor<> {
297    storeKey_visitor(GBDATA* gbmain, GBDATA* gbspec, const string& key)
298        : _gbmain(gbmain), _gbspec(gbspec), _key(key)
299    {}
300
301    GBDATA* _gbmain;
302    GBDATA* _gbspec;
303    const string& _key;
304
305    void operator()(const int& i) {
306        GBT_add_new_species_changekey(_gbmain, _key.c_str(), GB_INT);
307        GBDATA *gbd = GB_entry(_gbspec, _key.c_str());
308        if ((gbd != nullptr) && GB_read_type(gbd) != GB_INT) {
309            GB_delete(gbd);
310            gbd = nullptr;
311        }
312
313        if (gbd == nullptr) {
314            gbd = GB_create(_gbspec, _key.c_str(), GB_INT);
315        }
316
317        GB_ERROR err = GB_write_int(gbd, i);
318        if (err != nullptr) {
319            logger->error("GB_write_int(,{}) failed. Reason: {}",
320                          i, err);
321        }
322    }
323    void operator()(const unsigned int& ui) {
324        operator()((int)ui);
325    }
326    void operator()(const bool& b) {
327        operator()((int)b);
328    }
329    void operator()(const float& f) {
330        GBT_add_new_species_changekey(_gbmain, _key.c_str(), GB_FLOAT);
331        GBDATA *gbd = GB_entry(_gbspec, _key.c_str());
332        if ((gbd != nullptr) && GB_read_type(gbd) != GB_FLOAT) {
333            GB_delete(gbd);
334            gbd = nullptr;
335        }
336        if (gbd == nullptr) {
337            gbd = GB_create(_gbspec, _key.c_str(), GB_FLOAT);
338        }
339        GB_ERROR err = GB_write_float(gbd, f);
340        if (err != nullptr) {
341            logger->error("GB_write_float(,{}) failed. Reason: {}",
342                          f, err);
343        }
344    }
345    void operator()(const string& str) {
346        GBT_add_new_species_changekey(_gbmain, _key.c_str(), GB_STRING);
347        GBDATA *gbd = GB_entry(_gbspec, _key.c_str());
348        if ((gbd != nullptr) && GB_read_type(gbd) != GB_STRING) {
349            GB_delete(gbd);
350            gbd = nullptr;
351        }
352        if (gbd == nullptr) {
353            gbd = GB_create(_gbspec, _key.c_str(), GB_STRING);
354        }
355        GB_ERROR err = GB_write_string(gbd, str.c_str());
356
357        if (err != nullptr) {
358            log_GB_write_string_failure(str, err);
359        }
360    }
361
362    void operator()(const vector<cseq>& /*vc*/) {
363    }
364};
365
366
367/////////////////////   Outside Class   ///////////////
368
369void
370query_arb::storeKey(cseq& c, const std::string& key) {
371    // Note: only used in tests (keys are written to ARB database via putCseq)
372
373    std::lock_guard<std::mutex> lock(arb_db_access);
374    GB_transaction trans(data->gbmain);
375    GBDATA *gbspec = data->getGBDATA(c.getName());
376    storeKey_visitor vis(data->gbmain, gbspec, key);
377
378    cseq::variant V = c.get_attr<cseq::variant>(key);
379    boost::apply_visitor(vis, V);
380}
381
382void
383query_arb::loadKey(const cseq& c, const string& key, bool reload, bool warnMissingField) {
384    if (!reload && c.has_attr(key)) return;
385    std::lock_guard<std::mutex> lock(arb_db_access);
386    GB_transaction trans(data->gbmain);
387    // FIXME: should we check if the cseq is actually ours?
388    ::loadKey(const_cast<cseq&>(c), key, data->getGBDATA(c.getName()), warnMissingField);
389}
390
391vector<std::tuple<string, string, int>>
392query_arb::listKeys() {
393    vector<std::tuple<string, string, int>> res;
394    GB_transaction trans(data->gbmain);
395    GBDATA* gb_keys = GB_search(data->gbmain, CHANGE_KEY_PATH, GB_CREATE_CONTAINER);
396    if (gb_keys) {
397        for (GBDATA* gbd = GB_entry(gb_keys, CHANGEKEY);
398             gbd; gbd = GB_nextEntry(gbd)) {
399            const char* name = GBT_read_char_pntr(gbd, CHANGEKEY_NAME);
400            long int type =  *GBT_read_int(gbd, CHANGEKEY_TYPE);
401            const char* type_str;
402            switch(type) {
403                case GB_NONE:        type_str="None"; break;
404                case GB_BIT:         type_str="bit"; break;
405                case GB_BYTE:        type_str="byte"; break;
406                case GB_INT:         type_str="int"; break;
407                case GB_FLOAT:       type_str="float"; break;
408                case GB_POINTER:     type_str="pointer"; break;
409                case GB_BITS:        type_str="bit-array"; break;
410                case GB_BYTES:       type_str="byte-array"; break;
411                case GB_INTS:        type_str="int-array"; break;
412                case GB_FLOATS:      type_str="float-array"; break;
413                case GB_STRING:      type_str="string"; break;
414                case GB_STRING_SHRT: type_str="short-string"; break;
415                case GB_DB:          type_str="container"; break;
416                default:
417                    type_str = "unknown";
418            }
419            res.push_back(std::make_tuple(name, type_str, type));
420        }
421    }
422    return res;
423}
424
425
426query_arb::query_arb(const fs::path& arbfile)
427    : data(new priv_data()) {
428    data->filename = arbfile;
429    if (arbfile.empty()) {
430        throw make_exception("Empty ARB database name?!");
431    }
432
433    data->gbmain = GB_open(arbfile.c_str(), "rwc");
434    if (data->gbmain == nullptr) {
435        throw make_exception("Unable to open ARB database {}.", arbfile);
436    }
437
438    setProtectionLevel(6); // drop privileges
439
440    GB_transaction trans(data->gbmain);
441
442    GBDATA *gbd;
443    if ((gbd = GB_entry(data->gbmain, "ptserver")) != nullptr &&
444        (gbd = GB_entry(gbd, "dbstate")) != nullptr &&
445        GB_read_int(gbd) > 0) {
446        throw make_exception(
447            "{} has been compressed for use by the ARB PT server"
448            "and cannot be accessed by SINA.",
449            arbfile.filename()
450            );
451    }
452
453    if (!priv_data::use_default_alignment.empty()) {
454        data->default_alignment = ARB_strdup(priv_data::use_default_alignment.c_str());
455    }
456    else {
457        data->default_alignment = GBT_get_default_alignment(data->gbmain);
458    }
459
460    if (data->default_alignment == nullptr) {
461        GBT_create_new_alignment(data->gbmain, "ali_16s", 2000, 0, 4, "rna", "created by sina (found no default alignment)");
462        GBT_set_default_alignment(data->gbmain, "ali_16s");
463        data->default_alignment=strdup("ali_16s");
464        logger->warn("Created new alignment ali_16s in '{}'", data->filename);
465    }
466
467    data->alignment_length = GBT_get_alignment_len(data->gbmain,
468                                                   data->default_alignment);
469    if (data->alignment_length <= 0) {
470        // This should not actually be possible. LCOV_EXCL_START
471        string error = GB_await_error();
472
473        throw make_exception(
474            "Invalid default alignment in {}: {}",
475            data->filename, error);
476        // LCOV_EXCL_STOP
477    }
478
479    data->gbspec = GB_search(data->gbmain, "species_data", GB_CREATE_CONTAINER);
480
481    int spec_count = 0;
482    for ( GBDATA *gbspec = GBT_first_species(data->gbmain);
483          gbspec != nullptr; gbspec = GBT_next_species(gbspec)) {
484        spec_count++;
485    }
486    data->count = spec_count;
487
488    logger->info("Loading names map... (for {})", data->filename);
489
490    logger_progress p(logger, "Scanning", spec_count);
491    for ( GBDATA* gbspec = GBT_first_species(data->gbmain);
492          gbspec != nullptr; gbspec = GBT_next_species(gbspec)) {
493        data->gbdata_cache[GBT_read_name(gbspec)] = gbspec;
494        ++p;
495    }
496}
497
498query_arb::~query_arb() {
499}
500
501void
502query_arb::setProtectionLevel(int p) {
503    GB_transaction trans(data->gbmain);
504    GB_change_my_security(data->gbmain, p);
505}
506
507
508
509/////////////////////   ARB logging callbacks  ///////////////
510
511static void log_arb_err(const char *msg) {
512    arb_logger->error(msg);
513};
514static void log_arb_warn(const char *msg) {
515    arb_logger->warn(msg);
516};
517static void log_arb_info(const char *msg) {
518    arb_logger->info(msg);
519};
520
521logger_progress *arb_progress{nullptr};
522static void arb_openstatus(const char* title) {
523    if (arb_progress)
524        delete arb_progress;
525    arb_progress = new logger_progress(logger, title, 100);
526}
527static void arb_closestatus() {
528    delete arb_progress;
529    arb_progress = nullptr;
530}
531
532static ARB_STATUS_RETURN_TYPE arb_set_title(const char* title) {
533    arb_logger->info("Progress title: {}", title);
534    return ARB_STATUS_RETURN_VALUE;
535}
536static ARB_STATUS_RETURN_TYPE arb_set_subtitle(const char* title) {
537    arb_logger->info("Progress subttitle: {}", title);
538    return ARB_STATUS_RETURN_VALUE;
539}
540static ARB_STATUS_RETURN_TYPE arb_set_gauge(double gauge) {
541    if (arb_progress) {
542        auto cur = arb_progress->count();
543        auto set_to = arb_progress->size() * gauge;
544        arb_progress->update(set_to - cur);
545    }
546    return ARB_STATUS_RETURN_VALUE;
547}
548static bool arb_user_abort() {
549    return false;
550}
551
552static arb_status_implementation log_arb_status {
553    AST_RANDOM, arb_openstatus, arb_closestatus, arb_set_title,
554        arb_set_subtitle, arb_set_gauge, arb_user_abort
555};
556
557static arb_handlers arb_log_handlers = {
558    log_arb_err, log_arb_warn, log_arb_info, log_arb_status
559};
560
561
562/////////////////////   Static Instance Acessors  ///////////////
563
564query_arb*
565query_arb::getARBDB(const fs::path& file_name) {
566    std::lock_guard<std::mutex> lock(arb_db_access);
567    if (query_arb::priv_data::the_arb_shell == nullptr) {
568        query_arb::priv_data::the_arb_shell = new GB_shell();
569
570        ARB_install_handlers(arb_log_handlers);
571        ARB_redirect_handlers_to(stderr, stderr);
572    }
573    if (open_arb_dbs.empty()) {
574        atexit(query_arb::closeOpenARBDBs);
575    }
576    if (open_arb_dbs.count(file_name) == 0u) {
577        open_arb_dbs[file_name] = new query_arb(file_name);
578    }
579    return open_arb_dbs[file_name];
580}
581
582void
583query_arb::closeOpenARBDBs() {
584    // atexit registered method
585    std::lock_guard<std::mutex> lock(arb_db_access);
586    for (auto arb : open_arb_dbs) {
587        delete arb.second;
588    }
589    open_arb_dbs.clear();
590}
591
592void
593query_arb::save() {
594    saveAs(data->filename);
595}
596
597const fs::path&
598query_arb::getFileName() const {
599    return data->filename;
600}
601
602void
603query_arb::saveAs(const fs::path& fname, const char* type) {
604    logger->info("Saving database {}", fname);
605    {
606        GB_transaction trans(data->gbmain);
607        logger->info("Checking alignment...");
608        GB_ERROR err = GBT_check_data(data->gbmain, data->default_alignment);
609        if (err != nullptr) {
610            // LCOV_EXCL_START
611            logger->error("Error '{}' while checking ARB database alignment");
612            // LCOV_EXCL_STOP
613        }
614    }
615
616    if (GB_ERROR err = GB_save_as(data->gbmain, fname.c_str(), type)) {
617        logger->error("Error while trying to save {}", fname);
618        logger->error("  ARB said: \n{}\n", err);
619    }
620}
621
622
623void
624query_arb::loadCache(std::vector<std::string>& keys) {
625    GBDATA *gbspec;
626
627    const char *ali = data->default_alignment;
628
629    std::lock_guard<std::mutex> lock(arb_db_access);
630    GB_transaction trans(data->gbmain);
631
632    logger_progress p(logger, "Loading sequences", data->count);
633
634    // re-init sequence_cache with size data->count?
635
636#undef HAVE_TBB
637#ifndef HAVE_TBB // serial implementation
638    timer t;
639    for ( gbspec = GBT_first_species(data->gbmain);
640          gbspec;
641          gbspec = GBT_next_species(gbspec)) {
642        t.start();
643        const char* name_str = GBT_read_name(gbspec);
644        string name(name_str);
645        t.stop("name");
646        cseq sequence;
647        if (not data->sequence_cache.count(name)) {
648            GBDATA *gb_data = GBT_find_sequence(gbspec,ali);
649            if (not gb_data) continue;
650            const char* ptr =  GB_read_char_pntr(gb_data);
651            t.stop("arb load");
652            sequence = cseq(name.c_str(), ptr);
653            t.stop("cseq convert");
654            GB_flush_cache(gb_data);
655        } else {
656            sequence = data->sequence_cache[name];
657        }
658
659        for(vector<string>::iterator it = keys.begin();
660            it != keys.end(); ++it) {
661            if (not sequence.get_attrs().count(*it)) {
662                ::loadKey(sequence, *it, gbspec);
663            }
664        }
665
666
667        data->sequence_cache[sequence.getName()] = sequence;
668        ++p;
669    }
670    logger->info("Timings for Cache Load: {}", t);
671
672#else // HAVE_TBB -- parallel implementation
673
674    gbspec = GBT_first_species(data->gbmain);
675    if (gbspec == nullptr) {
676        logger->error("Failed to load sequences -- database empty?");
677        return;
678    }
679
680    timer arb, tkeys, all;
681    all.start();
682    int n=0;
683    tbb::parallel_pipeline(
684        /*max_number_of_live_token=*/ 1024,
685        tbb::make_filter<void, std::pair<const char*, const char*>>(
686            tbb::filter::serial,
687            [&](tbb::flow_control& fc) -> std::pair<const char*, const char*>
688            {
689                arb.start();
690                const char* name_str;
691                GBDATA *gb_sequence = nullptr;
692                while ((gbspec != nullptr) && (gb_sequence == nullptr)) {
693                    name_str = GBT_read_name(gbspec);
694                    gb_sequence = GBT_find_sequence(gbspec, ali);
695                    gbspec = GBT_next_species(gbspec);
696                }
697                arb.stop("arb find");
698                if ((gbspec == nullptr) && (gb_sequence == nullptr)) {
699                    fc.stop();
700                    return std::make_pair("","");
701                }
702                const char* seq_str = GB_read_char_pntr(gb_sequence);
703                arb.stop("arb load");
704                n++;
705                return std::make_pair(name_str, seq_str);
706            }) &
707        tbb::make_filter<std::pair<const char*, const char*>, cseq>(
708            tbb::filter::parallel,
709            [&](std::pair<const char*, const char*> p) -> cseq
710            {
711                return cseq(p.first, 0.f, p.second);
712            }) &
713        tbb::make_filter<cseq, void>(
714            tbb::filter::serial,
715            [&](cseq sequence) -> void
716            {
717                tkeys.start();
718                for(auto & key : keys) {
719                    if (sequence.get_attrs().count(key) == 0u) {
720                        ::loadKey(sequence, key, gbspec); //FIXME!!!
721                    }
722                }
723                data->sequence_cache[sequence.getName()] = sequence;
724                ++p;
725                tkeys.stop("keys");
726            })
727        );
728    all.stop("all");
729    logger->info("Timings for cache load: {} {} {}", all, tkeys, arb);
730#endif // have TBB
731
732    logger->info("Loaded {} sequences", data->sequence_cache.size());
733
734}
735
736vector<cseq*>
737query_arb::getCacheContents() {
738    vector<cseq*> tmp;
739    tmp.reserve(data->sequence_cache.size());
740    for (auto & it : data->sequence_cache) {
741        tmp.push_back(&it.second);
742    }
743    return tmp;
744}
745
746long
747query_arb::getAlignmentWidth() const {
748    return data->alignment_length;
749}
750
751void
752query_arb::override_default_alignment(const string& ali_name) {
753    query_arb::priv_data::use_default_alignment = ali_name;
754}
755
756vector<string>
757query_arb::getSequenceNames() {
758    vector<string> tmp;
759    tmp.reserve(data->gbdata_cache.size());
760    for (auto & it : data->gbdata_cache) {
761        tmp.push_back(it.first);
762    }
763    return tmp;
764}
765
766const cseq&
767query_arb::getCseq(const string& name) { //, bool nocache) {
768    // if not, check whether we already loaded it
769    auto it = data->sequence_cache.find(name);
770    if (it != data->sequence_cache.end()) {
771        return it->second;
772    }
773
774    // not checked, load cache and return item
775
776    auto res = data->sequence_cache.emplace(
777        name, std::move(getCseqUncached(name)));
778    return res.first->second;
779}
780
781cseq
782query_arb::getCseqUncached(const string& name) { //, bool nocache) {
783    std::lock_guard<std::mutex> lock(arb_db_access);
784    GB_transaction trans(data->gbmain);
785    GBDATA *spec = data->getGBDATA(name);
786    const char* seq = data->getSequence(spec);
787    if (seq == nullptr) {
788        throw make_exception("No alignment for sequence \"{}\" in {}",
789                             name, data->filename.filename());
790    }
791    cseq c = {name.c_str(), seq};
792    GB_flush_cache(spec);
793    return c;
794}
795
796int
797query_arb::getSeqCount() const {
798    return data->count;
799}
800
801void
802query_arb::putCseq(const cseq& seq) {
803    // writes sequence and all existing attributes to ARB database (see also set_attr)
804    data->putSequence(seq);
805
806    std::lock_guard<std::mutex> lock(arb_db_access);
807    GB_transaction trans(data->gbmain);
808    GBDATA* gbspec = data->getGBDATA(seq.getName());
809    for (auto& ap: seq.get_attrs()) {
810        storeKey_visitor vis(data->gbmain, gbspec, ap.first);
811        boost::apply_visitor(vis, ap.second);
812    }
813}
814
815
816void
817query_arb::copySequence(query_arb& other, const std::string& name, bool mark) {
818    std::lock_guard<std::mutex> lock(arb_db_access);
819
820    // lock underlying arb database
821    GB_transaction t1(data->gbmain);
822    GB_transaction t2(other.data->gbmain);
823
824    GBDATA *gbdest;
825
826    // don't copy if sequence with identical name exists
827    if ( (gbdest=GBT_find_species(data->gbmain, name.c_str())) != nullptr ) {
828        logger->error("Species \"{}\" already in target db. Not copying.", name);
829        if (mark) {
830            GB_write_flag(gbdest, 1);
831        }
832        return;
833    }
834
835    GBDATA *gbsource = other.data->getGBDATA(name);
836    gbdest = GB_create_container(data->gbspec, "species");
837    if ((gbsource != nullptr) && (gbdest != nullptr)) {
838        GB_copy_std(gbdest,gbsource);
839        data->gblast = gbdest;
840        if (mark) {
841            GB_write_flag(gbdest, 1);
842        }
843        return;
844    }
845    logger->error("Error while copying species \"{}\".", name);
846
847    data->gblast = nullptr;
848}
849
850void
851query_arb::setMark() {
852    std::lock_guard<std::mutex> lock(arb_db_access);
853    if (data->gblast != nullptr) {
854        GB_transaction trans(data->gbmain);
855        GB_write_flag(data->gblast, 1);
856    }
857}
858
859void
860query_arb::setMark(const std::string& name) {
861    std::lock_guard<std::mutex> lock(arb_db_access);
862    GB_transaction trans(data->gbmain);
863    GBDATA *gbdata = data->getGBDATA(name);
864    if (gbdata != nullptr) {
865        GB_write_flag(gbdata, 1);
866        data->gblast = gbdata;
867    } else {
868        logger->error("Failed to mark species {} - name not found", name);
869        data->gblast = nullptr;
870    }
871}
872
873
874/////// filter stuff
875
876string
877query_arb::getFilter(const string& name) {
878    std::lock_guard<std::mutex> lock(arb_db_access);
879    GB_transaction trans(data->gbmain);
880
881    // structure of most sai entries:
882    // name -> <name>
883    // <alignment> -> {   // (alignment name begins with 'ali'
884    //   data -> <chararray>
885    // }
886
887    // find SAI container named <name>
888    GBDATA *gbsai = GBT_find_SAI(data->gbmain, name.c_str());
889    if (gbsai == nullptr) {
890        return "";
891    }
892
893    // descend into alignment tag
894    gbsai = GB_find(gbsai, data->default_alignment, SEARCH_CHILD);
895    if (gbsai == nullptr) {
896        return "";
897    }
898
899    // find data entry
900    gbsai = GB_find(gbsai, "data", SEARCH_CHILD);
901    if (gbsai == nullptr) {
902        return "";
903    }
904
905    // read data entry and return as string
906    return string(GB_read_char_pntr(gbsai));
907}
908
909vector<alignment_stats>
910query_arb::getAlignmentStats() {
911    vector<alignment_stats> res;
912    vector<int> pairs = getPairs();
913   
914    // What ARB calls "Transitions" is really "Mutations"
915    // Actual transitions could be computed by subtracting transversions
916    const char *pvp_names[] =
917        { "NA", "NC", "NG", "NU", "TRANSITIONS", "TRANSVERSIONS", nullptr };
918    enum {
919        NA = 0, NC = 1, NG = 2, NU = 3, TRNS = 4, TRVRS = 5
920    };
921
922    std::lock_guard<std::mutex> lock(arb_db_access);
923    GB_transaction trans(data->gbmain);
924
925    for (GBDATA *gbsai = GBT_first_SAI(data->gbmain); gbsai != nullptr;
926         gbsai = GBT_next_SAI(gbsai)) {
927        // get sai data for current alignment
928        GBDATA *gbname = GB_find(gbsai, "name", SEARCH_CHILD);
929        if (gbname == nullptr) {
930            logger->error("SAI without name? Broken DB!");
931            continue;
932        }
933        string name = string(GB_read_char_pntr(gbname));
934
935        GBDATA *gbali = GB_find(gbsai, data->default_alignment, SEARCH_CHILD);
936        if (gbali == nullptr) {
937            continue; // no data for this alignment
938        }
939       
940       
941
942        // get contents of type field
943        GBDATA *gbtype = GB_find(gbali, "_TYPE", SEARCH_CHILD);
944        if (gbtype == nullptr) {
945            continue; // no _TYPE field
946        }
947        string type = string(GB_read_char_pntr(gbtype));
948
949        // check if this is a "positional variability by parsimony" sai
950        if (type.substr(0,4) != "PVP:") {
951            continue;
952        }
953       
954        // extract number of taxa from the type field
955        // (yeah, weird place, hope the value will keep being
956        // printed there...)
957        size_t ntaxa_end = type.rfind("ntaxa ") + 6;
958        int ntaxa = atoi(type.substr(ntaxa_end).c_str());
959       
960        // get frequencies container
961        GBDATA *gbfreq = GB_find(gbali, "FREQUENCIES", SEARCH_CHILD);
962        if (gbfreq == nullptr) {
963            logger->error("ERROR: SAI '{}' is of type PVP but lacks"
964                          "contained 'FREQUENCIES'. Your DB might be corrupted!",
965                          name);
966            continue;
967        }
968   
969        // load frequencies
970        unsigned int *pvp_data[6]; 
971        for ( int i=0; pvp_names[i] != nullptr; i++) {
972            GBDATA *gbdata = GB_find(gbfreq, pvp_names[i], SEARCH_CHILD);
973            if (gbdata == nullptr) {
974                logger->error("unable to find PVP data {}", pvp_names[i]);
975                continue;
976            }
977            pvp_data[i] = GB_read_ints(gbdata);
978        }
979
980        // make an alignment_stats from the frequencies
981        alignment_stats a(name,
982                          ntaxa, data->alignment_length,
983                          pvp_data[NA], pvp_data[NG], pvp_data[NC],
984                          pvp_data[NU], pvp_data[TRNS], pvp_data[TRVRS],
985                          pairs);
986        res.push_back(a);
987    }
988    return res;
989}
990
991vector<int>
992query_arb::getPairs() {
993    vector<int> pairs;
994    std::lock_guard<std::mutex> lock(arb_db_access);
995    GB_transaction trans(data->gbmain);
996
997    const char *ali = data->default_alignment;
998    BI_helix helix;
999
1000    pairs.resize(data->alignment_length);
1001
1002    const char* error = helix.init(data->gbmain, ali);
1003    if (error == nullptr) {
1004        for (int i=0; i<data->alignment_length; ++i) {
1005            pairs[i]=helix.entry(i).pair_pos;
1006        }
1007    } else {
1008        logger->error("No HELIX filter found in ARB file. "
1009                      "Disabling secondary structure features.");
1010        for (int i=0; i<data->alignment_length; ++i) {
1011            pairs[i]=0;
1012        }
1013    }
1014    return pairs;
1015}
1016
1017/*
1018  Local Variables:
1019  mode:c++
1020  c-file-style:"stroustrup"
1021  c-file-offsets:((innamespace . 0)(inline-open . 0)(case-label . +))
1022  indent-tabs-mode:nil
1023  fill-column:99
1024  End:
1025*/
1026// vim: filetype=cpp:expandtab:shiftwidth=4:tabstop=8:softtabstop=4:encoding=utf-8:textwidth=99 :
Note: See TracBrowser for help on using the repository browser.