Review Board 1.7.22


Keep better track of threads avoiding deadlocks at process rundown

Review Request #904 - Created June 15, 2011 and discarded

Steve Huston
QPID-3256
Reviewers
qpid
qpid
Keeps track of Qpid runnable threads and other threads, ensuring that rundown doesn't deadlock.
Qpid regression test suite.

Diff revision 1 (Latest)

  1. /trunk/qpid/cpp/src/qpid/sys/windows/Thread.cpp: Loading...
/trunk/qpid/cpp/src/qpid/sys/windows/Thread.cpp
Revision 1132733 New Change
[20] 19 lines
[+20]
20
 */
20
 */
21

    
   
21

   
22
#include "qpid/sys/Thread.h"
22
#include "qpid/sys/Thread.h"
23
#include "qpid/sys/Runnable.h"
23
#include "qpid/sys/Runnable.h"
24
#include "qpid/sys/windows/check.h"
24
#include "qpid/sys/windows/check.h"

    
   
25
#include "qpid/sys/Mutex.h"
25

    
   
26

   
26
#include <process.h>
27
#include <process.h>
27
#include <windows.h>
28
#include <windows.h>
28

    
   
29

   
29
namespace {
30
/*
30
unsigned __stdcall runRunnable(void* p)
31
 * This implementation distinguishes between two types of thread: Qpid
31
{
32
 * threads (based on qpid::sys::Runnable) and the rest.  It provides a
32
    static_cast<qpid::sys::Runnable*>(p)->run();
33
 * join() that will not deadlock against the Windows loader lock for
33
    _endthreadex(0);
34
 * Qpid threads.
34
    return 0;
35
 *
35
}
36
 * System thread identifiers are unique per Windows thread; thread
36
}
37
 * handles are not.  Thread identifiers can be recycled, but not

    
   
38
 * during the time a handle remains open against the thread.

    
   
39
 *

    
   
40
 * There is a 1-1 relationship between Qpid threads and their

    
   
41
 * ThreadPrivate structure.  Non-Qpid threads do not need to find the

    
   
42
 * qpidThreadDone handle, so there may be a 1-many relationship for

    
   
43
 * them.

    
   
44
 *

    
   
45
 * The map structure of Qpid threads serves two purposes: to locate

    
   
46
 * the associated ThreadPrivate structure, and to hold a keepalive

    
   
47
 * copy of the shared_ptr to guarantee the ThreadPrivate destructor is

    
   
48
 * not called before the thread completes the Runnable.

    
   
49
 */
37

    
   
50

   
38
namespace qpid {
51
namespace qpid {
39
namespace sys {
52
namespace sys {
40

    
   
53

   
41
class ThreadPrivate {
54
class ThreadPrivate {

    
   
55
public:
42
    friend class Thread;
56
    friend class Thread;

    
   
57
    friend unsigned __stdcall runThreadPrivate(void*);

    
   
58
    typedef boost::shared_ptr<ThreadPrivate> shared_ptr;

    
   
59
    ~ThreadPrivate();
43

    
   
60

   
44
    HANDLE threadHandle;
61
private:
45
    unsigned threadId;
62
    unsigned threadId;

    
   
63
    HANDLE threadHandle;

    
   
64
    HANDLE initCompleted;

    
   
65
    HANDLE qpidThreadDone;

    
   
66
    Runnable* runnable;
46
    
67
    
47
    ThreadPrivate(Runnable* runnable) {
68
    ThreadPrivate() : runnable(NULL), initCompleted(NULL), 

    
   
69
		      qpidThreadDone(NULL), threadId(GetCurrentThreadId()) {

    
   
70
	threadHandle =  OpenThread (SYNCHRONIZE, FALSE, threadId);

    
   
71
	QPID_WINDOWS_CHECK_CRT_NZ(threadHandle);

    
   
72
    }

    
   
73

   

    
   
74
    ThreadPrivate(Runnable* r) : runnable(r), threadHandle(NULL), initCompleted(NULL), 

    
   
75
				 qpidThreadDone(NULL){}

    
   
76

   

    
   
77
    void start(shared_ptr& tpsp);

    
   
78
    static shared_ptr createThread(Runnable* r);  

    
   
79
};

    
   
80

   

    
   
81

   

    
   
82
namespace {

    
   
83

   

    
   
84
qpid::sys::Mutex mapLock;

    
   
85
std::map<unsigned, ThreadPrivate::shared_ptr>* pQpidThreads;

    
   
86

   

    
   
87
} // namespace

    
   
88

   

    
   
89
unsigned __stdcall runThreadPrivate(void* p)

    
   
90
{

    
   
91
    ThreadPrivate* tpp = static_cast<qpid::sys::ThreadPrivate*>(p);

    
   
92
    try {

    
   
93
	tpp->runnable->run();

    
   
94
    } catch (...) {

    
   
95
	// not our concern

    
   
96
    }

    
   
97

   

    
   
98
    SetEvent (tpp->qpidThreadDone); // Runnable is done, allow join()

    
   
99
    WaitForSingleObject (tpp->initCompleted, INFINITE); // In case we completed faster than expected

    
   
100

   

    
   
101
    {

    
   
102
	ScopedLock<Mutex> l(mapLock);

    
   
103
	if (pQpidThreads != NULL) {

    
   
104
	    pQpidThreads->erase(tpp->threadId);

    
   
105
	    // note: *tpp's dtor may have just been called

    
   
106
	    tpp = NULL;

    
   
107
	    if (pQpidThreads->empty()) {

    
   
108
		delete(pQpidThreads);

    
   
109
		pQpidThreads = NULL;

    
   
110
	    }

    
   
111
	}

    
   
112
    }

    
   
113
    return 0;

    
   
114
}

    
   
115

   

    
   
116

   

    
   
117
ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) {

    
   
118
    ThreadPrivate::shared_ptr tpp(new ThreadPrivate(runnable));

    
   
119
    tpp->start(tpp);

    
   
120
    return tpp;

    
   
121
}

    
   
122

   

    
   
123
void ThreadPrivate::start(ThreadPrivate::shared_ptr& tpsp) {

    
   
124
    initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL);

    
   
125
    QPID_WINDOWS_CHECK_CRT_NZ(initCompleted);

    
   
126
    qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL);

    
   
127
    QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone);

    
   
128

   
48
        uintptr_t h =  _beginthreadex(0,
129
    uintptr_t h =  _beginthreadex(0,
49
                                      0,
130
				  0,
50
                                      runRunnable,
131
				  runThreadPrivate,
51
                                      runnable,
132
				  (void *)this,
52
                                      0,
133
				  0,
53
                                      &threadId);
134
				  &threadId);
54
        QPID_WINDOWS_CHECK_CRT_NZ(h);
135
    QPID_WINDOWS_CHECK_CRT_NZ(h);

    
   
136

   

    
   
137
    {

    
   
138
	ScopedLock<Mutex> l(mapLock);

    
   
139
	if (pQpidThreads == NULL) {

    
   
140
	    pQpidThreads = new std::map<unsigned, ThreadPrivate::shared_ptr>;

    
   
141
	}

    
   
142
	(*pQpidThreads)[threadId] = tpsp;

    
   
143
    }
55
        threadHandle = reinterpret_cast<HANDLE>(h);
144
    threadHandle = reinterpret_cast<HANDLE>(h);

    
   
145
    SetEvent (initCompleted);

    
   
146
}	

    
   
147

   

    
   
148
ThreadPrivate::~ThreadPrivate() {

    
   
149
    if (threadHandle)

    
   
150
	CloseHandle (threadHandle);

    
   
151
    if (initCompleted)

    
   
152
	CloseHandle (initCompleted);

    
   
153
    if (qpidThreadDone)

    
   
154
	CloseHandle (qpidThreadDone);
56
    }
155
}
57
    

   
58
    ThreadPrivate()

   
59
      : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {}

   
60
};

   
61

    
   
156

   

    
   
157

   

    
   
158

   
62
Thread::Thread() {}
159
Thread::Thread() {}
63

    
   
160

   
64
Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {}
161
Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {}
65

    
   
162

   
66
Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {}
163
Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {}
67

    
   
164

   
68
Thread::operator bool() {
165
Thread::operator bool() {
69
    return impl;
166
    return impl;
70
}
167
}
71

    
   
168

   
72
bool Thread::operator==(const Thread& t) const {
169
bool Thread::operator==(const Thread& t) const {

    
   
170
    if (!impl || !t.impl)

    
   
171
	return false;
73
    return impl->threadId == t.impl->threadId;
172
    return impl->threadId == t.impl->threadId;
74
}
173
}
75

    
   
174

   
76
bool Thread::operator!=(const Thread& t) const {
175
bool Thread::operator!=(const Thread& t) const {
77
    return !(*this==t);
176
    return !(*this==t);
78
}
177
}
79

    
   
178

   
80
void Thread::join() {
179
void Thread::join() {
81
    if (impl) {
180
    if (impl) {
82
        DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE);
181
	DWORD status;

    
   
182
	if (impl->runnable) {

    
   
183
	    HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle};

    
   
184
	    // wait for either.  threadHandle not signalled if loader

    
   
185
	    // lock held (FreeLibrary).  qpidThreadDone not signalled

    
   
186
	    // if thread terminated by exit().

    
   
187
	    status = WaitForMultipleObjects (2, handles, false, INFINITE);

    
   
188
	}

    
   
189
	else

    
   
190
	    status = WaitForSingleObject (impl->threadHandle, INFINITE);
83
        QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
191
	QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
84
        CloseHandle (impl->threadHandle);

   
85
        impl->threadHandle = 0;

   
86
    }
192
    }
87
}
193
}
88

    
   
