[Saga-devel] saga SVN commit 3262: /trunk/adaptors/aws/aws_job/

amerzky at cct.lsu.edu amerzky at cct.lsu.edu
Mon Jan 12 04:18:16 CST 2009


User: amerzky
Date: 2009/01/12 04:18 AM

Removed:
 /trunk/adaptors/aws/aws_job/
  aws_job.cpp, aws_job.hpp, aws_job_async.cpp, aws_job_istream.hpp, aws_job_ostream.hpp, aws_job_service_async.cpp, aws_job_stream.hpp

Modified:
 /trunk/adaptors/aws/aws_job/
  aws_job_adaptor.cpp, aws_job_service.cpp, aws_job_service.hpp

Log:
 completed the split of aws and ssh adaptors.
 A

File Changes:

Directory: /trunk/adaptors/aws/aws_job/
=======================================

File [removed]: aws_job.cpp
Delta lines: +0 -286
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job.cpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job.cpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,286 +0,0 @@
-//  Copyright (c) 2005-2007 Hartmut Kaiser 
-//  Copyright (c) 2005-2007 Andre Merzky   (andre at merzky.net)
-// 
-//  Distributed under the Boost Software License, Version 1.0. 
-//  (See accompanying file LICENSE or copy at 
-//   http://www.boost.org/LICENSE_1_0.txt)
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga adaptor icnludes
-#include <saga/saga/adaptors/task.hpp>
-#include <saga/saga/adaptors/attribute.hpp>
-#include <saga/saga/adaptors/file_transfer_spec.hpp>
-
-// saga engine includes
-#include <saga/impl/config.hpp>
-#include <saga/impl/exception_list.hpp>
-
-// saga package includes
-#include <saga/saga/packages/job/adaptors/job_self.hpp>
-#include <saga/saga/packages/job/job_description.hpp>
-
-// adaptor includes
-#include "aws_job.hpp"
-#include "aws_job_istream.hpp"
-#include "aws_job_ostream.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
-
-  // constructor
-  job_cpi_impl::job_cpi_impl (proxy                           * p, 
-                              cpi_info const                  & info,
-                              saga::ini::ini const            & glob_ini, 
-                              saga::ini::ini const            & adap_ini,
-                              TR1::shared_ptr <saga::adaptor>   adaptor)
-    : base_cpi  (p, info, adaptor, cpi::Noflags)
-  {
-    adaptor_data  adata (this);
-    instance_data idata (this);
-
-    SAGA_LOG_ALWAYS(idata->rm_.get_string ().c_str ());
-
-
-    std::string scheme = idata->rm_.get_scheme ();
-    std::string type;
-    bool        ok = false;
-
-    std::vector <std::string> types = saga::adaptors::utils::split (adata->ini_["defaults"]["cloud_names"], ' ');
-
-    std::vector <saga::context> contexts = p->get_session ().list_contexts ();
-
-    for ( unsigned int i = 0; i < types.size () && ! ok; i++ )
-    {
-      std::cout << " == checking type " << types[i] << "\n";
-
-      if ( scheme == types[i] ||
-           scheme == "any"    ||
-           scheme == "" )
-      {
-        // check if we have a context for that type
-        std::cout << " == matching scheme " << scheme << "\n";
-
-        std::vector <saga::context> :: iterator it;
-
-        for ( it = contexts.begin (); ! ok && it != contexts.end () ; it++ )
-        {
-          std::cout << " == matching ctype? " << (*it).get_attribute (saga::attributes::context_type) << "\n";
-          if ( (*it).get_attribute (saga::attributes::context_type) == types[i] )
-          {
-            std::cout << " == matching ctype! " << (*it).get_attribute (saga::attributes::context_type) << "\n";
-            ok   = true;
-            type = types[i];
-            ini_ = adata->ini_[types[i]];
-
-            std::cout <<  " == using ini section for " << type << "\n";
-          }
-        }
-      }
-    }
-
-    env_["JAVA_HOME"]       = ini_["java_home"];
-    env_["EC2_HOME"]        = ini_["ec2_home"];
-    env_["EC2_GSG_KEY"]     = ini_["ec2_proxy"];
-    env_["EC2_PRIVATE_KEY"] = ini_["ec2_key"];
-    env_["EC2_CERT"]        = ini_["ec2_cert"];
-    env_["EC2_URL"]         = ini_["ec2_url"];
-
-    // FIXME: more jd checks needed
-
-    jd_ = idata->jd_;
-    rm_ = idata->rm_;
-    id_ = rm_.get_string (); // FIXME
-
-    if ( ! ok )
-    {
-      // FIXME
-      SAGA_ADAPTOR_THROW_NO_CONTEXT ("Adaptor only supports 'aws' and 'any' URL schemes.",
-                                     saga::BadParameter);
-    }
-
-    SAGA_LOG_ALWAYS("job ctor done");
-  }
-
-
-  // destructor
-  job_cpi_impl::~job_cpi_impl (void)
-  {
-  }
-
-
-  //  SAGA API functions
-  void job_cpi_impl::sync_get_state (saga::job::state & ret)
-  {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-  void job_cpi_impl::sync_get_description (saga::job::description & ret)
-  {
-    ret = jd_;
-  }
-
-  void job_cpi_impl::sync_get_job_id (std::string & ret)
-  {
-    ret = id_;
-  }
-
-  // access streams for communication with the child
-  void job_cpi_impl::sync_get_stdin (saga::job::ostream & ret)
-  {
-    // SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-  void job_cpi_impl::sync_get_stdout (saga::job::istream & ret)
-  {
-    // SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-  void job_cpi_impl::sync_get_stderr (saga::job::istream & ret)
-  {
-    // SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-  void job_cpi_impl::sync_checkpoint (saga::impl::void_t & ret)
-  {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-  void job_cpi_impl::sync_migrate (saga::impl::void_t           & ret, 
-                                   saga::job::description   jd)
-  {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-  void job_cpi_impl::sync_signal (saga::impl::void_t & ret, 
-                                  int            signal)
-  {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-
-  //  suspend the child process 
-  void job_cpi_impl::sync_suspend (saga::impl::void_t & ret)
-  {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-  //  suspend the child process 
-  void job_cpi_impl::sync_resume (saga::impl::void_t & ret)
-  {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-
-  //////////////////////////////////////////////////////////////////////
-  // inherited from the task interface
-  void job_cpi_impl::sync_run (saga::impl::void_t & ret)
-  {
-    namespace sja = saga::job::attributes;
-
-    SAGA_LOG_ALWAYS("job.run");
-
-    adaptor_data adata (this);
-    std::string vm_ip = rm_.get_host ();
-    std::string vm_id = rm_.get_path ();
-
-
-    // erase leading slash
-    vm_id.erase (0, 1);
-
-    std::string exe = jd_.get_attribute (sja::description_executable);
-    saga::url   exe_url (exe);
-
-    // we allow staging of execs, if an explicit source host is given
-    if ( ! exe_url.get_host ().empty () )
-    { 
-      // at the moment, only staging from localhost is supported
-      // FIXME: should use ssh file adaptor for simple copy
-      if ( ! ( exe_url.get_scheme ().empty ()  ) &&
-           ! ( exe_url.get_scheme () == "any"  ) &&
-           ! ( exe_url.get_scheme () == "file" ) )
-      {
-        SAGA_ADAPTOR_THROW ("Staging is not supported for that executable", saga::BadParameter);
-      }
-
-      // the new exe path is created from "/tmp/" + last url path element
-      exe = std::string ("/tmp/") + saga::adaptors::utils::split (exe_url.get_path (), '/').back ();
-
-      // stage the executable from the given URL to the new exe location on the
-      // remote host
-      saga::adaptors::utils::process proc ("/usr/bin/scp", env_);
-
-      proc.add_arg ("-q"); // quiet
-      proc.add_args ("-o", "StrictHostKeyChecking=no");
-      proc.add_args ("-i", ini_["ec2_proxy"]);
-
-      proc.add_arg (exe_url.get_path ());
-      proc.add_arg ("root@" + vm_ip + ":" + exe);
-
-      (void) proc.run_sync ();
-
-      if ( proc.fail () )
-      {
-        SAGA_ADAPTOR_THROW ("Could not stage Executable", saga::BadParameter);
-      }
-    }
-
-    SAGA_LOG_ALWAYS("job.run after staging");
-
-
-    saga::adaptors::utils::process proc ("/usr/bin/ssh", env_);
-
-    proc.add_args ("-o", "StrictHostKeyChecking=no");
-    proc.add_args ("-i", ini_["ec2_proxy"]);
-
-    proc.add_arg ("root@" + vm_ip);
-    proc.add_arg (exe);
-
-    if ( jd_.attribute_exists (sja::description_arguments) )
-    {
-      std::vector <std::string> cl_args = jd_.get_vector_attribute (sja::description_arguments);
-
-      for ( unsigned int i = 0; i < cl_args.size (); i++ )
-      {
-        proc.add_arg (cl_args[i]);
-      }
-    }
-
-
-    // fixme: this should better be done by the ssh job adaptor
-    std::vector <std::string> out = proc.run_sync ();
-
-    if ( proc.fail () )
-    {
-      SAGA_ADAPTOR_THROW ("Could not run Executable", saga::BadParameter);
-    }
-
-    for ( unsigned int j = 0; j < out.size (); j++ )
-    {
-      std::cout << "  output[" << j << "] : " << out[j] << std::endl;
-    }
-  }
-
-  void job_cpi_impl::sync_cancel (saga::impl::void_t & ret, 
-                                  double timeout)
-  {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
-  }
-
-  //  wait for the child process to terminate
-  void job_cpi_impl::sync_wait (bool   & ret, 
-                                double   timeout)
-  {
-    // we only support sync jobs anyway, so they are finished after run()
-    // FIXME: for the real thing, we need to fix run_process, and turn it 
-    // into a stateful class.
-    return;
-  }
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-

File [removed]: aws_job.hpp
Delta lines: +0 -113
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job.hpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job.hpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,113 +0,0 @@
-//  Copyright (c) 2005-2007 Hartmut Kaiser 
-//  Copyright (c) 2005-2007 Andre Merzky   (andre at merzky.net)
-// 
-//  Distributed under the Boost Software License, Version 1.0. 
-//  (See accompanying file LICENSE or copy at 
-//   http://www.boost.org/LICENSE_1_0.txt)
-
-#ifndef ADAPTORS_AWS_JOB_HPP
-#define ADAPTORS_AWS_JOB_HPP
-
-// stl includes
-#include <string>
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga engine includes
-#include <saga/impl/engine/proxy.hpp>
-
-// saga adaptor includes
-#include <saga/saga/adaptors/adaptor.hpp>
-#include <saga/saga/adaptors/task.hpp>
-#include <saga/saga/adaptors/adaptor_data.hpp>
-
-// job package includes
-#include <saga/impl/packages/job/job_cpi.hpp>
-
-// adaptor includes
-#include "aws_job_adaptor.hpp"
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
-  class job_cpi_impl 
-    : public saga::adaptors::v1_0::job_cpi <job_cpi_impl>
-  {
-    private:
-      typedef saga::adaptors::v1_0::job_cpi <job_cpi_impl> base_cpi;
-
-      // adaptor data
-      typedef saga::adaptors::adaptor_data <adaptor> adaptor_data;
-
-      saga::job::description jd_;
-      saga::url              rm_;
-      std::string            id_;
-
-      std::map <std::string, std::string> ini_;
-      std::map <std::string, std::string> env_;
-
-    public:
-      // constructor of the job adaptor
-      job_cpi_impl  (proxy                           * p, 
-                     cpi_info const                  & info,
-                     saga::ini::ini const            & glob_ini, 
-                     saga::ini::ini const            & adap_ini,
-                     TR1::shared_ptr <saga::adaptor>   adaptor);
-
-      // destructor of the job adaptor
-      ~job_cpi_impl (void);
-
-      // job functions
-      void sync_get_state       (saga::job::state       & ret);
-      void sync_get_description (saga::job::description & ret);
-      void sync_get_job_id      (std::string            & ret);
-
-      void sync_get_stdin       (saga::job::ostream     & ret);
-      void sync_get_stdout      (saga::job::istream     & ret);
-      void sync_get_stderr      (saga::job::istream     & ret);
-
-      void sync_checkpoint      (saga::impl::void_t     & ret);
-      void sync_migrate         (saga::impl::void_t     & ret,
-                                 saga::job::description   jd);
-      void sync_signal          (saga::impl::void_t     & ret, 
-                                 int                      signal);
-
-      // inherited from saga::task
-      void sync_run     (saga::impl::void_t & ret);
-      void sync_cancel  (saga::impl::void_t & ret, 
-                         double               timeout);
-      void sync_suspend (saga::impl::void_t & ret);
-      void sync_resume  (saga::impl::void_t & ret);
-
-      void sync_wait    (bool         & ret, 
-                         double         timeout);
-
-      // This adaptor implements the async functions 
-      // based on its own synchronous functions.
-      saga::task async_get_state       (void);
-      saga::task async_get_description (void);
-      saga::task async_get_job_id      (void);
-
-      saga::task async_get_stdin       (void);
-      saga::task async_get_stdout      (void);
-      saga::task async_get_stderr      (void);
-
-      saga::task async_checkpoint      (void);
-      saga::task async_migrate         (saga::job::description   jd);
-      saga::task async_signal          (int                      signal);
-
-      // inherited from the task interface
-      saga::task async_run             (void);
-      saga::task async_cancel          (double timeout);
-      saga::task async_suspend         (void);
-      saga::task async_resume          (void);
-      saga::task async_wait            (double  timeout);
-  };  // class job_cpi_impl
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
-#endif // ADAPTORS_AWS_JOB_HPP
-

File [modified]: aws_job_adaptor.cpp
Delta lines: +4 -2
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_adaptor.cpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_adaptor.cpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -16,12 +16,15 @@
 // adaptor includes
 #include "aws_job_adaptor.hpp"
 #include "aws_job_service.hpp"
-#include "aws_job.hpp"
 
 SAGA_ADAPTOR_REGISTER (aws_job::adaptor);
 
 
 ////////////////////////////////////////////////////////////////////////
+//
+// This adaptor only implements the job service API - all jobs are actually
+// managed by the ssh job service adaptor
+//
 namespace aws_job
 {
   // register function for the SAGA engine
@@ -43,7 +46,6 @@
     // create file adaptor infos (each adaptor instance gets its own uuid)
     // and add cpi_infos to list
     job_service_cpi_impl::register_cpi (list, prefs, adaptor_uuid_);
-    job_cpi_impl::register_cpi         (list, prefs, adaptor_uuid_);
 
     // and return list
     return (list);

File [removed]: aws_job_async.cpp
Delta lines: +0 -157
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_async.cpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_async.cpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,157 +0,0 @@
-//  Copyright (c) 2005-2007 Hartmut Kaiser 
-//  Copyright (c) 2005-2007 Andre Merzky   (andre at merzky.net)
-// 
-//  Distributed under the Boost Software License, Version 1.0. 
-//  (See accompanying file LICENSE or copy at 
-//   http://www.boost.org/LICENSE_1_0.txt)
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga adaptor icnludes
-#include <saga/saga/adaptors/task.hpp>
-#include <saga/saga/adaptors/attribute.hpp>
-#include <saga/saga/adaptors/file_transfer_spec.hpp>
-
-// saga engine includes
-#include <saga/impl/config.hpp>
-#include <saga/impl/exception_list.hpp>
-
-// saga package includes
-#include <saga/saga/packages/job/adaptors/job_self.hpp>
-#include <saga/saga/packages/job/job_description.hpp>
-
-// adaptor includes
-#include "aws_job.hpp"
-#include "aws_job_istream.hpp"
-#include "aws_job_ostream.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
-
-  //////////////////////////////////////////////////////////////////////
-  // This adaptor implements the async functions 
-  // based on its own synchronous functions.
-
-  saga::task 
-    job_cpi_impl::async_get_state (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_get_state",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_get_state);
-  }
-
-  saga::task 
-    job_cpi_impl::async_get_description (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_get_description",
-                                 shared_from_this (),
-                                 &job_cpi_impl::sync_get_description);
-  }
-
-  saga::task 
-    job_cpi_impl::async_get_job_id (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_get_job_id",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_get_job_id);
-  }
-
-  saga::task 
-    job_cpi_impl::async_run (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_run",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_run);
-  }
-
-  saga::task 
-    job_cpi_impl::async_wait (double   timeout)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_wait",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_wait,
-                                 timeout);
-  }
-
-  saga::task 
-    job_cpi_impl::async_cancel (double timeout)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_cancel",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_cancel,
-                                 timeout);
-  }
-
-  saga::task 
-    job_cpi_impl::async_suspend (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_suspend",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_suspend);
-  }
-
-  saga::task 
-    job_cpi_impl::async_resume (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_resume",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_resume);
-  }
-
-  saga::task 
-    job_cpi_impl::async_get_stdin (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_get_stdin",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_get_stdin);
-  }
-
-  saga::task  
-    job_cpi_impl::async_get_stdout (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_get_stdout",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_get_stdout);
-  }
-
-  saga::task  
-    job_cpi_impl::async_get_stderr (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_get_stderr",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_get_stderr);
-  }
-
-  saga::task 
-    job_cpi_impl::async_checkpoint (void)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_checkpoint",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_checkpoint);
-  }
-
-  saga::task 
-    job_cpi_impl::async_migrate (saga::job::description   jd)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_migrate",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_migrate,
-                                 jd);
-  }
-
-  saga::task 
-    job_cpi_impl::async_signal (int signal)
-  {
-    return saga::adaptors::task ("job_cpi_impl::async_signal",
-                                 shared_from_this (), 
-                                 &job_cpi_impl::sync_signal,
-                                 signal);
-  }
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-

File [removed]: aws_job_istream.hpp
Delta lines: +0 -44
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_istream.hpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_istream.hpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,44 +0,0 @@
-//  Copyright (c) 2005-2007 Hartmut Kaiser 
-//  Copyright (c) 2005-2007 Andre Merzky   (andre at merzky.net)
-// 
-//  Distributed under the Boost Software License, Version 1.0. 
-//  (See accompanying file LICENSE or copy at 
-//   http://www.boost.org/LICENSE_1_0.txt)
-
-#if !defined(ADAPTORS_AWS_JOB_ISTREAM_HPP)
-#define ADAPTORS_AWS_JOB_ISTREAM_HPP
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga engine includes
-#include <saga/impl/job.hpp>
-
-// adaptor includes
-#include "aws_job_stream.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job 
-{
-  class istream : public saga::job::istream
-  {
-    private:
-      typedef impl::aws_job::stream <saga::adaptors::istream_ptr> 
-              impl_type;
-
-    public:
-      template <typename Stream>
-      istream (saga::impl::v1_0::job_cpi * cpi, 
-               Stream                    & child_istream)
-        : saga::job::istream (new impl_type (cpi, child_istream.rdbuf ()))
-      {
-      }
-  };
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
-#endif // !defined(ADAPTORS_AWS_JOB_ISTREAM_HPP)
-

File [removed]: aws_job_ostream.hpp
Delta lines: +0 -45
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_ostream.hpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_ostream.hpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,45 +0,0 @@
-//  Copyright (c) 2005-2007 Hartmut Kaiser 
-//  Copyright (c) 2005-2007 Andre Merzky   (andre at merzky.net)
-// 
-//  Distributed under the Boost Software License, Version 1.0. 
-//  (See accompanying file LICENSE or copy at 
-//   http://www.boost.org/LICENSE_1_0.txt)
-
-#ifdef  ADAPTORS_AWS_JOB_OSTREAM_HPP
-#define ADAPTORS_AWS_JOB_OSTREAM_HPP
-
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga engine includes
-#include <saga/impl/job.hpp>
-
-// adaptor includes
-#include "aws_job_stream.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
-  class ostream : public saga::job::ostream
-  {
-    private:
-      typedef impl::aws_job_stream <saga::adaptors::ostream_ptr> 
-              impl_type;
-
-    public:
-      template <typename Stream>
-      ostream (saga::impl::v1_0::job_cpi * cpi, 
-               Stream                    & child_ostream)
-        : saga::job::ostream (new impl_type (cpi, child_ostream.rdbuf ()))
-      {
-      }
-  };
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-
-#endif // ADAPTORS_AWS_JOB_OSTREAM_HPP
-

File [modified]: aws_job_service.cpp
Delta lines: +40 -41
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_service.cpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_service.cpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -95,7 +95,7 @@
     env_["EC2_PRIVATE_KEY"] = ini_["ec2_key"];
     env_["EC2_CERT"]        = ini_["ec2_cert"];
     env_["EC2_URL"]         = ini_["ec2_url"];
-    
+
     saga::adaptors::utils::process proc (env_);
 
     // is a VM given which we should contact?
@@ -107,7 +107,7 @@
 
       proc.add_args ("-k", ini_["ec2_keypair_name"]);
       proc.add_arg  (ini_["ec2_instance"]);
-      
+
       std::vector <std::string> out = proc.run_sync ();
 
       if ( out.size() < 1 )
@@ -130,7 +130,7 @@
 
 
       proc.set_cmd (ini_["ec2_home"] + "/bin/ec2-describe-instances");
-      
+
       proc.clear_args ();
       proc.add_arg (vm_id_);
 
@@ -260,7 +260,7 @@
            host.find (".") == std::string::npos )
       {
         // host is an instance id, and we need to get the hostname
-        
+
         SAGA_LOG_ALWAYS ("found id");
 
         std::string vm_id_ = host;
@@ -355,16 +355,6 @@
       }
     }
 
-    // set the correct scheme if needed, so that the job cpi recognizes it
-    if ( idata->rm_.get_scheme ().empty () ||
-         idata->rm_.get_scheme () == "any" )
-    {
-      idata->rm_.set_scheme (type);
-    }
-
-    std::cout << "  rm: " << idata->rm_ << std::endl;
-
-
     // we do have a job service instance, either new started or old and running.
     // Now, copy over the ssh identity file of the context we used, so that jobs
     // running on that instance can use it to contact other instances using the
@@ -399,7 +389,7 @@
     // we also propagate our HOME/.saga.ini file.  This is kind of a hack
     // (FIXME), but allows us to have a uniform SAGA configuration for all VMs.
     // FIXME: that should be better done with the prep scripts above.
-    
+
     std::cout << "copying ~/.saga.ini" << std::endl;
 
     proc.set_cmd ("/usr/bin/scp");
@@ -424,12 +414,28 @@
     proc.add_arg ("root@" + vm_ip_ + ":.saga.ini");
 
     (void) proc.run_sync ();
-    
+
     if ( proc.fail () )
     {
       std::cout << "could not copy saga.ini" << std::endl;
       throw;
     }
+
+
+    // we are sure ssh is running - we should be able to create a ssh job
+    // service for the instance now
+    ssh_url_ = std::string ("ssh://") + vm_ip_;
+
+    // FIXME: point to correct cert
+    ssh_context_.set_attribute (saga::attributes::context_type,    "ssh");
+    ssh_context_.set_defaults  ();
+    ssh_context_.set_attribute (saga::attributes::context_userid,  "root");
+    ssh_context_.set_attribute (saga::attributes::context_userkey, ini_["ec2_proxy"]);
+
+    ssh_session_.add_context (ssh_context_);
+
+    // create ssh job service which from now on handles job submission etc
+    js_ = TR1::shared_ptr <saga::job::service> (new saga::job::service (ssh_session_, ssh_url_));
   }
 
   //////////////////////////////////////////////////////////////////////
