1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
import difflib
import functools
import itertools
import logging
import os
import re
import sysconfig
import subprocess
import tempfile
logging.basicConfig(filename='testchill.log', level=logging.DEBUG, filemode='w')
#logging.basicConfig(level=logging.INFO)
python_version = sysconfig.get_python_version()
python_version_major = int(sysconfig.get_python_version().split('.')[0])
python_version_minor = int(sysconfig.get_python_version().split('.')[1])
if python_version_major == 2:
from StringIO import StringIO
else:
from io import StringIO
_temp_dirs = []
_temp_files = []
### Errors ###
### Shell Util ###
def shell(cmd, args=[], stdout=None, stderr=None, env={}, wd=os.getcwd()):
"""
Execute a shell command.
@params cmd The command name
@params args A list of command line arguments (defaults to [])
@params stdout A file like object or file number that reads input written to stdout.
stdout will be returned as a string if this is None or not given.
@params stderr A file like object or file number that reads input written to stderr.
@params env A dict of environment variables. Before the command is executed, these will be exported
@params wd The working directory. Before the command is executed, the working directory will be changed to wd. (wd defaults to the current working directory)
"""
fullcmd = ' '.join(['export {}={};'.format(k,str(v)) for k,v in env.items()] + ['cd {};'.format(wd)] + [cmd] + args)
logging.info('shell: '+fullcmd)
if stdout == None:
outp = subprocess.check_output(fullcmd, stderr=stderr, shell=True)
if python_version_major == 2:
return outp
elif python_version_major == 3:
return outp.decode()
else:
subprocess.check_call(fullcmd, stdout=stdout, stderr=stderr, shell=True)
def mkdir_p(directory, temp=False, **kwargs):
"""
Make directory (equivelent to shell('mkdir', ['-p', directory]))
"""
if not os.path.exists(directory):
if temp and (directory not in _temp_dirs):
_temp_dirs.append(directory)
shell('mkdir', ['-p', directory], **kwargs)
def set_tempfile(filename):
"""
Add a file to a list of temp files
@param filename The full path to a temparary file.
"""
_temp_files.append(filename)
def withtmp(wtfunc, rdfunc):
"""
Perform some operation using a temporary file.
@param wtfunc A function that writes to the temparary file
@param rdfybc A function that reads from the temparary file
"""
with tempfile.TemporaryFile() as f:
wtfunc(f)
f.seek(0)
return rdfunc(f)
def rmtemp():
"""
Clean temp files and directories
"""
for temp_file in list(_temp_files):
if os.path.exists(temp_file):
shell('rm', [temp_file])
_temp_files.remove(temp_file)
for temp_dir in list(_temp_dirs):
if os.path.exists(temp_dir):
shell('rm', ['-rf', temp_dir])
_temp_dirs.remove(temp_dir)
def mktemp(mode=None):
"""
Create a temparary file. Returns a two-tuple with an open file object and the filename.
"""
fd, name = tempfile.mkstemp()
_temp_files.append(name)
if mode is None:
os.close(fd)
return name
else:
return os.fdopen(fd, mode), name
### Misc Util ###
def copy(obj, exclude=[]):
"""
Make a shallow copy of a python object with __dict__, excluding any attribute in exclude
@param obj The object to copy
@param exclude A list of attributes to ignore
"""
nobj = type(obj)()
for k, v in vars(obj).items():
if k in exclude: continue
setattr(nobj, k, v)
return nobj
def applyenv(line):
"""
Apply bash style environment variables to a string
@param line The input string
"""
return re.sub(r'\$([a-zA-Z_][a-zA-Z_0-9]*)\b',lambda m: str(os.getenv(m.group(1), '')), line)
def callonce(func):
"""
Assert that a function is only ever called once.
@param func Function to only be run once.
"""
pred_name = '__' + func.__module__.replace('.','__') + '_' + func.__name__ + '_called'
globals()[pred_name] = False
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not globals()[pred_name]:
globals()[pred_name] = True
return func(*args, **kwargs)
else:
raise Exception('{} was invoked multiple times.'.format(func.__name___))
return wrapper
def isdiff(strone, strtwo):
"""
Diff two strings. Returns a two element tuple. The first is True if the the two files are different, and the
next is a textual representation of the diff.
@param strone First string.
@param strtwo Second string.
"""
diff = list(difflib.ndiff(strone.splitlines(), strtwo.splitlines()))
return len(list(line for line in diff if line[0] in ['-','+'])) != 0, '\n'.join(diff)
def filterext(ext_list, filenames):
"""
Filter file names by extension.
@param ext_list A list of extensions.
@param filenames An iterable object of file names.
"""
return iter(s for s in filenames if any(s.strip().endswith(e) for e in ext_list))
def extract_tag(tagname, filename, wd=os.getcwd()):
"""
Extract commented out text in each html tag '<tagname>'. Returns a list of tuples for each tag.
Each tuple has two elements, the first is the text found in the tag, the second contains a dict
of attributes given in the tag.
@param tagname The name of the tag to search for.
@param filename A filename to search for comments in.
@param wd The working directory.
"""
from . import _extract
return _extract.extract_tag(tagname, filename, wd)
def textstream(txt):
"""
Creates a stream from text. Intended to hide version differences between 2 and 3.
@param txt A string to use as the default data in a stream.
"""
if python_version_major == 2:
import StringIO
return StringIO.StringIO(txt)
elif python_version_major == 3:
import io
return io.StringIO(txt)
|