194

   
89
unsigned long Thread::logId() {
195
unsigned long Thread::logId() {
90
    return GetCurrentThreadId();
196
    return GetCurrentThreadId();
91
}
197
}
92

    
   
198

   
93
/* static */
199
/* static */
94
Thread Thread::current() {
200
Thread Thread::current() {

    
   
201
    unsigned threadId = GetCurrentThreadId();

    
   
202
    ThreadPrivate::shared_ptr tpsp;

    
   
203
    {

    
   
204
	ScopedLock<Mutex> l(mapLock);

    
   
205
	if (pQpidThreads != NULL) {

    
   
206
	    std::map<unsigned, ThreadPrivate::shared_ptr>::iterator i = pQpidThreads->find(threadId);

    
   
207
	    if (i != pQpidThreads->end()) {

    
   
208
		tpsp = i->second;

    
   
209
	    }

    
   
210
	}

    
   
211
    }

    
   
212

   
95
    Thread t;
213
    Thread t;

    
   
214
    if (tpsp)

    
   
215
	t.impl = tpsp;

    
   
216
    else
96
    t.impl.reset(new ThreadPrivate());
217
	t.impl.reset(new ThreadPrivate());
97
    return t;
218
    return t;
98
}
219
}
99

    
   
220

   
100
}}  /* qpid::sys */
221
}}  /* qpid::sys */
  1. /trunk/qpid/cpp/src/qpid/sys/windows/Thread.cpp: Loading...