[Saga-devel] saga SVN commit 3262: /trunk/adaptors/aws/aws_job/
amerzky at cct.lsu.edu
amerzky at cct.lsu.edu
Mon Jan 12 04:18:16 CST 2009
User: amerzky
Date: 2009/01/12 04:18 AM
Removed:
/trunk/adaptors/aws/aws_job/
aws_job.cpp, aws_job.hpp, aws_job_async.cpp, aws_job_istream.hpp, aws_job_ostream.hpp, aws_job_service_async.cpp, aws_job_stream.hpp
Modified:
/trunk/adaptors/aws/aws_job/
aws_job_adaptor.cpp, aws_job_service.cpp, aws_job_service.hpp
Log:
completed the split of aws and ssh adaptors.
A
File Changes:
Directory: /trunk/adaptors/aws/aws_job/
=======================================
File [removed]: aws_job.cpp
Delta lines: +0 -286
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job.cpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job.cpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,286 +0,0 @@
-// Copyright (c) 2005-2007 Hartmut Kaiser
-// Copyright (c) 2005-2007 Andre Merzky (andre at merzky.net)
-//
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga adaptor icnludes
-#include <saga/saga/adaptors/task.hpp>
-#include <saga/saga/adaptors/attribute.hpp>
-#include <saga/saga/adaptors/file_transfer_spec.hpp>
-
-// saga engine includes
-#include <saga/impl/config.hpp>
-#include <saga/impl/exception_list.hpp>
-
-// saga package includes
-#include <saga/saga/packages/job/adaptors/job_self.hpp>
-#include <saga/saga/packages/job/job_description.hpp>
-
-// adaptor includes
-#include "aws_job.hpp"
-#include "aws_job_istream.hpp"
-#include "aws_job_ostream.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
-
- // constructor
- job_cpi_impl::job_cpi_impl (proxy * p,
- cpi_info const & info,
- saga::ini::ini const & glob_ini,
- saga::ini::ini const & adap_ini,
- TR1::shared_ptr <saga::adaptor> adaptor)
- : base_cpi (p, info, adaptor, cpi::Noflags)
- {
- adaptor_data adata (this);
- instance_data idata (this);
-
- SAGA_LOG_ALWAYS(idata->rm_.get_string ().c_str ());
-
-
- std::string scheme = idata->rm_.get_scheme ();
- std::string type;
- bool ok = false;
-
- std::vector <std::string> types = saga::adaptors::utils::split (adata->ini_["defaults"]["cloud_names"], ' ');
-
- std::vector <saga::context> contexts = p->get_session ().list_contexts ();
-
- for ( unsigned int i = 0; i < types.size () && ! ok; i++ )
- {
- std::cout << " == checking type " << types[i] << "\n";
-
- if ( scheme == types[i] ||
- scheme == "any" ||
- scheme == "" )
- {
- // check if we have a context for that type
- std::cout << " == matching scheme " << scheme << "\n";
-
- std::vector <saga::context> :: iterator it;
-
- for ( it = contexts.begin (); ! ok && it != contexts.end () ; it++ )
- {
- std::cout << " == matching ctype? " << (*it).get_attribute (saga::attributes::context_type) << "\n";
- if ( (*it).get_attribute (saga::attributes::context_type) == types[i] )
- {
- std::cout << " == matching ctype! " << (*it).get_attribute (saga::attributes::context_type) << "\n";
- ok = true;
- type = types[i];
- ini_ = adata->ini_[types[i]];
-
- std::cout << " == using ini section for " << type << "\n";
- }
- }
- }
- }
-
- env_["JAVA_HOME"] = ini_["java_home"];
- env_["EC2_HOME"] = ini_["ec2_home"];
- env_["EC2_GSG_KEY"] = ini_["ec2_proxy"];
- env_["EC2_PRIVATE_KEY"] = ini_["ec2_key"];
- env_["EC2_CERT"] = ini_["ec2_cert"];
- env_["EC2_URL"] = ini_["ec2_url"];
-
- // FIXME: more jd checks needed
-
- jd_ = idata->jd_;
- rm_ = idata->rm_;
- id_ = rm_.get_string (); // FIXME
-
- if ( ! ok )
- {
- // FIXME
- SAGA_ADAPTOR_THROW_NO_CONTEXT ("Adaptor only supports 'aws' and 'any' URL schemes.",
- saga::BadParameter);
- }
-
- SAGA_LOG_ALWAYS("job ctor done");
- }
-
-
- // destructor
- job_cpi_impl::~job_cpi_impl (void)
- {
- }
-
-
- // SAGA API functions
- void job_cpi_impl::sync_get_state (saga::job::state & ret)
- {
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
- void job_cpi_impl::sync_get_description (saga::job::description & ret)
- {
- ret = jd_;
- }
-
- void job_cpi_impl::sync_get_job_id (std::string & ret)
- {
- ret = id_;
- }
-
- // access streams for communication with the child
- void job_cpi_impl::sync_get_stdin (saga::job::ostream & ret)
- {
- // SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
- void job_cpi_impl::sync_get_stdout (saga::job::istream & ret)
- {
- // SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
- void job_cpi_impl::sync_get_stderr (saga::job::istream & ret)
- {
- // SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
- void job_cpi_impl::sync_checkpoint (saga::impl::void_t & ret)
- {
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
- void job_cpi_impl::sync_migrate (saga::impl::void_t & ret,
- saga::job::description jd)
- {
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
- void job_cpi_impl::sync_signal (saga::impl::void_t & ret,
- int signal)
- {
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
-
- // suspend the child process
- void job_cpi_impl::sync_suspend (saga::impl::void_t & ret)
- {
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
- // suspend the child process
- void job_cpi_impl::sync_resume (saga::impl::void_t & ret)
- {
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
-
- //////////////////////////////////////////////////////////////////////
- // inherited from the task interface
- void job_cpi_impl::sync_run (saga::impl::void_t & ret)
- {
- namespace sja = saga::job::attributes;
-
- SAGA_LOG_ALWAYS("job.run");
-
- adaptor_data adata (this);
- std::string vm_ip = rm_.get_host ();
- std::string vm_id = rm_.get_path ();
-
-
- // erase leading slash
- vm_id.erase (0, 1);
-
- std::string exe = jd_.get_attribute (sja::description_executable);
- saga::url exe_url (exe);
-
- // we allow staging of execs, if an explicit source host is given
- if ( ! exe_url.get_host ().empty () )
- {
- // at the moment, only staging from localhost is supported
- // FIXME: should use ssh file adaptor for simple copy
- if ( ! ( exe_url.get_scheme ().empty () ) &&
- ! ( exe_url.get_scheme () == "any" ) &&
- ! ( exe_url.get_scheme () == "file" ) )
- {
- SAGA_ADAPTOR_THROW ("Staging is not supported for that executable", saga::BadParameter);
- }
-
- // the new exe path is created from "/tmp/" + last url path element
- exe = std::string ("/tmp/") + saga::adaptors::utils::split (exe_url.get_path (), '/').back ();
-
- // stage the executable from the given URL to the new exe location on the
- // remote host
- saga::adaptors::utils::process proc ("/usr/bin/scp", env_);
-
- proc.add_arg ("-q"); // quiet
- proc.add_args ("-o", "StrictHostKeyChecking=no");
- proc.add_args ("-i", ini_["ec2_proxy"]);
-
- proc.add_arg (exe_url.get_path ());
- proc.add_arg ("root@" + vm_ip + ":" + exe);
-
- (void) proc.run_sync ();
-
- if ( proc.fail () )
- {
- SAGA_ADAPTOR_THROW ("Could not stage Executable", saga::BadParameter);
- }
- }
-
- SAGA_LOG_ALWAYS("job.run after staging");
-
-
- saga::adaptors::utils::process proc ("/usr/bin/ssh", env_);
-
- proc.add_args ("-o", "StrictHostKeyChecking=no");
- proc.add_args ("-i", ini_["ec2_proxy"]);
-
- proc.add_arg ("root@" + vm_ip);
- proc.add_arg (exe);
-
- if ( jd_.attribute_exists (sja::description_arguments) )
- {
- std::vector <std::string> cl_args = jd_.get_vector_attribute (sja::description_arguments);
-
- for ( unsigned int i = 0; i < cl_args.size (); i++ )
- {
- proc.add_arg (cl_args[i]);
- }
- }
-
-
- // fixme: this should better be done by the ssh job adaptor
- std::vector <std::string> out = proc.run_sync ();
-
- if ( proc.fail () )
- {
- SAGA_ADAPTOR_THROW ("Could not run Executable", saga::BadParameter);
- }
-
- for ( unsigned int j = 0; j < out.size (); j++ )
- {
- std::cout << " output[" << j << "] : " << out[j] << std::endl;
- }
- }
-
- void job_cpi_impl::sync_cancel (saga::impl::void_t & ret,
- double timeout)
- {
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
- }
-
- // wait for the child process to terminate
- void job_cpi_impl::sync_wait (bool & ret,
- double timeout)
- {
- // we only support sync jobs anyway, so they are finished after run()
- // FIXME: for the real thing, we need to fix run_process, and turn it
- // into a stateful class.
- return;
- }
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
File [removed]: aws_job.hpp
Delta lines: +0 -113
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job.hpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job.hpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,113 +0,0 @@
-// Copyright (c) 2005-2007 Hartmut Kaiser
-// Copyright (c) 2005-2007 Andre Merzky (andre at merzky.net)
-//
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#ifndef ADAPTORS_AWS_JOB_HPP
-#define ADAPTORS_AWS_JOB_HPP
-
-// stl includes
-#include <string>
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga engine includes
-#include <saga/impl/engine/proxy.hpp>
-
-// saga adaptor includes
-#include <saga/saga/adaptors/adaptor.hpp>
-#include <saga/saga/adaptors/task.hpp>
-#include <saga/saga/adaptors/adaptor_data.hpp>
-
-// job package includes
-#include <saga/impl/packages/job/job_cpi.hpp>
-
-// adaptor includes
-#include "aws_job_adaptor.hpp"
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
- class job_cpi_impl
- : public saga::adaptors::v1_0::job_cpi <job_cpi_impl>
- {
- private:
- typedef saga::adaptors::v1_0::job_cpi <job_cpi_impl> base_cpi;
-
- // adaptor data
- typedef saga::adaptors::adaptor_data <adaptor> adaptor_data;
-
- saga::job::description jd_;
- saga::url rm_;
- std::string id_;
-
- std::map <std::string, std::string> ini_;
- std::map <std::string, std::string> env_;
-
- public:
- // constructor of the job adaptor
- job_cpi_impl (proxy * p,
- cpi_info const & info,
- saga::ini::ini const & glob_ini,
- saga::ini::ini const & adap_ini,
- TR1::shared_ptr <saga::adaptor> adaptor);
-
- // destructor of the job adaptor
- ~job_cpi_impl (void);
-
- // job functions
- void sync_get_state (saga::job::state & ret);
- void sync_get_description (saga::job::description & ret);
- void sync_get_job_id (std::string & ret);
-
- void sync_get_stdin (saga::job::ostream & ret);
- void sync_get_stdout (saga::job::istream & ret);
- void sync_get_stderr (saga::job::istream & ret);
-
- void sync_checkpoint (saga::impl::void_t & ret);
- void sync_migrate (saga::impl::void_t & ret,
- saga::job::description jd);
- void sync_signal (saga::impl::void_t & ret,
- int signal);
-
- // inherited from saga::task
- void sync_run (saga::impl::void_t & ret);
- void sync_cancel (saga::impl::void_t & ret,
- double timeout);
- void sync_suspend (saga::impl::void_t & ret);
- void sync_resume (saga::impl::void_t & ret);
-
- void sync_wait (bool & ret,
- double timeout);
-
- // This adaptor implements the async functions
- // based on its own synchronous functions.
- saga::task async_get_state (void);
- saga::task async_get_description (void);
- saga::task async_get_job_id (void);
-
- saga::task async_get_stdin (void);
- saga::task async_get_stdout (void);
- saga::task async_get_stderr (void);
-
- saga::task async_checkpoint (void);
- saga::task async_migrate (saga::job::description jd);
- saga::task async_signal (int signal);
-
- // inherited from the task interface
- saga::task async_run (void);
- saga::task async_cancel (double timeout);
- saga::task async_suspend (void);
- saga::task async_resume (void);
- saga::task async_wait (double timeout);
- }; // class job_cpi_impl
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
-#endif // ADAPTORS_AWS_JOB_HPP
-
File [modified]: aws_job_adaptor.cpp
Delta lines: +4 -2
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_adaptor.cpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_adaptor.cpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -16,12 +16,15 @@
// adaptor includes
#include "aws_job_adaptor.hpp"
#include "aws_job_service.hpp"
-#include "aws_job.hpp"
SAGA_ADAPTOR_REGISTER (aws_job::adaptor);
////////////////////////////////////////////////////////////////////////
+//
+// This adaptor only implements the job service API - all jobs are actually
+// managed by the ssh job service adaptor
+//
namespace aws_job
{
// register function for the SAGA engine
@@ -43,7 +46,6 @@
// create file adaptor infos (each adaptor instance gets its own uuid)
// and add cpi_infos to list
job_service_cpi_impl::register_cpi (list, prefs, adaptor_uuid_);
- job_cpi_impl::register_cpi (list, prefs, adaptor_uuid_);
// and return list
return (list);
File [removed]: aws_job_async.cpp
Delta lines: +0 -157
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_async.cpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_async.cpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,157 +0,0 @@
-// Copyright (c) 2005-2007 Hartmut Kaiser
-// Copyright (c) 2005-2007 Andre Merzky (andre at merzky.net)
-//
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga adaptor icnludes
-#include <saga/saga/adaptors/task.hpp>
-#include <saga/saga/adaptors/attribute.hpp>
-#include <saga/saga/adaptors/file_transfer_spec.hpp>
-
-// saga engine includes
-#include <saga/impl/config.hpp>
-#include <saga/impl/exception_list.hpp>
-
-// saga package includes
-#include <saga/saga/packages/job/adaptors/job_self.hpp>
-#include <saga/saga/packages/job/job_description.hpp>
-
-// adaptor includes
-#include "aws_job.hpp"
-#include "aws_job_istream.hpp"
-#include "aws_job_ostream.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
-
- //////////////////////////////////////////////////////////////////////
- // This adaptor implements the async functions
- // based on its own synchronous functions.
-
- saga::task
- job_cpi_impl::async_get_state (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_get_state",
- shared_from_this (),
- &job_cpi_impl::sync_get_state);
- }
-
- saga::task
- job_cpi_impl::async_get_description (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_get_description",
- shared_from_this (),
- &job_cpi_impl::sync_get_description);
- }
-
- saga::task
- job_cpi_impl::async_get_job_id (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_get_job_id",
- shared_from_this (),
- &job_cpi_impl::sync_get_job_id);
- }
-
- saga::task
- job_cpi_impl::async_run (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_run",
- shared_from_this (),
- &job_cpi_impl::sync_run);
- }
-
- saga::task
- job_cpi_impl::async_wait (double timeout)
- {
- return saga::adaptors::task ("job_cpi_impl::async_wait",
- shared_from_this (),
- &job_cpi_impl::sync_wait,
- timeout);
- }
-
- saga::task
- job_cpi_impl::async_cancel (double timeout)
- {
- return saga::adaptors::task ("job_cpi_impl::async_cancel",
- shared_from_this (),
- &job_cpi_impl::sync_cancel,
- timeout);
- }
-
- saga::task
- job_cpi_impl::async_suspend (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_suspend",
- shared_from_this (),
- &job_cpi_impl::sync_suspend);
- }
-
- saga::task
- job_cpi_impl::async_resume (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_resume",
- shared_from_this (),
- &job_cpi_impl::sync_resume);
- }
-
- saga::task
- job_cpi_impl::async_get_stdin (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_get_stdin",
- shared_from_this (),
- &job_cpi_impl::sync_get_stdin);
- }
-
- saga::task
- job_cpi_impl::async_get_stdout (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_get_stdout",
- shared_from_this (),
- &job_cpi_impl::sync_get_stdout);
- }
-
- saga::task
- job_cpi_impl::async_get_stderr (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_get_stderr",
- shared_from_this (),
- &job_cpi_impl::sync_get_stderr);
- }
-
- saga::task
- job_cpi_impl::async_checkpoint (void)
- {
- return saga::adaptors::task ("job_cpi_impl::async_checkpoint",
- shared_from_this (),
- &job_cpi_impl::sync_checkpoint);
- }
-
- saga::task
- job_cpi_impl::async_migrate (saga::job::description jd)
- {
- return saga::adaptors::task ("job_cpi_impl::async_migrate",
- shared_from_this (),
- &job_cpi_impl::sync_migrate,
- jd);
- }
-
- saga::task
- job_cpi_impl::async_signal (int signal)
- {
- return saga::adaptors::task ("job_cpi_impl::async_signal",
- shared_from_this (),
- &job_cpi_impl::sync_signal,
- signal);
- }
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
File [removed]: aws_job_istream.hpp
Delta lines: +0 -44
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_istream.hpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_istream.hpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,44 +0,0 @@
-// Copyright (c) 2005-2007 Hartmut Kaiser
-// Copyright (c) 2005-2007 Andre Merzky (andre at merzky.net)
-//
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#if !defined(ADAPTORS_AWS_JOB_ISTREAM_HPP)
-#define ADAPTORS_AWS_JOB_ISTREAM_HPP
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga engine includes
-#include <saga/impl/job.hpp>
-
-// adaptor includes
-#include "aws_job_stream.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
- class istream : public saga::job::istream
- {
- private:
- typedef impl::aws_job::stream <saga::adaptors::istream_ptr>
- impl_type;
-
- public:
- template <typename Stream>
- istream (saga::impl::v1_0::job_cpi * cpi,
- Stream & child_istream)
- : saga::job::istream (new impl_type (cpi, child_istream.rdbuf ()))
- {
- }
- };
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
-#endif // !defined(ADAPTORS_AWS_JOB_ISTREAM_HPP)
-
File [removed]: aws_job_ostream.hpp
Delta lines: +0 -45
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_ostream.hpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_ostream.hpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,45 +0,0 @@
-// Copyright (c) 2005-2007 Hartmut Kaiser
-// Copyright (c) 2005-2007 Andre Merzky (andre at merzky.net)
-//
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#ifdef ADAPTORS_AWS_JOB_OSTREAM_HPP
-#define ADAPTORS_AWS_JOB_OSTREAM_HPP
-
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga engine includes
-#include <saga/impl/job.hpp>
-
-// adaptor includes
-#include "aws_job_stream.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
- class ostream : public saga::job::ostream
- {
- private:
- typedef impl::aws_job_stream <saga::adaptors::ostream_ptr>
- impl_type;
-
- public:
- template <typename Stream>
- ostream (saga::impl::v1_0::job_cpi * cpi,
- Stream & child_ostream)
- : saga::job::ostream (new impl_type (cpi, child_ostream.rdbuf ()))
- {
- }
- };
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
-#endif // ADAPTORS_AWS_JOB_OSTREAM_HPP
-
File [modified]: aws_job_service.cpp
Delta lines: +40 -41
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_service.cpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_service.cpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -95,7 +95,7 @@
env_["EC2_PRIVATE_KEY"] = ini_["ec2_key"];
env_["EC2_CERT"] = ini_["ec2_cert"];
env_["EC2_URL"] = ini_["ec2_url"];
-
+
saga::adaptors::utils::process proc (env_);
// is a VM given which we should contact?
@@ -107,7 +107,7 @@
proc.add_args ("-k", ini_["ec2_keypair_name"]);
proc.add_arg (ini_["ec2_instance"]);
-
+
std::vector <std::string> out = proc.run_sync ();
if ( out.size() < 1 )
@@ -130,7 +130,7 @@
proc.set_cmd (ini_["ec2_home"] + "/bin/ec2-describe-instances");
-
+
proc.clear_args ();
proc.add_arg (vm_id_);
@@ -260,7 +260,7 @@
host.find (".") == std::string::npos )
{
// host is an instance id, and we need to get the hostname
-
+
SAGA_LOG_ALWAYS ("found id");
std::string vm_id_ = host;
@@ -355,16 +355,6 @@
}
}
- // set the correct scheme if needed, so that the job cpi recognizes it
- if ( idata->rm_.get_scheme ().empty () ||
- idata->rm_.get_scheme () == "any" )
- {
- idata->rm_.set_scheme (type);
- }
-
- std::cout << " rm: " << idata->rm_ << std::endl;
-
-
// we do have a job service instance, either new started or old and running.
// Now, copy over the ssh identity file of the context we used, so that jobs
// running on that instance can use it to contact other instances using the
@@ -399,7 +389,7 @@
// we also propagate our HOME/.saga.ini file. This is kind of a hack
// (FIXME), but allows us to have a uniform SAGA configuration for all VMs.
// FIXME: that should be better done with the prep scripts above.
-
+
std::cout << "copying ~/.saga.ini" << std::endl;
proc.set_cmd ("/usr/bin/scp");
@@ -424,12 +414,28 @@
proc.add_arg ("root@" + vm_ip_ + ":.saga.ini");
(void) proc.run_sync ();
-
+
if ( proc.fail () )
{
std::cout << "could not copy saga.ini" << std::endl;
throw;
}
+
+
+ // we are sure ssh is running - we should be able to create a ssh job
+ // service for the instance now
+ ssh_url_ = std::string ("ssh://") + vm_ip_;
+
+ // FIXME: point to correct cert
+ ssh_context_.set_attribute (saga::attributes::context_type, "ssh");
+ ssh_context_.set_defaults ();
+ ssh_context_.set_attribute (saga::attributes::context_userid, "root");
+ ssh_context_.set_attribute (saga::attributes::context_userkey, ini_["ec2_proxy"]);
+
+ ssh_session_.add_context (ssh_context_);
+
+ // create ssh job service which from now on handles job submission etc
+ js_ = TR1::shared_ptr <saga::job::service> (new saga::job::service (ssh_session_, ssh_url_));
}
//////////////////////////////////////////////////////////////////////
@@ -469,50 +475,43 @@
// that the VM may continue to run
SAGA_LOG_CRITICAL ("Could not shut down VM instance ");
}
-
}
+
+ // the ssh job service is automatically destroyed here.
}
//////////////////////////////////////////////////////////////////////
// SAGA API functions
- void
- job_service_cpi_impl::sync_create_job (saga::job::job & ret,
- saga::job::description jd)
+ void job_service_cpi_impl::sync_create_job (saga::job::job & ret,
+ saga::job::description jd)
{
- saga::url const instance_rm = instance_data (this)->rm_;
-
- saga::job::job job = saga::adaptors::job (instance_rm.get_string (),
- jd,
- proxy_->get_session ());
- ret = job;
+ ret = js_->create_job (jd);
}
- void
- job_service_cpi_impl::sync_run_job (saga::job::job & ret,
- std::string cmd,
- std::string host,
- saga::job::ostream & in,
- saga::job::istream & out,
- saga::job::istream & err)
+ void job_service_cpi_impl::sync_run_job (saga::job::job & ret,
+ std::string cmd,
+ std::string host,
+ saga::job::ostream & in,
+ saga::job::istream & out,
+ saga::job::istream & err)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ ret = js_->run_job (cmd, host, in, out, err);
}
- void
- job_service_cpi_impl::sync_list (std::vector <std::string> & ret)
+ void job_service_cpi_impl::sync_list (std::vector <std::string> & ret)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ ret = js_->list ();
}
- void
- job_service_cpi_impl::sync_get_job (saga::job::job & ret,
- std::string jobid)
+ void job_service_cpi_impl::sync_get_job (saga::job::job & ret,
+ std::string jobid)
{
- SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+ ret = js_->get_job (jobid);
}
void job_service_cpi_impl::sync_get_self (saga::job::self & ret)
{
+ // will never be implemented by this adaptor
SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
}
File [modified]: aws_job_service.hpp
Delta lines: +6 -13
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_service.hpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_service.hpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -48,8 +48,13 @@
std::string vm_id_;
std::string vm_ip_;
- saga::context ctx_; // context used to access this job service
+ saga::context ctx_; // context used to access this job service
+ saga::url ssh_url_; // rm url for ssh job service
+ saga::context ssh_context_; // context to use for ssh ops
+ saga::session ssh_session_; // session to use for ssh ops
+ TR1::shared_ptr <saga::job::service> js_; // ssh job service, does the real work
+
std::map <std::string, std::string> ini_;
std::map <std::string, std::string> env_;
@@ -79,18 +84,6 @@
std::string jobid);
void sync_get_self (saga::job::self & ret);
- // This adaptor implements the async functions
- // based on its own synchronous functions.
- saga::task async_create_job (saga::job::description jd);
- saga::task async_run_job (std::string cmd,
- std::string host,
- saga::job::ostream & in,
- saga::job::istream & out,
- saga::job::istream & err);
- saga::task async_list (void);
- saga::task async_get_job (std::string jobid);
- saga::task async_get_self (void);
-
}; // class job_service_cpi_impl
} // namespace aws_job
File [removed]: aws_job_service_async.cpp
Delta lines: +0 -91
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_service_async.cpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_service_async.cpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,91 +0,0 @@
-// Copyright (c) 2005-2007 Hartmut Kaiser
-// Copyright (c) 2005-2007 Andre Merzky (andre at merzky.net)
-//
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-// stl includes
-#include <vector>
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga engine includes
-#include <saga/impl/config.hpp>
-
-// saga adaptor includes
-#include <saga/saga/adaptors/task.hpp>
-#include <saga/saga/adaptors/attribute.hpp>
-
-// saga package includes
-#include <saga/saga/packages/job/adaptors/job.hpp>
-#include <saga/saga/packages/job/adaptors/job_self.hpp>
-
-// adaptor includes
-#include "aws_job_service.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
-
- //////////////////////////////////////////////////////////////////////
- // This adaptor implements the async functions
- // based on its own synchronous functions.
-
- saga::task
- job_service_cpi_impl::async_create_job (saga::job::description jd)
- {
- return saga::adaptors::task ("job_service_cpi_impl::async_create_job",
- shared_from_this (),
- &job_service_cpi_impl::sync_create_job,
- jd);
- }
-
- saga::task
- job_service_cpi_impl::async_run_job (std::string cmd,
- std::string host,
- saga::job::ostream & in,
- saga::job::istream & out,
- saga::job::istream & err)
- {
- return saga::adaptors::task ("job_service_cpi_impl::async_run_job",
- shared_from_this (),
- &job_service_cpi_impl::sync_run_job,
- cmd,
- host,
- TR1::ref (in),
- TR1::ref (out),
- TR1::ref (err));
- }
-
- saga::task
- job_service_cpi_impl::async_list (void)
- {
- return saga::adaptors::task ("job_service_cpi_impl::async_list",
- shared_from_this (),
- &job_service_cpi_impl::sync_list);
- }
-
- saga::task
- job_service_cpi_impl::async_get_job (std::string jobid)
- {
- return saga::adaptors::task ("job_service_cpi_impl::async_get_job",
- shared_from_this (),
- &job_service_cpi_impl::sync_get_job,
- jobid);
- }
-
- saga::task
- job_service_cpi_impl::async_get_self (void)
- {
- return saga::adaptors::task ("job_service_cpi_impl::async_get_self",
- shared_from_this (),
- &job_service_cpi_impl::sync_get_self);
- }
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
File [removed]: aws_job_stream.hpp
Delta lines: +0 -52
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_stream.hpp 2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_stream.hpp 2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,52 +0,0 @@
-// Copyright (c) 2005-2007 Hartmut Kaiser
-// Copyright (c) 2005-2007 Andre Merzky (andre at merzky.net)
-//
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#if !defined(ADAPTORS_AWS_JOB_STREAM_HPP)
-#define ADAPTORS_AWS_JOB_STREAM_HPP
-
-// stl includes
-#include <iosfwd>
-
-// saga engine includes
-#include <saga/impl/engine/cpi.hpp>
-
-
-///////////////////////////////////////////////////////////////////////////////
-namespace impl
-{
- namespace aws_job
- {
- template <typename Base>
- class stream
- : public Base
- {
- private:
- typedef Base base_type;
-
- // a saga stream has to keep alive the proxy and the cpi instance
- TR1::shared_ptr <saga::impl::v1_0::cpi> cpi_;
- TR1::shared_ptr <saga::impl::proxy> proxy_;
-
-
- public:
- stream (saga::impl::v1_0::job_cpi * cpi,
- std::streambuf * buf)
- : base_type (buf),
- cpi_ (cpi->shared_from_this ()),
- proxy_ (cpi->get_proxy ()->shared_from_this ())
- {
- }
- };
-
- } // namespace aws_job
- //////////////////////////////////////////////////////////////////////
-
-} // namespace impl
-////////////////////////////////////////////////////////////////////////
-
-#endif // !defined(ADAPTORS_AWS_JOB_STREAM_HPP)
-
More information about the saga-devel
mailing list