| 1 | /* |
|---|
| 2 | Copyright (c) 2006-2018 Elmar Pruesse <elmar.pruesse@ucdenver.edu> |
|---|
| 3 | |
|---|
| 4 | This file is part of SINA. |
|---|
| 5 | SINA is free software: you can redistribute it and/or modify it under |
|---|
| 6 | the terms of the GNU General Public License as published by the Free |
|---|
| 7 | Software Foundation, either version 3 of the License, or (at your |
|---|
| 8 | option) any later version. |
|---|
| 9 | |
|---|
| 10 | SINA is distributed in the hope that it will be useful, but WITHOUT ANY |
|---|
| 11 | WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|---|
| 12 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|---|
| 13 | for more details. |
|---|
| 14 | |
|---|
| 15 | You should have received a copy of the GNU General Public License |
|---|
| 16 | along with SINA. If not, see <http://www.gnu.org/licenses/>. |
|---|
| 17 | |
|---|
| 18 | Additional permission under GNU GPL version 3 section 7 |
|---|
| 19 | |
|---|
| 20 | If you modify SINA, or any covered work, by linking or combining it |
|---|
| 21 | with components of ARB (or a modified version of that software), |
|---|
| 22 | containing parts covered by the terms of the |
|---|
| 23 | ARB-public-library-license, the licensors of SINA grant you additional |
|---|
| 24 | permission to convey the resulting work. Corresponding Source for a |
|---|
| 25 | non-source form of such a combination shall include the source code |
|---|
| 26 | for the parts of ARB used as well as that of the covered work. |
|---|
| 27 | */ |
|---|
| 28 | |
|---|
| 29 | #include "config.h" |
|---|
| 30 | #include "query_pt.h" |
|---|
| 31 | #include "timer.h" |
|---|
| 32 | |
|---|
| 33 | #include <cstring> |
|---|
| 34 | |
|---|
| 35 | #include <iostream> |
|---|
| 36 | #include <sstream> |
|---|
| 37 | #include <cmath> |
|---|
| 38 | #include <mutex> |
|---|
| 39 | |
|---|
| 40 | #include <sys/types.h> |
|---|
| 41 | #include <sys/stat.h> |
|---|
| 42 | #include <unistd.h> |
|---|
| 43 | |
|---|
| 44 | using std::stringstream; |
|---|
| 45 | using std::string; |
|---|
| 46 | using std::vector; |
|---|
| 47 | |
|---|
| 48 | #include "cseq.h" |
|---|
| 49 | #include "cseq_comparator.h" |
|---|
| 50 | #include "log.h" |
|---|
| 51 | |
|---|
| 52 | #include "query_arb.h" |
|---|
| 53 | |
|---|
| 54 | #include <dlfcn.h> |
|---|
| 55 | #include <cstdlib> |
|---|
| 56 | |
|---|
| 57 | #include <arbdb.h> |
|---|
| 58 | #include <PT_com.h> |
|---|
| 59 | #include <client.h> |
|---|
| 60 | #include <boost/algorithm/string/predicate.hpp> |
|---|
| 61 | #include <boost/dll.hpp> |
|---|
| 62 | #include <pstream.h> |
|---|
| 63 | |
|---|
| 64 | #include <boost/program_options.hpp> |
|---|
| 65 | namespace po = boost::program_options; |
|---|
| 66 | |
|---|
| 67 | #include <boost/filesystem.hpp> |
|---|
| 68 | namespace fs = boost::filesystem; |
|---|
| 69 | |
|---|
| 70 | #include <utility> |
|---|
| 71 | |
|---|
| 72 | namespace sina { |
|---|
| 73 | |
|---|
| 74 | static auto logger = Log::create_logger("Search (ARB PT)"); |
|---|
| 75 | static auto pt_logger = Log::create_logger("ARB_PT_SERVER"); |
|---|
| 76 | |
|---|
| 77 | static void override_LD_LIBRARY_PATH() { |
|---|
| 78 | // Allow to override LD_LIBRARY_PATH used for 'arb_pt_server'. |
|---|
| 79 | // |
|---|
| 80 | // This allows arb to support sina installed from a fat tarball |
|---|
| 81 | // (see ../../arb-sina-fat.README for more details). |
|---|
| 82 | // |
|---|
| 83 | // SINA_SUBCMD_LD_LIBRARY_PATH is set by the shell-script |
|---|
| 84 | // used to call sina from arb: ../../../../SH/arb_sina.sh |
|---|
| 85 | |
|---|
| 86 | static bool checked = false; |
|---|
| 87 | if (!checked) { |
|---|
| 88 | const char *LD_LIBRARY_PATH_forSubcmds = getenv("SINA_SUBCMD_LD_LIBRARY_PATH"); |
|---|
| 89 | if (LD_LIBRARY_PATH_forSubcmds) { |
|---|
| 90 | logger->warn("overriding value of LD_LIBRARY_PATH with '{}'", LD_LIBRARY_PATH_forSubcmds); |
|---|
| 91 | logger->warn("(triggered by existance of environment variable SINA_SUBCMD_LD_LIBRARY_PATH)"); |
|---|
| 92 | int result = setenv("LD_LIBRARY_PATH", LD_LIBRARY_PATH_forSubcmds, 1); |
|---|
| 93 | if (result != 0) { |
|---|
| 94 | throw query_pt_exception("failed to setenv (LD_LIBRARY_PATH)"); |
|---|
| 95 | } |
|---|
| 96 | } |
|---|
| 97 | checked = true; |
|---|
| 98 | } |
|---|
| 99 | } |
|---|
| 100 | |
|---|
| 101 | class managed_pt_server { |
|---|
| 102 | redi::ipstream* process; |
|---|
| 103 | const fs::path dbname; |
|---|
| 104 | const string portname; |
|---|
| 105 | public: |
|---|
| 106 | managed_pt_server(fs::path dbname_, string portname_); |
|---|
| 107 | managed_pt_server(const managed_pt_server&); |
|---|
| 108 | ~managed_pt_server(); |
|---|
| 109 | fs::path ensure_env_ARBHOME(); |
|---|
| 110 | fs::path get_pt_server_path(); |
|---|
| 111 | fs::path ensure_index_exists(); |
|---|
| 112 | bool build_index(fs::path index_arb_file); |
|---|
| 113 | }; |
|---|
| 114 | |
|---|
| 115 | static std::mutex build_lock, launch_lock; |
|---|
| 116 | |
|---|
| 117 | managed_pt_server::managed_pt_server(fs::path dbname_, string portname_) |
|---|
| 118 | : dbname(std::move(dbname_)), portname(std::move(portname_)) |
|---|
| 119 | { |
|---|
| 120 | // Check that database specified and file accessible |
|---|
| 121 | if (dbname.empty() or not fs::exists(dbname)) { |
|---|
| 122 | throw query_pt_exception("Missing reference database"); |
|---|
| 123 | } |
|---|
| 124 | |
|---|
| 125 | ensure_env_ARBHOME(); |
|---|
| 126 | fs::path arb_pt_server = get_pt_server_path(); |
|---|
| 127 | fs::path index_arb_file = ensure_index_exists(); |
|---|
| 128 | |
|---|
| 129 | // Check portname: allowed are localhost:PORT, :PORT and :SOCKETFILE |
|---|
| 130 | int split = portname.find(':'); |
|---|
| 131 | string host = portname.substr(0, split); |
|---|
| 132 | string port = portname.substr(split+1); |
|---|
| 133 | if (!host.empty() && host != "localhost") { |
|---|
| 134 | throw query_pt_exception("Starting a PT server on hosts other than localhost not supported"); |
|---|
| 135 | } |
|---|
| 136 | |
|---|
| 137 | // Actually launch the server now: |
|---|
| 138 | vector<string> cmd{arb_pt_server.native(), "-D" + index_arb_file.native(), "-T" +portname}; |
|---|
| 139 | logger->info("Launching background PT server for {} on {}", dbname, portname); |
|---|
| 140 | logger->debug("Executing ['{}']", fmt::join(cmd, "', '")); |
|---|
| 141 | override_LD_LIBRARY_PATH(); |
|---|
| 142 | { |
|---|
| 143 | std::lock_guard<std::mutex> lock(launch_lock); |
|---|
| 144 | // something in here appears to not be thread safe |
|---|
| 145 | process = new redi::ipstream(cmd, redi::pstreams::pstdout|redi::pstreams::pstderr); // launches ptserver |
|---|
| 146 | } |
|---|
| 147 | |
|---|
| 148 | // read the pt server output. once it says "ok" |
|---|
| 149 | // we can be sure that it's ready for connections |
|---|
| 150 | // (connecting at wrong time causes lockup) |
|---|
| 151 | // FIXME: abort if waiting for too long |
|---|
| 152 | string line; |
|---|
| 153 | while (std::getline(*process, line)) { |
|---|
| 154 | pt_logger->debug(line); |
|---|
| 155 | if (line == "ok, server is running.") { |
|---|
| 156 | break; |
|---|
| 157 | } |
|---|
| 158 | } |
|---|
| 159 | if (line.empty()) { |
|---|
| 160 | process->rdbuf()->kill(); |
|---|
| 161 | throw query_pt_exception("PT server failed to respond. Do you have enough memory?"); |
|---|
| 162 | } |
|---|
| 163 | if (process->rdbuf()->exited()) { |
|---|
| 164 | throw query_pt_exception( |
|---|
| 165 | fmt::format("PT server exited immediately. Exit status was {}", |
|---|
| 166 | process->rdbuf()->status())); |
|---|
| 167 | } |
|---|
| 168 | |
|---|
| 169 | logger->warn("Launched PT server ({} on {}).", dbname, portname); |
|---|
| 170 | } |
|---|
| 171 | |
|---|
| 172 | managed_pt_server::~managed_pt_server() { |
|---|
| 173 | logger->warn("Terminating PT server ({} on {})", dbname, portname); |
|---|
| 174 | process->rdbuf()->kill(); |
|---|
| 175 | } |
|---|
| 176 | |
|---|
| 177 | fs::path |
|---|
| 178 | managed_pt_server::ensure_index_exists() { |
|---|
| 179 | std::lock_guard<std::mutex> lock(build_lock); |
|---|
| 180 | |
|---|
| 181 | fs::path index_arb_file = dbname; |
|---|
| 182 | index_arb_file += ".index.arb"; |
|---|
| 183 | fs::path index_pt_file = index_arb_file; |
|---|
| 184 | index_pt_file += ".pt"; |
|---|
| 185 | |
|---|
| 186 | |
|---|
| 187 | if (fs::exists(index_pt_file) && fs::exists(index_arb_file)) { |
|---|
| 188 | if (fs::last_write_time(index_arb_file) >= fs::last_write_time(dbname) |
|---|
| 189 | && fs::last_write_time(index_pt_file) >= fs::last_write_time(index_arb_file)) { |
|---|
| 190 | // some.arb older-than some.arb.index.arb older-than some.arb.index.arb.pt |
|---|
| 191 | // -> good index exists |
|---|
| 192 | return index_arb_file; |
|---|
| 193 | } else { |
|---|
| 194 | logger->info("PT server index out of date for {}. Rebuilding:", dbname); |
|---|
| 195 | } |
|---|
| 196 | } else { |
|---|
| 197 | logger->info("PT server index missing or incomplete for {}. Building:", dbname); |
|---|
| 198 | } |
|---|
| 199 | |
|---|
| 200 | build_index(index_arb_file); |
|---|
| 201 | |
|---|
| 202 | if (not fs::exists(index_pt_file) || |
|---|
| 203 | fs::last_write_time(index_pt_file) < fs::last_write_time(dbname)) { |
|---|
| 204 | throw query_pt_exception("Failed to (re)build PT server index! (out of memory?)"); |
|---|
| 205 | } |
|---|
| 206 | return index_arb_file; |
|---|
| 207 | } |
|---|
| 208 | |
|---|
| 209 | bool managed_pt_server::build_index(fs::path index_arb_file) { |
|---|
| 210 | fs::path arb_pt_server = get_pt_server_path(); |
|---|
| 211 | vector<string> cmds; |
|---|
| 212 | // copy database file |
|---|
| 213 | cmds.push_back(string("cp ") + dbname.native() + " " + index_arb_file.native()); |
|---|
| 214 | // shrink database (convert to index db) |
|---|
| 215 | cmds.push_back(arb_pt_server.native() + " -build_clean -D" + index_arb_file.native()); |
|---|
| 216 | // build index (build .arb.pt) |
|---|
| 217 | cmds.push_back(arb_pt_server.native() + " -build -D" + index_arb_file.native()); |
|---|
| 218 | for (auto& cmd : cmds) { |
|---|
| 219 | logger->debug("Executing '{}'", cmd); |
|---|
| 220 | override_LD_LIBRARY_PATH(); |
|---|
| 221 | int rval = system(cmd.c_str()); // launches ptserver |
|---|
| 222 | logger->debug("Command finished"); |
|---|
| 223 | if (rval) { |
|---|
| 224 | logger->error("Command {} failed with exit code {}", cmd, rval); |
|---|
| 225 | return false; |
|---|
| 226 | } |
|---|
| 227 | } |
|---|
| 228 | return true; |
|---|
| 229 | } |
|---|
| 230 | |
|---|
| 231 | |
|---|
| 232 | fs::path |
|---|
| 233 | managed_pt_server::ensure_env_ARBHOME() { |
|---|
| 234 | fs::path ARBHOME; |
|---|
| 235 | // Make sure ARBHOME is set; guess if possible |
|---|
| 236 | if (const char* arbhome = std::getenv("ARBHOME")) { |
|---|
| 237 | ARBHOME = arbhome; |
|---|
| 238 | logger->info("Using ARBHOME={}", ARBHOME); |
|---|
| 239 | } else { |
|---|
| 240 | ARBHOME = boost::dll::symbol_location(GB_open).parent_path().parent_path(); |
|---|
| 241 | logger->info("Setting ARBHOME={}", ARBHOME); |
|---|
| 242 | setenv("ARBHOME", ARBHOME.c_str(), 1); // no setenv in C++/STL |
|---|
| 243 | } |
|---|
| 244 | return ARBHOME; |
|---|
| 245 | } |
|---|
| 246 | |
|---|
| 247 | fs::path |
|---|
| 248 | managed_pt_server::get_pt_server_path() { |
|---|
| 249 | // Locate PT server binary |
|---|
| 250 | fs::path arb_pt_server("arb_pt_server"); |
|---|
| 251 | fs::path arb_pt_server_path = fs::system_complete(arb_pt_server); |
|---|
| 252 | if (!fs::exists(arb_pt_server_path)) { // not in PATH |
|---|
| 253 | logger->debug("{} not found in PATH", arb_pt_server); |
|---|
| 254 | arb_pt_server_path = ensure_env_ARBHOME() / "bin" / arb_pt_server; |
|---|
| 255 | if (!fs::exists(arb_pt_server_path)) { // not in ARBHOME |
|---|
| 256 | logger->debug("{} not found in ARBHOME", arb_pt_server); |
|---|
| 257 | // 3. Next to us |
|---|
| 258 | fs::path self_dir = boost::dll::program_location().parent_path(); |
|---|
| 259 | arb_pt_server_path = self_dir / arb_pt_server; |
|---|
| 260 | if (!fs::exists(arb_pt_server_path)) { // not next to us either? |
|---|
| 261 | logger->debug("{} not found in {}", arb_pt_server, self_dir); |
|---|
| 262 | throw query_pt_exception("Failed to locate 'arb_pt_server'"); |
|---|
| 263 | } |
|---|
| 264 | } |
|---|
| 265 | } |
|---|
| 266 | return arb_pt_server_path; |
|---|
| 267 | } |
|---|
| 268 | |
|---|
| 269 | struct query_pt::options { |
|---|
| 270 | }; |
|---|
| 271 | struct query_pt::options *query_pt::opts; |
|---|
| 272 | |
|---|
| 273 | void |
|---|
| 274 | query_pt::get_options_description(po::options_description& /*main*/, |
|---|
| 275 | po::options_description& /*adv*/) { |
|---|
| 276 | } |
|---|
| 277 | |
|---|
| 278 | void |
|---|
| 279 | query_pt::validate_vm(po::variables_map& /* vm */, |
|---|
| 280 | po::options_description& /*desc*/) { |
|---|
| 281 | } |
|---|
| 282 | |
|---|
| 283 | |
|---|
| 284 | struct query_pt::priv_data { |
|---|
| 285 | aisc_com *link{nullptr}; |
|---|
| 286 | T_PT_MAIN com; |
|---|
| 287 | T_PT_LOCS locs; |
|---|
| 288 | T_PT_FAMILYFINDER ffinder; |
|---|
| 289 | |
|---|
| 290 | std::mutex arb_pt_access; |
|---|
| 291 | |
|---|
| 292 | unsigned int range_begin{0}; |
|---|
| 293 | unsigned int range_end{INT_MAX}; |
|---|
| 294 | bool find_type_fast{false}; |
|---|
| 295 | int kmer_len; |
|---|
| 296 | int num_mismatch; |
|---|
| 297 | bool relative_sort; |
|---|
| 298 | |
|---|
| 299 | query_arb *arbdb; |
|---|
| 300 | |
|---|
| 301 | timer timeit; |
|---|
| 302 | |
|---|
| 303 | static std::map<string, std::weak_ptr<managed_pt_server>> servers; |
|---|
| 304 | std::shared_ptr<managed_pt_server> server; |
|---|
| 305 | |
|---|
| 306 | bool connect_server(const string& portname); |
|---|
| 307 | void disconnect_server(); |
|---|
| 308 | }; |
|---|
| 309 | |
|---|
| 310 | |
|---|
| 311 | bool |
|---|
| 312 | query_pt::priv_data::connect_server(const string& portname) { |
|---|
| 313 | std::lock_guard<std::mutex> lock(arb_pt_access); |
|---|
| 314 | GB_ERROR error = nullptr; |
|---|
| 315 | link = aisc_open(portname.c_str(), com, AISC_MAGIC_NUMBER, &error); |
|---|
| 316 | if (error != nullptr) { |
|---|
| 317 | throw query_pt_exception(error); |
|---|
| 318 | } |
|---|
| 319 | if (link == nullptr) { |
|---|
| 320 | return false; |
|---|
| 321 | } |
|---|
| 322 | |
|---|
| 323 | if (aisc_create(link, |
|---|
| 324 | PT_MAIN, com, |
|---|
| 325 | MAIN_LOCS, PT_LOCS, locs, |
|---|
| 326 | NULL) != 0) { |
|---|
| 327 | throw query_pt_exception("Unable to connect to PT server! (code 02)"); |
|---|
| 328 | } |
|---|
| 329 | |
|---|
| 330 | if (aisc_create(link, |
|---|
| 331 | PT_LOCS, locs, |
|---|
| 332 | LOCS_FFINDER, PT_FAMILYFINDER, ffinder, |
|---|
| 333 | NULL) != 0) { |
|---|
| 334 | throw query_pt_exception("Unable to connect to PT server! (code 03)"); |
|---|
| 335 | } |
|---|
| 336 | |
|---|
| 337 | return true; |
|---|
| 338 | } |
|---|
| 339 | |
|---|
| 340 | void |
|---|
| 341 | query_pt::priv_data::disconnect_server() { |
|---|
| 342 | std::lock_guard<std::mutex> lock(arb_pt_access); |
|---|
| 343 | aisc_close(link, com); |
|---|
| 344 | } |
|---|
| 345 | |
|---|
| 346 | |
|---|
| 347 | std::map<string, std::weak_ptr<managed_pt_server>> query_pt::priv_data::servers; |
|---|
| 348 | |
|---|
| 349 | |
|---|
| 350 | query_pt* |
|---|
| 351 | query_pt::get_pt_search(const fs::path& filename, int k, |
|---|
| 352 | bool fast, |
|---|
| 353 | bool norel, |
|---|
| 354 | int mk, |
|---|
| 355 | std::string portname) { |
|---|
| 356 | if (portname.empty()) { |
|---|
| 357 | portname = ":" + (fs::temp_directory_path() / fs::unique_path()).native(); |
|---|
| 358 | } |
|---|
| 359 | |
|---|
| 360 | return new query_pt(portname.c_str(), filename.native().c_str(), |
|---|
| 361 | fast, k, mk, norel); |
|---|
| 362 | } |
|---|
| 363 | |
|---|
| 364 | |
|---|
| 365 | query_pt::query_pt(const char* portname, const char* dbname, |
|---|
| 366 | bool fast, int k, int mk, bool norel) |
|---|
| 367 | : data(new priv_data()) |
|---|
| 368 | { |
|---|
| 369 | if (priv_data::servers.count(portname) != 0u) { |
|---|
| 370 | data->server = priv_data::servers[portname].lock(); |
|---|
| 371 | } |
|---|
| 372 | |
|---|
| 373 | if (!data->connect_server(portname)) { |
|---|
| 374 | data->server = std::make_shared<managed_pt_server>(dbname, portname); |
|---|
| 375 | if (!data->connect_server(portname)) { |
|---|
| 376 | delete data; |
|---|
| 377 | throw query_pt_exception("Failed to start PT server. Do you have enough memory?"); |
|---|
| 378 | } |
|---|
| 379 | priv_data::servers[portname] = data->server; |
|---|
| 380 | } |
|---|
| 381 | |
|---|
| 382 | data->arbdb = query_arb::getARBDB(dbname); |
|---|
| 383 | |
|---|
| 384 | set_find_type_fast(fast); |
|---|
| 385 | set_probe_len(k); |
|---|
| 386 | set_mismatches(mk); |
|---|
| 387 | set_sort_type(norel); |
|---|
| 388 | } |
|---|
| 389 | |
|---|
| 390 | query_pt::~query_pt() { |
|---|
| 391 | logger->info("Timings for PT Search: {}", data->timeit); |
|---|
| 392 | delete data; |
|---|
| 393 | } |
|---|
| 394 | |
|---|
| 395 | |
|---|
| 396 | void |
|---|
| 397 | query_pt::set_find_type_fast(bool fast) { |
|---|
| 398 | std::lock_guard<std::mutex> lock(data->arb_pt_access); |
|---|
| 399 | int err = aisc_put(data->link, |
|---|
| 400 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 401 | FAMILYFINDER_FIND_TYPE, fast?1L:0L, |
|---|
| 402 | NULL); |
|---|
| 403 | if (err != 0) { |
|---|
| 404 | logger->warn("Unable to set find_type = {}", fast ? "fast" : "normal"); |
|---|
| 405 | } else { |
|---|
| 406 | data->find_type_fast = fast; |
|---|
| 407 | } |
|---|
| 408 | } |
|---|
| 409 | |
|---|
| 410 | void |
|---|
| 411 | query_pt::set_probe_len(int len) { |
|---|
| 412 | std::lock_guard<std::mutex> lock(data->arb_pt_access); |
|---|
| 413 | int err = aisc_put(data->link, |
|---|
| 414 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 415 | FAMILYFINDER_PROBE_LEN, long(len), |
|---|
| 416 | NULL); |
|---|
| 417 | if (err != 0) { |
|---|
| 418 | logger->warn("Unable to set k = {}", len); |
|---|
| 419 | } else { |
|---|
| 420 | data->kmer_len = len; |
|---|
| 421 | } |
|---|
| 422 | } |
|---|
| 423 | |
|---|
| 424 | void |
|---|
| 425 | query_pt::set_mismatches(int len) { |
|---|
| 426 | std::lock_guard<std::mutex> lock(data->arb_pt_access); |
|---|
| 427 | int err = aisc_put(data->link, |
|---|
| 428 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 429 | FAMILYFINDER_MISMATCH_NUMBER, long(len), |
|---|
| 430 | NULL); |
|---|
| 431 | |
|---|
| 432 | if (err != 0) { |
|---|
| 433 | logger->warn("Unable to set allowable mismatches to {}", len); |
|---|
| 434 | } else { |
|---|
| 435 | data->num_mismatch = len; |
|---|
| 436 | } |
|---|
| 437 | } |
|---|
| 438 | |
|---|
| 439 | void |
|---|
| 440 | query_pt::set_sort_type(bool absolute) { |
|---|
| 441 | std::lock_guard<std::mutex> lock(data->arb_pt_access); |
|---|
| 442 | int err = aisc_put(data->link, |
|---|
| 443 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 444 | FAMILYFINDER_SORT_TYPE, absolute?0L:1L, |
|---|
| 445 | NULL); |
|---|
| 446 | if (err != 0) { |
|---|
| 447 | logger->warn("Unable to set sort type = {}", absolute ? "absolute" : "relative"); |
|---|
| 448 | } else { |
|---|
| 449 | data->relative_sort = !absolute; |
|---|
| 450 | } |
|---|
| 451 | } |
|---|
| 452 | |
|---|
| 453 | void |
|---|
| 454 | query_pt::set_range(int startpos, int stoppos) { |
|---|
| 455 | std::lock_guard<std::mutex> lock(data->arb_pt_access); |
|---|
| 456 | |
|---|
| 457 | int err = aisc_put(data->link, |
|---|
| 458 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 459 | FAMILYFINDER_RANGE_STARTPOS, long(startpos), |
|---|
| 460 | FAMILYFINDER_RANGE_ENDPOS, long(stoppos), |
|---|
| 461 | NULL); |
|---|
| 462 | if (err != 0) { |
|---|
| 463 | logger->warn("Unable to constain matching to {}-{}", startpos, stoppos); |
|---|
| 464 | } else { |
|---|
| 465 | data->range_begin = startpos; |
|---|
| 466 | data->range_end = stoppos; |
|---|
| 467 | } |
|---|
| 468 | } |
|---|
| 469 | |
|---|
| 470 | double |
|---|
| 471 | query_pt::match(search::result_vector &family, const cseq& queryc, |
|---|
| 472 | int min_match, int max_match, float min_score, float max_score, |
|---|
| 473 | query_arb *arb, bool noid, int min_len, |
|---|
| 474 | int num_full, int full_min_len, int range_cover, bool leave_query_out |
|---|
| 475 | ) { |
|---|
| 476 | string query_str = queryc.getBases(); |
|---|
| 477 | const char* query = query_str.c_str(); |
|---|
| 478 | int maxfail = 5; |
|---|
| 479 | int range_cover_left = range_cover; |
|---|
| 480 | int range_cover_right = range_cover; |
|---|
| 481 | int matches = 0; |
|---|
| 482 | int skipped_max_score = 0; |
|---|
| 483 | int skipped_broken = 0; |
|---|
| 484 | int skipped_min_len = 0; |
|---|
| 485 | int skipped_noid = 0; |
|---|
| 486 | cseq_comparator cmp(CMP_IUPAC_OPTIMISTIC, CMP_DIST_NONE, CMP_COVER_QUERY, false); |
|---|
| 487 | |
|---|
| 488 | if (query_str.size() < 20) { |
|---|
| 489 | logger->warn("Sequence {} too short ({} bp) for PT server search", |
|---|
| 490 | queryc.getName(), strlen(query)); |
|---|
| 491 | return 0; |
|---|
| 492 | } |
|---|
| 493 | |
|---|
| 494 | T_PT_FAMILYLIST f_list; |
|---|
| 495 | |
|---|
| 496 | bytestring bs; |
|---|
| 497 | bs.data = const_cast<char*>(query); |
|---|
| 498 | bs.size = strlen(query)+1; |
|---|
| 499 | |
|---|
| 500 | match_retry: |
|---|
| 501 | family.reserve(max_match); |
|---|
| 502 | std::lock_guard<std::mutex> lock(data->arb_pt_access); |
|---|
| 503 | |
|---|
| 504 | int err = aisc_put(data->link, |
|---|
| 505 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 506 | FAMILYFINDER_FIND_FAMILY, &bs, |
|---|
| 507 | NULL); |
|---|
| 508 | if (err != 0) { |
|---|
| 509 | logger->error("Unable to execute find_family command on pt-server"); |
|---|
| 510 | if (--maxfail==0) { |
|---|
| 511 | logger->error("No retries left; aborting."); |
|---|
| 512 | return 0; |
|---|
| 513 | } |
|---|
| 514 | logger->error("Retrying..."); |
|---|
| 515 | |
|---|
| 516 | //FIXME restart(); |
|---|
| 517 | goto match_retry; |
|---|
| 518 | } |
|---|
| 519 | |
|---|
| 520 | err = aisc_get(data->link, |
|---|
| 521 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 522 | FAMILYFINDER_FAMILY_LIST, f_list.as_result_param(), |
|---|
| 523 | NULL); |
|---|
| 524 | if (err != 0) { |
|---|
| 525 | logger->error("Unable to get results for search"); |
|---|
| 526 | return 0; |
|---|
| 527 | } |
|---|
| 528 | if (!f_list.exists()) { |
|---|
| 529 | return 0; |
|---|
| 530 | } |
|---|
| 531 | |
|---|
| 532 | |
|---|
| 533 | char *f_name; |
|---|
| 534 | double f_relscore = 0.f; |
|---|
| 535 | do { |
|---|
| 536 | err = aisc_get(data->link, PT_FAMILYLIST, f_list, |
|---|
| 537 | FAMILYLIST_NAME, &f_name, |
|---|
| 538 | FAMILYLIST_REL_MATCHES, &f_relscore, |
|---|
| 539 | FAMILYLIST_NEXT, f_list.as_result_param(), |
|---|
| 540 | NULL); |
|---|
| 541 | if (err != 0) { |
|---|
| 542 | logger->error("Unable to get next item in family list"); |
|---|
| 543 | break; |
|---|
| 544 | } |
|---|
| 545 | |
|---|
| 546 | if (leave_query_out && f_name == queryc.getName()) { |
|---|
| 547 | logger->info("Omitting sequence with same name as query from result"); |
|---|
| 548 | free(f_name); |
|---|
| 549 | continue; |
|---|
| 550 | } |
|---|
| 551 | |
|---|
| 552 | if (data->find_type_fast) { |
|---|
| 553 | f_relscore *= 4; |
|---|
| 554 | } |
|---|
| 555 | |
|---|
| 556 | // correct relscore according to formula based on edgar2004 |
|---|
| 557 | f_relscore = 1 - log(f_relscore + 1.0/bs.size)/log(1.0/bs.size); |
|---|
| 558 | |
|---|
| 559 | if (matches <= min_match || f_relscore >= min_score) { |
|---|
| 560 | try { |
|---|
| 561 | const cseq& seq = arb->getCseq(f_name); |
|---|
| 562 | |
|---|
| 563 | if (max_score <= 2 && cmp(queryc, seq) > max_score) { |
|---|
| 564 | skipped_max_score ++; |
|---|
| 565 | } else if ((long)seq.size() < min_len) { |
|---|
| 566 | skipped_min_len ++; |
|---|
| 567 | } else if (noid && boost::algorithm::icontains(seq.getBases(), query)) { |
|---|
| 568 | skipped_noid ++; |
|---|
| 569 | } else { |
|---|
| 570 | matches ++; |
|---|
| 571 | |
|---|
| 572 | family.emplace_back(f_relscore, &seq); |
|---|
| 573 | |
|---|
| 574 | if ((num_full != 0) && (long)seq.size() > full_min_len) { |
|---|
| 575 | num_full--; |
|---|
| 576 | } |
|---|
| 577 | if ((range_cover_right != 0) && |
|---|
| 578 | seq.getById(seq.size()-1).getPosition() >= data->range_end) { |
|---|
| 579 | range_cover_right--; |
|---|
| 580 | } |
|---|
| 581 | if ((range_cover_left != 0) && |
|---|
| 582 | seq.begin()->getPosition() <= data->range_begin) { |
|---|
| 583 | range_cover_left--; |
|---|
| 584 | } |
|---|
| 585 | } |
|---|
| 586 | } catch (base_iupac::bad_character_exception& e) { |
|---|
| 587 | logger->error("Sequence {} contains invalid character{}. Skipping", |
|---|
| 588 | f_name, e.character); |
|---|
| 589 | skipped_broken++; |
|---|
| 590 | } |
|---|
| 591 | } |
|---|
| 592 | |
|---|
| 593 | free(f_name); |
|---|
| 594 | } while (matches < max_match |
|---|
| 595 | && (matches <= min_match || f_relscore >= min_score) |
|---|
| 596 | && f_list.exists()); |
|---|
| 597 | |
|---|
| 598 | // get full length sequence |
|---|
| 599 | while (f_list.exists() && ((num_full + range_cover_right + range_cover_left) != 0)) { |
|---|
| 600 | err = aisc_get(data->link, PT_FAMILYLIST, f_list, |
|---|
| 601 | FAMILYLIST_NAME, &f_name, |
|---|
| 602 | FAMILYLIST_MATCHES, &f_relscore, |
|---|
| 603 | FAMILYLIST_NEXT, f_list.as_result_param(), |
|---|
| 604 | NULL); |
|---|
| 605 | |
|---|
| 606 | if (err != 0) { |
|---|
| 607 | logger->warn("Unable to get next item in family list"); |
|---|
| 608 | break; |
|---|
| 609 | } |
|---|
| 610 | if (data->find_type_fast) { |
|---|
| 611 | f_relscore *= 4; |
|---|
| 612 | } |
|---|
| 613 | |
|---|
| 614 | const cseq& seq = arb->getCseq(f_name); |
|---|
| 615 | if (max_score >= 2 || cmp(queryc, seq) <= max_score) { |
|---|
| 616 | bool keep = false; |
|---|
| 617 | if ((num_full != 0) && (long)seq.size() > full_min_len) { |
|---|
| 618 | num_full--; |
|---|
| 619 | keep = true; |
|---|
| 620 | } |
|---|
| 621 | |
|---|
| 622 | if ((range_cover_right != 0) && (long)seq.size() > min_len && |
|---|
| 623 | seq.getById(seq.size()-1).getPosition() >= data->range_end) { |
|---|
| 624 | range_cover_right--; |
|---|
| 625 | keep = true; |
|---|
| 626 | } |
|---|
| 627 | |
|---|
| 628 | if ((range_cover_left != 0) && (long)seq.size() > min_len && |
|---|
| 629 | seq.begin()->getPosition() <= data->range_begin) { |
|---|
| 630 | range_cover_left--; |
|---|
| 631 | keep = true; |
|---|
| 632 | } |
|---|
| 633 | if (keep) { |
|---|
| 634 | family.emplace_back(f_relscore, &seq); |
|---|
| 635 | } |
|---|
| 636 | } |
|---|
| 637 | free(f_name); |
|---|
| 638 | } |
|---|
| 639 | |
|---|
| 640 | if ((skipped_max_score != 0) || (skipped_broken != 0) || (skipped_min_len != 0) || (skipped_noid != 0)) { |
|---|
| 641 | logger->warn("Skipped {} sequences ({}x id > {}, {}x broken, {}x len < {}, {}x noid)", |
|---|
| 642 | skipped_max_score + skipped_broken + skipped_min_len + skipped_noid, |
|---|
| 643 | skipped_max_score, max_score, |
|---|
| 644 | skipped_broken, |
|---|
| 645 | skipped_min_len, min_len, |
|---|
| 646 | skipped_noid); |
|---|
| 647 | } |
|---|
| 648 | |
|---|
| 649 | return f_relscore; |
|---|
| 650 | } |
|---|
| 651 | |
|---|
| 652 | void |
|---|
| 653 | query_pt::find(const cseq& query, search::result_vector& results, unsigned int max) { |
|---|
| 654 | data->timeit.start(); |
|---|
| 655 | char *error = nullptr; |
|---|
| 656 | results.clear(); |
|---|
| 657 | results.reserve(max); |
|---|
| 658 | |
|---|
| 659 | std::lock_guard<std::mutex> lock(data->arb_pt_access); |
|---|
| 660 | data->timeit.stop("acquire lock"); |
|---|
| 661 | |
|---|
| 662 | bytestring bs; |
|---|
| 663 | bs.data = strdup(query.getBases().c_str()); |
|---|
| 664 | bs.size = query.size()+1; |
|---|
| 665 | if (aisc_put(data->link, |
|---|
| 666 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 667 | FAMILYFINDER_SORT_MAX, long(max), |
|---|
| 668 | FAMILYFINDER_FIND_FAMILY, &bs, |
|---|
| 669 | NULL) != 0) { |
|---|
| 670 | logger->error("Unable to execute find_family command on pt-server"); |
|---|
| 671 | } |
|---|
| 672 | free(bs.data); |
|---|
| 673 | data->timeit.stop("send query"); |
|---|
| 674 | |
|---|
| 675 | T_PT_FAMILYLIST f_list; |
|---|
| 676 | aisc_get(data->link, |
|---|
| 677 | PT_FAMILYFINDER, data->ffinder, |
|---|
| 678 | FAMILYFINDER_FAMILY_LIST, f_list.as_result_param(), |
|---|
| 679 | FAMILYFINDER_ERROR, &error, |
|---|
| 680 | NULL); |
|---|
| 681 | if (error && error[0] != 0) { |
|---|
| 682 | logger->error("Unable to get results for search: {}", error); |
|---|
| 683 | return; |
|---|
| 684 | } |
|---|
| 685 | data->timeit.stop("get first"); |
|---|
| 686 | |
|---|
| 687 | char* f_name; |
|---|
| 688 | double f_rel_matches = 0.f; |
|---|
| 689 | long f_matches = 0; |
|---|
| 690 | int mult = (data->find_type_fast) ? 4:1; |
|---|
| 691 | |
|---|
| 692 | std::vector<std::pair<float, string> > scored_names; |
|---|
| 693 | while (max-- && f_list.exists()) { |
|---|
| 694 | aisc_get(data->link, PT_FAMILYLIST, f_list, |
|---|
| 695 | FAMILYLIST_NAME, &f_name, |
|---|
| 696 | FAMILYLIST_REL_MATCHES, &f_rel_matches, |
|---|
| 697 | FAMILYLIST_MATCHES, &f_matches, |
|---|
| 698 | FAMILYLIST_NEXT, f_list.as_result_param(), |
|---|
| 699 | NULL); |
|---|
| 700 | f_rel_matches = 1 - log((f_rel_matches*mult) + 1.0/bs.size)/log(1.0/bs.size); |
|---|
| 701 | scored_names.emplace_back(f_rel_matches, f_name); |
|---|
| 702 | free(f_name); |
|---|
| 703 | } |
|---|
| 704 | data->timeit.stop("get all"); |
|---|
| 705 | |
|---|
| 706 | for (auto& res : scored_names) { |
|---|
| 707 | results.emplace_back(res.first, &data->arbdb->getCseq(res.second)); |
|---|
| 708 | } |
|---|
| 709 | |
|---|
| 710 | data->timeit.stop("load seqs"); |
|---|
| 711 | } |
|---|
| 712 | |
|---|
| 713 | unsigned int query_pt::size() const { |
|---|
| 714 | return data->arbdb->getSeqCount(); |
|---|
| 715 | } |
|---|
| 716 | |
|---|
| 717 | struct query_pt_pool::pimpl { |
|---|
| 718 | pimpl(fs::path& filename, int k, bool fast, bool norel, int km, string portname) |
|---|
| 719 | : _filename(filename), _k(k), _fast(fast), _norel(norel), _km(km), _portname(portname) |
|---|
| 720 | {} |
|---|
| 721 | ~pimpl() { |
|---|
| 722 | for (auto pt : _pts) { |
|---|
| 723 | delete pt; |
|---|
| 724 | } |
|---|
| 725 | } |
|---|
| 726 | |
|---|
| 727 | using pool_map = std::map<fs::path, std::shared_ptr<query_pt_pool::pimpl>>; |
|---|
| 728 | static pool_map _pools; |
|---|
| 729 | |
|---|
| 730 | // parameters for creating new query_pts |
|---|
| 731 | fs::path _filename; |
|---|
| 732 | int _k; |
|---|
| 733 | bool _fast; |
|---|
| 734 | bool _norel; |
|---|
| 735 | int _km; |
|---|
| 736 | string _portname; |
|---|
| 737 | |
|---|
| 738 | std::mutex _access_pts; |
|---|
| 739 | std::list<query_pt*> _pts; |
|---|
| 740 | int _count{0}; |
|---|
| 741 | query_pt* borrow() { |
|---|
| 742 | int n = 0; |
|---|
| 743 | { |
|---|
| 744 | std::lock_guard<std::mutex> lock(_access_pts); |
|---|
| 745 | if (!_pts.empty()) { |
|---|
| 746 | query_pt* pt = _pts.front(); |
|---|
| 747 | _pts.pop_front(); |
|---|
| 748 | return pt; |
|---|
| 749 | } |
|---|
| 750 | n = _count++; |
|---|
| 751 | } |
|---|
| 752 | std::string portname; |
|---|
| 753 | if (n > 0) { |
|---|
| 754 | // FIXME: manage the port better. This works for unix sockets, but not |
|---|
| 755 | // for TCP ports. |
|---|
| 756 | portname = fmt::format("{}_{}", _portname, n); |
|---|
| 757 | } else { |
|---|
| 758 | portname = _portname; |
|---|
| 759 | } |
|---|
| 760 | query_pt *pt = query_pt::get_pt_search( |
|---|
| 761 | _filename, _k, _fast, _norel, _km, portname |
|---|
| 762 | ); |
|---|
| 763 | return pt; |
|---|
| 764 | } |
|---|
| 765 | void giveback(query_pt* pt) { |
|---|
| 766 | std::lock_guard<std::mutex> lock(_access_pts); |
|---|
| 767 | _pts.push_front(pt); |
|---|
| 768 | } |
|---|
| 769 | }; |
|---|
| 770 | |
|---|
| 771 | query_pt_pool::pimpl::pool_map query_pt_pool::pimpl::_pools; |
|---|
| 772 | |
|---|
| 773 | query_pt_pool* query_pt_pool::get_pool(fs::path filename, |
|---|
| 774 | int k, bool fast, bool norel, int mk, |
|---|
| 775 | std::string portname) { |
|---|
| 776 | if (query_pt_pool::pimpl::_pools.count(filename) == 0u) { |
|---|
| 777 | query_pt_pool::pimpl::_pools.emplace( |
|---|
| 778 | filename, std::make_shared<query_pt_pool::pimpl>(filename, k, fast, norel, mk, portname) |
|---|
| 779 | ); |
|---|
| 780 | } |
|---|
| 781 | return new query_pt_pool(pimpl::_pools.at(filename)); |
|---|
| 782 | } |
|---|
| 783 | |
|---|
| 784 | query_pt_pool::query_pt_pool(std::shared_ptr<query_pt_pool::pimpl> p) |
|---|
| 785 | : impl(p) { |
|---|
| 786 | } |
|---|
| 787 | |
|---|
| 788 | query_pt_pool::~query_pt_pool() {} |
|---|
| 789 | |
|---|
| 790 | void |
|---|
| 791 | query_pt_pool::find(const cseq& query, result_vector& results, unsigned int max) { |
|---|
| 792 | query_pt *pt = impl->borrow(); |
|---|
| 793 | pt->find(query, results, max); |
|---|
| 794 | impl->giveback(pt); |
|---|
| 795 | } |
|---|
| 796 | |
|---|
| 797 | double |
|---|
| 798 | query_pt_pool::match(result_vector& family, const cseq& queryc, int min_match, |
|---|
| 799 | int max_match, float min_score, float max_score, query_arb *arb, |
|---|
| 800 | bool noid, int min_len, int num_full, int full_min_len, |
|---|
| 801 | int range_cover, bool leave_query_out) { |
|---|
| 802 | query_pt *pt = impl->borrow(); |
|---|
| 803 | double res = pt->match(family, queryc, min_match, max_match, min_score, max_score, arb, |
|---|
| 804 | noid, min_len, num_full, full_min_len, range_cover, leave_query_out); |
|---|
| 805 | impl->giveback(pt); |
|---|
| 806 | return res; |
|---|
| 807 | } |
|---|
| 808 | |
|---|
| 809 | unsigned int |
|---|
| 810 | query_pt_pool::size() const { |
|---|
| 811 | query_pt *pt = impl->borrow(); |
|---|
| 812 | unsigned int res = pt->size(); |
|---|
| 813 | impl->giveback(pt); |
|---|
| 814 | return res; |
|---|
| 815 | } |
|---|
| 816 | |
|---|
| 817 | |
|---|
| 818 | query_pt_exception::query_pt_exception(std::string msg) noexcept |
|---|
| 819 | : message(std::move(msg)) |
|---|
| 820 | { |
|---|
| 821 | } |
|---|
| 822 | |
|---|
| 823 | query_pt_exception::~query_pt_exception() noexcept = default; |
|---|
| 824 | |
|---|
| 825 | const char* |
|---|
| 826 | query_pt_exception::what() const noexcept { |
|---|
| 827 | return message.c_str(); |
|---|
| 828 | } |
|---|
| 829 | |
|---|
| 830 | } // namespace sina |
|---|
| 831 | |
|---|
| 832 | |
|---|
| 833 | /* |
|---|
| 834 | Local Variables: |
|---|
| 835 | mode:c++ |
|---|
| 836 | c-file-style:"stroustrup" |
|---|
| 837 | c-file-offsets:((innamespace . 0)(inline-open . 0)(case-label . 0)) |
|---|
| 838 | indent-tabs-mode:nil |
|---|
| 839 | fill-column:99 |
|---|
| 840 | End: |
|---|
| 841 | */ |
|---|
| 842 | // vim: filetype=cpp:expandtab:shiftwidth=4:tabstop=8:softtabstop=4:encoding=utf-8:textwidth=99 : |
|---|