@@ -469,50 +475,43 @@
         // that the VM may continue to run
         SAGA_LOG_CRITICAL ("Could not shut down VM instance ");
       }
-      
     }
+
+    // the ssh job service is automatically destroyed here.
   }
 
   //////////////////////////////////////////////////////////////////////
   // SAGA API functions
-  void 
-    job_service_cpi_impl::sync_create_job (saga::job::job         & ret, 
-                                           saga::job::description   jd)
+  void job_service_cpi_impl::sync_create_job (saga::job::job         & ret, 
+                                              saga::job::description   jd)
   {
-    saga::url const instance_rm = instance_data (this)->rm_;
-
-    saga::job::job job = saga::adaptors::job (instance_rm.get_string (), 
-                                              jd,
-                                              proxy_->get_session ());
-    ret = job;
+    ret = js_->create_job (jd);
   }
 
-  void 
-    job_service_cpi_impl::sync_run_job (saga::job::job     & ret, 
-                                        std::string          cmd, 
-                                        std::string          host, 
-                                        saga::job::ostream & in, 
-                                        saga::job::istream & out, 
-                                        saga::job::istream & err)
+  void job_service_cpi_impl::sync_run_job (saga::job::job     & ret, 
+                                           std::string          cmd, 
+                                           std::string          host, 
+                                           saga::job::ostream & in, 
+                                           saga::job::istream & out, 
+                                           saga::job::istream & err)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    ret = js_->run_job (cmd, host, in, out, err);
   }
 
