1 | /* |
---|
2 | Copyright (c) 2006-2018 Elmar Pruesse <elmar.pruesse@ucdenver.edu> |
---|
3 | |
---|
4 | This file is part of SINA. |
---|
5 | SINA is free software: you can redistribute it and/or modify it under |
---|
6 | the terms of the GNU General Public License as published by the Free |
---|
7 | Software Foundation, either version 3 of the License, or (at your |
---|
8 | option) any later version. |
---|
9 | |
---|
10 | SINA is distributed in the hope that it will be useful, but WITHOUT ANY |
---|
11 | WARRANTY; without even the implied warranty of MERCHANTABILITY or |
---|
12 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
---|
13 | for more details. |
---|
14 | |
---|
15 | You should have received a copy of the GNU General Public License |
---|
16 | along with SINA. If not, see <http://www.gnu.org/licenses/>. |
---|
17 | |
---|
18 | Additional permission under GNU GPL version 3 section 7 |
---|
19 | |
---|
20 | If you modify SINA, or any covered work, by linking or combining it |
---|
21 | with components of ARB (or a modified version of that software), |
---|
22 | containing parts covered by the terms of the |
---|
23 | ARB-public-library-license, the licensors of SINA grant you additional |
---|
24 | permission to convey the resulting work. Corresponding Source for a |
---|
25 | non-source form of such a combination shall include the source code |
---|
26 | for the parts of ARB used as well as that of the covered work. |
---|
27 | */ |
---|
28 | |
---|
29 | #include "config.h" |
---|
30 | #include <iostream> |
---|
31 | |
---|
32 | #include <fstream> |
---|
33 | using std::ifstream; |
---|
34 | |
---|
35 | #include <string> |
---|
36 | using std::string; |
---|
37 | |
---|
38 | #include <vector> |
---|
39 | using std::vector; |
---|
40 | |
---|
41 | #include <tuple> |
---|
42 | using std::tuple; |
---|
43 | using std::get; |
---|
44 | |
---|
45 | #include <boost/program_options.hpp> |
---|
46 | namespace po = boost::program_options; |
---|
47 | |
---|
48 | #include <boost/filesystem.hpp> |
---|
49 | namespace fs = boost::filesystem; |
---|
50 | |
---|
51 | #include <boost/algorithm/string.hpp> |
---|
52 | #include <boost/algorithm/string/predicate.hpp> |
---|
53 | using boost::algorithm::iequals; |
---|
54 | |
---|
55 | #include <boost/core/demangle.hpp> |
---|
56 | using boost::core::demangle; |
---|
57 | |
---|
58 | using std::exception; |
---|
59 | using std::logic_error; |
---|
60 | |
---|
61 | #define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1 |
---|
62 | #include <tbb/task_scheduler_init.h> |
---|
63 | #include <tbb/parallel_for.h> |
---|
64 | #include <tbb/flow_graph.h> |
---|
65 | namespace tf = tbb::flow; |
---|
66 | |
---|
67 | #include "famfinder.h" |
---|
68 | #include "align.h" |
---|
69 | #include "rw_arb.h" |
---|
70 | #include "rw_fasta.h" |
---|
71 | #include "rw_csv.h" |
---|
72 | #include "log.h" |
---|
73 | #include "search_filter.h" |
---|
74 | #include "timer.h" |
---|
75 | #include "cseq_comparator.h" |
---|
76 | #include "progress.h" |
---|
77 | #include "search.h" |
---|
78 | |
---|
79 | using namespace sina; |
---|
80 | |
---|
81 | static auto logger = Log::create_logger("SINA"); |
---|
82 | |
---|
83 | // define new type of configuration selection of input/output type |
---|
84 | enum SEQUENCE_DB_TYPE { |
---|
85 | SEQUENCE_DB_NONE, |
---|
86 | SEQUENCE_DB_AUTO, |
---|
87 | SEQUENCE_DB_ARB, |
---|
88 | SEQUENCE_DB_FASTA, |
---|
89 | SEQUENCE_DB_CSV, |
---|
90 | }; |
---|
91 | |
---|
92 | // make above type printable |
---|
93 | std::ostream& operator<<(std::ostream& out, const SEQUENCE_DB_TYPE& db) { |
---|
94 | switch(db) { |
---|
95 | case SEQUENCE_DB_NONE: out << "NONE"; break; |
---|
96 | case SEQUENCE_DB_AUTO: out << "AUTO"; break; |
---|
97 | case SEQUENCE_DB_ARB: out << "ARB"; break; |
---|
98 | case SEQUENCE_DB_FASTA: out << "FASTA"; break; |
---|
99 | case SEQUENCE_DB_CSV: out << "CSV"; break; |
---|
100 | default: out << "Undef!"; |
---|
101 | } |
---|
102 | return out; |
---|
103 | } |
---|
104 | |
---|
105 | template<typename T> |
---|
106 | std::ostream& operator<<(std::ostream& out, const std::vector<T>& vs) { |
---|
107 | bool first = true; |
---|
108 | for (auto& v : vs) { |
---|
109 | if (first) { |
---|
110 | first = false; |
---|
111 | } else { |
---|
112 | out << " "; |
---|
113 | } |
---|
114 | out << v; |
---|
115 | } |
---|
116 | return out; |
---|
117 | } |
---|
118 | |
---|
119 | // make above type parseable by boost::program_options |
---|
120 | void validate(boost::any& v, |
---|
121 | const vector<string>& values, |
---|
122 | SEQUENCE_DB_TYPE* /*db*/, int /*unused*/) { |
---|
123 | //po::validators::check_first_occurrence(v); |
---|
124 | const std::string& s = po::validators::get_single_string(values); |
---|
125 | if (iequals(s, "NONE")) { |
---|
126 | v = SEQUENCE_DB_NONE; |
---|
127 | } else if (iequals(s, "AUTO")) { |
---|
128 | v = SEQUENCE_DB_AUTO; |
---|
129 | } else if (iequals(s, "ARB")) { |
---|
130 | v = SEQUENCE_DB_ARB; |
---|
131 | } else if (iequals (s, "FASTA")) { |
---|
132 | v = SEQUENCE_DB_FASTA; |
---|
133 | } else if (iequals (s, "CSV")) { |
---|
134 | v = SEQUENCE_DB_CSV; |
---|
135 | } else { |
---|
136 | throw po::invalid_option_value(s); |
---|
137 | } |
---|
138 | } |
---|
139 | |
---|
140 | |
---|
141 | // make known any<> types printable |
---|
142 | template <class T, |
---|
143 | typename = typename std::enable_if<std::is_same<T, boost::any>::value>::type > |
---|
144 | std::ostream& operator<<(std::ostream& out, |
---|
145 | const T& a) { |
---|
146 | using boost::any_cast; |
---|
147 | if (any_cast<bool>(&a) != nullptr) { |
---|
148 | out << any_cast<bool>(a); |
---|
149 | } else if (any_cast<int>(&a) != nullptr) { |
---|
150 | out << any_cast<int>(a); |
---|
151 | } else if (any_cast<unsigned int>(&a) != nullptr) { |
---|
152 | out << any_cast<unsigned int>(a); |
---|
153 | } else if (any_cast<long>(&a) != nullptr) { |
---|
154 | out << any_cast<long>(a); |
---|
155 | } else if (any_cast<float>(&a) != nullptr) { |
---|
156 | out << any_cast<float>(a); |
---|
157 | } else if (any_cast<string>(&a) != nullptr) { |
---|
158 | out << any_cast<string>(a); |
---|
159 | } else if (any_cast<TURN_TYPE>(&a) != nullptr) { |
---|
160 | out << any_cast<TURN_TYPE>(a); |
---|
161 | } else if (any_cast<OVERHANG_TYPE>(&a) != nullptr) { |
---|
162 | out << any_cast<OVERHANG_TYPE>(a); |
---|
163 | } else if (any_cast<INSERTION_TYPE>(&a) != nullptr) { |
---|
164 | out << any_cast<INSERTION_TYPE>(a); |
---|
165 | } else if (any_cast<LOWERCASE_TYPE>(&a) != nullptr) { |
---|
166 | out << any_cast<LOWERCASE_TYPE>(a); |
---|
167 | } else if (any_cast<FASTA_META_TYPE>(&a) != nullptr) { |
---|
168 | out << any_cast<FASTA_META_TYPE>(a); |
---|
169 | } else if (any_cast<SEQUENCE_DB_TYPE>(&a) != nullptr) { |
---|
170 | out << any_cast<SEQUENCE_DB_TYPE>(a); |
---|
171 | } else if (any_cast<std::vector<SEQUENCE_DB_TYPE>>(&a) != nullptr) { |
---|
172 | out << any_cast<std::vector<SEQUENCE_DB_TYPE>>(a); |
---|
173 | } else if (any_cast<CMP_IUPAC_TYPE>(&a) != nullptr) { |
---|
174 | out << any_cast<CMP_IUPAC_TYPE>(a); |
---|
175 | } else if (any_cast<CMP_DIST_TYPE>(&a) != nullptr) { |
---|
176 | out << any_cast<CMP_DIST_TYPE>(a); |
---|
177 | } else if (any_cast<CMP_COVER_TYPE>(&a) != nullptr) { |
---|
178 | out << any_cast<CMP_COVER_TYPE>(a); |
---|
179 | } else if (any_cast<ENGINE_TYPE>(&a) != nullptr) { |
---|
180 | out << any_cast<ENGINE_TYPE>(a); |
---|
181 | } else if (any_cast<fs::path>(&a) != nullptr) { |
---|
182 | out << any_cast<fs::path>(a); |
---|
183 | } else if (any_cast<std::vector<fs::path>>(&a) != nullptr) { |
---|
184 | out << any_cast<std::vector<fs::path>>(a); |
---|
185 | } else { |
---|
186 | out << "UNKNOWN TYPE: '" << a.type().name()<<"'"; |
---|
187 | } |
---|
188 | return out; |
---|
189 | } |
---|
190 | |
---|
191 | void show_conf(po::variables_map& vm) { |
---|
192 | std::cerr << "Effective parameters:" << std::endl; |
---|
193 | for (auto& pv: vm) { |
---|
194 | std::cerr << pv.first << " = "; |
---|
195 | try { |
---|
196 | std::cerr << pv.second.value() << std::endl; |
---|
197 | } catch (boost::bad_any_cast &e) { |
---|
198 | std::cerr << "UNKNOWN TYPE" << std::endl; |
---|
199 | } |
---|
200 | } |
---|
201 | std::cerr << std::endl; |
---|
202 | } |
---|
203 | |
---|
204 | struct options { |
---|
205 | SEQUENCE_DB_TYPE intype; |
---|
206 | fs::path in; |
---|
207 | std::vector<SEQUENCE_DB_TYPE> outtype; |
---|
208 | std::vector<fs::path> out; |
---|
209 | std::vector<std::pair<SEQUENCE_DB_TYPE, fs::path>> out_merged; |
---|
210 | unsigned int copy_relatives; |
---|
211 | bool noalign; |
---|
212 | bool skip_align; |
---|
213 | bool do_search; |
---|
214 | bool inorder; |
---|
215 | unsigned int threads; |
---|
216 | unsigned int num_pt_servers; |
---|
217 | unsigned int max_trays; |
---|
218 | string has_cli_vers; |
---|
219 | string fields; |
---|
220 | vector<string> v_fields; |
---|
221 | }; |
---|
222 | |
---|
223 | static options opts; |
---|
224 | |
---|
225 | // define hidden options |
---|
226 | void get_options_description(po::options_description& main, |
---|
227 | po::options_description& adv) { |
---|
228 | int tbb_threads = tbb::task_scheduler_init::default_num_threads(); |
---|
229 | unsigned int tbb_automatic = tbb::task_scheduler_init::automatic; |
---|
230 | |
---|
231 | adv.add_options() |
---|
232 | ("show-conf", "show effective configuration") |
---|
233 | ("intype", |
---|
234 | po::value<SEQUENCE_DB_TYPE>(&opts.intype)->default_value(SEQUENCE_DB_AUTO), |
---|
235 | "override input file type [*auto*|none|arb|fasta|csv]") |
---|
236 | ("outtype", |
---|
237 | po::value<std::vector<SEQUENCE_DB_TYPE>>(&opts.outtype), |
---|
238 | "override output file type for next output file [*auto*|none|arb|fasta|csv]") |
---|
239 | ("preserve-order", po::bool_switch(&opts.inorder), |
---|
240 | "maintain order of sequences") |
---|
241 | ("max-in-flight", po::value<unsigned int>(&opts.max_trays) |
---|
242 | ->default_value(tbb_threads*2), |
---|
243 | "max number of sequences processed at a time") |
---|
244 | ("has-cli-vers", po::value<string>(&opts.has_cli_vers), "verify support of cli version") |
---|
245 | ("no-align", po::bool_switch(&opts.noalign), |
---|
246 | "disable alignment stage (same as prealigned)") |
---|
247 | ("fields,f", po::value<string>(&opts.fields), |
---|
248 | "select fields to write") |
---|
249 | ; |
---|
250 | |
---|
251 | main.add_options() |
---|
252 | ("help,h", "show short help") |
---|
253 | ("help-all,H", "show full help (long)") |
---|
254 | ("in,i", po::value<fs::path>(&opts.in)->default_value("-"), |
---|
255 | "input file (arb or fasta)") |
---|
256 | ("out,o", po::value<std::vector<fs::path>>(&opts.out)->multitoken(), |
---|
257 | "output file (arb, fasta or csv; may be specified multiple times)") |
---|
258 | ("add-relatives", po::value<unsigned int>(&opts.copy_relatives)->default_value(0, ""), |
---|
259 | "add the ARG nearest relatives for each sequence to output") |
---|
260 | ("search,S", po::bool_switch(&opts.do_search), "enable search stage") |
---|
261 | ("prealigned,P", po::bool_switch(&opts.skip_align), "skip alignment stage") |
---|
262 | ("threads,p", po::value<unsigned int>(&opts.threads)->default_value(tbb_automatic, ""), |
---|
263 | "limit number of threads (automatic)") |
---|
264 | ("num-pts", po::value<unsigned int>(&opts.num_pt_servers)->default_value(tbb_threads), |
---|
265 | "number of PT servers to start") |
---|
266 | ("version,V", "show version") |
---|
267 | ; |
---|
268 | } |
---|
269 | |
---|
270 | void validate_vm(po::variables_map& vm, const po::options_description& /*all_od*/, |
---|
271 | po::parsed_options& parsed_options) { |
---|
272 | if (vm.count("has-cli-vers") != 0u) { |
---|
273 | std::cerr << "** SINA (SILVA Incremental Aligner) " << PACKAGE_VERSION |
---|
274 | << " present" << std::endl; |
---|
275 | const char* supported_versions[]{"1", "2", "ARB5.99", "ARB7.1"}; |
---|
276 | for (auto& supported : supported_versions) { |
---|
277 | if (opts.has_cli_vers == supported) { |
---|
278 | exit(EXIT_SUCCESS); |
---|
279 | } |
---|
280 | } |
---|
281 | |
---|
282 | std::cerr << "** Error: requested CLI version '" << opts.has_cli_vers << "' not supported!" << std::endl; |
---|
283 | exit(EXIT_FAILURE); |
---|
284 | } |
---|
285 | |
---|
286 | if (vm.count("version") != 0u) { |
---|
287 | std::cerr << PACKAGE_STRING |
---|
288 | #ifdef PACKAGE_BUILDINFO |
---|
289 | << " (" << PACKAGE_BUILDINFO << ")" |
---|
290 | #endif |
---|
291 | << std::endl; |
---|
292 | exit(EXIT_SUCCESS); |
---|
293 | } |
---|
294 | |
---|
295 | // Autodetect / validate intype selection |
---|
296 | if (opts.intype == SEQUENCE_DB_AUTO) { |
---|
297 | if (opts.in.extension() == ".arb" || opts.in.native() == ":") { |
---|
298 | opts.intype = SEQUENCE_DB_ARB; |
---|
299 | } else if ((opts.in.extension() == ".csv") || |
---|
300 | (opts.in.extension() == ".gz" && |
---|
301 | opts.in.stem().extension() == ".csv")) { |
---|
302 | opts.intype = SEQUENCE_DB_CSV; |
---|
303 | } else { |
---|
304 | opts.intype = SEQUENCE_DB_FASTA; |
---|
305 | } |
---|
306 | } |
---|
307 | |
---|
308 | if (opts.intype == SEQUENCE_DB_NONE) { |
---|
309 | throw logic_error("Input type NONE invalid - need something to process"); |
---|
310 | } |
---|
311 | if (opts.intype == SEQUENCE_DB_CSV) { |
---|
312 | throw logic_error("Input type CSV invalid - can't parse sequences from that"); |
---|
313 | } |
---|
314 | |
---|
315 | SEQUENCE_DB_TYPE type_val = SEQUENCE_DB_AUTO; |
---|
316 | int type_idx = 0, out_idx = 0; |
---|
317 | for (auto &opt : parsed_options.options) { |
---|
318 | if (opt.string_key == "outtype") { |
---|
319 | type_val = opts.outtype[type_idx++]; |
---|
320 | } else if (opt.string_key == "out") { |
---|
321 | for (size_t i = 0; i < opt.value.size(); i++) { |
---|
322 | fs::path out = opts.out[out_idx++]; |
---|
323 | SEQUENCE_DB_TYPE outtype = type_val; |
---|
324 | if (outtype == SEQUENCE_DB_AUTO) { |
---|
325 | if (out.extension() == ".arb" || out.native() == ":") { |
---|
326 | outtype = SEQUENCE_DB_ARB; |
---|
327 | } else if (out == "/dev/null") { |
---|
328 | continue; |
---|
329 | } else if ( |
---|
330 | (out.extension() == ".csv") || |
---|
331 | (out.extension() == ".gz" && out.stem().extension() == ".csv") |
---|
332 | ) { |
---|
333 | outtype = SEQUENCE_DB_CSV; |
---|
334 | } else { |
---|
335 | outtype = SEQUENCE_DB_FASTA; |
---|
336 | } |
---|
337 | } |
---|
338 | opts.out_merged.emplace_back(outtype, out); |
---|
339 | } |
---|
340 | type_val = SEQUENCE_DB_AUTO; |
---|
341 | } |
---|
342 | } |
---|
343 | |
---|
344 | if (out_idx == 0) { // no --out specified |
---|
345 | if (opts.intype == SEQUENCE_DB_ARB) { |
---|
346 | opts.out_merged.push_back(std::make_pair(SEQUENCE_DB_ARB, opts.in)); |
---|
347 | logger->warn("No explicit output file provided. " |
---|
348 | "Reading and writing to same ARB database."); |
---|
349 | } else if (type_val != SEQUENCE_DB_NONE) { |
---|
350 | opts.out_merged.push_back(std::make_pair(SEQUENCE_DB_FASTA, "-")); |
---|
351 | } |
---|
352 | } |
---|
353 | |
---|
354 | // Split copy_fields |
---|
355 | boost::split(opts.v_fields, opts.fields, boost::is_any_of(":,")); |
---|
356 | if (opts.v_fields.back().empty()) { |
---|
357 | opts.v_fields.pop_back(); |
---|
358 | } |
---|
359 | // Add full_name if no fields are specified |
---|
360 | if (opts.v_fields.empty()) { |
---|
361 | opts.v_fields.push_back(query_arb::fn_fullname); |
---|
362 | } |
---|
363 | } |
---|
364 | |
---|
365 | void show_help(po::options_description* od, |
---|
366 | po::options_description* adv = nullptr) { |
---|
367 | std::cerr << "Usage:" << std::endl |
---|
368 | << " sina -i input [-o output] [--prealigned|--db reference] [--search] " |
---|
369 | << "[--search-db search.arb] [options]" |
---|
370 | << std::endl << std::endl |
---|
371 | << *od << std::endl; |
---|
372 | if (adv != nullptr) { |
---|
373 | std::cerr << *adv << std::endl; |
---|
374 | } |
---|
375 | exit(EXIT_SUCCESS); |
---|
376 | } |
---|
377 | |
---|
378 | // do the messy option parsing/validating |
---|
379 | po::variables_map |
---|
380 | parse_options(int argc, char** argv) { |
---|
381 | po::variables_map vm; |
---|
382 | |
---|
383 | string infile, outfile; |
---|
384 | po::options_description od("Options"), adv_od("Advanced Options"); |
---|
385 | |
---|
386 | get_options_description(od, adv_od); |
---|
387 | Log::get_options_description(od, adv_od); |
---|
388 | rw_arb::get_options_description(od, adv_od); |
---|
389 | rw_fasta::get_options_description(od, adv_od); |
---|
390 | rw_csv::get_options_description(od, adv_od); |
---|
391 | aligner::get_options_description(od, adv_od); |
---|
392 | famfinder::get_options_description(od, adv_od); |
---|
393 | search_filter::get_options_description(od, adv_od); |
---|
394 | query_pt::get_options_description(od, adv_od); |
---|
395 | |
---|
396 | po::options_description all_od(od); |
---|
397 | all_od.add(adv_od); |
---|
398 | |
---|
399 | po::positional_options_description no_positional; |
---|
400 | try { |
---|
401 | po::command_line_parser parser(argc, argv); |
---|
402 | parser.options(all_od); |
---|
403 | parser.positional(no_positional); |
---|
404 | po::parsed_options parsed_options = parser.run(); |
---|
405 | po::store(parsed_options, vm); |
---|
406 | |
---|
407 | if (vm.count("help") != 0u) { |
---|
408 | show_help(&od); |
---|
409 | } |
---|
410 | if (vm.count("help-all") != 0u) { |
---|
411 | show_help(&od, &adv_od); |
---|
412 | } |
---|
413 | |
---|
414 | po::notify(vm); |
---|
415 | |
---|
416 | validate_vm(vm, all_od, parsed_options); |
---|
417 | Log::validate_vm(vm, all_od); |
---|
418 | rw_arb::validate_vm(vm, all_od); |
---|
419 | rw_fasta::validate_vm(vm, all_od); |
---|
420 | rw_csv::validate_vm(vm, all_od); |
---|
421 | if (!opts.skip_align && !opts.noalign) { |
---|
422 | famfinder::validate_vm(vm, all_od); |
---|
423 | aligner::validate_vm(vm, all_od); |
---|
424 | } |
---|
425 | if (opts.do_search) { |
---|
426 | search_filter::validate_vm(vm, all_od); |
---|
427 | } |
---|
428 | query_pt::validate_vm(vm, all_od); |
---|
429 | } catch (std::logic_error &e) { |
---|
430 | std::cerr << "Configuration error:" << std::endl |
---|
431 | << e.what() << std::endl |
---|
432 | << "Use \"--help\" to show options" << std::endl |
---|
433 | << std::endl; |
---|
434 | if (vm.count("show-conf") != 0u) { |
---|
435 | show_conf(vm); |
---|
436 | } |
---|
437 | exit(EXIT_FAILURE); |
---|
438 | } |
---|
439 | return vm; |
---|
440 | } |
---|
441 | |
---|
442 | |
---|
443 | int real_main(int argc, char** argv) { |
---|
444 | po::variables_map vm = parse_options(argc, argv); |
---|
445 | logger->warn("This is {}.", PACKAGE_STRING); |
---|
446 | if (vm.count("show-conf") != 0u) { |
---|
447 | show_conf(vm); |
---|
448 | } |
---|
449 | |
---|
450 | tbb::task_scheduler_init init(vm["threads"].as<unsigned int>()); |
---|
451 | |
---|
452 | tf::graph g; // Main data flow graph (pipeline) |
---|
453 | logger_progress p(logger, "Processing"); |
---|
454 | |
---|
455 | vector<std::unique_ptr<tf::graph_node>> nodes; // Nodes (for cleanup) |
---|
456 | tf::sender<tray> *last_node; // Last tray producing node |
---|
457 | |
---|
458 | using source_node = tf::source_node<tray>; |
---|
459 | using filter_node = tf::function_node<tray, tray>; |
---|
460 | using limiter_node = tf::limiter_node<tray>; |
---|
461 | filter_node *node; |
---|
462 | |
---|
463 | // initialize default alignment |
---|
464 | query_arb::override_default_alignment(rw_arb::get_cli_alignment()); |
---|
465 | |
---|
466 | // Make source node reading sequences |
---|
467 | source_node *source; // will be activated once graph complete |
---|
468 | switch (opts.intype) { |
---|
469 | case SEQUENCE_DB_ARB: { |
---|
470 | auto arbreader = rw_arb::reader(opts.in, opts.v_fields); |
---|
471 | arbreader.set_progress(p); |
---|
472 | source = new source_node(g, arbreader, false); |
---|
473 | } |
---|
474 | break; |
---|
475 | case SEQUENCE_DB_FASTA: { |
---|
476 | auto fastareader = rw_fasta::reader(opts.in, opts.v_fields); |
---|
477 | fastareader.set_progress(p); |
---|
478 | source = new source_node(g, fastareader, false); |
---|
479 | } |
---|
480 | break; |
---|
481 | default: |
---|
482 | throw logic_error("input type undefined"); |
---|
483 | } |
---|
484 | nodes.emplace_back(source); |
---|
485 | last_node = source; |
---|
486 | |
---|
487 | // Make node limiting in-flight sequence trays |
---|
488 | auto *limiter = new limiter_node(g, opts.max_trays); |
---|
489 | tf::make_edge(*last_node, *limiter); |
---|
490 | nodes.emplace_back(limiter); |
---|
491 | last_node = limiter; |
---|
492 | |
---|
493 | // determine number of pt servers for search and align |
---|
494 | if (not opts.skip_align && not opts.noalign && opts.do_search) { |
---|
495 | opts.num_pt_servers /= 2; |
---|
496 | } |
---|
497 | if (opts.num_pt_servers < 1) { |
---|
498 | opts.num_pt_servers = 1; |
---|
499 | } |
---|
500 | if (famfinder::get_engine() == ENGINE_SINA_KMER) { |
---|
501 | opts.num_pt_servers = tf::unlimited; |
---|
502 | } |
---|
503 | |
---|
504 | if (opts.skip_align || opts.noalign) { |
---|
505 | // Just copy alignment over |
---|
506 | node = new filter_node(g, tf::unlimited, [](tray t) -> tray { |
---|
507 | t.aligned_sequence = new cseq(*t.input_sequence); |
---|
508 | return t; |
---|
509 | }); |
---|
510 | tf::make_edge(*last_node, *node); |
---|
511 | nodes.emplace_back(node); |
---|
512 | last_node = node; |
---|
513 | } else { |
---|
514 | node = new filter_node(g, opts.num_pt_servers, famfinder()); |
---|
515 | tf::make_edge(*last_node, *node); |
---|
516 | nodes.emplace_back(node); |
---|
517 | last_node = node; |
---|
518 | |
---|
519 | node = new filter_node(g, tf::unlimited, aligner()); |
---|
520 | tf::make_edge(*last_node, *node); |
---|
521 | nodes.emplace_back(node); |
---|
522 | last_node = node; |
---|
523 | } |
---|
524 | |
---|
525 | if (opts.do_search) { |
---|
526 | node = new filter_node(g, opts.num_pt_servers, search_filter()); |
---|
527 | tf::make_edge(*last_node, *node); |
---|
528 | nodes.emplace_back(node); |
---|
529 | last_node = node; |
---|
530 | } |
---|
531 | |
---|
532 | if (opts.inorder) { |
---|
533 | using sequencer_node = tf::sequencer_node<tray>; |
---|
534 | sequencer_node *node = new sequencer_node( |
---|
535 | g, [](const tray& t) -> int { |
---|
536 | return t.seqno - 1; |
---|
537 | }); |
---|
538 | tf::make_edge(*last_node, *node); |
---|
539 | nodes.emplace_back(node); |
---|
540 | last_node = node; |
---|
541 | } |
---|
542 | |
---|
543 | // Make node writing sequences |
---|
544 | for (auto& out : opts.out_merged) { |
---|
545 | switch(out.first) { |
---|
546 | case SEQUENCE_DB_ARB: |
---|
547 | node = new filter_node(g, 1, rw_arb::writer(out.second, |
---|
548 | opts.copy_relatives, |
---|
549 | opts.v_fields)); |
---|
550 | break; |
---|
551 | case SEQUENCE_DB_FASTA: |
---|
552 | node = new filter_node(g, 1, rw_fasta::writer(out.second, |
---|
553 | opts.copy_relatives, |
---|
554 | opts.v_fields)); |
---|
555 | break; |
---|
556 | case SEQUENCE_DB_CSV: |
---|
557 | node = new filter_node(g, 1, rw_csv::writer(out.second, |
---|
558 | opts.copy_relatives, |
---|
559 | opts.v_fields)); |
---|
560 | break; |
---|
561 | default: |
---|
562 | throw logic_error("output type undefined"); |
---|
563 | } |
---|
564 | tf::make_edge(*last_node, *node); |
---|
565 | nodes.emplace_back(node); |
---|
566 | last_node = node; |
---|
567 | } |
---|
568 | |
---|
569 | node = new filter_node(g, 1, Log::printer()); |
---|
570 | tf::make_edge(*last_node, *node); |
---|
571 | nodes.emplace_back(node); |
---|
572 | last_node = node; |
---|
573 | |
---|
574 | int count = 0; |
---|
575 | // needs very new tbb: tf::function_node<tray, tf::continue_msg, tf::lightweight> |
---|
576 | tf::function_node<tray, tf::continue_msg> |
---|
577 | sink(g, 1, [&](tray t) -> tf::continue_msg { |
---|
578 | count++; |
---|
579 | ++p; |
---|
580 | t.destroy(); |
---|
581 | return tf::continue_msg(); |
---|
582 | }); |
---|
583 | tf::make_edge(sink, limiter->decrement); |
---|
584 | tf::make_edge(*last_node, sink); |
---|
585 | |
---|
586 | logger->warn("Aligner ready. Processing sequences"); |
---|
587 | timestamp before; |
---|
588 | source->activate(); |
---|
589 | g.wait_for_all(); |
---|
590 | timestamp after; |
---|
591 | logger->warn("Took {} to align {} sequences ({} sequences/s)", |
---|
592 | after-before, count, count/(after-before)); |
---|
593 | nodes.clear(); |
---|
594 | logger->warn("SINA finished."); |
---|
595 | return 0; |
---|
596 | } |
---|
597 | |
---|
598 | int main(int argc, char** argv) { |
---|
599 | try { |
---|
600 | return real_main(argc, argv); |
---|
601 | } catch (query_arb_exception &e) { |
---|
602 | logger->error(e.what()); |
---|
603 | logger->error("The ARB database you were trying to use is likely corrupted."); |
---|
604 | return EXIT_FAILURE; |
---|
605 | } catch (std::exception &e) { |
---|
606 | logger->error("Error during program execution: {} {}", |
---|
607 | demangle(typeid(e).name()), |
---|
608 | e.what()); |
---|
609 | return EXIT_FAILURE; |
---|
610 | } |
---|
611 | } |
---|
612 | |
---|
613 | |
---|
614 | /* |
---|
615 | Local Variables: |
---|
616 | mode:c++ |
---|
617 | c-file-style:"stroustrup" |
---|
618 | c-file-offsets:((innamespace . 0)(inline-open . 0)(case-label . 0)) |
---|
619 | indent-tabs-mode:nil |
---|
620 | fill-column:99 |
---|
621 | End: |
---|
622 | */ |
---|
623 | // vim: filetype=cpp:expandtab:shiftwidth=4:tabstop=8:softtabstop=4:encoding=utf-8:textwidth=99 : |
---|