// 1. Implement typed servant
# include "TMNEvents_s.hh"
class TMNTypedEventImpl : public POA_TMN::TypedEvent,
public PortableServer::RefCountServantBase
{
public:
void attributeValueChange(...) { ... };
...
};
int main(int argc, char** argv)
{
...
// 2. Get orb and poa environment
CORBA::ORB_ptr orb = CORBA::ORB_init(argc, argv);
CORBA::Object_var obj =
orb->resolve_initial_references("RootPOA");
PortableServer::POA_var poa = PortableServer::POA::_narrow(obj);
// 3. Construct the typed servant
TMNTypedEventImpl* servant = new TMNTypedEventImpl();
// 4. Activate it on poa
poa->activate_object(servant);
// 5. Somehow, this consumer is given a channel reference
CORBA::Object_var channel = ... ;
// 6. Get object id of the consumer servant
PortableServer::ObjectId_var oid =
poa-servant_to_id(servant);
// 7. Narrow the POA to PSA
PortableServerExt::PSA_var psa = PortableServerExt::PSA::_narrow(poa);
// 8. Subscribe to the channel
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL:example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PUSH_EVENT };
psa->subscribe(scheme, channel, oid, CORBA::NameValuePairSeq());
// 9. Consumer working loop
poa->the_POAManager()->activate();
orb->run();
}
import com.inprise.vbroker.PortableServerExt.*;
// 1. Implement typed servant
public class TMNTypedEventImpl : extend TMN.TypedEventPOA,
{
public void attributeValueChange(...) { ... }
};
public class TypedPushConsumerImpl
{
public static void main(String[] args)
{
...
// 2. Get orb and psa environment
org.omg.CORBA.ORB orb = ORB_init(args, null);
org.omg.PortableServer.POA poa =
org.omg.PortableServer.POA.orb.resolve_initial_references(
"RootPOA");
// 3. Construct the typed servant
TMNTypedEventImpl servant = new TMNTypedEventImpl();
// 4. Activate it on root psa
poa.activate_object(servant);
// 5. Somehow, this consumer is given a channel reference
org.omg.CORBA.Object channel = ...;
// 6. Get object id of the consumer servant
org.omg.PortableServer.ObjectId oid = psa.servant_to_id(servant);
// 7. Narrow the org.omg.PortableServer.POA to com.inprise.vbroker.PSA
PSA psa = PSA.narrow(poa);
// 8. Subscribe to the channel
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL:example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PUSH_EVENT);
psa.subscribe(scheme, channel, oid, null);
// 9. working loop
poa.the_POAManager().activate();
orb.run();
}
}
// Getting root PSA in C++
CORBA::Object_var ref =
orb->resolve_initial_references("RootPSA");
PortableServerExt::PSA_var psa = PortableServerExt::_narrow(ref);
// Getting root PSA in Java
// get publisher/subscriber adapter
org.omg.CORBA.Object ref = orb.resolve_initial_references("RootPOA");
PSA psa = PSAHelper.narrow(ref);
module PortableServerExt {
interface POA : PortableServer::POA {
readonly attribute CORBA::PolicyList the_policies;
};
enum SubjectAddressScheme {
SUBSCRIBE_ADMIN_ADDR,
PUBLISH_ADMIN_ADDR,
CHANNEL_ADDR,
SUBJECT_ADDR
};
enum SubjectInterfaceScheme {
TYPED_SUBJECT,
UNTYPED_SUBJECT,
STRUCTURED_SUBJECT,
SEQUENCE_SUBJECT
};
enum SubjectDeliveryScheme {
PUSH_EVENT,
PULL_EVENT
};
typedef string SubjectInterfaceId;
struct SubjectScheme {
SubjectAddressScheme address_scheme;
SubjectInterfaceScheme interface_scheme;
SubjectInterfaceId interface_id;
SubjectDeliveryScheme delivery_scheme;
};
typedef Object Subject;
typedef CORBA::OctetSequence PublishSubscribeDesc;
typedef PublishSubscribeDesc SubscribeDesc;
typedef PublishSubscribeDesc PublishDesc;
exception InvalidSubjectScheme { long error; };
exception InvalidSubscribeDesc { long error; };
exception InvalidPublishDesc { long error; };
exception InvalidProperties { CORBA::StringSequence names; };
exception ChannelException { string repository_id; }
// The Publisher/Subscriber Adapter
interface PSA : POA {
// register subject observer
SubscribeDesc
subscribe(
in SubjectScheme the_subject_scheme,
in Subject the_subject,
in PortableServer::ObjectId the_observer_id,
in CORBA::NameValuePairSeq the_properties )
raises( InvalidSubjectScheme,
InvalidProperties,
ChannelException );
// Register subject provider
PublishDesc
publish(
in SubjectScheme the_subject_scheme,
in Subject the_subject,
in PortableServer::ObjectId the_pullable_publisher_id,
in CORBA::NameValuePairSeq the_properties )
raises( InvalidSubjectScheme,
InvalidProperties,
ChannelException );
// Unregister observer from subject
void
unsubscribe(
in SubscribeDesc the_subscribe_desc )
raises( InvalidSubscribeDesc,
ChannelException );
// Unregister (pull mode) provider
void
unpublish(
in PublishDesc the_publish_desc)
raises( InvalidPublishDesc,
ChannelException );
// Suspend subject to push into the registered
// observer or suspend subject to pull from the
// registered provider
void
suspend(
in PublishSubscribeDesc the_desc)
raises( ChannelException );
// Resume subject to push into the registered
// observer or resume subject to pull from the
// registered provider.
Void
resume(
in PublishSubscribeDesc the_desc)
raises( ChannelException );
// Pull (typed) event and dispatch it to a registered
// observer.
unsigned long
pull_and_dispatch(
in SubscribeDesc the_subscribe_desc,
in unsigned long max_count,
in boolean block_pulling,
in Boolean async_dispatch)
raises( InvalidSubscribeDesc,
InvalidSubjectScheme,
ChannelException );
// Pull (typed) event and accept a given visitor to
// 'visit' the event.
Unsigned long
pull_and_visit(
in SubscribeDesc the_subscribe_desc,
in unsigned long max_count,
in Boolean block_pulling,
in PortableServer::Servant the_visitor)
raises( InvalidSubscribeDesc,
InvalidSubjectScheme,
ChannelException );
Subject
the_subject_addr(
in PublishSubscribeDesc the_desc)
raises( InvalidSubjectScheme );
// low level access
Object
the_proxy_addr(
in PublishSubscribeDesc the_desc)
raises( InvalidSubjectScheme );
};
...
};
// Narrowing a POA into a PSA in Java
org.omg.PortableServer.POA poa = parent_poa.create_POA(...);
com.inprise.vbroker.PortableServerExt.PSA psa = com.inprise.vbroker.PortableServerExt.PSAHelper.narrow
(poa);
|
|
|
|
|
|
|
|
// Connect to channel // 1. Get default admin
ConsumerAdmin_var admin = channel->default_consumer_admin();
|
|
// 2. Create a proxy
ProxyID proxy_id; ProxySupplier_var proxy = admin-> obtain_notification_push_supplier( STRUCTURED_EVENT, proxy_id); // Narrow to the stub StructuredProxyPushSupplier_var supplier = StructuredProxyPushSupplier:: _narrow(proxy); // 3. Connect proxy supplier
supplier-> connect_structured_push_consumer( consumer); // working loop orb->run(); }
|
// 1. Subscribe
psa->subscribe(scheme, channel, oid, CORBA::NameValuePairSeq()); // working loop orb->run(); }
|
This is the Java equivalent.
|
|
|
|
|
|
|
|
// Connect to channel // 1. Get default admin
ConsumerAdmin admin = channel.default_consumer_admin();
|
// Subscribe to channel // Specify the subject scheme SubjectScheme scheme = new SubjectScheme( SubjectAddressScheme.CHANNEL_ADDR, SubjectInterfaceScheme. STRUCTURED_SUBJECT, (const char*)"", SubjectDeliveryScheme.PUSH_EVENT);
|
|
// 1. Subscribe
psa.subscribe( scheme, channel, oid, null); // working loop orb.run(); }
|
|
|
|
|
|
|
|
|
// Connect to channel // 1. Construct the proxy consumer
TypedPushConsumerImpl* servant = new TypedPushConsumerImpl(typed_ref);
|
// Subscribe to channel
// Specify the subject scheme PortableServerExt::SubjectScheme scheme = { PortableServerExt::CHANNEL_ADDR, PortableServerExt::TYPED_SUBJECT, (const char*)"IDL:example.borland.com" "TMN/TypedEvent:1.0", PortableServerExt::PUSH_EVENT };
// 1. Subscribe
psa->subscribe(scheme, channel, oid, CORBA::NameValuePairSeq()); // working loop orb->run(); }
|
// 2. Activate it on root poa
poa->activate_object(servant);
|
|
// 3. Get consumer object reference/ obj = poa ->servant_to_reference(servant); CosTypedNotifyComm::TypedPushConsumer_ var consumer = CosTypedNotifyComm:: TypedPushConsumer::_narrow(obj); // 4. Get default admin
TypedConsumerAdmin_var admin = channel->default_consumer_admin(); // 5. Create a proxy
CosNotifyChannelAdmin::ProxyID proxy_id;
|
|
|
|
|
|
|
|
|
|
|
|
// 1. Allocate the proxy consumer
TypedPushConsumerImpl servant = New TypedPushConsumerImpl(typed_ref); // 2. Activate it on root poa
poa.activate_object(servant); // 3. Get consumer object reference
obj = poa-> servant_to_reference(servant); TypedPushConsumer Consumer = TypedPushConsumer.narrow(obj); // 4. Get default admin
TypedConsumerAdmin admin = Channel.default_consumer_admin(); // 5. Create a proxy
CosNotifyChannelAdmin::ProxyID proxy_id; TypedProxySupplier proxy = admin. Obtain_notification_push_supplier( "IDL:example.borland.com/" "TMN/TypedEvent:1.0", proxy_id); // 6. Connect proxy supplier
proxy.connect_typed_push_consumer( consumer); // working loop orb.run(); }
|
|
|
|
|
int main(int argc, char** argv) { // Get orb and psa environment CORBA::ORB_ptr orb = CORBA::ORB_init(argc, argv); CORBA::Object_var obj = orb-> Resolve_initial_references( "RootPSA"); PSA_var psa = PSA::_narrow(obj); // Get channel reference CORBA::Object_var channel = ... ; // Publish to channel // 1. Publish
PortableServerExt::SubjectScheme scheme = { PortableServerExt::CHANNEL_ADDR, PortableServerExt:: STRUCTURED_SUBJECT,
(const char*)"", PortableServerExt::PUSH_EVENT }; PortableServerExt::PublishDesc_var desc = psa->publish(scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq()); // 2. Get the StructuredProxyConsumer
CORBA::Object_var obj = psa->the_subject_addr(desc); StructuredProxyPushConsumer_var consumer = StructuredProxyPushConsumer:: _narrow(proxy); // Push typed events interface
for(;;) { consumer-> push_structured_event(...); ... } ... }
|
|
|
|
int main(int argc, char** argv) { // Get orb and psa environment CORBA::ORB_ptr orb = CORBA::ORB_init(argc, argv); CORBA::Object_var obj = orb-> Resolve_initial_references( "RootPSA"); PSA_var psa = PSA::_narrow(obj); // Get channel reference CORBA::Object_var channel = ... ; // Publish to channel // 1. Publish
PortableServerExt::SubjectScheme scheme = { PortableServerExt::CHANNEL_ADDR, PortableServerExt:: TYPED_SUBJECT, (const char*)"IDL:example.borland.com" "TMN/TypedEvent:1.0", PortableServerExt::PUSH_EVENT }; PortableServerExt::PublishDesc_var desc = psa->publish(scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq()); // 2. Get the <I> interface
CORBA::Object_var obj = psa->the_subject_addr(desc); TMN::TypedEvent_var consumer = TMN::TypedEvent::_narrow(obj); // Push typed events interface
for(;;) { consumer-> attributeValueChange(...); ... } ... }
|
SubscribeDesc subscribe(
in SubjectScheme the_subject_scheme,
in Subject the_subject,
in PortableServer::ObjectId the_observer_id,
in CORBA::NameValuePairSeq the_properties )
raises( InvalidSubjectScheme,
InvalidProperties,
ChannelException );
struct SubjectScheme {
SubjectAddressScheme address_scheme;
SubjectInterfaceScheme interface_scheme;
SubjectInterfaceId interface_id;
SubjectDeliveryScheme delivery_scheme;
};
The address_scheme field specifies the subject reference. For example, an address can be specified, which can be used directly for push event or an address to only do subscribe. Currently, there are three values on this field for subscribing;
SUBSCRIBE_ADMIN_ADDR,
CHANNEL_ADDR, and
SUBJECT_ADDR, which indicates that the subject reference to the subscribe() operation is a OMG Notification Consumer Admin, a OMG Notification Channel (or typed channel) or an event direct pushing address, respectively.
•
|
SUBSCRIBE_ADMIN_ADDR - The subject reference to subscribe() is an OMG Notification Consumer Admin reference, PSA simply calls obtain_<...>_supplier() on the admin to allocate a proxy on the admin and then calls connect_<...>_consumer() on the proxy. The consumer reference connected to the proxy is either null (for pull mode consumer) or a push consumer object reference created from this PSA with the_observer_id parameter. For typed channels, the get_typed_consumer() and get_typed_supplier() are automatically handled by PSA.
|
•
|
CHANNEL_ADDR - The subject reference to subscribe() is an OMG Notification Channel (or typed channel). PSA simply calls _get_default_consumer_admin() on the channel to get the default admin and then handles it as a connection through this consumer admin reference.
|
•
|
SUBJECT_ADDR - The subject reference to subscribe() is a direct event pushing address. For example, it could be a multicast IOR, or a typed <I> interface. For any other channel than typed, it is a proxy push consumer. PSA calls _get_MyAdmin()/_get_MyChannel()/_get_default_consumer_admin() and then handles it as a connection through consumer admin. For typed channels, this is already a push <I> interface. PSA looks into the reference for a consumer admin component (not currently supported) and handles it as a connection through consumer admin.
|
Additionally, applications need to specify SubjectInterfaceScheme and
SubjectDeliveryScheme.
For SubjectInterfaceScheme the valid values are:
•
|
TYPED_SUBJECT - Subject uses either multicast or OMG Typed Notification Channel.
|
For SubjectDeliveryScheme the valid values are:
•
|
PUSH_EVENT - Subject uses either multicast or OMG Push Notification mode (any of the four OMG event types).
|
•
|
PULL_EVENT - Subject uses OMG Pull Notification mode (any of the four OMG event types).
|
The second and third parameters to subscribe() are the reference of the subject and the object id of a passive consumer object. The subject reference's interpretation is specified by the SubjectScheme as the first parameter to
subscribe() and has been described above. The passive consumer object id specifies which consumer object, a received event, can be dispatched to.
There are two kind of consumer objects; passive and
active. All push consumers are passive consumers and all pull consumers, except for typed consumer using
pull_and_dispatch(), are active.
Object the_subject_addr(in PublishSubscribeDesc the_desc);
PSA unsubscribe() disconnects the consumer from a connected channel and cleans up any local resource, if necessary (for multicast case, it removes the subject key to observer id mapping). If the consumer is connected to an untyped or a typed channel, the PSA invokes
disconnect_push/pull_supplier() to the proxy.
If the consumer is connected to a structured or sequence channel, the PSA invokes disconnect_structured_push/pull_supplier() or
disconnect_sequence_push/pull_supplier(), respectively.
void unsubscribe(in SubscribeDesc the_subscribe_desc)
Publish in the PSA model is defined as an operation, which attaches a supplier object or source to a notification/event channel that provides (either push or pull) event messages.
PublishDesc publish(
in SubjectScheme the_subject_scheme,
in Subject the_subject,
in PortableServer::ObjectId the_provider_id,
in CORBA::NameValuePairSeq the_properties )
raises( InvalidSubjectScheme,
InvalidProperties,
ChannelException );
struct SubjectScheme {
SubjectAddressScheme address_scheme;
SubjectInterfaceScheme interface_scheme;
SubjectInterfaceId interface_id;
SubjectDeliveryScheme delivery_scheme;
};
The address_scheme field specifies the subject reference, such as whether it is an address that can directly push events or an address that can only subscribe. Currently, VisiBroker supports three valid values for this field, which indicates that the subject reference to the
publish() operation is an OMG notification consumer admin, an OMG notification channel (or typed channel) or an event direct pushing address, respectively.
•
|
PUBLISH_ADMIN_ADDR - The subject reference to publish() is an OMG notification supplier admin reference. PSA simply calls obtain_<...>_consumer() on the admin reference to allocate a proxy on the admin and then calls connect_<...>_supplier() on the proxy. The supplier reference connected to the proxy is either null (for push supplier) or a pull supplier reference created from this PSA with provider_id parameter. For typed channels, get_typed_consumer() operation and get_typed_supplier() implementation are automatically handled by PSA.
|
•
|
CHANNEL_ADDR - The subject reference to publish() is an OMG notification channel (or typed channel). PSA simply calls _get_default_supplier_admin() on the channel to get the default supplier admin. It handles it as connect through this consumer admin reference.
|
•
|
SUBJECT_ADDR - The subject reference to subscribe() is a direct event pushing address. For example, it could be a multicast IOR or a typed <I> interface. This is a trivial case. PSA simply wraps a publisher descriptor and returns.
|
Your application will also need to specify SubjectInterfaceScheme and
SubjectDeliveryScheme.
The valid SubjectInterfaceScheme values are:
•
|
TYPED_SUBJECT - Subject uses either multicast or OMG typed notification channel.
|
The valid SubjectDeliveryScheme values are:
•
|
PUSH_EVENT - Subject uses either multicast or OMG push notification mode (any of the four OMG event types).
|
•
|
PULL_EVENT - Subject uses OMG pull notification mode (any of the four OMG event types).
|
The subject reference's interpretation is specified by the SubjectScheme as the first parameter to publish() operation. The second and third parameters to
publish() are the reference of the subject and the object id of a passive supplier (such as a supplier) object. The passive supplier object id specifies which supplier object should be used by PSA to pull events for publishing.
•
|
Calls get_typed_consumer() on the proxy reference to get the <I> interface.
|
Object the_subject_addr(in PublishSubscribeDesc the_desc);
After a successful publish() operation, a publish descriptor is returned. It contains information/mapping to implement other
publish() operations, such as
unpublish(),
suspend(), and
resume(). This descriptor can be saved into a persistent repository and reloaded into the same supplier process session or a restart of a new supplier session. However, the format of this descriptor is internal to the ORB that creates it. Therefore, like the object key, a subscribe descriptor should only be used by the same ORB.
After a successful publish() operation, applications with active (push) suppliers can get push addresses for the proxy push consumers or for typed channel with the <I> interface references from PSA's
the_subject_addr() using the publish descriptor. The publish descriptor is returned from the PSA
publish() method, as a parameter.
The PSA unpublish() disconnects the supplier from a connected channel and cleans up any local resource. If the supplier is connected to an untyped and typed channel, the PSA invokes
disconnect_push/pull_consumer() to the proxy. If it is connected to a structured or sequence channel, the PSA invokes
disconnect_structured_push/pull_consumer() or
disconnect_sequence_push/pull_consumer(), respectively.
Therefore, to subscribe a passive typed pull consumer, a valid object id is needed in PSA subscribe() operation. After the
subscribe(), application uses PSA's
pull_and_dispatch() method to pull typed event from channel and dispatches into the passive consumer. Passive typed pull consumer is designed for applications that want to use passive consumer along with the control of incoming events from consumer applications.
// (examples/vbroker/notify/psa_cpp/typedPullConsumer2.C)
// Implement a passive observer
# include "TMNEvents_s.hh"
class TMNTypedEventObserver : public POA_TMN::TypedEvent
{
...
public: void attributeValueChange(...) { ... }
...
void qosAlarm(...) { ... }
};
int main(int argc, char** argv)
{
// construct the observer implementation
TMNTypedEventObserver* observer = new TMNTypedEventObserver;
// activate it on PSA
psa->activate_object (observer);
PortableServer::ObjectId_var oid = psa->servant_to_id (observer);
// activate the PSA
PortableServer::POAManager_var poa_manager =
psa->the_POAManager ();
poa_manager->activate ();
// subscribe to the channel as typed pull consumer
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL::example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc =
psa->subscribe scheme, channel, oid.in(),
CORBA::NameValuePairSeq());
// pull and visit max 100 events using block mode.
psa->pull_and_dispatch(desc, 100, (CORBA::Boolean)1, (CORBA::Boolean)0);
...
}
•
|
Call pull_and_dispatch() on the subscribe PSA with the subscribe descriptor as parameters.
|
// (examples/vbe/notify/psa_cpp/typedPullConsumer1.C)
// Implement an active visitor
# include "TMNEvents_s.hh"
class TMNTypedEventVisitor : public POA_TMN::TypedEvent
{
...
public: void attributeValueChange(...) { ... }
...
void qosAlarm(...) { ... }
};
int main(int argc, char** argv)
{
...
// subscribe to the channel as typed pull consumer
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL::example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc = psa->subscribe(
scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq());
// create a visitor instance
TMNTypedEventVisitor
visitor;
// pull and visit max 100 events using block mode.
psa->pull_and_visit(desc, 100, (CORBA::Boolean)1, &visitor);
...
}
// (examples/vbe/notify/psa_java/TypedPullConsumer1.java)
import com.inprise.vbroker.PortableServerExt.*;
// Implement an active visitor
class TMNTypedEventVisitor extends TMN.TypedEventPOA {
{
public void attributeValueChange(...) { ... }
...
public void qosAlarm(...) { ... }
};
public class TypedPullConsumer1 {
public static void main(String[] args) {
...
// subscribe to the channel as typed pull consumer
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL::example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PULL_EVENT };
SubscribeDesc desc = psa.subscribe(scheme, channel, null, null);
// create a visitor instance
TMNTypedEventVisitor
visitor = new TMNTypedEventVisitor();
// pull and visit max 100 events using block mode.
psa.pull_and_visit(desc, 100, true, visitor);
}
}
•
|
Call pull_and_visit() on the subscribe PSA with the subscribe descriptor and a visitor instance as parameters.
|
// (examples/vbroker/notify/psa_cpp/typedPullSupplier.C)
// implement the TypedCallback::PullEvent handler,
// with piggybacked double callback, this handler is
// called back by local PSA instead of by remote proxy
// pull consumer. Therefore, the event receiver is also
// a local object.
# include <TypedCallback_s.hh>
# include "TMNEvents_c.hh"
class PullEventImpl : public POA_TypedCallback::PullEvent,
public virtual PortableServer::RefCountServantBase
{
public:
// on typed pulling
void pull_typed_event(
CORBA::Object_ptr event_receiver,
CORBA::Boolean block)
{
// narrow to typed stub
TMN::TypedEvent_ptr stub
= TMN::TypedEvent::_narrow(event_receiver);
// reflect the callback to issue an
// attributeValueChange event
stub->attributeValueChange(...);
}
};
...
// create a supplier handler servant to activate it on// the PSA
PullEventImpl* supplier = new PullEventImpl;
psa->activate_object (handler);
PortableServer::ObjectId_var oid = psa->servant_to_id(supplier);
// publish to the channel as typed pull supplier with the
// handler_id but the real <I> interface repository id.
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL::example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc = psa->publish(
scheme, channel, oid.in(), CORBA::NameValuePairSeq());
// activate the PSA and wait for pulling.
psa->the_POAManager()->activate();
orb->run();
// (examples/vbe/notify/psa_java/TypedPullSupplier.java)
import com.inprise.vbroker.PortableServerExt.*;
// Implement the TypedCallback::PullEvent handler,
// with piggybacked double callback, this handler is
// called back by local PSA instead of by remote proxy
// pull consumer. Therefore, the event receiver is also a
//local object.
class TypedPullSupplierImpl
extends com.borland.vbroker.TypedCallback.PullEventPOA {
...
public void pull_typed_event(
org.omg.CORBA.Object event_receiver,
Boolean block) {
// narrow to typed stub
TMN.TypedEvent stub = TMN.TypedEventHelper.narrow(event_receiver);
// reflect the callback to issue an attributeValueChange event
stub.attributeValueChange(...);
}
}
public class typedPullSupplier {
...
public static void main(String[] args) {
...
// create a supplier handler servant to activate it on the PSA
TypedPullSupplierImpl supplier = new TypedPullSupplierImpl ();
psa.activate_object (supplier);
byte [] oid = psa.servant_to_id (supplier);
// publish to the channel as typed pull supplier with the oid
// but the real <I> interface repository id.
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL::example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PULL_EVENT);
SubscribeDesc desc = psa.publish(scheme, channel, oid, null);
// activate the PSA and wait for pulling.
psa.the_POAManager().activate();
orb.run();
}
}
•
|
Write a TypedCallback::PullEvent supplier servant implementation from POA skeleton. The pull_typed_event() operation of this servant uses reflective callback to generate typed event using the original IDL interface stub.
|
Most PSA operations, with the exception of the the_subject_addr() and
the_proxy_addr(), can raise
PortableServerExt::ChannelException. This exception has a string member that is the repository of low level CORBA User exception. For example, when calling
suspend() twice while using a given push consumer subscribe describer as parameter, you will get a ChannelException with its
repository_id member as being
IDL:omg.org/CosNotifyChannelAdmin/ConnectionAlreadyInactive.
•
|
The PSA automatically handles get_typed_consumer()/get_typed_supplier() and the <I> interface to proxy mapping. This largely simplifies application code of using typed event/notification service. Typed notification applications only need to implement and install the <I> interfaces observers.
|