source: trunk/GDE/SINA/builddir/src/query_arb.cpp

Last change on this file was 19432, checked in by westram, 16 months ago
  • GBT_get_alignment_len
    • now also reports error if alignment length is zero
      • this case often was unhandled and did easily lead to allocation problems.
    • catch error in case of zero alignment length ⇒ fixes alloc-size-larger-than-warning (in NT_count_different_chars).
    • check + fix callers.
File size: 30.9 KB
Line 
1/*
2Copyright (c) 2006-2018 Elmar Pruesse <elmar.pruesse@ucdenver.edu>
3
4This file is part of SINA.
5SINA is free software: you can redistribute it and/or modify it under
6the terms of the GNU General Public License as published by the Free
7Software Foundation, either version 3 of the License, or (at your
8option) any later version.
9
10SINA is distributed in the hope that it will be useful, but WITHOUT ANY
11WARRANTY; without even the implied warranty of MERCHANTABILITY or
12FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13for more details.
14
15You should have received a copy of the GNU General Public License
16along with SINA.  If not, see <http://www.gnu.org/licenses/>.
17
18Additional permission under GNU GPL version 3 section 7
19
20If you modify SINA, or any covered work, by linking or combining it
21with components of ARB (or a modified version of that software),
22containing parts covered by the terms of the
23ARB-public-library-license, the licensors of SINA grant you additional
24permission to convey the resulting work. Corresponding Source for a
25non-source form of such a combination shall include the source code
26for the parts of ARB used as well as that of the covered work.
27*/
28
29/* this file contains the code that communicates with arb
30 * => get data from db, query pt-server */
31
32#include "config.h"
33
34#include "query_arb.h"
35#include "timer.h"
36#include "alignment_stats.h"
37#include "log.h"
38#include "progress.h"
39
40#include <iostream>
41using std::uppercase;
42using std::hex;
43
44#include <string>
45using std::string;
46
47#include <vector>
48using std::vector;
49
50#include <list>
51using std::list;
52
53#include <fstream>
54using std::ifstream;
55using std::ofstream;
56
57#include <cmath>
58#include <exception>
59#include <mutex>
60
61#include <sstream>
62using std::stringstream;
63
64#include <map>
65using std::map;
66using std::pair;
67
68#include <unordered_map>
69#include <tbb/concurrent_unordered_map.h>
70
71#include <cstdio>
72
73#ifdef HAVE_TBB
74#  include "tbb/pipeline.h"
75#endif
76
77// ARB needs either DEBUG or NDEBUG defined
78#ifdef DEBUG
79#  define typeof __typeof__
80#else
81# ifndef NDEBUG
82#  define NDEBUG 1
83# endif
84#endif
85
86#include <arbdbt.h>
87#include <BI_helix.hxx>
88#include <arb_handlers.h>
89
90#ifndef HAVE_GBT_FIND_SEQUENCE
91inline GBDATA* GBT_find_sequence(GBDATA* gbd, const char* ali) {
92    return GBT_read_sequence(gbd, ali);
93}
94#endif
95
96#include <boost/functional/hash/hash.hpp>
97
98#include <boost/archive/binary_iarchive.hpp>
99#include <boost/archive/binary_oarchive.hpp>
100#include <boost/filesystem.hpp>
101
102namespace fs = boost::filesystem;
103
104using namespace sina;
105
106// const fieldnames for arb export
107const char* query_arb::fn_turn       = "turn";
108const char* query_arb::fn_acc        = "acc";
109const char* query_arb::fn_start      = "start";
110const char* query_arb::fn_used_rels  = "used_rels";
111const char* query_arb::fn_fullname   = "full_name";
112const char* query_arb::fn_nuc        = "nuc";
113
114const char* query_arb::fn_qual       = "align_quality_slv";
115const char* query_arb::fn_head       = "align_cutoff_head_slv";
116const char* query_arb::fn_tail       = "align_cutoff_tail_slv";
117const char* query_arb::fn_date       = "aligned_slv";
118const char* query_arb::fn_astart     = "align_startpos_slv";
119const char* query_arb::fn_astop      = "align_stoppos_slv";
120const char* query_arb::fn_idty       = "align_ident_slv";
121const char* query_arb::fn_nuc_gene   = "nuc_gene_slv";
122const char* query_arb::fn_bpscore    = "align_bp_score_slv";
123const char* query_arb::fn_family     = "align_family_slv";
124const char* query_arb::fn_align_log  = "align_log_slv";
125const char* query_arb::fn_filter     = "align_filter_slv";
126const char* query_arb::fn_nearest    = "nearest_slv";
127
128// Global lock -- ARB database access is not thread safe! Not even between
129//                open datases.
130static std::mutex arb_db_access;
131
132// List of opened ARB databases
133map<fs::path, query_arb*> query_arb::open_arb_dbs;
134
135static auto arb_logger = Log::create_logger("libARBDB");
136static auto logger = Log::create_logger("ARB I/O");
137
138template<typename... Args>
139query_arb_exception make_exception(const char *fmt, const Args&... args) {
140    return query_arb_exception(fmt::format(fmt, args...));
141}
142
143
144/////////////////////   Hidden Implementation Class ///////////////
145
146struct query_arb::priv_data {
147    ~priv_data() {
148        if (default_alignment != nullptr) {
149            free(const_cast<char*>(default_alignment));
150        }
151        if (gbmain != nullptr) {
152            logger->warn("Closing ARB database '{}' ...", filename);
153            GB_close(gbmain);
154        }
155    }
156
157    static GB_shell* the_arb_shell;
158    static string use_default_alignment;
159
160    using sequence_cache_type = tbb::concurrent_unordered_map<std::string, cseq>;
161    using gbdata_cache_type = std::unordered_map<string, GBDATA*, boost::hash<string>>;
162    using error_list_type = list<std::string>;
163
164    sequence_cache_type sequence_cache;
165    gbdata_cache_type gbdata_cache;
166    error_list_type write_errors;
167    const char* default_alignment{nullptr};
168    int alignment_length{0};
169    fs::path filename;
170    GBDATA *gbmain{nullptr}, *gblast{nullptr}, *gbspec{nullptr};
171    int count{0};
172
173    void write(GBDATA* pData, const string& str);
174    GBDATA* getGBDATA(const string& name, bool create = false);
175    const char* getSequence(GBDATA*);
176    void putSequence(const cseq& seq);
177    void get_weights_nolock(vector<float>& res,
178                            const char *name, const char *ali);
179};
180
181GB_shell *query_arb::priv_data::the_arb_shell = nullptr;
182string query_arb::priv_data::use_default_alignment;
183
184inline void log_GB_write_string_failure(const string& str, GB_ERROR err) {
185    string    shortened;
186    const int MAXSHOWN = 50; // truncate overlong values (e.g. sequence) to 50 characters
187    if (str.length()>MAXSHOWN) {
188        shortened = str.substr(0, MAXSHOWN-5)+"[...]";
189    }
190    logger->error("GB_write_string(value = {}) failed. Reason: {}",
191                  shortened.empty() ? str : shortened, err);
192}
193
194void
195query_arb::priv_data::write(GBDATA *pData, const string& str) {
196    if (pData == nullptr) {
197        throw make_exception("GB_write_string pData is null");
198    }
199
200    GB_ERROR err = GB_write_string(pData, str.c_str());
201    if (err != nullptr) {
202        log_GB_write_string_failure(str, err);
203    }
204}
205
206
207GBDATA*
208query_arb::priv_data::getGBDATA(const string& name, bool create) {
209    auto it = gbdata_cache.find(name);
210    if (it != gbdata_cache.end()) {
211        return it->second;
212    }
213    GBDATA* gbd = GBT_find_species(gbmain, name.c_str());
214    if (gbd == nullptr) {
215        if (!create) {
216            throw make_exception("No sequence \"{}\" in {}",
217                                 name, filename.filename());
218        }
219        logger->info("Creating new sequence in {}", filename.filename());
220        gbd = GB_create_container(gbspec, "species");
221        GBDATA *gbname = GB_create(gbd, "name", GB_STRING);
222        ++count;
223        if (name.empty()) {
224            write(gbname, fmt::format("sina_{}", count));
225        } else {
226            write(gbname, name);
227        }
228    }
229    gbdata_cache[name] =  gbd;
230
231    return gbd;
232}
233
234const char*
235query_arb::priv_data::getSequence(GBDATA* gbdata) {
236    gbdata = GBT_find_sequence(gbdata, default_alignment);
237    if (gbdata != nullptr) {
238        const char *res = GB_read_char_pntr(gbdata);
239        if (res != nullptr) {
240            return res;
241        }
242    }
243    return nullptr;
244}
245
246void
247query_arb::priv_data::putSequence(const cseq& seq) {
248    string aseq = seq.getAligned();
249
250    std::lock_guard<std::mutex> lock(arb_db_access);
251    GB_transaction trans(gbmain);
252    GBDATA *gbdata = getGBDATA(seq.getName(), true);
253    GBDATA *gbseq = GBT_find_sequence(gbdata, default_alignment);
254    if (gbseq == nullptr) {
255        gbseq = GBT_create_sequence_data(gbdata, default_alignment, "data", GB_STRING, 0);
256        GBDATA *gbacc = GB_create(gbdata, "acc", GB_STRING);
257        string acc = fmt::format(
258            "ARB_{:X}", GB_checksum(aseq.c_str(), aseq.size(), 1, ".-")
259            );
260        write(gbacc, acc);
261    }
262    write(gbseq, aseq);
263}
264
265/////////////////////  Reading / Writing attributes  ///////////////
266
267/* Loads a metadata attribute from ARB into cseq
268 */
269static void loadKey(cseq& c, const string& key, GBDATA* gbspec, bool warnMissingField = true) {
270    GBDATA *gbd = GB_find(gbspec, key.c_str(), SEARCH_CHILD);
271    if (gbd != nullptr) {
272        int type = GB_read_type(gbd);
273        switch(type) {
274            case GB_STRING:
275                c.set_attr(key, (const char*)GB_read_pntr(gbd));
276                return;
277            case GB_INT:
278                c.set_attr(key, (int)GB_read_int(gbd));
279                return;
280            case GB_FLOAT:
281                c.set_attr(key, (float)GB_read_float(gbd));
282                return;
283            case GB_BYTE:
284            case GB_BITS:
285            default:
286                logger->error("loadKey '{}' from '{}' failed: type {} unsupported", key, c.getName(), type);
287                return;
288        }
289    } else if (warnMissingField) {
290        logger->error("loadKey '{}' from '{}' failed: not found", key, c.getName());
291    }
292}
293
294
295struct storeKey_visitor : public boost::static_visitor<> {
296    storeKey_visitor(GBDATA* gbmain, GBDATA* gbspec, const string& key)
297        : _gbmain(gbmain), _gbspec(gbspec), _key(key)
298    {}
299
300    GBDATA* _gbmain;
301    GBDATA* _gbspec;
302    const string& _key;
303
304    void operator()(const int& i) {
305        GBT_add_new_species_changekey(_gbmain, _key.c_str(), GB_INT);
306        GBDATA *gbd = GB_entry(_gbspec, _key.c_str());
307        if ((gbd != nullptr) && GB_read_type(gbd) != GB_INT) {
308            GB_delete(gbd);
309            gbd = nullptr;
310        }
311
312        if (gbd == nullptr) {
313            gbd = GB_create(_gbspec, _key.c_str(), GB_INT);
314        }
315
316        GB_ERROR err = GB_write_int(gbd, i);
317        if (err != nullptr) {
318            logger->error("GB_write_int(,{}) failed. Reason: {}",
319                          i, err);
320        }
321    }
322    void operator()(const unsigned int& ui) {
323        operator()((int)ui);
324    }
325    void operator()(const bool& b) {
326        operator()((int)b);
327    }
328    void operator()(const float& f) {
329        GBT_add_new_species_changekey(_gbmain, _key.c_str(), GB_FLOAT);
330        GBDATA *gbd = GB_entry(_gbspec, _key.c_str());
331        if ((gbd != nullptr) && GB_read_type(gbd) != GB_FLOAT) {
332            GB_delete(gbd);
333            gbd = nullptr;
334        }
335        if (gbd == nullptr) {
336            gbd = GB_create(_gbspec, _key.c_str(), GB_FLOAT);
337        }
338        GB_ERROR err = GB_write_float(gbd, f);
339        if (err != nullptr) {
340            logger->error("GB_write_float(,{}) failed. Reason: {}",
341                          f, err);
342        }
343    }
344    void operator()(const string& str) {
345        GBT_add_new_species_changekey(_gbmain, _key.c_str(), GB_STRING);
346        GBDATA *gbd = GB_entry(_gbspec, _key.c_str());
347        if ((gbd != nullptr) && GB_read_type(gbd) != GB_STRING) {
348            GB_delete(gbd);
349            gbd = nullptr;
350        }
351        if (gbd == nullptr) {
352            gbd = GB_create(_gbspec, _key.c_str(), GB_STRING);
353        }
354        GB_ERROR err = GB_write_string(gbd, str.c_str());
355
356        if (err != nullptr) {
357            log_GB_write_string_failure(str, err);
358        }
359    }
360
361    void operator()(const vector<cseq>& /*vc*/) {
362    }
363};
364
365
366/////////////////////   Outside Class   ///////////////
367
368void
369query_arb::storeKey(cseq& c, const std::string& key) {
370    std::lock_guard<std::mutex> lock(arb_db_access);
371    GB_transaction trans(data->gbmain);
372    GBDATA *gbspec = data->getGBDATA(c.getName());
373    storeKey_visitor vis(data->gbmain, gbspec, key);
374
375    cseq::variant V = c.get_attr<cseq::variant>(key);
376    boost::apply_visitor(vis, V);
377}
378
379void
380query_arb::loadKey(const cseq& c, const string& key, bool reload, bool warnMissingField) {
381    if (!reload && c.has_attr(key)) return;
382    std::lock_guard<std::mutex> lock(arb_db_access);
383    GB_transaction trans(data->gbmain);
384    // FIXME: should we check if the cseq is actually ours?
385    ::loadKey(const_cast<cseq&>(c), key, data->getGBDATA(c.getName()), warnMissingField);
386}
387
388vector<std::tuple<string, string, int>>
389query_arb::listKeys() {
390    vector<std::tuple<string, string, int>> res;
391    GB_transaction trans(data->gbmain);
392    GBDATA* gb_keys = GB_search(data->gbmain, CHANGE_KEY_PATH, GB_CREATE_CONTAINER);
393    if (gb_keys) {
394        for (GBDATA* gbd = GB_entry(gb_keys, CHANGEKEY);
395             gbd; gbd = GB_nextEntry(gbd)) {
396            const char* name = GBT_read_char_pntr(gbd, CHANGEKEY_NAME);
397            long int type =  *GBT_read_int(gbd, CHANGEKEY_TYPE);
398            const char* type_str;
399            switch(type) {
400                case GB_NONE:        type_str="None"; break;
401                case GB_BIT:         type_str="bit"; break;
402                case GB_BYTE:        type_str="byte"; break;
403                case GB_INT:         type_str="int"; break;
404                case GB_FLOAT:       type_str="float"; break;
405                case GB_POINTER:     type_str="pointer"; break;
406                case GB_BITS:        type_str="bit-array"; break;
407                case GB_BYTES:       type_str="byte-array"; break;
408                case GB_INTS:        type_str="int-array"; break;
409                case GB_FLOATS:      type_str="float-array"; break;
410                case GB_STRING:      type_str="string"; break;
411                case GB_STRING_SHRT: type_str="short-string"; break;
412                case GB_DB:          type_str="container"; break;
413                default:
414                    type_str = "unknown";
415            }
416            res.push_back(std::make_tuple(name, type_str, type));
417        }
418    }
419    return res;
420}
421
422
423query_arb::query_arb(const fs::path& arbfile)
424    : data(new priv_data()) {
425    data->filename = arbfile;
426    if (arbfile.empty()) {
427        throw make_exception("Empty ARB database name?!");
428    }
429
430    data->gbmain = GB_open(arbfile.c_str(), "rwc");
431    if (data->gbmain == nullptr) {
432        throw make_exception("Unable to open ARB database {}.", arbfile);
433    }
434
435    setProtectionLevel(6); // drop privileges
436
437    GB_transaction trans(data->gbmain);
438
439    GBDATA *gbd;
440    if ((gbd = GB_entry(data->gbmain, "ptserver")) != nullptr &&
441        (gbd = GB_entry(gbd, "dbstate")) != nullptr &&
442        GB_read_int(gbd) > 0) {
443        throw make_exception(
444            "{} has been compressed for use by the ARB PT server"
445            "and cannot be accessed by SINA.",
446            arbfile.filename()
447            );
448    }
449
450    if (!priv_data::use_default_alignment.empty()) {
451        data->default_alignment = ARB_strdup(priv_data::use_default_alignment.c_str());
452    }
453    else {
454        data->default_alignment = GBT_get_default_alignment(data->gbmain);
455    }
456
457    if (data->default_alignment == nullptr) {
458        GBT_create_alignment(data->gbmain, "ali_16s", 2000, 0, 4, "rna", "created by sina (found no default alignment)");
459        GBT_set_default_alignment(data->gbmain, "ali_16s");
460        data->default_alignment=strdup("ali_16s");
461        logger->warn("Created new alignment ali_16s in '{}'", data->filename);
462    }
463
464    data->alignment_length = GBT_get_alignment_len(data->gbmain,
465                                                   data->default_alignment);
466    if (data->alignment_length <= 0) {
467        // This should not actually be possible. LCOV_EXCL_START
468        string error = GB_await_error();
469
470        throw make_exception(
471            "Invalid default alignment in {}: {}",
472            data->filename, error);
473        // LCOV_EXCL_STOP
474    }
475
476    data->gbspec = GB_search(data->gbmain, "species_data", GB_CREATE_CONTAINER);
477
478    int spec_count = 0;
479    for ( GBDATA *gbspec = GBT_first_species(data->gbmain);
480          gbspec != nullptr; gbspec = GBT_next_species(gbspec)) {
481        spec_count++;
482    }
483    data->count = spec_count;
484
485    logger->info("Loading names map... (for {})", data->filename);
486
487    logger_progress p(logger, "Scanning", spec_count);
488    for ( GBDATA* gbspec = GBT_first_species(data->gbmain);
489          gbspec != nullptr; gbspec = GBT_next_species(gbspec)) {
490        data->gbdata_cache[GBT_read_name(gbspec)] = gbspec;
491        ++p;
492    }
493}
494
495query_arb::~query_arb() {
496}
497
498void
499query_arb::setProtectionLevel(int p) {
500    GB_transaction trans(data->gbmain);
501    GB_change_my_security(data->gbmain, p);
502}
503
504
505
506/////////////////////   ARB logging callbacks  ///////////////
507
508static void log_arb_err(const char *msg) {
509    arb_logger->error(msg);
510};
511static void log_arb_warn(const char *msg) {
512    arb_logger->warn(msg);
513};
514static void log_arb_info(const char *msg) {
515    arb_logger->info(msg);
516};
517
518logger_progress *arb_progress{nullptr};
519static void arb_openstatus(const char* title) {
520    if (arb_progress)
521        delete arb_progress;
522    arb_progress = new logger_progress(logger, title, 100);
523}
524static void arb_closestatus() {
525    delete arb_progress;
526    arb_progress = nullptr;
527}
528
529static ARB_STATUS_RETURN_TYPE arb_set_title(const char* title) {
530    arb_logger->info("Progress title: {}", title);
531    return ARB_STATUS_RETURN_VALUE;
532}
533static ARB_STATUS_RETURN_TYPE arb_set_subtitle(const char* title) {
534    arb_logger->info("Progress subttitle: {}", title);
535    return ARB_STATUS_RETURN_VALUE;
536}
537static ARB_STATUS_RETURN_TYPE arb_set_gauge(double gauge) {
538    if (arb_progress) {
539        auto cur = arb_progress->count();
540        auto set_to = arb_progress->size() * gauge;
541        arb_progress->update(set_to - cur);
542    }
543    return ARB_STATUS_RETURN_VALUE;
544}
545static bool arb_user_abort() {
546    return false;
547}
548
549static arb_status_implementation log_arb_status {
550    AST_RANDOM, arb_openstatus, arb_closestatus, arb_set_title,
551        arb_set_subtitle, arb_set_gauge, arb_user_abort
552};
553
554static arb_handlers arb_log_handlers = {
555    log_arb_err, log_arb_warn, log_arb_info, log_arb_status
556};
557
558
559/////////////////////   Static Instance Acessors  ///////////////
560
561query_arb*
562query_arb::getARBDB(const fs::path& file_name) {
563    std::lock_guard<std::mutex> lock(arb_db_access);
564    if (query_arb::priv_data::the_arb_shell == nullptr) {
565        query_arb::priv_data::the_arb_shell = new GB_shell();
566
567        ARB_install_handlers(arb_log_handlers);
568        ARB_redirect_handlers_to(stderr, stderr);
569    }
570    if (open_arb_dbs.empty()) {
571        atexit(query_arb::closeOpenARBDBs);
572    }
573    if (open_arb_dbs.count(file_name) == 0u) {
574        open_arb_dbs[file_name] = new query_arb(file_name);
575    }
576    return open_arb_dbs[file_name];
577}
578
579void
580query_arb::closeOpenARBDBs() {
581    // atexit registered method
582    std::lock_guard<std::mutex> lock(arb_db_access);
583    for (auto arb : open_arb_dbs) {
584        delete arb.second;
585    }
586    open_arb_dbs.clear();
587}
588
589void
590query_arb::save() {
591    saveAs(data->filename);
592}
593
594const fs::path&
595query_arb::getFileName() const {
596    return data->filename;
597}
598
599void
600query_arb::saveAs(const fs::path& fname, const char* type) {
601    logger->info("Saving database {}", fname);
602    {
603        GB_transaction trans(data->gbmain);
604        logger->info("Checking alignment...");
605        GB_ERROR err = GBT_check_data(data->gbmain, data->default_alignment);
606        if (err != nullptr) {
607            // LCOV_EXCL_START
608            logger->error("Error '{}' while checking ARB database alignment");
609            // LCOV_EXCL_STOP
610        }
611    }
612
613    if (GB_ERROR err = GB_save_as(data->gbmain, fname.c_str(), type)) {
614        logger->error("Error while trying to save {}", fname);
615        logger->error("  ARB said: \n{}\n", err);
616    }
617}
618
619
620void
621query_arb::loadCache(std::vector<std::string>& keys) {
622    GBDATA *gbspec;
623
624    const char *ali = data->default_alignment;
625
626    std::lock_guard<std::mutex> lock(arb_db_access);
627    GB_transaction trans(data->gbmain);
628
629    logger_progress p(logger, "Loading sequences", data->count);
630
631    // re-init sequence_cache with size data->count?
632
633#undef HAVE_TBB
634#ifndef HAVE_TBB // serial implementation
635    timer t;
636    for ( gbspec = GBT_first_species(data->gbmain);
637          gbspec;
638          gbspec = GBT_next_species(gbspec)) {
639        t.start();
640        const char* name_str = GBT_read_name(gbspec);
641        string name(name_str);
642        t.stop("name");
643        cseq sequence;
644        if (not data->sequence_cache.count(name)) {
645            GBDATA *gb_data = GBT_find_sequence(gbspec,ali);
646            if (not gb_data) continue;
647            const char* ptr =  GB_read_char_pntr(gb_data);
648            t.stop("arb load");
649            sequence = cseq(name.c_str(), ptr);
650            t.stop("cseq convert");
651            GB_flush_cache(gb_data);
652        } else {
653            sequence = data->sequence_cache[name];
654        }
655
656        for(vector<string>::iterator it = keys.begin();
657            it != keys.end(); ++it) {
658            if (not sequence.get_attrs().count(*it)) {
659                ::loadKey(sequence, *it, gbspec);
660            }
661        }
662
663
664        data->sequence_cache[sequence.getName()] = sequence;
665        ++p;
666    }
667    logger->info("Timings for Cache Load: {}", t);
668
669#else // HAVE_TBB -- parallel implementation
670
671    gbspec = GBT_first_species(data->gbmain);
672    if (gbspec == nullptr) {
673        logger->error("Failed to load sequences -- database empty?");
674        return;
675    }
676
677    timer arb, tkeys, all;
678    all.start();
679    int n=0;
680    tbb::parallel_pipeline(
681        /*max_number_of_live_token=*/ 1024,
682        tbb::make_filter<void, std::pair<const char*, const char*>>(
683            tbb::filter::serial,
684            [&](tbb::flow_control& fc) -> std::pair<const char*, const char*>
685            {
686                arb.start();
687                const char* name_str;
688                GBDATA *gb_sequence = nullptr;
689                while ((gbspec != nullptr) && (gb_sequence == nullptr)) {
690                    name_str = GBT_read_name(gbspec);
691                    gb_sequence = GBT_find_sequence(gbspec, ali);
692                    gbspec = GBT_next_species(gbspec);
693                }
694                arb.stop("arb find");
695                if ((gbspec == nullptr) && (gb_sequence == nullptr)) {
696                    fc.stop();
697                    return std::make_pair("","");
698                }
699                const char* seq_str = GB_read_char_pntr(gb_sequence);
700                arb.stop("arb load");
701                n++;
702                return std::make_pair(name_str, seq_str);
703            }) &
704        tbb::make_filter<std::pair<const char*, const char*>, cseq>(
705            tbb::filter::parallel,
706            [&](std::pair<const char*, const char*> p) -> cseq
707            {
708                return cseq(p.first, 0.f, p.second);
709            }) &
710        tbb::make_filter<cseq, void>(
711            tbb::filter::serial,
712            [&](cseq sequence) -> void
713            {
714                tkeys.start();
715                for(auto & key : keys) {
716                    if (sequence.get_attrs().count(key) == 0u) {
717                        ::loadKey(sequence, key, gbspec); //FIXME!!!
718                    }
719                }
720                data->sequence_cache[sequence.getName()] = sequence;
721                ++p;
722                tkeys.stop("keys");
723            })
724        );
725    all.stop("all");
726    logger->info("Timings for cache load: {} {} {}", all, tkeys, arb);
727#endif // have TBB
728
729    logger->info("Loaded {} sequences", data->sequence_cache.size());
730
731}
732
733vector<cseq*>
734query_arb::getCacheContents() {
735    vector<cseq*> tmp;
736    tmp.reserve(data->sequence_cache.size());
737    for (auto & it : data->sequence_cache) {
738        tmp.push_back(&it.second);
739    }
740    return tmp;
741}
742
743long
744query_arb::getAlignmentWidth() const {
745    return data->alignment_length;
746}
747
748void
749query_arb::override_default_alignment(const string& ali_name) {
750    query_arb::priv_data::use_default_alignment = ali_name;
751}
752
753vector<string>
754query_arb::getSequenceNames() {
755    vector<string> tmp;
756    tmp.reserve(data->gbdata_cache.size());
757    for (auto & it : data->gbdata_cache) {
758        tmp.push_back(it.first);
759    }
760    return tmp;
761}
762
763const cseq&
764query_arb::getCseq(const string& name) { //, bool nocache) {
765    // if not, check whether we already loaded it
766    auto it = data->sequence_cache.find(name);
767    if (it != data->sequence_cache.end()) {
768        return it->second;
769    }
770
771    // not checked, load cache and return item
772
773    auto res = data->sequence_cache.emplace(
774        name, std::move(getCseqUncached(name)));
775    return res.first->second;
776}
777
778cseq
779query_arb::getCseqUncached(const string& name) { //, bool nocache) {
780    std::lock_guard<std::mutex> lock(arb_db_access);
781    GB_transaction trans(data->gbmain);
782    GBDATA *spec = data->getGBDATA(name);
783    const char* seq = data->getSequence(spec);
784    if (seq == nullptr) {
785        throw make_exception("No alignment for sequence \"{}\" in {}",
786                             name, data->filename.filename());
787    }
788    cseq c = {name.c_str(), seq};
789    GB_flush_cache(spec);
790    return c;
791}
792
793int
794query_arb::getSeqCount() const {
795    return data->count;
796}
797
798void
799query_arb::putCseq(const cseq& seq) {
800    data->putSequence(seq);
801
802    std::lock_guard<std::mutex> lock(arb_db_access);
803    GB_transaction trans(data->gbmain);
804    GBDATA* gbspec = data->getGBDATA(seq.getName());
805    for (auto& ap: seq.get_attrs()) {
806        storeKey_visitor vis(data->gbmain, gbspec, ap.first);
807        boost::apply_visitor(vis, ap.second);
808    }
809}
810
811
812void
813query_arb::copySequence(query_arb& other, const std::string& name, bool mark) {
814    std::lock_guard<std::mutex> lock(arb_db_access);
815
816    // lock underlying arb database
817    GB_transaction t1(data->gbmain);
818    GB_transaction t2(other.data->gbmain);
819
820    GBDATA *gbdest;
821
822    // don't copy if sequence with identical name exists
823    if ( (gbdest=GBT_find_species(data->gbmain, name.c_str())) != nullptr ) {
824        logger->error("Species \"{}\" already in target db. Not copying.", name);
825        if (mark) {
826            GB_write_flag(gbdest, 1);
827        }
828        return;
829    }
830
831    GBDATA *gbsource = other.data->getGBDATA(name);
832    gbdest = GB_create_container(data->gbspec, "species");
833    if ((gbsource != nullptr) && (gbdest != nullptr)) {
834        GB_copy_std(gbdest,gbsource);
835        data->gblast = gbdest;
836        if (mark) {
837            GB_write_flag(gbdest, 1);
838        }
839        return;
840    }
841    logger->error("Error while copying species \"{}\".", name);
842
843    data->gblast = nullptr;
844}
845
846void
847query_arb::setMark() {
848    std::lock_guard<std::mutex> lock(arb_db_access);
849    if (data->gblast != nullptr) {
850        GB_transaction trans(data->gbmain);
851        GB_write_flag(data->gblast, 1);
852    }
853}
854
855void
856query_arb::setMark(const std::string& name) {
857    std::lock_guard<std::mutex> lock(arb_db_access);
858    GB_transaction trans(data->gbmain);
859    GBDATA *gbdata = data->getGBDATA(name);
860    if (gbdata != nullptr) {
861        GB_write_flag(gbdata, 1);
862        data->gblast = gbdata;
863    } else {
864        logger->error("Failed to mark species {} - name not found", name);
865        data->gblast = nullptr;
866    }
867}
868
869
870/////// filter stuff
871
872string
873query_arb::getFilter(const string& name) {
874    std::lock_guard<std::mutex> lock(arb_db_access);
875    GB_transaction trans(data->gbmain);
876
877    // structure of most sai entries:
878    // name -> <name>
879    // <alignment> -> {   // (alignment name begins with 'ali'
880    //   data -> <chararray>
881    // }
882
883    // find SAI container named <name>
884    GBDATA *gbsai = GBT_find_SAI(data->gbmain, name.c_str());
885    if (gbsai == nullptr) {
886        return "";
887    }
888
889    // descend into alignment tag
890    gbsai = GB_find(gbsai, data->default_alignment, SEARCH_CHILD);
891    if (gbsai == nullptr) {
892        return "";
893    }
894
895    // find data entry
896    gbsai = GB_find(gbsai, "data", SEARCH_CHILD);
897    if (gbsai == nullptr) {
898        return "";
899    }
900
901    // read data entry and return as string
902    return string(GB_read_char_pntr(gbsai));
903}
904
905vector<alignment_stats>
906query_arb::getAlignmentStats() {
907    vector<alignment_stats> res;
908    vector<int> pairs = getPairs();
909   
910    // What ARB calls "Transitions" is really "Mutations"
911    // Actual transitions could be computed by subtracting transversions
912    const char *pvp_names[] =
913        { "NA", "NC", "NG", "NU", "TRANSITIONS", "TRANSVERSIONS", nullptr };
914    enum {
915        NA = 0, NC = 1, NG = 2, NU = 3, TRNS = 4, TRVRS = 5
916    };
917
918    std::lock_guard<std::mutex> lock(arb_db_access);
919    GB_transaction trans(data->gbmain);
920
921    for (GBDATA *gbsai = GBT_first_SAI(data->gbmain); gbsai != nullptr;
922         gbsai = GBT_next_SAI(gbsai)) {
923        // get sai data for current alignment
924        GBDATA *gbname = GB_find(gbsai, "name", SEARCH_CHILD);
925        if (gbname == nullptr) {
926            logger->error("SAI without name? Broken DB!");
927            continue;
928        }
929        string name = string(GB_read_char_pntr(gbname));
930
931        GBDATA *gbali = GB_find(gbsai, data->default_alignment, SEARCH_CHILD);
932        if (gbali == nullptr) {
933            continue; // no data for this alignment
934        }
935       
936       
937
938        // get contents of type field
939        GBDATA *gbtype = GB_find(gbali, "_TYPE", SEARCH_CHILD);
940        if (gbtype == nullptr) {
941            continue; // no _TYPE field
942        }
943        string type = string(GB_read_char_pntr(gbtype));
944
945        // check if this is a "positional variability by parsimony" sai
946        if (type.substr(0,4) != "PVP:") {
947            continue;
948        }
949       
950        // extract number of taxa from the type field
951        // (yeah, weird place, hope the value will keep being
952        // printed there...)
953        size_t ntaxa_end = type.rfind("ntaxa ") + 6;
954        int ntaxa = atoi(type.substr(ntaxa_end).c_str());
955       
956        // get frequencies container
957        GBDATA *gbfreq = GB_find(gbali, "FREQUENCIES", SEARCH_CHILD);
958        if (gbfreq == nullptr) {
959            logger->error("ERROR: SAI '{}' is of type PVP but lacks"
960                          "contained 'FREQUENCIES'. Your DB might be corrupted!",
961                          name);
962            continue;
963        }
964   
965        // load frequencies
966        unsigned int *pvp_data[6]; 
967        for ( int i=0; pvp_names[i] != nullptr; i++) {
968            GBDATA *gbdata = GB_find(gbfreq, pvp_names[i], SEARCH_CHILD);
969            if (gbdata == nullptr) {
970                logger->error("unable to find PVP data {}", pvp_names[i]);
971                continue;
972            }
973            pvp_data[i] = GB_read_ints(gbdata);
974        }
975
976        // make an alignment_stats from the frequencies
977        alignment_stats a(name,
978                          ntaxa, data->alignment_length,
979                          pvp_data[NA], pvp_data[NG], pvp_data[NC],
980                          pvp_data[NU], pvp_data[TRNS], pvp_data[TRVRS],
981                          pairs);
982        res.push_back(a);
983    }
984    return res;
985}
986
987vector<int>
988query_arb::getPairs() {
989    vector<int> pairs;
990    std::lock_guard<std::mutex> lock(arb_db_access);
991    GB_transaction trans(data->gbmain);
992
993    const char *ali = data->default_alignment;
994    BI_helix helix;
995
996    pairs.resize(data->alignment_length);
997
998    const char* error = helix.init(data->gbmain, ali);
999    if (error == nullptr) {
1000        for (int i=0; i<data->alignment_length; ++i) {
1001            pairs[i]=helix.entry(i).pair_pos;
1002        }
1003    } else {
1004        logger->error("No HELIX filter found in ARB file. "
1005                      "Disabling secondary structure features.");
1006        for (int i=0; i<data->alignment_length; ++i) {
1007            pairs[i]=0;
1008        }
1009    }
1010    return pairs;
1011}
1012
1013/*
1014  Local Variables:
1015  mode:c++
1016  c-file-style:"stroustrup"
1017  c-file-offsets:((innamespace . 0)(inline-open . 0)(case-label . +))
1018  indent-tabs-mode:nil
1019  fill-column:99
1020  End:
1021*/
1022// vim: filetype=cpp:expandtab:shiftwidth=4:tabstop=8:softtabstop=4:encoding=utf-8:textwidth=99 :
Note: See TracBrowser for help on using the repository browser.