00001 #ifndef PROTON_IO_CONTAINER_IMPL_BASE_HPP
00002 #define PROTON_IO_CONTAINER_IMPL_BASE_HPP
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "../container.hpp"
00026
00027 #include <mutex>
00028 #include <sstream>
00029
00030 namespace proton {
00031 namespace io {
00032
00041 class container_impl_base : public standard_container {
00042 public:
00043
00044 using standard_container::open_receiver;
00045 using standard_container::open_sender;
00046
00048 void client_connection_options(const connection_options & opts) {
00049 store(client_copts_, opts);
00050 }
00051
00053 connection_options client_connection_options() const {
00054 return load(client_copts_);
00055 }
00056
00058 void server_connection_options(const connection_options & opts) {
00059 store(server_copts_, opts);
00060 }
00061
00063 connection_options server_connection_options() const {
00064 return load(server_copts_);
00065 }
00066
00068 void sender_options(const class sender_options & opts) {
00069 store(sender_opts_, opts);
00070 }
00071
00073 class sender_options sender_options() const {
00074 return load(sender_opts_);
00075 }
00076
00078 void receiver_options(const class receiver_options & opts) {
00079 store(receiver_opts_, opts);
00080 }
00081
00083 class receiver_options receiver_options() const {
00084 return load(receiver_opts_);
00085 }
00086
00088 returned<sender> open_sender(
00089 const std::string &url, const class sender_options &opts, const connection_options &copts)
00090 {
00091 return open_link<sender, class sender_options>(url, opts, copts, &connection::open_sender);
00092 }
00093
00095 returned<receiver> open_receiver(
00096 const std::string &url, const class receiver_options &opts, const connection_options &copts)
00097 {
00098 return open_link<receiver>(url, opts, copts, &connection::open_receiver);
00099 }
00100
00101 private:
00102 template<class T, class Opts>
00103 returned<T> open_link(
00104 const std::string &url_str, const Opts& opts, const connection_options& copts,
00105 T (connection::*open_fn)(const std::string&, const Opts&))
00106 {
00107 std::string addr = url(url_str).path();
00108 std::shared_ptr<thread_safe<connection> > ts_connection = connect(url_str, copts);
00109 std::promise<returned<T> > result_promise;
00110 auto do_open = [ts_connection, addr, opts, open_fn, &result_promise]() {
00111 try {
00112 connection c = ts_connection->unsafe();
00113 returned<T> s = make_thread_safe((c.*open_fn)(addr, opts));
00114 result_promise.set_value(s);
00115 } catch (...) {
00116 result_promise.set_exception(std::current_exception());
00117 }
00118 };
00119 ts_connection->event_loop()->inject(do_open);
00120 std::future<returned<T> > result_future = result_promise.get_future();
00121 if (!result_future.valid())
00122 throw error(url_str+": connection closed");
00123 return result_future.get();
00124 }
00125
00126 mutable std::mutex lock_;
00127 template <class T> T load(const T& v) const {
00128 std::lock_guard<std::mutex> g(lock_);
00129 return v;
00130 }
00131 template <class T> void store(T& v, const T& x) const {
00132 std::lock_guard<std::mutex> g(lock_);
00133 v = x;
00134 }
00135 connection_options client_copts_, server_copts_;
00136 class receiver_options receiver_opts_;
00137 class sender_options sender_opts_;
00138 };
00139
00140 }
00141 }
00142
00143 #endif // PROTON_IO_CONTAINER_IMPL_BASE_HPP