NetP.h

Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 #ifndef NETP_H
00006 #define NETP_H
00007 
00008 #include <string>
00009 #include <cstring>
00010 #include <cmath>
00011 #include <sstream>
00012 #include <map>
00013 #include <set>
00014 #include <pvm3.h>
00015 #include <vector>
00016 #include <cstdlib>
00017 #include <unistd.h>
00018 #include <deque>
00019 #include <sys/timeb.h>
00020 #include <sys/time.h>
00021 #include <iostream>
00022 #include <pthread.h>
00023 
00024 #include <fcntl.h>
00025 #include <arpa/inet.h>
00026 #include <netdb.h>
00027 #include <sys/socket.h>
00028 #include <sys/types.h>
00029 #include <errno.h>
00030 #include <netinet/in.h>
00031 
00032 #include "PData.h"
00033 
00034 using namespace std;
00035 
00038 class NetP {
00039 
00040 public:
00041 
00043 
00044   
00047   NetP(string packettype);
00048 
00050   NetP();
00051 
00052   ~NetP();
00053 
00059   void cleanUp();
00060 
00064   int joinChannel(string channel);
00065 
00069   int leaveChannel(string channel);
00070 
00072 
00074 
00075 
00078   int send();
00079 
00083   string bReceive();
00084 
00088   string nReceive();
00089 
00095   string tReceive(double timeout_sec);
00096 
00099   string bReceiveNewest();
00100 
00104   string nReceiveNewest();
00105 
00110   string tReceiveNewest(double timeout_sec);
00111 
00115   int peek();
00116 
00118 
00120   
00125   PData& data();
00126   
00129   string getType();
00130 
00135   double getTimeStamp();
00136 
00139   static double currentTime();
00140 
00143   int getSize();
00144 
00147   int getTid();
00148 
00151   string getHostname();
00152 
00155   int getProcessID();
00156 
00159   string toString();
00160 
00162   friend ostream& operator<<(ostream& o, NetP& netp) {
00163     o << netp.toString();
00164     return o;
00165   }
00167 
00169 
00170   
00175   void setBufferSize(long bytes);
00176 
00185   static void setMaxPeriod(double seconds);
00186 
00193   static void setMinPeriod(double seconds);
00194 
00199   static void setUnthreaded();
00200 
00206   static void LockPvmMutex();
00207   
00209   static void UnlockPvmMutex();
00210 
00214   static void Exit();
00215 
00217 
00219 
00220 
00222   static void DirectOffer(string channel);
00223 
00225   static int DirectConnect(string channel, unsigned short int port = 51552);
00226 
00228   static void DirectClose(string channel);
00229 
00231   static void DirectClose(int socket);
00232 
00234   static void DirectCloseAll();
00235 
00238   static int DirectWrite(int socket, const char* buf, int len=-1);
00239 
00242   static int DirectWrite(int socket, string buf);
00243 
00246   static int DirectWrite(string channel, string buf);
00247 
00250   static int DirectRead(int socket, char* buf, int len);
00251 
00258   static int DirectConnectToServer(string hostname, string direct_id, 
00259                                    unsigned short int port=51552); // for direct connect
00260 
00263   static string DirectRead(int socket);
00264 
00267   static string DirectRead(string channel);
00268 
00270   static vector<string> ListOpenConnects();
00271 
00273   static vector<string> ListPendingConnects();
00274 
00279   static int GetSocketNum(string channel);
00280   
00282 private:
00283   // static class functions
00284   static void Connect(); // establish PVM connection
00285   static void Spawn(); // create global processing thread
00286   static void* NetLoop(void* arg); // global PVM processing loop
00287   static void ReceiveMapAdd(string type, NetP* ptr); // add to receive list
00288   static void ReceiveMapDelete(string type); // remove from receive list
00289   static NetP* ReceiveMapLookup(string type); // mutex'd lookup in receive map
00290   static int ReceiveOnce(); // receive one pvm packet
00291   static int UnthreadedReceiveOnce(); // for unthreaded mode
00292 
00293   // instance functions
00294   void pvmInit(); // connect to pvm if necessary
00295   void threadInit(); // make thread if necessary
00296   void initPData(); // initialize a new pdata set, create pdata_mutex
00297   void clearPData(); // clear the dataRoot
00298   void destroyPData(); // destroy the dataRoot when we're about to destroy the NetPacket
00299   void receiveBufferToPData(); // mutex helper to move receive buffer to data
00300   int hasQueuedReceive(); // whether anything's waiting in receive queue
00301   void insertReceived(PData* pmeta, PData* pdata); // insert one element into receive queue
00302   void eatRecvBuffer(int remaining = 0); // eat the receive buffer until only this many remain
00303 
00304   PData& getMetaField(string index); // access a subnode of the Root structure
00305   void updateTimeStamp(); // update the timestamp on the send packet
00306 
00307   // static variables
00308   
00309   // metadata setup
00310   static const char* MetaDataSetup; // string that initializes each metadata pdata
00311   static int ProcessID; // the local PID of the running process
00312   static string HostName; // keep track of the host running the process
00313 
00314   // thread-related variables
00315   static int IsThreadCalled; // have we called the function to start the thread?
00316   static int IsThreadStarted; // was the thread started successfully?
00317   static int IsThreadRunning; // is the thread still running?
00318   static int RUN_THREAD; // set this to 0 if we want the thread to stop and exit
00319   static int NO_THREAD_MODE; // disable receive thread; must set before first constructor call
00320   static pthread_t ThreadID; // thread id
00321 
00322   // pvm-related variables
00323   static int MyTid; // pvm task id
00324   static pthread_mutex_t PvmOpMutex; // all pvm functions must be locked
00325 
00326   // timing variables
00327   static double MinReceivePeriod; // receive thread won't receive faster than this
00328   static double MaxReceivePeriod; // receive thread won't receive slower than this
00329   static double ReceivePeriod; // the current period being followed by NetPacket
00330   static long RECEIVE_TIMEOUT_US; // the timeout on the pvm call to trecv
00331   static timeval TimeSyncDiff; // difference between timesync clock and local clock
00332   static int TimeSyncDTID; // the host id of the timesync host
00333 
00334   // receive variables
00335   static map<string, NetP*> ReceiveMap; // the map from channels to receive np's
00336   static pthread_mutex_t ReceiveMapMutex; // lock the receive map so we don't corrupt it
00337   static char* ReceiveBuffer; // the buffer we use to receive stuff
00338   static int ReceiveBufferSize; // the current size of that receive buffer
00339   static pthread_mutex_t ReceiveBlockMutex; // condition variable mutex for blocked receives
00340 
00341   // direct connect variables
00342   static map<string, int> DirectSockets; // channel to socket map for direct connects
00343   static set<string> DirectChannels; // channels we're listening on with no connection yet
00344 
00345   // instance variables
00346 
00347   int cleaned_up; // mark that we've already cleaned up.
00348   
00349   int is_send_type; // 1 means send type, 0 means receive type
00350   string send_group; // the group we send to
00351   char* send_buffer; // our send buffer
00352 
00353   set<string> groups; // the groups this receive netp belongs to
00354   long recv_buffer_limit; // -1 means no limit
00355   long recv_buffer_size; // how much stuff in receive buffer
00356   pthread_mutex_t recv_buffer_mutex; // lock the receive buffer
00357   pthread_cond_t recv_block_condition; // for immediate blocking response
00358   deque<PData*> recv_buffer; // alternately holds a meta and then a data
00359 
00360   PData* _meta; // meta data
00361   PData* _data; // main data
00362   pthread_mutex_t pdata_mutex; // lock the data so they can't be left inconsistent
00363 };
00364 
00365 
00366 #endif

Generated on Wed Mar 9 12:04:24 2011 for netp2 by  doxygen 1.6.1