source: trunk/GDE/SINA/builddir/src/query_pt.cpp

Last change on this file was 19244, checked in by westram, 2 years ago
  • split override message.
File size: 26.4 KB
Line 
1/*
2Copyright (c) 2006-2018 Elmar Pruesse <elmar.pruesse@ucdenver.edu>
3
4This file is part of SINA.
5SINA is free software: you can redistribute it and/or modify it under
6the terms of the GNU General Public License as published by the Free
7Software Foundation, either version 3 of the License, or (at your
8option) any later version.
9
10SINA is distributed in the hope that it will be useful, but WITHOUT ANY
11WARRANTY; without even the implied warranty of MERCHANTABILITY or
12FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13for more details.
14
15You should have received a copy of the GNU General Public License
16along with SINA.  If not, see <http://www.gnu.org/licenses/>.
17
18Additional permission under GNU GPL version 3 section 7
19
20If you modify SINA, or any covered work, by linking or combining it
21with components of ARB (or a modified version of that software),
22containing parts covered by the terms of the
23ARB-public-library-license, the licensors of SINA grant you additional
24permission to convey the resulting work. Corresponding Source for a
25non-source form of such a combination shall include the source code
26for the parts of ARB used as well as that of the covered work.
27*/
28
29#include "config.h"
30#include "query_pt.h"
31#include "timer.h"
32
33#include <cstring>
34
35#include <iostream>
36#include <sstream>
37#include <cmath>
38#include <mutex>
39
40#include <sys/types.h>
41#include <sys/stat.h>
42#include <unistd.h>
43
44using std::stringstream;
45using std::string;
46using std::vector;
47
48#include "cseq.h"
49#include "cseq_comparator.h"
50#include "log.h"
51
52#include "query_arb.h"
53
54#include <dlfcn.h>
55#include <cstdlib>
56
57#include <arbdb.h>
58#include <PT_com.h>
59#include <client.h>
60#include <boost/algorithm/string/predicate.hpp>
61#include <boost/dll.hpp>
62#include <pstream.h>
63
64#include <boost/program_options.hpp>
65namespace po = boost::program_options;
66
67#include <boost/filesystem.hpp>
68namespace fs = boost::filesystem;
69
70#include <utility>
71
72namespace sina {
73
74static auto logger = Log::create_logger("Search (ARB PT)");
75static auto pt_logger = Log::create_logger("ARB_PT_SERVER");
76
77static void override_LD_LIBRARY_PATH() {
78    // Allow to override LD_LIBRARY_PATH used for 'arb_pt_server'.
79    //
80    // This allows arb to support sina installed from a fat tarball
81    // (see ../../arb-sina-fat.README for more details).
82    //
83    // SINA_SUBCMD_LD_LIBRARY_PATH is set by the shell-script
84    // used to call sina from arb: ../../../../SH/arb_sina.sh
85
86    static bool checked = false;
87    if (!checked) {
88        const char *LD_LIBRARY_PATH_forSubcmds = getenv("SINA_SUBCMD_LD_LIBRARY_PATH");
89        if (LD_LIBRARY_PATH_forSubcmds) {
90            logger->warn("overriding value of LD_LIBRARY_PATH with '{}'", LD_LIBRARY_PATH_forSubcmds);
91            logger->warn("(triggered by existance of environment variable SINA_SUBCMD_LD_LIBRARY_PATH)");
92            int result = setenv("LD_LIBRARY_PATH", LD_LIBRARY_PATH_forSubcmds, 1);
93            if (result != 0) {
94                throw query_pt_exception("failed to setenv (LD_LIBRARY_PATH)");
95            }
96        }
97        checked = true;
98    }
99}
100
101class managed_pt_server {
102    redi::ipstream* process;
103    const fs::path dbname;
104    const string portname;
105public:
106    managed_pt_server(fs::path dbname_, string portname_);
107    managed_pt_server(const managed_pt_server&);
108    ~managed_pt_server();
109    fs::path ensure_env_ARBHOME();
110    fs::path get_pt_server_path();
111    fs::path ensure_index_exists();
112    bool build_index(fs::path index_arb_file);
113};
114
115static std::mutex build_lock, launch_lock;
116
117managed_pt_server::managed_pt_server(fs::path dbname_, string  portname_)
118    : dbname(std::move(dbname_)), portname(std::move(portname_))
119{
120    // Check that database specified and file accessible
121    if (dbname.empty() or not fs::exists(dbname)) {
122        throw query_pt_exception("Missing reference database");
123    }
124
125    ensure_env_ARBHOME();
126    fs::path arb_pt_server = get_pt_server_path();
127    fs::path index_arb_file = ensure_index_exists();
128
129    // Check portname: allowed are localhost:PORT, :PORT and :SOCKETFILE
130    int split = portname.find(':');
131    string host = portname.substr(0, split);
132    string port = portname.substr(split+1);
133    if (!host.empty() && host != "localhost") {
134        throw query_pt_exception("Starting a PT server on hosts other than localhost not supported");
135    }
136
137    // Actually launch the server now:
138    vector<string> cmd{arb_pt_server.native(), "-D" + index_arb_file.native(), "-T" +portname};
139    logger->info("Launching background PT server for {} on {}", dbname, portname);
140    logger->debug("Executing ['{}']", fmt::join(cmd, "', '"));
141    override_LD_LIBRARY_PATH();
142    {
143        std::lock_guard<std::mutex> lock(launch_lock);
144        // something in here appears to not be thread safe
145        process = new redi::ipstream(cmd, redi::pstreams::pstdout|redi::pstreams::pstderr); // launches ptserver
146    }
147
148    // read the pt server output. once it says "ok"
149    // we can be sure that it's ready for connections
150    // (connecting at wrong time causes lockup)
151    // FIXME: abort if waiting for too long
152    string line;
153    while (std::getline(*process, line)) {
154        pt_logger->debug(line);
155        if (line == "ok, server is running.") {
156            break;
157        }
158    }
159    if (line.empty()) {
160        process->rdbuf()->kill();
161        throw query_pt_exception("PT server failed to respond. Do you have enough memory?");
162    }
163    if (process->rdbuf()->exited()) {
164        throw query_pt_exception(
165            fmt::format("PT server exited immediately. Exit status was {}",
166                        process->rdbuf()->status()));
167    }
168
169    logger->warn("Launched PT server ({} on {}).", dbname, portname);
170}
171
172managed_pt_server::~managed_pt_server() {
173    logger->warn("Terminating PT server ({} on {})", dbname, portname);
174    process->rdbuf()->kill();
175}
176
177fs::path
178managed_pt_server::ensure_index_exists() {
179    std::lock_guard<std::mutex> lock(build_lock);
180
181    fs::path index_arb_file = dbname;
182    index_arb_file += ".index.arb";
183    fs::path index_pt_file = index_arb_file;
184    index_pt_file += ".pt";
185
186
187    if (fs::exists(index_pt_file) && fs::exists(index_arb_file)) {
188        if (fs::last_write_time(index_arb_file) >= fs::last_write_time(dbname)
189            && fs::last_write_time(index_pt_file) >= fs::last_write_time(index_arb_file)) {
190            // some.arb older-than some.arb.index.arb older-than some.arb.index.arb.pt
191            // -> good index exists
192            return index_arb_file;
193        } else {
194            logger->info("PT server index out of date for {}. Rebuilding:", dbname);
195        }
196    } else {
197        logger->info("PT server index missing or incomplete for {}. Building:", dbname);
198    }
199
200    build_index(index_arb_file);
201
202    if (not fs::exists(index_pt_file) ||
203        fs::last_write_time(index_pt_file) < fs::last_write_time(dbname)) {
204        throw query_pt_exception("Failed to (re)build PT server index! (out of memory?)");
205    }
206    return index_arb_file;
207}
208
209bool managed_pt_server::build_index(fs::path index_arb_file) {
210    fs::path arb_pt_server = get_pt_server_path();
211    vector<string> cmds;
212    // copy database file
213    cmds.push_back(string("cp ") + dbname.native() + " " + index_arb_file.native());
214    // shrink database (convert to index db)
215    cmds.push_back(arb_pt_server.native() + " -build_clean -D" + index_arb_file.native());
216    // build index (build .arb.pt)
217    cmds.push_back(arb_pt_server.native() + " -build -D" + index_arb_file.native());
218    for (auto& cmd : cmds) {
219        logger->debug("Executing '{}'", cmd);
220        override_LD_LIBRARY_PATH();
221        int rval = system(cmd.c_str()); // launches ptserver
222        logger->debug("Command finished");
223        if (rval) {
224            logger->error("Command {} failed with exit code {}", cmd, rval);
225            return false;
226        }
227    }
228    return true;
229}
230
231
232fs::path
233managed_pt_server::ensure_env_ARBHOME() {
234    fs::path ARBHOME;
235    // Make sure ARBHOME is set; guess if possible
236    if (const char* arbhome = std::getenv("ARBHOME")) {
237        ARBHOME = arbhome;
238        logger->info("Using ARBHOME={}", ARBHOME);
239    } else {
240        ARBHOME = boost::dll::symbol_location(GB_open).parent_path().parent_path();
241        logger->info("Setting ARBHOME={}", ARBHOME);
242        setenv("ARBHOME", ARBHOME.c_str(), 1);  // no setenv in C++/STL
243    }
244    return ARBHOME;
245}
246
247fs::path
248managed_pt_server::get_pt_server_path() {
249    // Locate PT server binary
250    fs::path arb_pt_server("arb_pt_server");
251    fs::path arb_pt_server_path = fs::system_complete(arb_pt_server);
252    if (!fs::exists(arb_pt_server_path)) { // not in PATH
253        logger->debug("{} not found in PATH", arb_pt_server);
254        arb_pt_server_path = ensure_env_ARBHOME() / "bin" / arb_pt_server;
255        if (!fs::exists(arb_pt_server_path)) { // not in ARBHOME
256            logger->debug("{} not found in ARBHOME", arb_pt_server);
257            // 3. Next to us
258            fs::path self_dir = boost::dll::program_location().parent_path();
259            arb_pt_server_path = self_dir / arb_pt_server;
260            if (!fs::exists(arb_pt_server_path)) { // not next to us either?
261                logger->debug("{} not found in {}", arb_pt_server, self_dir);
262                throw query_pt_exception("Failed to locate 'arb_pt_server'");
263            }
264        }
265    }
266    return arb_pt_server_path;
267}
268
269struct query_pt::options {
270};
271struct query_pt::options *query_pt::opts;
272
273void
274query_pt::get_options_description(po::options_description& /*main*/,
275                                  po::options_description& /*adv*/) {
276}
277
278void
279query_pt::validate_vm(po::variables_map& /* vm */,
280                      po::options_description& /*desc*/) {
281}
282
283
284struct query_pt::priv_data {
285    aisc_com         *link{nullptr};
286    T_PT_MAIN         com;
287    T_PT_LOCS         locs;
288    T_PT_FAMILYFINDER ffinder;
289
290    std::mutex arb_pt_access;
291
292    unsigned int  range_begin{0};
293    unsigned int  range_end{INT_MAX};
294    bool find_type_fast{false};
295    int  kmer_len;
296    int  num_mismatch;
297    bool relative_sort;
298
299    query_arb *arbdb;
300
301    timer timeit;
302
303    static std::map<string, std::weak_ptr<managed_pt_server>> servers;
304    std::shared_ptr<managed_pt_server> server;
305
306    bool            connect_server(const string& portname);
307    void            disconnect_server();
308};
309
310
311bool
312query_pt::priv_data::connect_server(const string& portname) {
313    std::lock_guard<std::mutex> lock(arb_pt_access);
314    GB_ERROR error = nullptr;
315    link = aisc_open(portname.c_str(), com, AISC_MAGIC_NUMBER, &error);
316    if (error != nullptr) {
317        throw query_pt_exception(error);
318    }
319    if (link == nullptr) {
320        return false;
321    }
322
323    if (aisc_create(link,
324                    PT_MAIN, com,
325                    MAIN_LOCS, PT_LOCS, locs,
326                    NULL) != 0) {
327        throw query_pt_exception("Unable to connect to PT server! (code 02)");
328    }
329
330    if (aisc_create(link,
331                    PT_LOCS, locs,
332                    LOCS_FFINDER, PT_FAMILYFINDER, ffinder,
333                    NULL) != 0) {
334        throw query_pt_exception("Unable to connect to PT server! (code 03)");
335    }
336
337    return true;
338}
339
340void
341query_pt::priv_data::disconnect_server() {
342    std::lock_guard<std::mutex> lock(arb_pt_access);
343    aisc_close(link, com);
344}
345
346
347std::map<string, std::weak_ptr<managed_pt_server>> query_pt::priv_data::servers;
348
349
350query_pt*
351query_pt::get_pt_search(const fs::path& filename, int k,
352                        bool fast,
353                        bool norel,
354                        int mk,
355                        std::string portname) {
356    if (portname.empty()) {
357        portname = ":" + (fs::temp_directory_path() / fs::unique_path()).native();
358    }
359
360    return new query_pt(portname.c_str(), filename.native().c_str(),
361                        fast, k, mk, norel);
362}
363
364
365query_pt::query_pt(const char* portname, const char* dbname,
366                   bool fast, int k, int mk, bool norel)
367    : data(new priv_data())
368{
369    if (priv_data::servers.count(portname) != 0u) {
370        data->server = priv_data::servers[portname].lock();
371    }
372
373    if (!data->connect_server(portname)) {
374        data->server = std::make_shared<managed_pt_server>(dbname, portname);
375        if (!data->connect_server(portname)) {
376            delete data;
377            throw query_pt_exception("Failed to start PT server. Do you have enough memory?");
378        }
379        priv_data::servers[portname] = data->server;
380    }
381
382    data->arbdb = query_arb::getARBDB(dbname);
383
384    set_find_type_fast(fast);
385    set_probe_len(k);
386    set_mismatches(mk);
387    set_sort_type(norel);
388}
389
390query_pt::~query_pt() {
391    logger->info("Timings for PT Search: {}", data->timeit);
392    delete data;
393}
394
395
396void
397query_pt::set_find_type_fast(bool fast) {
398    std::lock_guard<std::mutex> lock(data->arb_pt_access);
399    int err = aisc_put(data->link,
400                       PT_FAMILYFINDER, data->ffinder,
401                       FAMILYFINDER_FIND_TYPE, fast?1L:0L,
402                       NULL);
403    if (err != 0) {
404        logger->warn("Unable to set find_type = {}", fast ? "fast" : "normal");
405    } else {
406        data->find_type_fast = fast;
407    }
408}
409
410void
411query_pt::set_probe_len(int len) {
412    std::lock_guard<std::mutex> lock(data->arb_pt_access);
413    int err = aisc_put(data->link,
414                       PT_FAMILYFINDER, data->ffinder,
415                       FAMILYFINDER_PROBE_LEN, long(len),
416                       NULL);
417    if (err != 0) {
418        logger->warn("Unable to set k = {}", len);
419    } else {
420        data->kmer_len = len;
421    }
422}
423
424void
425query_pt::set_mismatches(int len) {
426    std::lock_guard<std::mutex> lock(data->arb_pt_access);
427    int err = aisc_put(data->link,
428                       PT_FAMILYFINDER, data->ffinder,
429                       FAMILYFINDER_MISMATCH_NUMBER, long(len),
430                       NULL);
431
432    if (err != 0) {
433        logger->warn("Unable to set allowable mismatches to {}", len);
434    } else {
435        data->num_mismatch = len;
436    }
437}
438
439void
440query_pt::set_sort_type(bool absolute) {
441    std::lock_guard<std::mutex> lock(data->arb_pt_access);
442    int err = aisc_put(data->link,
443                       PT_FAMILYFINDER, data->ffinder,
444                       FAMILYFINDER_SORT_TYPE, absolute?0L:1L,
445                       NULL);
446    if (err != 0) {
447        logger->warn("Unable to set sort type = {}", absolute ? "absolute" : "relative");
448    } else {
449        data->relative_sort = !absolute;
450    }
451}
452
453void
454query_pt::set_range(int startpos, int stoppos) {
455    std::lock_guard<std::mutex> lock(data->arb_pt_access);
456
457    int err = aisc_put(data->link,
458                       PT_FAMILYFINDER, data->ffinder,
459                       FAMILYFINDER_RANGE_STARTPOS, long(startpos),
460                       FAMILYFINDER_RANGE_ENDPOS, long(stoppos),
461                       NULL);
462    if (err != 0) {
463        logger->warn("Unable to constain matching to {}-{}", startpos, stoppos);
464    } else {
465        data->range_begin = startpos;
466        data->range_end = stoppos;
467    }
468}
469
470double
471query_pt::match(search::result_vector &family, const cseq& queryc,
472                int min_match, int max_match, float min_score, float max_score,
473                query_arb *arb, bool noid, int min_len,
474                int num_full, int full_min_len, int range_cover, bool leave_query_out
475                ) {
476    string query_str = queryc.getBases();
477    const char* query = query_str.c_str();
478    int maxfail = 5;
479    int range_cover_left = range_cover;
480    int range_cover_right = range_cover;
481    int matches = 0;
482    int skipped_max_score = 0;
483    int skipped_broken = 0;
484    int skipped_min_len = 0;
485    int skipped_noid = 0;
486    cseq_comparator cmp(CMP_IUPAC_OPTIMISTIC, CMP_DIST_NONE, CMP_COVER_QUERY, false);
487
488    if (query_str.size() < 20) {
489        logger->warn("Sequence {} too short ({} bp) for PT server search",
490                     queryc.getName(), strlen(query));
491        return 0;
492    }
493
494    T_PT_FAMILYLIST f_list;
495
496    bytestring bs;
497    bs.data = const_cast<char*>(query);
498    bs.size = strlen(query)+1;
499
500match_retry:
501    family.reserve(max_match);
502    std::lock_guard<std::mutex> lock(data->arb_pt_access);
503   
504    int err = aisc_put(data->link,
505                       PT_FAMILYFINDER, data->ffinder,
506                       FAMILYFINDER_FIND_FAMILY, &bs,
507                       NULL);
508    if (err != 0) {
509        logger->error("Unable to execute find_family command on pt-server");
510        if (--maxfail==0) {
511            logger->error("No retries left; aborting.");
512            return 0;
513        }
514        logger->error("Retrying...");
515
516        //FIXME restart();
517        goto match_retry;
518    }
519
520    err = aisc_get(data->link,
521                   PT_FAMILYFINDER, data->ffinder,
522                   FAMILYFINDER_FAMILY_LIST, f_list.as_result_param(),
523                   NULL);
524    if (err != 0) {
525        logger->error("Unable to get results for search");
526        return 0;
527    }
528    if (!f_list.exists()) {
529        return 0;
530    }
531
532
533    char   *f_name;
534    double  f_relscore = 0.f;
535    do {
536        err = aisc_get(data->link, PT_FAMILYLIST, f_list,
537                       FAMILYLIST_NAME, &f_name,
538                       FAMILYLIST_REL_MATCHES, &f_relscore,
539                       FAMILYLIST_NEXT, f_list.as_result_param(),
540                       NULL);
541        if (err != 0) {
542            logger->error("Unable to get next item in family list");
543            break;
544        }
545
546        if (leave_query_out && f_name == queryc.getName()) {
547            logger->info("Omitting sequence with same name as query from result");
548            free(f_name);
549            continue;
550        }
551
552        if (data->find_type_fast) {
553            f_relscore *= 4;
554        }
555
556        // correct relscore according to formula based on edgar2004
557        f_relscore = 1 - log(f_relscore + 1.0/bs.size)/log(1.0/bs.size);
558
559        if (matches <= min_match || f_relscore >= min_score) {
560            try {
561                const cseq& seq = arb->getCseq(f_name);
562
563                if (max_score <= 2 && cmp(queryc, seq) > max_score) {
564                    skipped_max_score ++;
565                } else if ((long)seq.size() < min_len) {
566                    skipped_min_len ++;
567                } else if (noid && boost::algorithm::icontains(seq.getBases(), query)) {
568                    skipped_noid ++;
569                } else {
570                    matches ++;
571
572                    family.emplace_back(f_relscore, &seq);
573
574                    if ((num_full != 0) && (long)seq.size() > full_min_len) {
575                        num_full--;
576                    }
577                    if ((range_cover_right != 0) &&
578                        seq.getById(seq.size()-1).getPosition() >= data->range_end) {
579                        range_cover_right--;
580                    }
581                    if ((range_cover_left != 0) &&
582                        seq.begin()->getPosition() <= data->range_begin) {
583                        range_cover_left--;
584                    }
585                }
586            } catch (base_iupac::bad_character_exception& e) {
587                logger->error("Sequence {} contains invalid character{}. Skipping",
588                              f_name, e.character);
589                skipped_broken++;
590            }
591        }
592
593        free(f_name);
594    } while (matches < max_match
595             && (matches <= min_match || f_relscore >= min_score)
596             && f_list.exists());
597
598    // get full length sequence
599    while (f_list.exists() && ((num_full + range_cover_right + range_cover_left) != 0)) {
600        err = aisc_get(data->link, PT_FAMILYLIST, f_list,
601                       FAMILYLIST_NAME, &f_name,
602                       FAMILYLIST_MATCHES, &f_relscore,
603                       FAMILYLIST_NEXT, f_list.as_result_param(),
604                       NULL);
605
606        if (err != 0) {
607            logger->warn("Unable to get next item in family list");
608            break;
609        }
610        if (data->find_type_fast) {
611            f_relscore *= 4;
612        }
613
614        const cseq& seq = arb->getCseq(f_name);
615        if (max_score >= 2 || cmp(queryc, seq) <= max_score) {
616            bool keep = false;
617            if ((num_full != 0) && (long)seq.size() > full_min_len) {
618                num_full--;
619                keep = true;
620            }
621
622            if ((range_cover_right != 0) && (long)seq.size() > min_len &&
623                seq.getById(seq.size()-1).getPosition() >= data->range_end) {
624                range_cover_right--;
625                keep = true;
626            }
627
628            if ((range_cover_left != 0) && (long)seq.size() > min_len &&
629                seq.begin()->getPosition() <= data->range_begin) {
630                range_cover_left--;
631                keep = true;
632            }
633            if (keep) {
634                family.emplace_back(f_relscore, &seq);
635            }
636        }
637        free(f_name);
638    }
639
640    if ((skipped_max_score != 0) || (skipped_broken != 0) || (skipped_min_len != 0) || (skipped_noid != 0)) {
641        logger->warn("Skipped {} sequences ({}x id > {}, {}x broken, {}x len < {}, {}x noid)",
642                     skipped_max_score + skipped_broken + skipped_min_len + skipped_noid,
643                     skipped_max_score, max_score,
644                     skipped_broken,
645                     skipped_min_len, min_len,
646                     skipped_noid);
647    }
648
649    return f_relscore;
650}
651
652void
653query_pt::find(const cseq& query, search::result_vector& results, unsigned int max) {
654    data->timeit.start();
655    char *error = nullptr;
656    results.clear();
657    results.reserve(max);
658
659    std::lock_guard<std::mutex> lock(data->arb_pt_access);
660    data->timeit.stop("acquire lock");
661
662    bytestring bs;
663    bs.data = strdup(query.getBases().c_str());
664    bs.size = query.size()+1;
665    if (aisc_put(data->link,
666                 PT_FAMILYFINDER, data->ffinder,
667                 FAMILYFINDER_SORT_MAX, long(max),
668                 FAMILYFINDER_FIND_FAMILY, &bs,
669                 NULL) != 0) {
670        logger->error("Unable to execute find_family command on pt-server");
671    }
672    free(bs.data);
673    data->timeit.stop("send query");
674
675    T_PT_FAMILYLIST f_list;
676    aisc_get(data->link,
677             PT_FAMILYFINDER, data->ffinder,
678             FAMILYFINDER_FAMILY_LIST, f_list.as_result_param(),
679             FAMILYFINDER_ERROR, &error,
680             NULL);
681    if (error && error[0] != 0) {
682        logger->error("Unable to get results for search: {}", error);
683        return;
684    }
685    data->timeit.stop("get first");
686
687    char* f_name;
688    double f_rel_matches = 0.f;
689    long f_matches = 0;
690    int mult = (data->find_type_fast) ? 4:1;
691
692    std::vector<std::pair<float, string> > scored_names;
693    while (max-- && f_list.exists()) {
694        aisc_get(data->link, PT_FAMILYLIST, f_list,
695                 FAMILYLIST_NAME, &f_name,
696                 FAMILYLIST_REL_MATCHES, &f_rel_matches,
697                 FAMILYLIST_MATCHES, &f_matches,
698                 FAMILYLIST_NEXT, f_list.as_result_param(),
699                 NULL);
700        f_rel_matches = 1 - log((f_rel_matches*mult) + 1.0/bs.size)/log(1.0/bs.size);
701        scored_names.emplace_back(f_rel_matches, f_name);
702        free(f_name);
703    }
704    data->timeit.stop("get all");
705
706    for (auto& res : scored_names) {
707        results.emplace_back(res.first, &data->arbdb->getCseq(res.second));
708    }
709
710    data->timeit.stop("load seqs");
711}
712
713unsigned int query_pt::size() const {
714    return data->arbdb->getSeqCount();
715}
716
717struct query_pt_pool::pimpl {
718    pimpl(fs::path& filename, int k, bool fast, bool norel, int km, string portname)
719        : _filename(filename), _k(k), _fast(fast), _norel(norel), _km(km), _portname(portname)
720    {}
721    ~pimpl() {
722        for (auto pt : _pts) {
723            delete pt;
724        }
725    }
726
727    using pool_map = std::map<fs::path, std::shared_ptr<query_pt_pool::pimpl>>;
728    static pool_map _pools;
729
730    // parameters for creating new query_pts
731    fs::path _filename;
732    int _k;
733    bool _fast;
734    bool _norel;
735    int _km;
736    string _portname;
737
738    std::mutex _access_pts;
739    std::list<query_pt*> _pts;
740    int _count{0};
741    query_pt* borrow() {
742        int n = 0;
743        {
744            std::lock_guard<std::mutex> lock(_access_pts);
745            if (!_pts.empty()) {
746                query_pt* pt = _pts.front();
747                _pts.pop_front();
748                return pt;
749            }
750            n = _count++;
751        }
752        std::string portname;
753        if (n > 0) {
754            // FIXME: manage the port better. This works for unix sockets, but not
755            // for TCP ports.
756            portname = fmt::format("{}_{}", _portname, n);
757        } else {
758            portname = _portname;
759        }
760        query_pt *pt = query_pt::get_pt_search(
761            _filename, _k, _fast, _norel, _km, portname
762            );
763        return pt;
764    }
765    void giveback(query_pt* pt) {
766        std::lock_guard<std::mutex> lock(_access_pts);
767        _pts.push_front(pt);
768    }
769};
770
771query_pt_pool::pimpl::pool_map query_pt_pool::pimpl::_pools;
772
773query_pt_pool* query_pt_pool::get_pool(fs::path filename,
774                                       int k, bool fast, bool norel, int mk,
775                                       std::string portname) {
776    if (query_pt_pool::pimpl::_pools.count(filename) == 0u) {
777        query_pt_pool::pimpl::_pools.emplace(
778            filename, std::make_shared<query_pt_pool::pimpl>(filename, k, fast, norel, mk, portname)
779            );
780    }
781    return new query_pt_pool(pimpl::_pools.at(filename));
782}
783
784query_pt_pool::query_pt_pool(std::shared_ptr<query_pt_pool::pimpl> p)
785    : impl(p) {
786}
787
788query_pt_pool::~query_pt_pool() {}
789
790void
791query_pt_pool::find(const cseq& query, result_vector& results, unsigned int max) {
792    query_pt *pt = impl->borrow();
793    pt->find(query, results, max);
794    impl->giveback(pt);
795}
796
797double
798query_pt_pool::match(result_vector& family, const cseq& queryc, int min_match,
799                     int max_match, float min_score, float max_score, query_arb *arb,
800                     bool noid, int min_len, int num_full, int full_min_len,
801                     int range_cover, bool leave_query_out) {
802    query_pt *pt = impl->borrow();
803    double res = pt->match(family, queryc, min_match, max_match, min_score, max_score, arb,
804                           noid, min_len, num_full, full_min_len, range_cover, leave_query_out);
805    impl->giveback(pt);
806    return res;
807}
808
809unsigned int
810query_pt_pool::size() const {
811    query_pt *pt = impl->borrow();
812    unsigned int res = pt->size();
813    impl->giveback(pt);
814    return res;
815}
816
817
818query_pt_exception::query_pt_exception(std::string msg) noexcept
819    : message(std::move(msg))
820{
821}
822
823query_pt_exception::~query_pt_exception() noexcept = default;
824
825const char*
826query_pt_exception::what() const noexcept {
827    return message.c_str();
828}
829
830} // namespace sina
831
832
833/*
834  Local Variables:
835  mode:c++
836  c-file-style:"stroustrup"
837  c-file-offsets:((innamespace . 0)(inline-open . 0)(case-label . 0))
838  indent-tabs-mode:nil
839  fill-column:99
840  End:
841*/
842// vim: filetype=cpp:expandtab:shiftwidth=4:tabstop=8:softtabstop=4:encoding=utf-8:textwidth=99 :
Note: See TracBrowser for help on using the repository browser.