source: branches/properties/GDE/SINA/builddir/src/sina.cpp

Last change on this file was 19236, checked in by westram, 2 years ago
  • print requested cli-version in error message.
File size: 21.0 KB
Line 
1/*
2Copyright (c) 2006-2018 Elmar Pruesse <elmar.pruesse@ucdenver.edu>
3
4This file is part of SINA.
5SINA is free software: you can redistribute it and/or modify it under
6the terms of the GNU General Public License as published by the Free
7Software Foundation, either version 3 of the License, or (at your
8option) any later version.
9
10SINA is distributed in the hope that it will be useful, but WITHOUT ANY
11WARRANTY; without even the implied warranty of MERCHANTABILITY or
12FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13for more details.
14
15You should have received a copy of the GNU General Public License
16along with SINA.  If not, see <http://www.gnu.org/licenses/>.
17
18Additional permission under GNU GPL version 3 section 7
19
20If you modify SINA, or any covered work, by linking or combining it
21with components of ARB (or a modified version of that software),
22containing parts covered by the terms of the
23ARB-public-library-license, the licensors of SINA grant you additional
24permission to convey the resulting work. Corresponding Source for a
25non-source form of such a combination shall include the source code
26for the parts of ARB used as well as that of the covered work.
27*/
28
29#include "config.h"
30#include <iostream>
31
32#include <fstream>
33using std::ifstream;
34
35#include <string>
36using std::string;
37
38#include <vector>
39using std::vector;
40
41#include <tuple>
42using std::tuple;
43using std::get;
44
45#include <boost/program_options.hpp>
46namespace po = boost::program_options;
47
48#include <boost/filesystem.hpp>
49namespace fs = boost::filesystem;
50
51#include <boost/algorithm/string.hpp>
52#include <boost/algorithm/string/predicate.hpp>
53using boost::algorithm::iequals;
54
55#include <boost/core/demangle.hpp>
56using boost::core::demangle;
57
58using std::exception;
59using std::logic_error;
60
61#define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1
62#include <tbb/task_scheduler_init.h>
63#include <tbb/parallel_for.h>
64#include <tbb/flow_graph.h>
65namespace tf = tbb::flow;
66
67#include "famfinder.h"
68#include "align.h"
69#include "rw_arb.h"
70#include "rw_fasta.h"
71#include "rw_csv.h"
72#include "log.h"
73#include "search_filter.h"
74#include "timer.h"
75#include "cseq_comparator.h"
76#include "progress.h"
77#include "search.h"
78
79using namespace sina;
80
81static auto logger = Log::create_logger("SINA");
82
83// define new type of configuration selection of input/output type
84enum SEQUENCE_DB_TYPE {
85    SEQUENCE_DB_NONE,
86    SEQUENCE_DB_AUTO,
87    SEQUENCE_DB_ARB,
88    SEQUENCE_DB_FASTA,
89    SEQUENCE_DB_CSV,
90};
91
92// make above type printable
93std::ostream& operator<<(std::ostream& out, const SEQUENCE_DB_TYPE& db) {
94    switch(db) {
95    case SEQUENCE_DB_NONE:  out << "NONE";  break;
96    case SEQUENCE_DB_AUTO:  out << "AUTO";  break;
97    case SEQUENCE_DB_ARB:   out << "ARB";   break;
98    case SEQUENCE_DB_FASTA: out << "FASTA"; break;
99    case SEQUENCE_DB_CSV:   out << "CSV"; break;
100    default:                out << "Undef!";
101    }
102    return out;
103}
104
105template<typename T>
106std::ostream& operator<<(std::ostream& out, const std::vector<T>& vs) {
107    bool first = true;
108    for (auto& v : vs) {
109        if (first) {
110            first = false;
111        } else {
112            out << " ";
113        }
114        out << v;
115    }
116    return out;
117}
118
119// make above type parseable by boost::program_options
120void validate(boost::any& v,
121              const vector<string>& values,
122              SEQUENCE_DB_TYPE* /*db*/, int /*unused*/) {
123    //po::validators::check_first_occurrence(v);
124    const std::string& s = po::validators::get_single_string(values);
125    if (iequals(s, "NONE")) {
126        v = SEQUENCE_DB_NONE;
127    } else if (iequals(s, "AUTO")) {
128        v = SEQUENCE_DB_AUTO;
129    } else if (iequals(s, "ARB")) {
130        v = SEQUENCE_DB_ARB;
131    } else if (iequals (s, "FASTA")) {
132        v = SEQUENCE_DB_FASTA;
133    } else if (iequals (s, "CSV")) {
134        v = SEQUENCE_DB_CSV;
135    } else {
136        throw po::invalid_option_value(s);
137    }
138}
139
140
141// make known any<> types printable
142template <class T,
143          typename = typename std::enable_if<std::is_same<T, boost::any>::value>::type >
144std::ostream& operator<<(std::ostream& out,
145                         const T& a) {
146    using boost::any_cast;
147    if (any_cast<bool>(&a) != nullptr) {
148        out << any_cast<bool>(a);
149    } else if (any_cast<int>(&a) != nullptr) {
150        out << any_cast<int>(a);
151    } else if (any_cast<unsigned int>(&a) != nullptr) {
152        out << any_cast<unsigned int>(a);
153    } else if (any_cast<long>(&a) != nullptr) {
154        out << any_cast<long>(a);
155    } else if (any_cast<float>(&a) != nullptr) {
156        out << any_cast<float>(a);
157    } else if (any_cast<string>(&a) != nullptr) {
158        out << any_cast<string>(a);
159    } else if (any_cast<TURN_TYPE>(&a) != nullptr) {
160        out << any_cast<TURN_TYPE>(a);
161    } else if (any_cast<OVERHANG_TYPE>(&a) != nullptr) {
162        out << any_cast<OVERHANG_TYPE>(a);
163    } else if (any_cast<INSERTION_TYPE>(&a) != nullptr) {
164        out << any_cast<INSERTION_TYPE>(a);
165    } else if (any_cast<LOWERCASE_TYPE>(&a) != nullptr) {
166        out << any_cast<LOWERCASE_TYPE>(a);
167    } else if (any_cast<FASTA_META_TYPE>(&a) != nullptr) {
168        out << any_cast<FASTA_META_TYPE>(a);
169    } else if (any_cast<SEQUENCE_DB_TYPE>(&a) != nullptr) {
170        out << any_cast<SEQUENCE_DB_TYPE>(a);
171    } else if (any_cast<std::vector<SEQUENCE_DB_TYPE>>(&a) != nullptr) {
172        out << any_cast<std::vector<SEQUENCE_DB_TYPE>>(a);
173    } else if (any_cast<CMP_IUPAC_TYPE>(&a) != nullptr) {
174        out << any_cast<CMP_IUPAC_TYPE>(a);
175    } else if (any_cast<CMP_DIST_TYPE>(&a) != nullptr) {
176        out << any_cast<CMP_DIST_TYPE>(a);
177    } else if (any_cast<CMP_COVER_TYPE>(&a) != nullptr) {
178        out << any_cast<CMP_COVER_TYPE>(a);
179    } else if (any_cast<ENGINE_TYPE>(&a) != nullptr) {
180        out << any_cast<ENGINE_TYPE>(a);
181    } else if (any_cast<fs::path>(&a) != nullptr) {
182        out << any_cast<fs::path>(a);
183    } else if (any_cast<std::vector<fs::path>>(&a) != nullptr) {
184        out << any_cast<std::vector<fs::path>>(a);
185    } else {
186        out << "UNKNOWN TYPE: '" << a.type().name()<<"'";
187    }
188    return out;
189}
190
191void show_conf(po::variables_map& vm) {
192    std::cerr << "Effective parameters:" << std::endl;
193    for (auto& pv: vm) {
194        std::cerr << pv.first << " = ";
195        try {
196            std::cerr << pv.second.value() << std::endl;
197        } catch (boost::bad_any_cast &e) {
198            std::cerr << "UNKNOWN TYPE" << std::endl;
199        }
200    }
201    std::cerr << std::endl;
202}
203
204struct options {
205    SEQUENCE_DB_TYPE intype;
206    fs::path in;
207    std::vector<SEQUENCE_DB_TYPE> outtype;
208    std::vector<fs::path> out;
209    std::vector<std::pair<SEQUENCE_DB_TYPE, fs::path>> out_merged;
210    unsigned int copy_relatives;
211    bool noalign;
212    bool skip_align;
213    bool do_search;
214    bool inorder;
215    unsigned int threads;
216    unsigned int num_pt_servers;
217    unsigned int max_trays;
218    string has_cli_vers;
219    string fields;
220    vector<string> v_fields;
221};
222
223static options opts;
224
225// define hidden options
226void get_options_description(po::options_description& main,
227                             po::options_description& adv) {
228    int tbb_threads = tbb::task_scheduler_init::default_num_threads();
229    unsigned int tbb_automatic = tbb::task_scheduler_init::automatic;
230
231    adv.add_options()
232        ("show-conf", "show effective configuration")
233        ("intype",
234         po::value<SEQUENCE_DB_TYPE>(&opts.intype)->default_value(SEQUENCE_DB_AUTO),
235         "override input file type [*auto*|none|arb|fasta|csv]")
236        ("outtype",
237         po::value<std::vector<SEQUENCE_DB_TYPE>>(&opts.outtype),
238         "override output file type for next output file [*auto*|none|arb|fasta|csv]")
239        ("preserve-order", po::bool_switch(&opts.inorder),
240         "maintain order of sequences")
241        ("max-in-flight", po::value<unsigned int>(&opts.max_trays)
242         ->default_value(tbb_threads*2),
243         "max number of sequences processed at a time")
244        ("has-cli-vers", po::value<string>(&opts.has_cli_vers), "verify support of cli version")
245        ("no-align", po::bool_switch(&opts.noalign),
246         "disable alignment stage (same as prealigned)")
247        ("fields,f", po::value<string>(&opts.fields),
248         "select fields to write")
249        ;
250
251    main.add_options()
252        ("help,h", "show short help")
253        ("help-all,H", "show full help (long)")
254        ("in,i", po::value<fs::path>(&opts.in)->default_value("-"),
255         "input file (arb or fasta)")
256        ("out,o", po::value<std::vector<fs::path>>(&opts.out)->multitoken(),
257         "output file (arb, fasta or csv; may be specified multiple times)")
258        ("add-relatives", po::value<unsigned int>(&opts.copy_relatives)->default_value(0, ""),
259         "add the ARG nearest relatives for each sequence to output")
260        ("search,S", po::bool_switch(&opts.do_search), "enable search stage")
261        ("prealigned,P", po::bool_switch(&opts.skip_align), "skip alignment stage")
262        ("threads,p", po::value<unsigned int>(&opts.threads)->default_value(tbb_automatic, ""),
263         "limit number of threads (automatic)")
264        ("num-pts", po::value<unsigned int>(&opts.num_pt_servers)->default_value(tbb_threads),
265         "number of PT servers to start")
266        ("version,V", "show version")
267        ;
268}
269
270void validate_vm(po::variables_map& vm, const po::options_description&  /*all_od*/,
271                 po::parsed_options& parsed_options) {
272    if (vm.count("has-cli-vers") != 0u) {
273        std::cerr << "** SINA (SILVA Incremental Aligner) " << PACKAGE_VERSION
274                  << " present" << std::endl;
275        const char* supported_versions[]{"1", "2", "ARB5.99", "ARB7.1"};
276        for (auto& supported : supported_versions) {
277            if (opts.has_cli_vers == supported) {
278                exit(EXIT_SUCCESS);
279            }
280        }
281
282        std::cerr << "** Error: requested CLI version '" << opts.has_cli_vers << "' not supported!" << std::endl;
283        exit(EXIT_FAILURE);
284    }
285
286    if (vm.count("version") != 0u) {
287        std::cerr << PACKAGE_STRING
288#ifdef PACKAGE_BUILDINFO
289                  << " (" << PACKAGE_BUILDINFO << ")"
290#endif
291                  << std::endl;
292        exit(EXIT_SUCCESS);
293    }
294
295    // Autodetect / validate intype selection
296    if (opts.intype == SEQUENCE_DB_AUTO) {
297        if (opts.in.extension() == ".arb" || opts.in.native() == ":") {
298            opts.intype = SEQUENCE_DB_ARB;
299        } else if ((opts.in.extension() == ".csv") ||
300                   (opts.in.extension() == ".gz" &&
301                    opts.in.stem().extension() == ".csv")) {
302            opts.intype = SEQUENCE_DB_CSV;
303        } else {
304            opts.intype = SEQUENCE_DB_FASTA;
305        }
306    }
307
308    if (opts.intype == SEQUENCE_DB_NONE) {
309        throw logic_error("Input type NONE invalid - need something to process");
310    }
311    if (opts.intype == SEQUENCE_DB_CSV) {
312        throw logic_error("Input type CSV invalid - can't parse sequences from that");
313    }
314
315    SEQUENCE_DB_TYPE type_val = SEQUENCE_DB_AUTO;
316    int type_idx = 0, out_idx = 0;
317    for (auto &opt : parsed_options.options) {
318        if (opt.string_key == "outtype") {
319            type_val = opts.outtype[type_idx++];
320        } else if (opt.string_key == "out") {
321            for (size_t i = 0; i < opt.value.size(); i++) {
322                fs::path out = opts.out[out_idx++];
323                SEQUENCE_DB_TYPE outtype = type_val;
324                if (outtype == SEQUENCE_DB_AUTO) {
325                    if (out.extension() == ".arb" || out.native() == ":") {
326                        outtype = SEQUENCE_DB_ARB;
327                    } else if (out == "/dev/null") {
328                        continue;
329                    } else if (
330                        (out.extension() == ".csv") ||
331                        (out.extension() == ".gz" && out.stem().extension() == ".csv")
332                        ) {
333                        outtype = SEQUENCE_DB_CSV;
334                    } else {
335                        outtype = SEQUENCE_DB_FASTA;
336                    }
337                }
338                opts.out_merged.emplace_back(outtype, out);
339            }
340            type_val = SEQUENCE_DB_AUTO;
341        }
342    }
343
344    if (out_idx == 0) { // no --out specified
345        if (opts.intype == SEQUENCE_DB_ARB) {
346            opts.out_merged.push_back(std::make_pair(SEQUENCE_DB_ARB, opts.in));
347            logger->warn("No explicit output file provided. "
348                         "Reading and writing to same ARB database.");
349        } else if (type_val != SEQUENCE_DB_NONE) {
350            opts.out_merged.push_back(std::make_pair(SEQUENCE_DB_FASTA, "-"));
351        }
352    }
353
354    // Split copy_fields
355    boost::split(opts.v_fields, opts.fields, boost::is_any_of(":,"));
356    if (opts.v_fields.back().empty()) {
357        opts.v_fields.pop_back();
358    }
359    // Add full_name if no fields are specified
360    if (opts.v_fields.empty()) {
361        opts.v_fields.push_back(query_arb::fn_fullname);
362    }
363}
364
365void show_help(po::options_description* od,
366               po::options_description* adv = nullptr) {
367    std::cerr << "Usage:" << std::endl
368              << " sina -i input [-o output] [--prealigned|--db reference] [--search] "
369              << "[--search-db search.arb] [options]"
370              << std::endl << std::endl
371              << *od << std::endl;
372    if (adv != nullptr) {
373        std::cerr << *adv << std::endl;
374    }
375    exit(EXIT_SUCCESS);
376}
377
378// do the messy option parsing/validating
379po::variables_map
380parse_options(int argc, char** argv) {
381    po::variables_map vm;
382
383    string infile, outfile;
384    po::options_description od("Options"), adv_od("Advanced Options");
385
386    get_options_description(od, adv_od);
387    Log::get_options_description(od, adv_od);
388    rw_arb::get_options_description(od, adv_od);
389    rw_fasta::get_options_description(od, adv_od);
390    rw_csv::get_options_description(od, adv_od);
391    aligner::get_options_description(od, adv_od);
392    famfinder::get_options_description(od, adv_od);
393    search_filter::get_options_description(od, adv_od);
394    query_pt::get_options_description(od, adv_od);
395
396    po::options_description all_od(od);
397    all_od.add(adv_od);
398
399    po::positional_options_description no_positional;
400    try {
401        po::command_line_parser parser(argc, argv);
402        parser.options(all_od);
403        parser.positional(no_positional);
404        po::parsed_options parsed_options = parser.run();
405        po::store(parsed_options, vm);
406
407        if (vm.count("help") != 0u) {
408            show_help(&od);
409        }
410        if (vm.count("help-all") != 0u) {
411            show_help(&od, &adv_od);
412        }
413
414        po::notify(vm);
415
416        validate_vm(vm, all_od, parsed_options);
417        Log::validate_vm(vm, all_od);
418        rw_arb::validate_vm(vm, all_od);
419        rw_fasta::validate_vm(vm, all_od);
420        rw_csv::validate_vm(vm, all_od);
421        if (!opts.skip_align && !opts.noalign) {
422            famfinder::validate_vm(vm, all_od);
423            aligner::validate_vm(vm, all_od);
424        }
425        if (opts.do_search) {
426            search_filter::validate_vm(vm, all_od);
427        }
428        query_pt::validate_vm(vm, all_od);
429    } catch (std::logic_error &e) {
430        std::cerr << "Configuration error:" << std::endl
431                  << e.what() << std::endl
432                  << "Use \"--help\" to show options" << std::endl
433                  << std::endl;
434        if (vm.count("show-conf") != 0u) {
435            show_conf(vm);
436        }
437        exit(EXIT_FAILURE);
438    }
439    return vm;
440}
441
442
443int real_main(int argc, char** argv) {
444    po::variables_map vm = parse_options(argc, argv);
445    logger->warn("This is {}.", PACKAGE_STRING);
446    if (vm.count("show-conf") != 0u) {
447         show_conf(vm);
448    }
449
450    tbb::task_scheduler_init init(vm["threads"].as<unsigned int>());
451
452    tf::graph g;  // Main data flow graph (pipeline)
453    logger_progress p(logger, "Processing");
454
455    vector<std::unique_ptr<tf::graph_node>> nodes; // Nodes (for cleanup)
456    tf::sender<tray> *last_node; // Last tray producing node
457
458    using source_node = tf::source_node<tray>;
459    using filter_node = tf::function_node<tray, tray>;
460    using limiter_node = tf::limiter_node<tray>;
461    filter_node *node;
462
463    // initialize default alignment
464    query_arb::override_default_alignment(rw_arb::get_cli_alignment());
465
466    // Make source node reading sequences
467    source_node *source; // will be activated once graph complete
468    switch (opts.intype) {
469    case SEQUENCE_DB_ARB: {
470        auto arbreader = rw_arb::reader(opts.in, opts.v_fields);
471        arbreader.set_progress(p);
472        source = new source_node(g, arbreader, false);
473    }
474        break;
475    case SEQUENCE_DB_FASTA: {
476        auto fastareader = rw_fasta::reader(opts.in, opts.v_fields);
477        fastareader.set_progress(p);
478        source = new source_node(g, fastareader, false);
479    }
480        break;
481    default:
482        throw logic_error("input type undefined");
483    }
484    nodes.emplace_back(source);
485    last_node = source;
486
487    // Make node limiting in-flight sequence trays
488    auto *limiter = new limiter_node(g, opts.max_trays);
489    tf::make_edge(*last_node, *limiter);
490    nodes.emplace_back(limiter);
491    last_node = limiter;
492
493    // determine number of pt servers for search and align
494    if (not opts.skip_align && not opts.noalign && opts.do_search) {
495        opts.num_pt_servers /= 2;
496    }
497    if (opts.num_pt_servers < 1) {
498        opts.num_pt_servers = 1;
499    }
500    if (famfinder::get_engine() == ENGINE_SINA_KMER) {
501        opts.num_pt_servers = tf::unlimited;
502    }
503
504    if (opts.skip_align || opts.noalign) {
505        // Just copy alignment over
506        node = new filter_node(g, tf::unlimited, [](tray t) -> tray {
507                t.aligned_sequence = new cseq(*t.input_sequence);
508                return t;
509            });
510        tf::make_edge(*last_node, *node);
511        nodes.emplace_back(node);
512        last_node = node;
513    } else {
514        node = new filter_node(g, opts.num_pt_servers, famfinder());
515        tf::make_edge(*last_node, *node);
516        nodes.emplace_back(node);
517        last_node = node;
518
519        node = new filter_node(g, tf::unlimited, aligner());
520        tf::make_edge(*last_node, *node);
521        nodes.emplace_back(node);
522        last_node = node;
523    }
524
525    if (opts.do_search) {
526        node = new filter_node(g, opts.num_pt_servers, search_filter());
527        tf::make_edge(*last_node, *node);
528        nodes.emplace_back(node);
529        last_node = node;
530    }
531
532    if (opts.inorder) {
533        using sequencer_node = tf::sequencer_node<tray>;
534        sequencer_node *node = new sequencer_node(
535            g, [](const tray& t) -> int {
536                return t.seqno - 1;
537            });
538        tf::make_edge(*last_node, *node);
539        nodes.emplace_back(node);
540        last_node = node;
541    }
542
543    // Make node writing sequences
544    for (auto& out : opts.out_merged) {
545        switch(out.first) {
546        case SEQUENCE_DB_ARB:
547            node = new filter_node(g, 1, rw_arb::writer(out.second,
548                                                        opts.copy_relatives,
549                                                        opts.v_fields));
550            break;
551        case SEQUENCE_DB_FASTA:
552            node = new filter_node(g, 1, rw_fasta::writer(out.second,
553                                                          opts.copy_relatives,
554                                                          opts.v_fields));
555            break;
556        case SEQUENCE_DB_CSV:
557            node = new filter_node(g, 1, rw_csv::writer(out.second,
558                                                        opts.copy_relatives,
559                                                        opts.v_fields));
560            break;
561        default:
562            throw logic_error("output type undefined");
563        }
564        tf::make_edge(*last_node, *node);
565        nodes.emplace_back(node);
566        last_node = node;
567    }
568
569    node = new filter_node(g, 1, Log::printer());
570    tf::make_edge(*last_node, *node);
571    nodes.emplace_back(node);
572    last_node = node;
573
574    int count = 0;
575    // needs very new tbb: tf::function_node<tray, tf::continue_msg, tf::lightweight>
576    tf::function_node<tray, tf::continue_msg>
577        sink(g, 1, [&](tray t) -> tf::continue_msg {
578                count++;
579                ++p;
580                t.destroy();
581                return tf::continue_msg();
582            });
583    tf::make_edge(sink, limiter->decrement);
584    tf::make_edge(*last_node, sink);
585
586    logger->warn("Aligner ready. Processing sequences");
587    timestamp before;
588    source->activate();
589    g.wait_for_all();
590    timestamp after;
591    logger->warn("Took {} to align {} sequences ({} sequences/s)",
592                 after-before, count, count/(after-before));
593    nodes.clear();
594    logger->warn("SINA finished.");
595    return 0;
596}
597
598int main(int argc, char** argv) {
599    try {
600        return real_main(argc, argv);
601    } catch (query_arb_exception &e) {
602        logger->error(e.what());
603        logger->error("The ARB database you were trying to use is likely corrupted.");
604        return EXIT_FAILURE;
605    } catch (std::exception &e) {
606        logger->error("Error during program execution: {} {}",
607                         demangle(typeid(e).name()),
608                         e.what());
609        return EXIT_FAILURE;
610    }
611}
612
613
614/*
615  Local Variables:
616  mode:c++
617  c-file-style:"stroustrup"
618  c-file-offsets:((innamespace . 0)(inline-open . 0)(case-label . 0))
619  indent-tabs-mode:nil
620  fill-column:99
621  End:
622*/
623// vim: filetype=cpp:expandtab:shiftwidth=4:tabstop=8:softtabstop=4:encoding=utf-8:textwidth=99 :
Note: See TracBrowser for help on using the repository browser.