-  void 
-    job_service_cpi_impl::sync_list (std::vector <std::string> & ret)
+  void job_service_cpi_impl::sync_list (std::vector <std::string> & ret)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    ret = js_->list ();
   }
 
-  void
-    job_service_cpi_impl::sync_get_job (saga::job::job & ret, 
-                                        std::string      jobid)
+  void job_service_cpi_impl::sync_get_job (saga::job::job & ret, 
+                                           std::string      jobid)
   {
-    SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
+    ret = js_->get_job (jobid);
   }
 
   void job_service_cpi_impl::sync_get_self (saga::job::self & ret)
   {
+    // will never be implemented by this adaptor
     SAGA_ADAPTOR_THROW ("Not Implemented", saga::NotImplemented);
   }
 

File [modified]: aws_job_service.hpp
Delta lines: +6 -13
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_service.hpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_service.hpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -48,8 +48,13 @@
       std::string vm_id_;
       std::string vm_ip_;
 
-      saga::context ctx_;   // context used to access this job service
+      saga::context      ctx_;          // context used to access this job service
+      saga::url          ssh_url_;      // rm url for ssh job service
+      saga::context      ssh_context_;  // context to use for ssh ops
+      saga::session      ssh_session_;  // session to use for ssh ops
 
+      TR1::shared_ptr <saga::job::service> js_; // ssh job service, does the real work
+
       std::map <std::string, std::string> ini_;
       std::map <std::string, std::string> env_;
 
