GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: src/pull/corePull.c Lines: 0 246 0.0 %
Date: 2017-05-26 Branches: 0 179 0.0 %

Line Branch Exec Source
1
/*
2
  Teem: Tools to process and visualize scientific data and images             .
3
  Copyright (C) 2013, 2012, 2011, 2010, 2009  University of Chicago
4
  Copyright (C) 2008, 2007, 2006, 2005  Gordon Kindlmann
5
  Copyright (C) 2004, 2003, 2002, 2001, 2000, 1999, 1998  University of Utah
6
7
  This library is free software; you can redistribute it and/or
8
  modify it under the terms of the GNU Lesser General Public License
9
  (LGPL) as published by the Free Software Foundation; either
10
  version 2.1 of the License, or (at your option) any later version.
11
  The terms of redistributing and/or modifying this software also
12
  include exceptions to the LGPL that facilitate static linking.
13
14
  This library is distributed in the hope that it will be useful,
15
  but WITHOUT ANY WARRANTY; without even the implied warranty of
16
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
  Lesser General Public License for more details.
18
19
  You should have received a copy of the GNU Lesser General Public License
20
  along with this library; if not, write to Free Software Foundation, Inc.,
21
  51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
22
*/
23
24
#include "pull.h"
25
#include "privatePull.h"
26
27
int
28
_pullVerbose = 0;
29
30
#define _DECREASE(ell, enn) (           \
31
  2*((ell) - (enn))                     \
32
  / ( (AIR_ABS(ell) + AIR_ABS(enn))     \
33
      ? (AIR_ABS(ell) + AIR_ABS(enn))   \
34
      : 1 )                             \
35
  )
