import sys import os import logging from distutils.util import get_platform as _get_platform import ctypes import glob as _glob import subprocess as _subprocess from copy import copy try: import sframe from sframe._scripts import _pylambda_worker except ImportError: import graphlab as sframe from graphlab._scripts import _pylambda_worker def test_pylambda_worker(): """ Tests the pylambda workers by spawning off a seperate python process in order to print out additional diagnostic information in case there is an error. """ import os environment = os.environ.copy() from os.path import join, exists, split, abspath import tempfile import subprocess import datetime import time import zipfile import sys if sys.platform == 'darwin': if exists('/tmp'): tempfile.tempdir = '/tmp' temp_dir = tempfile.mkdtemp() temp_dir_sim = join(temp_dir, "simulated") os.mkdir(temp_dir_sim) lambda_log_file_sym = join(temp_dir_sim, "lambda_log") main_dir = split(abspath(sframe.__file__))[0] # Dump the directory structure. print("\nGathering installation information.") dir_structure_file = join(temp_dir, "dir_structure.log") dir_structure_out = open(dir_structure_file, "w") dump_directory_structure(main_dir,dir_structure_out) dir_structure_out.close() print("\nRunning simulation.") env=sframe.sys_util.make_unity_server_env() env["GRAPHLAB_LAMBDA_WORKER_DEBUG_MODE"] = "1" env["GRAPHLAB_LAMBDA_WORKER_LOG_FILE"] = lambda_log_file_sym proc = subprocess.Popen( [sys.executable, os.path.abspath(_pylambda_worker.__file__)], env = env) proc.wait() ################################################################################ # Write out the current system path. open(join(temp_dir, "sys_path_1.log"), "w").write( "\n".join(" sys.path[%d] = %s. " % (i, p) for i, p in enumerate(sys.path))) # Now run the program print("\nRunning full lambda worker process") trial_temp_dir = join(temp_dir, "full_run") os.mkdir(trial_temp_dir) lambda_log_file_run = join(trial_temp_dir, "lambda_log.log") run_temp_dir = join(trial_temp_dir, "run_temp_dir") os.mkdir(run_temp_dir) run_temp_dir_copy = join(temp_dir, "run_temp_dir_copy") run_info_dict = { "lambda_log" : lambda_log_file_run, "temp_dir" : trial_temp_dir, "run_temp_dir" : run_temp_dir, "preserved_temp_dir" : run_temp_dir_copy, "runtime_log" : join(trial_temp_dir, "runtime.log"), "sys_path_log" : join(trial_temp_dir, "sys_path_2.log")} run_script = r""" import os import traceback import shutil import sys import glob from os.path import join def write_exception(e): ex_str = "\n\nException: \n" traceback_str = traceback.format_exc() try: ex_str += repr(e) except Exception, e: ex_str += "Error expressing exception as string." ex_str += ": \n" + traceback_str try: sys.stderr.write(ex_str + "\n") sys.stderr.flush() except: # Pretty much nothing we can do here. pass # Set the system path. system_path = os.environ.get("__GL_SYS_PATH__", "") del sys.path[:] sys.path.extend(p.strip() for p in system_path.split(os.pathsep) if p.strip()) try: open(r"%(sys_path_log)s", "w").write( "\n".join(" sys.path[%%d] = %%s. " %% (i, p) for i, p in enumerate(sys.path))) except Exception as e: write_exception(e) os.environ["GRAPHLAB_LAMBDA_WORKER_DEBUG_MODE"] = "1" os.environ["GRAPHLAB_LAMBDA_WORKER_LOG_FILE"] = r"%(lambda_log)s" os.environ["GRAPHLAB_CACHE_FILE_LOCATIONS"] = r"%(run_temp_dir)s" os.environ["OMP_NUM_THREADS"] = "1" try: import sframe except Exception as e: write_exception(e) try: import graphlab as sframe except Exception as e: write_exception(e) sys.exit(55) log_file = open(r"%(runtime_log)s", "w") for k, v in sframe.get_runtime_config().items(): log_file.write("%%s : %%s\n" %% (str(k), str(v))) log_file.close() try: sa = sframe.SArray(range(1000)) except Exception as e: write_exception(e) try: print("Sum = %%d" %% (sa.apply(lambda x: x).sum())) except Exception as e: write_exception(e) new_dirs = [] copy_files = [] for root, dirs, files in os.walk(r"%(run_temp_dir)s"): new_dirs += [join(root, d) for d in dirs] copy_files += [join(root, name) for name in files] def translate_name(d): return os.path.abspath(join(r"%(preserved_temp_dir)s", os.path.relpath(d, r"%(run_temp_dir)s"))) for d in new_dirs: try: os.makedirs(translate_name(d)) except Exception as e: sys.stderr.write("Error with: " + d) write_exception(e) for f in copy_files: try: shutil.copy(f, translate_name(f)) except Exception as e: sys.stderr.write("Error with: " + f) write_exception(e) server_logs = (glob.glob(sframe.util.get_server_log_location() + "*") + glob.glob(sframe.util.get_client_log_location() + "*")) for f in server_logs: try: shutil.copy(f, join(r"%(temp_dir)s", os.path.split(f)[1])) except Exception as e: sys.stderr.write("Error with: " + f) write_exception(e) """ % run_info_dict run_script_file = join(temp_dir, "run_script.py") open(run_script_file, "w").write(run_script) log_file_stdout = join(trial_temp_dir, "stdout.log") log_file_stderr = join(trial_temp_dir, "stderr.log") env = os.environ.copy() env['__GL_SYS_PATH__'] = (os.path.pathsep).join(sys.path) proc = subprocess.Popen( [sys.executable, os.path.abspath(run_script_file)], stdout = open(log_file_stdout, "w"), stderr = open(log_file_stderr, "w"), env = env) proc.wait() # Now zip up the output data into a package we can access. timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d-%H-%M-%S') zipfile_name = join(temp_dir, "testing_logs-%d-%s.zip" % (os.getpid(), timestamp)) print "Creating archive of log files in %s." % zipfile_name save_files = [] for root, dirs, files in os.walk(temp_dir): save_files += [join(root, name) for name in files] with zipfile.ZipFile(zipfile_name, 'w') as logzip: error_logs = [] for f in save_files: try: logzip.write(f) except Exception as e: error_logs.append("%s: error = %s" % (f, repr(e))) if error_logs: error_log_file = join(temp_dir, "archive_errors.log") open(error_log_file, "w").write("\n\n".join(error_logs)) logzip.write(error_log_file) print "################################################################################" print "# " print "# Results of lambda test logged as %s." % zipfile_name print "# " print "################################################################################" print "Cleaning up." for f in save_files: try: os.remove(f) except Exception: pass def dump_directory_structure(main_dir, out = sys.stdout): """ Dumps a detailed report of the graphlab/sframe directory structure and files, along with the output of os.lstat for each. This is useful for debugging purposes. """ "Dumping Installation Directory Structure for Debugging: " import sys, os from os.path import split, abspath, join from itertools import chain visited_files = [] def on_error(err): visited_files.append( (" ERROR", str(err)) ) for path, dirs, files in os.walk(main_dir, onerror = on_error): for fn in chain(files, dirs): name = join(path, fn) try: visited_files.append( (name, repr(os.lstat(name))) ) except: visited_files.append( (name, "ERROR calling os.lstat.") ) def strip_name(n): if n[:len(main_dir)] == main_dir: return "/" + n[len(main_dir):] else: return n out.write("\n".join( (" %s: %s" % (strip_name(name), stats)) for name, stats in sorted(visited_files))) out.flush() if __name__ == "__main__": test_pylambda_worker()