@@ -79,18 +84,6 @@
                             std::string                 jobid);
       void sync_get_self   (saga::job::self           & ret);
 
-      // This adaptor implements the async functions 
-      // based on its own synchronous functions.
-      saga::task async_create_job (saga::job::description      jd);
-      saga::task async_run_job    (std::string                 cmd, 
-                                   std::string                 host, 
-                                   saga::job::ostream        & in, 
-                                   saga::job::istream        & out, 
-                                   saga::job::istream        & err);
-      saga::task async_list       (void);
-      saga::task async_get_job    (std::string                 jobid);
-      saga::task async_get_self   (void);
-
   };  // class job_service_cpi_impl
 
 } // namespace aws_job

File [removed]: aws_job_service_async.cpp
Delta lines: +0 -91
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_service_async.cpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_service_async.cpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,91 +0,0 @@
-//  Copyright (c) 2005-2007 Hartmut Kaiser 
-//  Copyright (c) 2005-2007 Andre Merzky   (andre at merzky.net)
-// 
-//  Distributed under the Boost Software License, Version 1.0. 
-//  (See accompanying file LICENSE or copy at 
-//   http://www.boost.org/LICENSE_1_0.txt)
-
-// stl includes
-#include <vector>
-
-// saga includes
-#include <saga/saga.hpp>
-#include <saga/saga/adaptors/task.hpp>
-
-// saga engine includes
-#include <saga/impl/config.hpp>
-
-// saga adaptor includes
-#include <saga/saga/adaptors/task.hpp>
-#include <saga/saga/adaptors/attribute.hpp>
-
-// saga package includes
-#include <saga/saga/packages/job/adaptors/job.hpp>
-#include <saga/saga/packages/job/adaptors/job_self.hpp>
-
-// adaptor includes
-#include "aws_job_service.hpp"
-
-
-////////////////////////////////////////////////////////////////////////
-namespace aws_job
-{
-
-  //////////////////////////////////////////////////////////////////////
-  // This adaptor implements the async functions 
-  // based on its own synchronous functions.
-
-  saga::task 
-    job_service_cpi_impl::async_create_job (saga::job::description   jd)
-  {
-    return saga::adaptors::task ("job_service_cpi_impl::async_create_job",
-                                 shared_from_this (),
-                                 &job_service_cpi_impl::sync_create_job, 
-                                 jd);
-  }
-
-  saga::task 
-    job_service_cpi_impl::async_run_job (std::string          cmd, 
-                                         std::string          host, 
-                                         saga::job::ostream & in, 
-                                         saga::job::istream & out, 
-                                         saga::job::istream & err)
-  {
-    return saga::adaptors::task ("job_service_cpi_impl::async_run_job",
-                                 shared_from_this (),
-                                 &job_service_cpi_impl::sync_run_job, 
-                                 cmd, 
-                                 host, 
-                                 TR1::ref (in), 
-                                 TR1::ref (out),
-                                 TR1::ref (err));
-  }
-
-  saga::task 
-    job_service_cpi_impl::async_list (void)
-  {
-    return saga::adaptors::task ("job_service_cpi_impl::async_list",
-                                 shared_from_this (),
-                                 &job_service_cpi_impl::sync_list);
-  }
-
-  saga::task 
-    job_service_cpi_impl::async_get_job (std::string      jobid)
-  {
-    return saga::adaptors::task ("job_service_cpi_impl::async_get_job",
-                                 shared_from_this (),
-                                 &job_service_cpi_impl::sync_get_job, 
-                                 jobid);
-  }
-
-  saga::task 
-    job_service_cpi_impl::async_get_self (void)
-  {
-    return saga::adaptors::task ("job_service_cpi_impl::async_get_self",
-                                 shared_from_this (),
-                                 &job_service_cpi_impl::sync_get_self);
-  }
-
-} // namespace aws_job
-////////////////////////////////////////////////////////////////////////
-

