source: branches/items/GDE/SATIVA/sativa/epac/raxml_util.py

Last change on this file was 14661, checked in by akozlov, 9 years ago
File size: 13.0 KB
Line 
1#!/usr/bin/env python
2
3import os
4import sys
5import glob
6import shutil
7import datetime
8import random
9import re
10from subprocess import call,STDOUT
11from json_util import EpaJsonParser
12
13class FileUtils:
14
15    @staticmethod   
16    def normalize_dir(dir_str):
17        if dir_str and not dir_str.endswith("/"):
18           dir_str += "/"
19        return dir_str
20
21    @staticmethod   
22    def remove_if_exists(fname):
23        try:
24            os.remove(fname)
25        except:
26            pass
27
28    @staticmethod   
29    def rebase(fname, old_basedir, new_basedir):
30        return fname.replace(old_basedir, new_basedir)   
31
32class RaxmlWrapper:
33
34    def __init__(self, config): 
35        self.cfg = config
36        if config.rand_seed:
37            random.seed(config.rand_seed)
38   
39    def make_raxml_fname(self, stem, job_name, absolute=True):
40        fname = "RAxML_" + stem + "." + job_name
41        if absolute:
42            return os.path.join(self.cfg.raxml_outdir, fname)
43        else:
44            return fname           
45
46    def make_raxml_wildcard(self, job_name):
47        return self.make_raxml_fname("*", job_name)
48
49    def cleanup(self, job_name, remove_jplace=False):
50        raxml_out_mask = self.make_raxml_wildcard(job_name)
51        for fl in glob.glob(raxml_out_mask):
52            os.remove(fl)
53        if remove_jplace:
54            jplace_fname = self.make_raxml_fname("portableTree", job_name) + ".jplace"
55            FileUtils.remove_if_exists(jplace_fname)
56
57    def reduce_alignment(self, align_fname, job_name="reduce"):
58        reduced_fname = align_fname + ".reduced"
59        # we don't have to do anything in restart mode
60        if self.cfg.restart and os.path.isfile(reduced_fname):
61            return reduced_fname
62        else:
63            FileUtils.remove_if_exists(reduced_fname)
64            raxml_params = ["-f", "c", "-s", align_fname]
65            raxml_params += ["--no-dup-check"]
66            self.run(job_name, raxml_params)
67            self.cleanup(job_name)
68            if os.path.isfile(reduced_fname):
69                return reduced_fname
70            else:
71                return align_fname
72
73    def run_epa(self, job_name, align_fname, reftree_fname, optmod_fname="", silent=True, mode="epa", subtree_fname=None,\
74    lhw_acc_threshold=0.999):
75        raxml_params = ["-s", align_fname, "-t", reftree_fname]
76        # assume that by the time we call EPA reference has been cleaned already (e.g. with previous reduce_alignment call)
77        raxml_params += ["--no-seq-check"]
78        if mode == "l1o_seq":
79            raxml_params += ["-f", "O"]
80            result_file_stem = "leaveOneOutResults"
81        elif mode == "l1o_subtree":
82            raxml_params += ["-f", "P", "-z", subtree_fname]
83            result_file_stem = "subtreePlacement"
84        elif mode == "epa":
85            raxml_params += ["-f", "v"]
86            result_file_stem = "portableTree"
87        elif mode == "epa_mp":
88            raxml_params += ["-f", "y"]
89            result_file_stem = "portableTree"
90        else:
91            print "ERROR: Invalid RAxML-EPA running mode: %s" % mode
92            sys.exit()
93           
94        raxml_params += ["--epa-accumulated-threshold", str(lhw_acc_threshold)]
95
96        if self.cfg.epa_use_heuristic in ["TRUE", "YES", "1"]:
97            raxml_params += ["-G", str(self.cfg.epa_heur_rate)]           
98
99        if self.cfg.epa_load_optmod and optmod_fname:
100            if os.path.isfile(optmod_fname):
101                raxml_params += ["-R", optmod_fname]
102                if self.cfg.raxml_model.startswith("GTRCAT") and not self.cfg.compress_patterns:
103                    raxml_params +=  ["-H"]
104            else:
105                print "WARNING: Binary model file not found: %s" % optmod_fname
106                print "WARNING: Model parameters will be estimated by RAxML"
107               
108        self.run(job_name, raxml_params, silent)
109       
110        jp = None
111        failed = False
112        if mode == "l1o_subtree":
113            jp = []
114            i = 0
115            while True:
116                jp_fname = self.make_raxml_fname(result_file_stem, job_name) + ".%d.jplace" % (i+1)
117                if not os.path.isfile(jp_fname):
118                    break
119                jp.append(EpaJsonParser(jp_fname))
120                i += 1
121            failed = i == 0   
122        else:
123            jp_fname = self.make_raxml_fname(result_file_stem, job_name) + ".jplace"
124            if os.path.isfile(jp_fname):
125                jp = EpaJsonParser(jp_fname)
126            else:
127                failed = True
128           
129        if failed:
130            print "RAxML EPA run failed, please examine the log for details:\n %s" \
131                    % self.make_raxml_fname("output", job_name)
132            sys.exit()
133        else:       
134            return jp
135
136    def run(self, job_name, params, silent=True, chkpoint_fname=None):
137        if self.cfg.raxml_model == "AUTO":
138            print "ERROR: you should have called EpacConfig.resolve_auto_settings() in your script!\n"
139            sys.exit()
140
141        self.cleanup(job_name)
142       
143        lparams  = []
144        lparams += params
145        lparams += ["-m", self.cfg.raxml_model, "-n", job_name]
146
147        if not self.cfg.use_bfgs:
148            lparams += ["--no-bfgs"]
149           
150        if self.cfg.save_memory:
151            lparams += ["-U"]
152       
153        if self.cfg.verbose:
154            lparams += ["--verbose"]
155       
156        if not "-p" in lparams:
157            seed = random.randint(1, 32000)
158            lparams += ["-p", str(seed)]
159           
160        if chkpoint_fname:
161            lparams += ["-Z", chkpoint_fname]
162
163        if self.cfg.run_on_cluster:
164            self.run_cluster(lparams)
165            return;       
166
167        if self.cfg.raxml_remote_call:
168            call_str = ["ssh", self.cfg.raxml_remote_host]
169        else:
170            call_str = []
171        call_str += self.cfg.raxml_cmd + lparams
172        if silent:       
173            self.cfg.log.debug(' '.join(call_str) + "\n")
174            out_fname = self.make_raxml_fname("output", job_name)
175            with open(out_fname, "w") as fout:
176                call(call_str, stdout=fout, stderr=STDOUT)
177        else:       
178            call(call_str)
179
180        return ' '.join(call_str)
181       
182    def run_multiple(self, job_name, params, repnum, silent=True):   
183        best_lh = float("-inf")
184        best_jobname = None
185        check_old_jobs = self.cfg.restart
186       
187        for i in range(repnum):
188            call_raxml = True
189            chkpoint_fname = None
190
191            rep_jobname = "%s.%d" % (job_name, i)
192           
193            if check_old_jobs:
194                # in resume mode, we have to check where we have stopped before
195                next_jobname = "%s.%d" % (job_name, i+1)
196                next_info = self.info_fname(next_jobname)
197                # if RAxML_info file for the next job exists, current job has had finished -> skip it
198                if os.path.isfile(next_info):
199                    call_raxml = False
200                else:
201                    # use RAxML checkpoints if there are any
202                    old_chkpoint_fname = self.checkpoint_fname(rep_jobname)
203                    if not os.path.isfile(old_chkpoint_fname):
204                        old_chkpoint_fname = self.bkup_checkpoint_fname(rep_jobname)
205                        if not os.path.isfile(old_chkpoint_fname):
206                            old_chkpoint_fname = None
207
208                    FileUtils.remove_if_exists(next_info)
209                    if old_chkpoint_fname:
210                        chkpoint_fname = self.checkpoint_fname("last_chkpoint")
211                        shutil.move(old_chkpoint_fname, chkpoint_fname)
212                   
213                    # this was the last RAxML run from previous SATIVA invocation -> proceed without checks from now on
214                    check_old_jobs = False
215                 
216            if call_raxml:
217                invoc_str = self.run(rep_jobname, params, silent, chkpoint_fname)
218            lh = self.get_tree_lh(rep_jobname, "GAMMA")
219            if lh > best_lh:
220                best_lh = lh
221                best_jobname = rep_jobname
222            self.cfg.log.debug("Tree %d GAMMA-based logLH: %s\n" % (i, str(lh)))
223       
224        best_fname = self.info_fname(best_jobname)
225        dst_fname = self.info_fname(job_name)
226        shutil.copy(best_fname, dst_fname)
227
228        best_fname = self.result_fname(best_jobname)
229        dst_fname = self.result_fname(job_name)
230        shutil.copy(best_fname, dst_fname)
231       
232        return invoc_str
233       
234    def run_cluster(self, params):
235        if self.cfg.raxml_remote_call:
236            qsub_call_str = ["ssh", self.cfg.raxml_remote_host]
237        else:
238            qsub_call_str = []
239       
240        raxml_call_cmd = self.cfg.raxml_cmd + params       
241        for i in range(len(raxml_call_cmd)):
242            if isinstance(raxml_call_cmd[i], basestring):
243                raxml_call_cmd[i] = FileUtils.rebase(raxml_call_cmd[i], self.cfg.epac_home, self.cfg.cluster_epac_home)
244        raxml_call_str = ' '.join(raxml_call_cmd)
245               
246        script_fname = self.cfg.tmp_fname("%NAME%_sub.sh")
247        FileUtils.remove_if_exists(script_fname)
248        shutil.copy(self.cfg.cluster_qsub_script, script_fname)
249        qsub_job_name = "epa"       
250        with open(script_fname, "a") as fout:
251            fout.write("#$ -N %s\n" % qsub_job_name)
252            fout.write("\n")           
253            fout.write(raxml_call_str + "\n")
254
255        cluster_script_fname = FileUtils.rebase(script_fname, self.cfg.epac_home, self.cfg.cluster_epac_home)
256        qsub_call_str += ["qsub", "-sync", "y", cluster_script_fname]
257
258        print raxml_call_str + "\n"
259        print ' '.join(qsub_call_str) + "\n"
260#        sys.exit()
261
262        call(qsub_call_str)
263        if not self.cfg.debug:
264            FileUtils.remove_if_exists(script_fname)
265           
266    def get_tree_lh(self, job_name, ratehet="GAMMA"):
267        info_fname = self.info_fname(job_name)
268        with open(info_fname, "r") as info_file:
269            info_str = info_file.read()
270       
271        lh_patterns = [ "Final %s-based Score of best tree " % ratehet,
272                       "Final %s  likelihood: " % ratehet,
273                       "%s-based likelihood " % ratehet]
274       
275        m = None
276        for pat in lh_patterns:
277            m = re.search('(?<=%s)[0-9.\-]+' % pat, info_str)
278            if m:
279                lh = float(m.group(0))
280                return lh
281               
282        return None
283       
284    def get_invocation_str(self, job_name):
285        info_fname = self.info_fname(job_name)
286        with open(info_fname, "r") as info_file:
287            info_str = info_file.read()
288       
289        pattern = "RAxML was called as follows:\n\n"
290       
291        m = re.search('(?<=%s).*' % pattern, info_str)
292        if m:
293            return m.group(0)
294        else:
295            return ""
296   
297    def result_fname(self, job_name):
298        return self.make_raxml_fname("result", job_name)
299   
300    def besttree_fname(self, job_name):
301        return self.make_raxml_fname("bestTree", job_name)
302
303    def info_fname(self, job_name):
304        return self.make_raxml_fname("info", job_name)
305
306    def checkpoint_fname(self, job_name):
307        return self.make_raxml_fname("binaryCheckpoint", job_name)
308
309    def bkup_checkpoint_fname(self, job_name):
310        return self.make_raxml_fname("binaryCheckpointBackup", job_name)
311
312    def result_exists(self, job_name):
313        if os.path.isfile(self.result_fname(job_name)):
314            return True
315        else:
316            return False
317
318    def besttree_exists(self, job_name):
319        if os.path.isfile(self.besttree_fname(job_name)):
320            return True
321        else:
322            return False
323
324    def epa_result_exists(self, job_name):
325        if os.path.isfile(self.make_raxml_fname("labelledTree", job_name)):
326            return True
327        else:
328            return False
329
330    def copy_result_tree(self, job_name, dst_fname):
331        src_fname = self.result_fname(job_name)
332        shutil.copy(src_fname, dst_fname)
333
334    def copy_best_tree(self, job_name, dst_fname):
335        src_fname = self.besttree_fname(job_name)
336        shutil.copy(src_fname, dst_fname)
337
338    def copy_optmod_params(self, job_name, dst_fname):
339        src_fname = self.make_raxml_fname("binaryModelParameters", job_name)
340        shutil.copy(src_fname, dst_fname)
341
342    def copy_epa_orig_tree(self, job_name, dst_fname):
343        src_fname = self.make_raxml_fname("originalLabelledTree", job_name)
344        shutil.copy(src_fname, dst_fname)
345       
346    def copy_epa_result_tree(self, job_name, dst_fname):
347        src_fname = self.make_raxml_fname("labelledTree", job_name)
348        shutil.copy(src_fname, dst_fname)
349
350    def copy_epa_jplace(self, job_name, dst_fname, mode="epa", move=False):
351        if mode == "l1o_seq":
352            result_file_stem = "leaveOneOutResults"
353        elif mode == "l1o_subtree":
354            result_file_stem = "subtreePlacement"
355        elif mode == "epa":
356            result_file_stem = "portableTree"
357        else:
358            return
359        src_fname = self.make_raxml_fname(result_file_stem, job_name) + ".jplace"
360        if move:
361            shutil.move(src_fname, dst_fname)
362        else:
363            shutil.copy(src_fname, dst_fname)
Note: See TracBrowser for help on using the repository browser.