36
/*
37
** this is the core of the worker threads: as long as there are bins
38
** left to process, get the next one, and process it
39
*/
40
int
41
_pullProcess(pullTask *task) {
42
  static const char me[]="_pullProcess";
43
  unsigned int binIdx;
44
45
  while (task->pctx->binNextIdx < task->pctx->binNum) {
46
    /* get the index of the next bin to process */
47
    if (task->pctx->threadNum > 1) {
48
      airThreadMutexLock(task->pctx->binMutex);
49
    }
50
    /* note that we entirely skip bins with no points */
51
    do {
52
      binIdx = task->pctx->binNextIdx;
53
      if (task->pctx->binNextIdx < task->pctx->binNum) {
54
        task->pctx->binNextIdx++;
55
      }
56
    } while (binIdx < task->pctx->binNum
57
             && 0 == task->pctx->bin[binIdx].pointNum);
58
    if (task->pctx->threadNum > 1) {
59
      airThreadMutexUnlock(task->pctx->binMutex);
60
    }
61
    if (binIdx == task->pctx->binNum) {
62
      /* no more bins to process! */
63
      break;
64
    }
65
    if (task->pctx->verbose > 1) {
66
      fprintf(stderr, "%s(%u): calling pullBinProcess(%u)\n",
67
              me, task->threadIdx, binIdx);
68
    }
69
    if (pullBinProcess(task, binIdx)) {
70
      biffAddf(PULL, "%s(%u): had trouble on bin %u", me,
71
               task->threadIdx, binIdx);
72
      return 1;
73
    }
74
  }
75
  return 0;
76
}
77
78
/* the main loop for each worker thread */
79
void *
80
_pullWorker(void *_task) {
81
  static const char me[]="_pushWorker";
82
  pullTask *task;
83
84
  task = (pullTask *)_task;
85
86
  while (1) {
87
    if (task->pctx->verbose > 1) {
88
      fprintf(stderr, "%s(%u): waiting on barrier A\n",
89
              me, task->threadIdx);
90
    }
91
    /* pushFinish sets finished prior to the barriers */
92
    airThreadBarrierWait(task->pctx->iterBarrierA);
93
    if (task->pctx->finished) {
94
      if (task->pctx->verbose > 1) {
95
        fprintf(stderr, "%s(%u): done!\n", me, task->threadIdx);
96
      }
97
      break;
98
    }
99
    /* else there's work to do . . . */
100
    if (task->pctx->verbose > 1) {
101
      fprintf(stderr, "%s(%u): starting to process\n", me, task->threadIdx);
102
    }
103
    if (_pullProcess(task)) {
104
      /* HEY clearly not threadsafe to have errors . . . */
105
      biffAddf(PULL, "%s: thread %u trouble", me, task->threadIdx);
106
      task->pctx->finished = AIR_TRUE;
107
    }
108
    if (task->pctx->verbose > 1) {
109
      fprintf(stderr, "%s(%u): waiting on barrier B\n",
110
             me, task->threadIdx);
111
    }
112
    airThreadBarrierWait(task->pctx->iterBarrierB);
113
  }
114
115
  return _task;
116
}
117
118
int
119
pullStart(pullContext *pctx) {
120
  static const char me[]="pullStart";
121
  unsigned int tidx;
122
123
  if (pctx->verbose) {
124
    fprintf(stderr, "%s: hello %p\n", me, AIR_VOIDP(pctx));
125
  }
126
  pctx->iter = 0; /* have to initialize this here because of seedOnly hack */
127
128
  /* the ordering of steps below is important! e.g. gage context has
129
     to be set up (_pullVolumeSetup) by before its copied (_pullTaskSetup) */
130
  if (_pullContextCheck(pctx)
131
      || _pullVolumeSetup(pctx)
132
      || _pullInfoSetup(pctx)
133
      || _pullTaskSetup(pctx)
134
      || _pullBinSetup(pctx)) {
135
    biffAddf(PULL, "%s: trouble starting to set up context", me);
136
    return 1;
137
  }
138
  if (!(pctx->flag.startSkipsPoints)) {
139
    if (_pullPointSetup(pctx)) {
140
      biffAddf(PULL, "%s: trouble setting up points", me);
141
      return 1;
142
    }
143
  }
144
145
  if (pctx->threadNum > 1) {
146
    pctx->binMutex = airThreadMutexNew();
147
    pctx->iterBarrierA = airThreadBarrierNew(pctx->threadNum);
148
    pctx->iterBarrierB = airThreadBarrierNew(pctx->threadNum);
149
    /* start threads 1 and up running; they'll all hit iterBarrierA  */
150
    for (tidx=1; tidx<pctx->threadNum; tidx++) {
151
      if (pctx->verbose > 1) {
152
        fprintf(stderr, "%s: spawning thread %d\n", me, tidx);
153
      }
154
      airThreadStart(pctx->task[tidx]->thread, _pullWorker,
155
                     (void *)(pctx->task[tidx]));
156
    }
157
  } else {
158
    pctx->binMutex = NULL;
159
    pctx->iterBarrierA = NULL;
160
    pctx->iterBarrierB = NULL;
161
  }
162
  if (pctx->verbose) {
163
    fprintf(stderr, "%s: setup for %u threads done\n", me, pctx->threadNum);
164
  }
165
166
  pctx->timeIteration = 0;
167
  pctx->timeRun = 0;
168
169
  return 0;
170
}
171
172
/*
173
** this is called *after* pullOutputGet
174
**
175
** should nix everything created by the many _pull*Setup() functions
176
*/
177
int
178
pullFinish(pullContext *pctx) {
179
  static const char me[]="pullFinish";
180
  unsigned int tidx;
181
182
  if (!pctx) {
183
    biffAddf(PULL, "%s: got NULL pointer", me);
184
    return 1;
185
  }
186
187
  pctx->finished = AIR_TRUE;
188
  if (pctx->threadNum > 1) {
189
    if (pctx->verbose > 1) {
190
      fprintf(stderr, "%s: finishing workers\n", me);
191
    }
192
    airThreadBarrierWait(pctx->iterBarrierA);
193
    /* worker threads now pass barrierA and see that finished is AIR_TRUE,
194
       and then bail, so now we collect them */
195
    for (tidx=pctx->threadNum; tidx>0; tidx--) {
196
      if (tidx-1) {
197
        airThreadJoin(pctx->task[tidx-1]->thread,
198
                      &(pctx->task[tidx-1]->returnPtr));
199
      }
200
    }
201
    pctx->binMutex = airThreadMutexNix(pctx->binMutex);
202
    pctx->iterBarrierA = airThreadBarrierNix(pctx->iterBarrierA);
203
    pctx->iterBarrierB = airThreadBarrierNix(pctx->iterBarrierB);
204
  }
205
206
  /* no need for _pullVolumeFinish(pctx), at least not now */
207
  /* no need for _pullInfoFinish(pctx), at least not now */
208
  _pullTaskFinish(pctx);
209
  _pullBinFinish(pctx);
210
  _pullPointFinish(pctx); /* yes, nixed bins deleted pnts inside, but
211
                             other buffers still have to be freed */
212
213
  return 0;
214
}
215
/*
216
** _pullIterate
217
**
218
** (documentation)
219
**
220
** NB: this implements the body of thread 0, the master thread
221
*/
222
int
223
_pullIterate(pullContext *pctx, int mode) {
224
  static const char me[]="_pullIterate";
225
  double time0;
226
  int myError, E;
227
  unsigned int ti;
228
229
  if (!pctx) {
230
    biffAddf(PULL, "%s: got NULL pointer", me);
231
    return 1;
232
  }
233
  if (airEnumValCheck(pullProcessMode, mode)) {
234
    biffAddf(PULL, "%s: process mode %d unrecognized", me, mode);
235
    return 1;
236
  }
237
  if (!pctx->task) {
238
    biffAddf(PULL, "%s: NULL task array, didn't call pullStart()?", me);
239
    return 1;
240
  }
241
242
  /* if this is descent, pull down eip a bit */
243
  if (pullProcessModeDescent == mode) {
244
    pctx->sysParm.energyIncreasePermit *= pctx->eipScale;
245
  }
246
247
#if PULL_HINTER
248
  /* zero-out/alloc hinter if need be */
249
  if (pullProcessModeDescent == mode && pctx->nhinter) {
250
    if (nrrdMaybeAlloc_va(pctx->nhinter, nrrdTypeFloat, 2,
251
                          AIR_CAST(size_t, _PULL_HINTER_SIZE),
252
                          AIR_CAST(size_t, _PULL_HINTER_SIZE))) {
253
      biffMovef(PULL, NRRD, "%s: setting up nhinter", me);
254
      return 1;
255
    }
256
  }
257
#endif
258
259
  /* tell all tasks what mode they're in */
260
  for (ti=0; ti<pctx->threadNum; ti++) {
261
    pctx->task[ti]->processMode = mode;
262
  }
263
  if (pctx->verbose) {
264
    fprintf(stderr, "%s(%s): iter %d goes w/ eip %g, %u pnts, enr %g%s\n",
265
            me, airEnumStr(pullProcessMode, mode),
266
            pctx->iter, pctx->sysParm.energyIncreasePermit,
267
            pullPointNumber(pctx), _pullEnergyTotal(pctx),
268
            (pctx->flag.permuteOnRebin ? " (por)" : ""));
269
  }
270
271
  time0 = airTime();
272
  pctx->pointNum = pullPointNumber(pctx);
273
274
  /* the _pullWorker checks finished after iterBarrierA */
275
  pctx->finished = AIR_FALSE;
276
277
  /* initialize index of next bin to be doled out to threads */
278
  pctx->binNextIdx=0;
279
280
  if (pctx->threadNum > 1) {
281
    airThreadBarrierWait(pctx->iterBarrierA);
282
  }
283
  myError = AIR_FALSE;
284
  if (_pullProcess(pctx->task[0])) {
285
    biffAddf(PULL, "%s: master thread trouble w/ iter %u", me, pctx->iter);
286
    pctx->finished = AIR_TRUE;
287
    myError = AIR_TRUE;
288
  }
289
  if (pctx->threadNum > 1) {
290
    airThreadBarrierWait(pctx->iterBarrierB);
291
  }
292
  if (pctx->finished) {
293
    if (!myError) {
294
      /* we didn't set finished- one of the workers must have */
295
      biffAddf(PULL, "%s: worker error on iter %u", me, pctx->iter);
296
    }
297
    return 1;
298
  }
299
  if (pctx->verbose) {
300
    if (pctx->pointNum > _PULL_PROGRESS_POINT_NUM_MIN) {
301
      fprintf(stderr, ".\n"); /* finishing line of progress indicators */
302
    }
303
  }
304
305
  /* depending on mode, run one of the iteration finishers */
306
  E = 0;
307
  switch (mode) {
308
  case pullProcessModeDescent:
309
    E = _pullIterFinishDescent(pctx); /* includes rebinning */
310
    break;
311
  case pullProcessModeNeighLearn:
312
    E = _pullIterFinishNeighLearn(pctx);
313
    break;
314
  case pullProcessModeAdding:
315
    E = _pullIterFinishAdding(pctx);
316
    break;
317
  case pullProcessModeNixing:
318
    E = _pullIterFinishNixing(pctx);
319
    break;
320
  default:
321
    biffAddf(PULL, "%s: process mode %d unrecognized", me, mode);
322
    return 1;
323
    break;
324
  }
325
  if (E) {
326
    pctx->finished = AIR_TRUE;
327
    biffAddf(PULL, "%s: trouble finishing iter %u", me, pctx->iter);
328
    return 1;
329
  }
330
331
  pctx->timeIteration = airTime() - time0;
332
333
#if PULL_HINTER
334
  if (pullProcessModeDescent == mode && pctx->nhinter) {
335
    char fname[AIR_STRLEN_SMALL];
336
    sprintf(fname, "hinter-%05u.nrrd", pctx->iter);
337
    if (nrrdSave(fname, pctx->nhinter, NULL)) {
338
      biffMovef(PULL, NRRD, "%s: saving hinter to %s", me, fname);
339
      return 1;
340
    }
341
  }
342
#endif
343
344
  return 0;
345
}
346
347
int
348
pullRun(pullContext *pctx) {
349
  static const char me[]="pullRun";
350
  char poutS[AIR_STRLEN_MED];
351
  Nrrd *npos;
352
  double time0, time1, enrLast,
353
    enrNew=AIR_NAN, enrDecrease=AIR_NAN, enrDecreaseAvg=AIR_NAN;
354
  int converged;
355
  unsigned firstIter;
356
357
  if (pctx->verbose) {
358
    fprintf(stderr, "%s: hello\n", me);
359
  }
360
  time0 = airTime();
361
  firstIter = pctx->iter;
362
  if (pctx->verbose) {
363
    fprintf(stderr, "%s: doing priming iteration (iter now %u)\n", me,
364
            pctx->iter);
365
  }
366
  if (_pullIterate(pctx, pullProcessModeDescent)) {
367
    biffAddf(PULL, "%s: trouble on priming iter %u", me, pctx->iter);
368
    return 1;
369
  }
370
  pctx->iter += 1;
371
  enrLast = enrNew = _pullEnergyTotal(pctx);
372
  if (pctx->verbose) {
373
    fprintf(stderr, "%s: starting system energy = %g\n", me, enrLast);
374
  }
375
  enrDecrease = enrDecreaseAvg = 0;
376
  converged = AIR_FALSE;
377
  while ((pctx->iterParm.min && pctx->iter <= pctx->iterParm.min)
378
         ||
379
         ((!pctx->iterParm.max || pctx->iter < pctx->iterParm.max)
380
          && !converged)) {
381
    /* this per iteration init had been missing for a very long time */
382
    pctx->addNum = pctx->nixNum = 0;
383
    if (pctx->iterParm.snap && !(pctx->iter % pctx->iterParm.snap)) {
384
      npos = nrrdNew();
385
      sprintf(poutS, "snap.%06d.pos.nrrd", pctx->iter);
386
      if (pullOutputGet(npos, NULL, NULL, NULL, 0.0, pctx)) {
387
        biffAddf(PULL, "%s: couldn't get snapshot for iter %d",
388
                 me, pctx->iter);
389
        return 1;
390
      }
391
      if (nrrdSave(poutS, npos, NULL)) {
392
        biffMovef(PULL, NRRD,
393
                  "%s: couldn't save snapshot for iter %d",
394
                  me, pctx->iter);
395
        return 1;
396
      }
397
      npos = nrrdNuke(npos);
398
    }
399
400
    if (_pullIterate(pctx, pullProcessModeDescent)) {
401
      biffAddf(PULL, "%s: trouble on iter %d", me, pctx->iter);
402
      return 1;
403
    }
404
    enrNew = _pullEnergyTotal(pctx);
405
    enrDecrease = _DECREASE(enrLast, enrNew);
406
    if (firstIter + 1 == pctx->iter) {
407
      /* we need some way of artificially boosting enrDecreaseAvg when
408
         we're just starting, so that we thwart the convergence test,
409
         which we do because we don't have the history of iterations
410
         that enrDecreaseAvg is supposed to describe.  Using some scaling
411
         of enrDecrease is one possible hack. */
412
      enrDecreaseAvg = 3*enrDecrease;
413
    } else {
414
      enrDecreaseAvg = (2*enrDecreaseAvg + enrDecrease)/3;
415
    }
416
    if (pctx->verbose) {
417
      fprintf(stderr, "%s: ___ done iter %u: "
418
              "e=%g,%g, de=%g,%g (%g), s=%g,%g\n",
419
              me, pctx->iter, enrLast, enrNew, enrDecrease, enrDecreaseAvg,
420
              pctx->sysParm.energyDecreaseMin,
421
              _pullStepInterAverage(pctx), _pullStepConstrAverage(pctx));
422
    }
423
    if (pctx->iterParm.popCntlPeriod) {
424
      if ((pctx->iterParm.popCntlPeriod - 1)
425
          == (pctx->iter % pctx->iterParm.popCntlPeriod)
426
          && enrDecreaseAvg < pctx->sysParm.energyDecreasePopCntlMin
427
          && (pctx->sysParm.alpha != 0
428
              || !pctx->flag.noPopCntlWithZeroAlpha)) {
429
        if (pctx->verbose) {
430
          fprintf(stderr, "%s: ***** enr decrease %g < %g: "
431
                  "trying pop cntl ***** \n",
432
                  me, enrDecreaseAvg, pctx->sysParm.energyDecreasePopCntlMin);
433
        }
434
        if (_pullIterate(pctx, pullProcessModeNeighLearn)
435
            || _pullIterate(pctx, pullProcessModeAdding)
436
            || _pullIterate(pctx, pullProcessModeNixing)) {
437
          biffAddf(PULL, "%s: trouble with %s for pop cntl on iter %u", me,
438
                   airEnumStr(pullProcessMode, pctx->task[0]->processMode),
439
                   pctx->iter);
440
          return 1;
441
        }
442
      } else {
443
        if (pctx->verbose > 2) {
444
          fprintf(stderr, "%s: ***** no pop cntl:\n", me);
445
          fprintf(stderr, "    iter=%u %% period=%u = %u != %u\n",
446
                  pctx->iter, pctx->iterParm.popCntlPeriod,
447
                  pctx->iter % pctx->iterParm.popCntlPeriod,
448
                  pctx->iterParm.popCntlPeriod - 1);
449
          fprintf(stderr, "    en dec avg = %g >= %g\n",
450
                  enrDecreaseAvg, pctx->sysParm.energyDecreasePopCntlMin);
451
          fprintf(stderr, "    npcwza %s && alpha = %g\n",
452
                  pctx->flag.noPopCntlWithZeroAlpha ? "true" : "false",
453
                  pctx->sysParm.alpha);
454
        }
455
      }
456
    }
457
    pctx->iter += 1;
458
    enrLast = enrNew;
459
    converged = ((pctx->flag.convergenceIgnoresPopCntl
460
                  || (!pctx->iterParm.popCntlPeriod
461
                      || (!pctx->addNum && !pctx->nixNum)))
462
                 && AIR_IN_CL(0, enrDecreaseAvg,
463
                              pctx->sysParm.energyDecreaseMin));
464
    if (pctx->verbose) {
465
      fprintf(stderr, "%s: converged %d = (%d || (%d || (%d && %d))) "
466
              "&& (0 <= %g <= %g)=%d\n",
467
              me, converged,
468
              pctx->flag.convergenceIgnoresPopCntl,
469
              !pctx->iterParm.popCntlPeriod,
470
              !pctx->addNum, !pctx->nixNum,
471
              enrDecreaseAvg, pctx->sysParm.energyDecreaseMin,
472
              AIR_IN_OP(0, enrDecreaseAvg, pctx->sysParm.energyDecreaseMin));
473
    }
474
    if (converged && pctx->verbose) {
475
      printf("%s: enrDecreaseAvg %g < %g: converged!!\n", me,
476
             enrDecreaseAvg, pctx->sysParm.energyDecreaseMin);
477
    }
478
    _pullPointStepEnergyScale(pctx, pctx->sysParm.opporStepScale);
479
    /* call the callback */
480
    if (!(pctx->iter % pctx->iterParm.callback)
481
        && pctx->iter_cb) {
482
      pctx->iter_cb(pctx->data_cb);
483
    }
484
  }
485
  if (pctx->verbose) {
486
    printf("%s: done ((%d|%d)&%d) @iter %u: enr %g, enrDec = %g,%g "
487
           "%u stuck\n", me,
488
           !pctx->iterParm.max, pctx->iter < pctx->iterParm.max,
489
           !converged,
490
           pctx->iter, enrNew, enrDecrease, enrDecreaseAvg, pctx->stuckNum);
491
  }
492
  time1 = airTime();
493
494
  pctx->timeRun += time1 - time0;
495
  pctx->energy = enrNew;
496
497
  /* we do one final neighbor-learn iteration, to set the fields
498
     (like stability) that are only learned then */
499
  if (_pullIterate(pctx, pullProcessModeNeighLearn)) {
500
    biffAddf(PULL, "%s: trouble after-final iter", me);
501
    return 1;
502
  }
503
504
  if (0) {
505
    /* probe inter-particle energy function */
506
    unsigned int szimg=300, ri, si;
507
    Nrrd *nout;
508
    pullPoint *pa, *pb;
509
    double rdir[3], len, r, s, *out, enr, egrad[4];
510
    airRandMTState *rng;
511
    rng = pctx->task[0]->rng;
512
    nout = nrrdNew();
513
    nrrdMaybeAlloc_va(nout, nrrdTypeDouble, 3,
514
                      AIR_CAST(size_t, 3),
515
                      AIR_CAST(size_t, szimg),
516
                      AIR_CAST(size_t, szimg));
517
    out = AIR_CAST(double *, nout->data);
518
    pa = pullPointNew(pctx);
519
    pb = pullPointNew(pctx);
520
    airNormalRand_r(pa->pos + 0, pa->pos + 1, rng);
521
    airNormalRand_r(pa->pos + 2, pa->pos + 3, rng);
522
    airNormalRand_r(rdir + 0, rdir + 1, rng);
523
    airNormalRand_r(rdir + 2, NULL, rng);
524
    ELL_3V_NORM(rdir, rdir, len);
525
    for (si=0; si<szimg; si++) {
526
      s = AIR_AFFINE(0, si, szimg-1,
527
                     -1.5*pctx->sysParm.radiusScale,
528
                     1.5*pctx->sysParm.radiusScale);
529
      for (ri=0; ri<szimg; ri++) {
530
        r = AIR_AFFINE(0, ri, szimg-1,
531
                       -1.5*pctx->sysParm.radiusSpace,
532
                       1.5*pctx->sysParm.radiusSpace);
533
        ELL_3V_SCALE_ADD2(pb->pos, 1.0, pa->pos, r, rdir);
534
        pb->pos[3] = pa->pos[3] + s;
535
        /* now points are in desired test positions */
536
        enr = _pullEnergyInterParticle(pctx, pa, pb,
537
                                       AIR_ABS(r), AIR_ABS(s), egrad);
538
        ELL_3V_SET(out + 3*(ri + szimg*si),
539
                   enr, ELL_3V_DOT(egrad, rdir), egrad[3]);
540
      }
541
    }
542
    nrrdSave("eprobe.nrrd", nout, NULL);
543
    pullPointNix(pa);
544
    pullPointNix(pb);
545
    nrrdNuke(nout);
546
  }
547
548
  return 0;
549
}
550