File [removed]: aws_job_stream.hpp
Delta lines: +0 -52
===================================================================
--- trunk/adaptors/aws/aws_job/aws_job_stream.hpp	2009-01-12 10:16:56 UTC (rev 3261)
+++ trunk/adaptors/aws/aws_job/aws_job_stream.hpp	2009-01-12 10:17:56 UTC (rev 3262)
@@ -1,52 +0,0 @@
-//  Copyright (c) 2005-2007 Hartmut Kaiser 
-//  Copyright (c) 2005-2007 Andre Merzky   (andre at merzky.net)
-// 
-//  Distributed under the Boost Software License, Version 1.0. 
-//  (See accompanying file LICENSE or copy at 
-//   http://www.boost.org/LICENSE_1_0.txt)
-
-#if !defined(ADAPTORS_AWS_JOB_STREAM_HPP)
-#define ADAPTORS_AWS_JOB_STREAM_HPP
-
-// stl includes
-#include <iosfwd>
-
-// saga engine includes
-#include <saga/impl/engine/cpi.hpp>
-
-
-///////////////////////////////////////////////////////////////////////////////
-namespace impl
-{
-  namespace aws_job
-  {
-    template <typename Base>
-    class stream
-      :   public Base
-    {
-      private:
-        typedef Base base_type;
-
-        // a saga stream has to keep alive the proxy and the cpi instance
-        TR1::shared_ptr <saga::impl::v1_0::cpi> cpi_;
-        TR1::shared_ptr <saga::impl::proxy>     proxy_;
-
-
-      public:
-        stream (saga::impl::v1_0::job_cpi * cpi, 
-                std::streambuf            * buf)
-          : base_type (buf),
-            cpi_      (cpi->shared_from_this ()),
-            proxy_    (cpi->get_proxy ()->shared_from_this ())
-          {
-          }
-    };
-
-  } // namespace aws_job
-  //////////////////////////////////////////////////////////////////////
-
-} // namespace impl
-////////////////////////////////////////////////////////////////////////
-
-#endif // !defined(ADAPTORS_AWS_JOB_STREAM_HPP)
-



More information about the saga-devel mailing list