diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0256b2ba..28249ff9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -284,7 +284,7 @@ jobs: echo SUBJOB_PREFIX=${SUBJOB_PREFIX} while (( CAF_IMAGES > 0 )); do \ echo CAF_IMAGES=${CAF_IMAGES} ; \ - ( set -x ; ./run-fpm.sh test --verbose -- -d ) ; \ + ( set -x ; ./run-fpm.sh test --verbose -- ) ; \ sleep 1 ; \ CAF_IMAGES=$(( CAF_IMAGES / 2 )) ; \ done @@ -319,3 +319,11 @@ jobs: ) ; \ done + - name: Build and Test Caffeine (thread-safe) + run: | + for var in FC CC CXX FFLAGS CPPFLAGS CFLAGS LDFLAGS LIBS GASNET_CONFIGURE_ARGS ; do \ + eval echo "$var=\$$var"; done + set -x + ./install.sh --prefix=${PREFIX} --network=${{ matrix.network }} --enable-threads --verbose + ./run-fpm.sh test --verbose -- + diff --git a/README.md b/README.md index 6282361a..c2c71bc8 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ See also [issue #228](https://github.com/BerkeleyLab/caffeine/issues/228). The `install.sh` recognizes a number of command-line options and environment variables to customize behavior for your system. See the output of `./install.sh --help` for full documentation, -including options for how to build for a distributed-memory platform. +including options for how to build for a distributed-memory platform or with thread-safety. Example Usage diff --git a/docs/README-release.md b/docs/README-release.md index 69015e92..ff7de848 100644 --- a/docs/README-release.md +++ b/docs/README-release.md @@ -12,13 +12,13 @@ Release Procedure for Caffeine 1. Update all instances of the copyright year embedded in: [LICENSE.txt](../LICENSE.txt), [manifest/fpm.toml.template](../manifest/fpm.toml.template) 2. Update all instances of the release package version number embedded in: - [manifest/fpm.toml.template](../manifest/fpm.toml.template), [install.sh](../install.sh) + [manifest/fpm.toml.template](../manifest/fpm.toml.template), [install.sh](../install.sh), [version.h](../include/version.h) 3. Update the author list embedded in: [manifest/fpm.toml.template](../manifest/fpm.toml.template) 4. Review top-level [README.md](../README.md) and other user-facing documentation for any necessary changes 5. Update [docs/implementation-status.md](../docs/implementation-status.md) with current status 6. If the PRIF specification revision is changing, search and update all instances of the old revision, - including `CAF_PRIF_VERSION_{MAJOR,MINOR}` in [language-support.F90](../include/language-support.F90) + including `CAF_PRIF_VERSION_{MAJOR,MINOR}` in [version.h](../include/version.h) 7. Temporarily hardcode version of gasnet installer in [install.sh](../install.sh) as the last commit in the release. Set GASNET_VERSION flag to the latest gasnet release 5. Produce the ChangeLog @@ -42,7 +42,7 @@ Release Procedure for Caffeine 10. Post release chores 1. Git revert the commit that hardcoded the gasnet version or manually edit 2. Update patch number of the version number embedded in: - [manifest/fpm.toml.template](../manifest/fpm.toml.template), [install.sh](../install.sh) + [manifest/fpm.toml.template](../manifest/fpm.toml.template), [install.sh](../install.sh), [version.h](../include/version.h) Update to an odd number to indicate that the `main` branch is currently a snapshot of something that is beyond the offical release 3. Update the release procedure with any new steps or changes diff --git a/include/language-support.F90 b/include/language-support.F90 index 7b4b9e1e..bebd5411 100644 --- a/include/language-support.F90 +++ b/include/language-support.F90 @@ -70,23 +70,4 @@ #define CAF_IMPORT_TEAM_CONSTANTS CAF_IMPORT_CONSTANTS #endif -! PRIF specification version override and control -! By default, Caffeine provides the latest ratified version of the PRIF specification. -! Clients can optionally define one of the FORCE_* macros below to force compliance -! with a different revision of the PRIF specification. These override settings are -! NOT officially supported and may be removed at any time without notice. -#define CAF_PRIF_VERSION_MAJOR 0 -#if FORCE_PRIF_0_5 -# define CAF_PRIF_VERSION_MINOR 5 -#elif FORCE_PRIF_0_6 -# define CAF_PRIF_VERSION_MINOR 6 -#elif FORCE_PRIF_0_7 -# define CAF_PRIF_VERSION_MINOR 7 -#elif FORCE_PRIF_0_8 -# define CAF_PRIF_VERSION_MINOR 8 -#else -# define CAF_PRIF_VERSION_MINOR 7 -#endif -#define CAF_PRIF_VERSION (100 * CAF_PRIF_VERSION_MAJOR + CAF_PRIF_VERSION_MINOR) - #endif diff --git a/include/version.h b/include/version.h new file mode 100644 index 00000000..3e1cc9b3 --- /dev/null +++ b/include/version.h @@ -0,0 +1,37 @@ +# /* Copyright (c), The Regents of the University of California */ +# /* Terms of use are as specified in LICENSE.txt */ + +# /* NOTE: this is a dual-language header file, */ +# /* and should ONLY contain portable preprocessor directives. */ + +#ifndef CAF_INCLUDED_VERSION_H +#define CAF_INCLUDED_VERSION_H + +# /* Caffeine software package versioning */ +#define CAF_RELEASE_VERSION_MAJOR 0 +#define CAF_RELEASE_VERSION_MINOR 7 +#define CAF_RELEASE_VERSION_PATCH 1 +#define CAF_RELEASE_VERSION (1000*CAF_RELEASE_VERSION_MAJOR + 100*CAF_RELEASE_VERSION_MINOR + CAF_RELEASE_VERSION_PATCH) + +#if 0 +! PRIF specification version override and control +! By default, Caffeine provides the latest ratified version of the PRIF specification. +! Clients can optionally define one of the FORCE_* macros below to force compliance +! with a different revision of the PRIF specification. These override settings are +! NOT officially supported and may be removed at any time without notice. +#endif +#define CAF_PRIF_VERSION_MAJOR 0 +#if FORCE_PRIF_0_5 +# define CAF_PRIF_VERSION_MINOR 5 +#elif FORCE_PRIF_0_6 +# define CAF_PRIF_VERSION_MINOR 6 +#elif FORCE_PRIF_0_7 +# define CAF_PRIF_VERSION_MINOR 7 +#elif FORCE_PRIF_0_8 +# define CAF_PRIF_VERSION_MINOR 8 +#else +# define CAF_PRIF_VERSION_MINOR 7 +#endif +#define CAF_PRIF_VERSION (100 * CAF_PRIF_VERSION_MAJOR + CAF_PRIF_VERSION_MINOR) + +#endif diff --git a/install.sh b/install.sh index 317f6915..1845c61a 100755 --- a/install.sh +++ b/install.sh @@ -23,6 +23,8 @@ USAGE: Default prefix='\$HOME/.local/bin' --verbose Show verbose build commands --yes Assume (yes) to all prompts for non-interactive build + --enable-threads Build a thread-safe Caffeine library and link to + thread-safe GASNet, for use in threaded do-concurrent. All unrecognized arguments will be passed to GASNet's configure. @@ -49,6 +51,7 @@ GCC_VERSION=${GCC_VERSION:=14} GASNET_VERSION="stable" VERBOSE="" GASNET_CONDUIT="${GASNET_CONDUIT:-smp}" +GASNET_THREADMODE="${GASNET_THREADMODE:-seq}" YES=false list_prerequisites() @@ -106,6 +109,9 @@ while [ "$1" != "" ]; do -y | --yes) YES="true" ;; + --enable-threads) GASNET_THREADMODE=par ;; + --disable-threads) GASNET_THREADMODE=seq ;; + *) # We pass the unmodified argument to GASNet configure # Quoting is believed sufficient for embedded whitespace but not quotes @@ -328,7 +334,7 @@ EOF printf "Is it ok to download and install $1? [yes] " } -pkg="gasnet-$GASNET_CONDUIT-seq" +pkg="gasnet-$GASNET_CONDUIT-$GASNET_THREADMODE" export PKG_CONFIG_PATH if ! $PKG_CONFIG $pkg ; then @@ -357,7 +363,7 @@ if ! $PKG_CONFIG $pkg ; then cmd="$cmd --with-cc=\"$CC\" --with-cxx=\"$CXX\"" # select the GASNet config settings Caffeine requires, and disable unused features: cmd="$cmd --enable-$GASNET_CONDUIT" - cmd="$cmd --enable-seq --disable-par --disable-parsync" + cmd="$cmd --enable-seq --enable-par --disable-parsync" cmd="$cmd --disable-segment-everything" # TEMPORARY: disable MPI compatibility until we figure out how to support in fpm cmd="$cmd --disable-mpi-compat" @@ -468,6 +474,10 @@ if ! [[ "$user_compiler_flags " =~ -[DU]ASSERTIONS[=\ ] ]] ; then compiler_flag+=" -DASSERTIONS" fi +if [[ $GASNET_THREADMODE == "par" ]] ; then + compiler_flag+=" -DCAF_THREAD_SAFE" +fi + GASNET_CONDUIT_UPPER=$(tr '[:lower:]' '[:upper:]' <<<$GASNET_CONDUIT) compiler_flag+=" -DCAF_NETWORK_$GASNET_CONDUIT_UPPER" @@ -530,7 +540,7 @@ chmod u+x $RUN_FPM_SH ./$RUN_FPM_SH build $VERBOSE -LIBCAFFEINE_DST=libcaffeine-$GASNET_CONDUIT.a +LIBCAFFEINE_DST=libcaffeine-$GASNET_CONDUIT-$GASNET_THREADMODE.a LIBCAFFEINE_SRC=$(./$RUN_FPM_SH install --list 2>/dev/null | grep libcaffeine | cut -d' ' -f2) if [ -z "$LIBCAFFEINE_SRC" ]; then @@ -539,6 +549,7 @@ if [ -z "$LIBCAFFEINE_SRC" ]; then else mkdir -p "$PREFIX/lib" cp -af "$LIBCAFFEINE_SRC" "$PREFIX/lib/$LIBCAFFEINE_DST" + ln -sf "$LIBCAFFEINE_DST" "$PREFIX/lib/libcaffeine-$GASNET_CONDUIT.a" ln -sf "$LIBCAFFEINE_DST" "$PREFIX/lib/libcaffeine.a" fi diff --git a/src/caffeine/alias_s.F90 b/src/caffeine/alias_s.F90 index 2d3a91a7..9dcd3ef7 100644 --- a/src/caffeine/alias_s.F90 +++ b/src/caffeine/alias_s.F90 @@ -2,7 +2,7 @@ ! Terms of use are as specified in LICENSE.txt #include "assert_macros.h" -#include "language-support.F90" +#include "version.h" submodule(prif:prif_private_s) alias_s ! DO NOT ADD USE STATEMENTS HERE diff --git a/src/caffeine/allocation_s.F90 b/src/caffeine/allocation_s.F90 index 18038bb1..9434e931 100644 --- a/src/caffeine/allocation_s.F90 +++ b/src/caffeine/allocation_s.F90 @@ -2,6 +2,7 @@ ! Terms of use are as specified in LICENSE.txt #include "assert_macros.h" +#include "version.h" #include "language-support.F90" submodule(prif:prif_private_s) allocation_s @@ -84,7 +85,7 @@ module procedure prif_allocate type(c_ptr) :: mem - mem = caf_allocate(non_symmetric_heap_mspace, size_in_bytes) + mem = caf_allocate_non_symmetric(size_in_bytes) if (.not. c_associated(mem)) then call report_error(PRIF_STAT_OUT_OF_MEMORY, out_of_memory_message(size_in_bytes, .false.), & stat, errmsg, errmsg_alloc) @@ -200,7 +201,7 @@ subroutine coarray_cleanup_i(handle, stat, errmsg) bind(C) end procedure module procedure prif_deallocate - call caf_deallocate(non_symmetric_heap_mspace, mem) + call caf_deallocate_non_symmetric(mem) if (present(stat)) stat = 0 end procedure diff --git a/src/caffeine/caffeine-internal.h b/src/caffeine/caffeine-internal.h index d6359761..a5553b47 100644 --- a/src/caffeine/caffeine-internal.h +++ b/src/caffeine/caffeine-internal.h @@ -4,6 +4,9 @@ # /* NOTE: this is a dual-language header file, */ # /* and should ONLY contain portable preprocessor directives. */ +#ifndef CAF_INCLUDED_CAFFEINE_INTERNAL_H +#define CAF_INCLUDED_CAFFEINE_INTERNAL_H + # /* define some macro portability helpers */ #if defined(__GFORTRAN__) || defined(_CRAYFTN) || defined(NAGFOR) # define CAF_CONCAT2(x,y) x/**/y @@ -30,3 +33,4 @@ #define CAF_OP_FXOR 9 #define CAF_OP_FCAS 10 +#endif diff --git a/src/caffeine/caffeine.c b/src/caffeine/caffeine.c index b487a2ad..5e81fcee 100644 --- a/src/caffeine/caffeine.c +++ b/src/caffeine/caffeine.c @@ -18,6 +18,7 @@ #include "../dlmalloc/dl_malloc_caf.h" #include "../dlmalloc/dl_malloc.h" #include "caffeine-internal.h" +#include "version.h" enum { UNRECOGNIZED_TYPE, @@ -30,12 +31,54 @@ static gex_Rank_t myproc, numprocs; static gex_Segment_t mysegment; static gex_TM_t myworldteam; +static mspace* non_symmetric_heap; +static gasnett_mutex_t non_symmetric_heap_lock = GASNETT_MUTEX_INITIALIZER; + typedef void(*final_func_ptr)(void*, size_t) ; typedef uint8_t byte; static void event_init(void); static void atomic_init(void); +#define CAF_IDENT(name, contents) \ + GASNETT_IDENT(caf_IdentString_ ## name, \ + "$Caffeine" #name ": " contents " $") +CAF_IDENT(Network, CAF_STRINGIFY(GASNET_CONDUIT_NAME)); +#if GASNET_SEQ + CAF_IDENT(ThreadMode, "SEQ"); +#elif GASNET_PAR + CAF_IDENT(ThreadMode, "PAR"); +#endif +CAF_IDENT(LibraryVersion, CAF_STRINGIFY(CAF_RELEASE_VERSION_MAJOR) "." + CAF_STRINGIFY(CAF_RELEASE_VERSION_MINOR) "." + CAF_STRINGIFY(CAF_RELEASE_VERSION_PATCH)); +#if 0 +// TODO: PRIFVersion does not correctly respect FORCE_PRIF_X flags unless they are also passed in CFLAGS +CAF_IDENT(PRIFVersion, CAF_STRINGIFY(CAF_PRIF_VERSION_MAJOR) "." + CAF_STRINGIFY(CAF_PRIF_VERSION_MINOR)); +#endif +#if 0 +#if ASSERTIONS // TODO: This doesn't yet work, until we fix issue #241 + CAF_IDENT(Assertions, "1"); +#else + CAF_IDENT(Assertions, "0"); +#endif +#endif +CAF_IDENT(BuildTime, __DATE__ " " __TIME__ ); +CAF_IDENT(CompilerID, PLATFORM_COMPILER_IDSTR); +CAF_IDENT(GASNetConfig, GASNET_CONFIG_STRING); + +// --------------------------------------------------- +// Thread-safety support + +#if GASNET_PAR + #define LOCK(m) gasnett_mutex_lock(&(m)) + #define UNLOCK(m) gasnett_mutex_unlock(&(m)) +#else + #define LOCK(m) do{}while(0) + #define UNLOCK(m) do{}while(0) +#endif + // --------------------------------------------------- // Floating-point exception support @@ -83,7 +126,6 @@ void caf_caffeinate( mspace* symmetric_heap, intptr_t* symmetric_heap_start, intptr_t* symmetric_heap_size, - mspace* non_symmetric_heap, gex_TM_t* initial_team ) { GASNET_SAFE(gex_Client_Init(&myclient, &myep, &myworldteam, "caffeine", NULL, NULL, 0)); @@ -146,17 +188,21 @@ void caf_caffeinate( assert(*symmetric_heap); mspace_set_footprint_limit(*symmetric_heap, *symmetric_heap_size); } - *non_symmetric_heap = create_mspace_with_base((void*)non_symmetric_heap_start, non_symmetric_heap_size, 0); - assert(*non_symmetric_heap); - mspace_set_footprint_limit(*non_symmetric_heap, non_symmetric_heap_size); + non_symmetric_heap = create_mspace_with_base((void*)non_symmetric_heap_start, non_symmetric_heap_size, 0); + assert(non_symmetric_heap); + mspace_set_footprint_limit(non_symmetric_heap, non_symmetric_heap_size); // init various subsystems: atomic_init(); event_init(); } -void caf_decaffeinate(int exit_code) -{ +void caf_acquire_exit_lock() { + static gasnett_mutex_t exit_lock = GASNETT_MUTEX_INITIALIZER; + LOCK(exit_lock); +} + +void caf_decaffeinate(int exit_code) { gasnet_exit(exit_code); } @@ -180,10 +226,16 @@ void caf_fatal_error( const CFI_cdesc_t* Fstr ) gasnett_fatalerror_nopos("%.*s", len, msg); } -void* caf_allocate(mspace heap, size_t bytes) -{ - void* allocated_space = mspace_memalign(heap, 8, bytes); - return allocated_space; +void* caf_allocate(mspace heap, size_t bytes) { + void* allocated_space = mspace_memalign(heap, 8, bytes); + return allocated_space; +} + +void* caf_allocate_non_symmetric(size_t bytes) { + LOCK(non_symmetric_heap_lock); + void* allocated_space = caf_allocate(non_symmetric_heap, bytes); + UNLOCK(non_symmetric_heap_lock); + return allocated_space; } void caf_allocate_remaining(mspace heap, void** allocated_space, size_t* allocated_size) @@ -206,11 +258,16 @@ void caf_allocate_remaining(mspace heap, void** allocated_space, size_t* allocat *allocated_size); } -void caf_deallocate(mspace heap, void* mem) -{ +void caf_deallocate(mspace heap, void* mem) { mspace_free(heap, mem); } +void caf_deallocate_non_symmetric(void* mem) { + LOCK(non_symmetric_heap_lock); + caf_deallocate(non_symmetric_heap, mem); + UNLOCK(non_symmetric_heap_lock); +} + void caf_establish_mspace(mspace* heap, void* heap_start, size_t heap_size) { *heap = create_mspace_with_base(heap_start, heap_size, 0); @@ -335,7 +392,8 @@ void caf_event_query(void *event_var_ptr, int64_t *count) { ); } -void caf_event_wait(void *event_var_ptr, int64_t threshold, int segment_boundary, int acquire_fence) { +void caf_event_wait(void *event_var_ptr, int64_t threshold, + int segment_boundary, int acquire_fence, int maybe_concurrent) { assert(event_AD != GEX_AD_INVALID); assert(event_var_ptr); assert(threshold >= 1); @@ -353,18 +411,40 @@ void caf_event_wait(void *event_var_ptr, int64_t threshold, int segment_boundary } int64_t cnt = 0; - while (caf_event_query(event_var_ptr, &cnt), cnt < threshold) { - // issue #222 : TODO: we probably want to insert a wait hook here - gasnet_AMPoll(); - } + if (maybe_concurrent) { + static gasnett_mutex_t notify_wait_lock = GASNETT_MUTEX_INITIALIZER; + while (1) { + while (caf_event_query(event_var_ptr, &cnt), cnt < threshold) { + // issue #222 : TODO: we probably want to insert a wait hook here + gasnet_AMPoll(); + } + LOCK(notify_wait_lock); + if (caf_event_query(event_var_ptr, &cnt), cnt >= threshold) { + gex_Event_Wait( + gex_AD_OpNB_I64(event_AD, &cnt, + myproc, event_var_ptr, + GEX_OP_FSUB, threshold, 0, + flags) + ); + assert(cnt >= threshold); + UNLOCK(notify_wait_lock); + break; + } else UNLOCK(notify_wait_lock); + } + } else { // not concurrent + while (caf_event_query(event_var_ptr, &cnt), cnt < threshold) { + // issue #222 : TODO: we probably want to insert a wait hook here + gasnet_AMPoll(); + } - gex_Event_Wait( - gex_AD_OpNB_I64(event_AD, &cnt, - myproc, event_var_ptr, - GEX_OP_FSUB, threshold, 0, - flags) - ); - assert(cnt >= threshold); + gex_Event_Wait( + gex_AD_OpNB_I64(event_AD, &cnt, + myproc, event_var_ptr, + GEX_OP_FSUB, threshold, 0, + flags) + ); + assert(cnt >= threshold); + } } // _______________________ Atomics ____________________________ diff --git a/src/caffeine/events_s.F90 b/src/caffeine/events_s.F90 index 61f3375b..aea02c0d 100644 --- a/src/caffeine/events_s.F90 +++ b/src/caffeine/events_s.F90 @@ -41,7 +41,7 @@ threshold = 1 endif call caf_event_wait(event_var_ptr, threshold, & - segment_boundary=1, acquire_fence=1) + segment_boundary=1, acquire_fence=1, maybe_concurrent=0) if (present(stat)) stat = 0 end procedure @@ -61,7 +61,7 @@ threshold = 1 endif call caf_event_wait(notify_var_ptr, threshold, & - segment_boundary=0, acquire_fence=1) + segment_boundary=0, acquire_fence=1, maybe_concurrent=1) if (present(stat)) stat = 0 end procedure diff --git a/src/caffeine/prif_private_s.F90 b/src/caffeine/prif_private_s.F90 index 9deed134..4a878533 100644 --- a/src/caffeine/prif_private_s.F90 +++ b/src/caffeine/prif_private_s.F90 @@ -26,7 +26,6 @@ type(prif_team_descriptor), target :: initial_team type(prif_team_type) :: current_team - type(c_ptr) :: non_symmetric_heap_mspace integer(c_intptr_t) :: total_heap_size, non_symmetric_heap_size interface @@ -38,18 +37,21 @@ subroutine caf_caffeinate( & symmetric_heap, & symmetric_heap_start, & symmetric_heap_size, & - non_symmetric_heap, & initial_team) & bind(C) import c_ptr, c_intptr_t implicit none integer(c_intptr_t), intent(out) :: total_heap_size, symmetric_heap_start, symmetric_heap_size - type(c_ptr), intent(out) :: symmetric_heap, non_symmetric_heap + type(c_ptr), intent(out) :: symmetric_heap type(c_ptr), intent(out) :: initial_team end subroutine + subroutine caf_acquire_exit_lock() bind(C) + !! void caf_acquire_exit_lock() + end subroutine + subroutine caf_decaffeinate(exit_code) bind(C) - !! void c_decaffeinate(); + !! void caf_decaffeinate(int exit_code) import c_int implicit none integer(c_int), value :: exit_code @@ -112,6 +114,13 @@ function caf_allocate(mspace, bytes) result(ptr) bind(c) type(c_ptr) :: ptr end function + function caf_allocate_non_symmetric(bytes) result(ptr) bind(c) + import c_size_t, c_ptr + implicit none + integer(c_size_t), intent(in), value :: bytes + type(c_ptr) :: ptr + end function + subroutine caf_allocate_remaining(mspace, allocated_space, allocated_size) bind(c) import c_size_t, c_ptr implicit none @@ -127,6 +136,12 @@ subroutine caf_deallocate(mspace, mem) bind(c) type(c_ptr), intent(in), value :: mem end subroutine + subroutine caf_deallocate_non_symmetric(mem) bind(c) + import c_ptr + implicit none + type(c_ptr), intent(in), value :: mem + end subroutine + subroutine caf_establish_mspace(mspace, mem, mem_size) bind(c) import c_size_t, c_ptr implicit none @@ -232,14 +247,13 @@ subroutine caf_event_post(image, event_var_ptr, segment_boundary, release_fence) integer(c_int), intent(in), value :: release_fence end subroutine - subroutine caf_event_wait(event_var_ptr, threshold, segment_boundary, acquire_fence) bind(c) - !! void caf_event_wait(void *event_var_ptr, int64_t threshold, int segment_boundary, int acquire_fence) + subroutine caf_event_wait(event_var_ptr, threshold, segment_boundary, acquire_fence, maybe_concurrent) bind(c) + !! void caf_event_wait(void *event_var_ptr, int64_t threshold, int segment_boundary, int acquire_fence, int maybe_concurrent) import c_int64_t, c_ptr, c_int implicit none type(c_ptr), intent(in), value :: event_var_ptr integer(c_int64_t), intent(in), value :: threshold - integer(c_int), intent(in), value :: segment_boundary - integer(c_int), intent(in), value :: acquire_fence + integer(c_int), intent(in), value :: segment_boundary, acquire_fence, maybe_concurrent end subroutine subroutine caf_event_query(event_var_ptr, count) bind(c) diff --git a/src/caffeine/program_startup_s.F90 b/src/caffeine/program_startup_s.F90 index 08786bc0..d571fcd6 100644 --- a/src/caffeine/program_startup_s.F90 +++ b/src/caffeine/program_startup_s.F90 @@ -18,7 +18,6 @@ initial_team%heap_mspace, & initial_team%heap_start, & initial_team%heap_size, & - non_symmetric_heap_mspace, & initial_team%gex_team) call assert_init() current_team%info => initial_team diff --git a/src/caffeine/program_termination_s.F90 b/src/caffeine/program_termination_s.F90 index 628aee31..fad8dea2 100644 --- a/src/caffeine/program_termination_s.F90 +++ b/src/caffeine/program_termination_s.F90 @@ -149,6 +149,7 @@ subroutine run_callbacks(is_error_stop, quiet, stop_code_int, stop_code_char) type(callback_entry), pointer :: next_entry + call caf_acquire_exit_lock() next_entry => callback_list do while (associated(next_entry)) call next_entry%callback(is_error_stop, quiet, stop_code_int, stop_code_char) diff --git a/src/caffeine/sync_stmt_s.F90 b/src/caffeine/sync_stmt_s.F90 index 3186f4a4..73294351 100644 --- a/src/caffeine/sync_stmt_s.F90 +++ b/src/caffeine/sync_stmt_s.F90 @@ -110,7 +110,8 @@ img = caf_image_to_initial( current_team%info%gex_team, img ) call caf_event_wait(c_loc(si_evt(img)), 1_c_int64_t, & segment_boundary=0, & - acquire_fence=merge(1,0,i==u)) + acquire_fence=merge(1,0,i==u), & + maybe_concurrent=0) end do if (present(stat)) stat = 0 diff --git a/src/caffeine/teams_s.F90 b/src/caffeine/teams_s.F90 index bbd91fa1..ac00d29e 100644 --- a/src/caffeine/teams_s.F90 +++ b/src/caffeine/teams_s.F90 @@ -1,7 +1,7 @@ ! Copyright (c), The Regents of the University of California ! Terms of use are as specified in LICENSE.txt -#include "language-support.F90" +#include "version.h" submodule(prif:prif_private_s) teams_s ! DO NOT ADD USE STATEMENTS HERE diff --git a/src/prif.F90 b/src/prif.F90 index dfc063a4..3bc90fce 100644 --- a/src/prif.F90 +++ b/src/prif.F90 @@ -2,6 +2,7 @@ ! Terms of use are as specified in LICENSE.txt #include "language-support.F90" +#include "version.h" module prif diff --git a/test/julienne-driver.F90 b/test/julienne-driver.F90 index c907381a..f202eed0 100644 --- a/test/julienne-driver.F90 +++ b/test/julienne-driver.F90 @@ -20,6 +20,7 @@ program test_suite_driver use prif_allocate_test_m, only : prif_allocate_test_t use prif_coarray_inquiry_test_m, only : prif_coarray_inquiry_test_t use prif_teams_test_m, only : prif_teams_test_t + use prif_threaded_test_m, only : prif_threaded_test_t use prif_rma_test_m, only : prif_rma_test_t use prif_strided_test_m, only : prif_strided_test_t use prif_event_test_m, only : prif_event_test_t @@ -69,6 +70,7 @@ program test_suite_driver ! complicated multi-feature tests ,test_fixture_t( prif_teams_test_t() ) & + ,test_fixture_t( prif_threaded_test_t() ) & ! exit tests ,test_fixture_t( prif_error_stop_test_t() ) & diff --git a/test/prif_allocate_test.F90 b/test/prif_allocate_test.F90 index 5e3efe72..36e59866 100644 --- a/test/prif_allocate_test.F90 +++ b/test/prif_allocate_test.F90 @@ -1,4 +1,5 @@ #include "test-utils.F90" +#include "version.h" #include "language-support.F90" module prif_allocate_test_m diff --git a/test/prif_threaded_test.F90 b/test/prif_threaded_test.F90 new file mode 100644 index 00000000..447d96fa --- /dev/null +++ b/test/prif_threaded_test.F90 @@ -0,0 +1,179 @@ +#include "test-utils.F90" + +module prif_threaded_test_m +# include "test-uses-alloc.F90" + use iso_c_binding, only: c_funptr, c_funloc, c_f_procpointer + use prif, only: & + prif_notify_type, prif_notify_wait, prif_put_with_notify, & + prif_this_image_no_coarray, prif_num_images, & + prif_sync_all + use julienne_m, only: test_description_t, test_diagnosis_t, test_result_t, test_t, usher + + implicit none + private + public :: prif_threaded_test_t + + type, extends(test_t) :: prif_threaded_test_t + contains + procedure, nopass, non_overridable :: subject + procedure, nopass, non_overridable :: results + end type + + abstract interface + pure subroutine pure_allocate_i(size_in_bytes, allocated_memory, stat, errmsg, errmsg_alloc) + import c_size_t, c_int, c_ptr + integer(c_size_t), intent(in) :: size_in_bytes + type(c_ptr), intent(out) :: allocated_memory + integer(c_int), intent(out), optional :: stat + character(len=*), intent(inout), optional :: errmsg + character(len=:), intent(inout), allocatable , optional :: errmsg_alloc + end subroutine + end interface + + abstract interface + pure subroutine pure_deallocate_i(mem, stat, errmsg, errmsg_alloc) + import c_int, c_ptr + type(c_ptr), intent(in) :: mem + integer(c_int), intent(out), optional :: stat + character(len=*), intent(inout), optional :: errmsg + character(len=:), intent(inout), allocatable , optional :: errmsg_alloc + end subroutine + end interface + + abstract interface + pure subroutine pure_notify_wait_i(notify_var_ptr, until_count, stat, errmsg, errmsg_alloc) + import c_int64_t, c_int, c_ptr + type(c_ptr), intent(in) :: notify_var_ptr + integer(c_int64_t), intent(in), optional :: until_count + integer(c_int), intent(out), optional :: stat + character(len=*), intent(inout), optional :: errmsg + character(len=:), intent(inout), allocatable , optional :: errmsg_alloc + end subroutine + end interface + +contains + pure function subject() + character(len=:), allocatable :: subject + subject = "PRIF Thread-safe operations" + end function + + function results() result(test_results) + type(test_result_t), allocatable :: test_results(:) + type(prif_threaded_test_t) prif_threaded_test + + allocate(test_results, source = prif_threaded_test%run([ & + test_description_t("prif_allocate in do concurrent" & +# if CAF_THREAD_SAFE + ,usher(check_allocate) & +# endif + ) & + ,test_description_t("prif_notify_wait in do concurrent" & +# if CAF_THREAD_SAFE + ,usher(check_notify) & +# endif + ) & + ])) + end function + + function check_allocate() result(diag) + type(test_diagnosis_t) diag + integer, parameter :: lim = 10000 + integer(c_size_t), parameter :: sz = 64 + integer :: i + type(c_funptr) :: fp + procedure(pure_allocate_i), pointer :: pure_allocate + procedure(pure_deallocate_i), pointer :: pure_deallocate + + ! workaround the fact that PRIF routines are not pure, + ! but a compiler client might still invoke them from within do concurrent + fp = c_funloc(prif_allocate) + call c_f_procpointer(fp, pure_allocate) + fp = c_funloc(prif_deallocate) + call c_f_procpointer(fp, pure_deallocate) + + do concurrent(i = 1:lim) + block + type(c_ptr) :: p + call pure_allocate(sz, p) + call pure_deallocate(p) + end block + end do + + diag = .true. + end function + + function check_notify() result(diag) + type(test_diagnosis_t) diag + + integer :: me, num_imgs + type(prif_notify_type) :: dummy_notify + integer(c_size_t) :: sizeof_notify, sizeof_int + type(prif_coarray_handle) :: coarray_handle_evt + type(prif_coarray_handle) :: coarray_handle_ctr + type(c_ptr) :: allocated_memory + type(prif_notify_type), pointer :: local_evt + integer, pointer :: local_ctr + type(c_funptr) :: fp + procedure(pure_notify_wait_i), pointer :: pure_notify_wait + + fp = c_funloc(prif_notify_wait) + call c_f_procpointer(fp, pure_notify_wait) + + diag = .true. + sizeof_notify = int(storage_size(dummy_notify)/8, c_size_t) + sizeof_int = c_sizeof(me) + call prif_num_images(num_images=num_imgs) + call prif_this_image_no_coarray(this_image=me) + + ! type(notify_type) :: evt[*] + call prif_allocate_coarray( & + lcobounds = [1_c_int64_t], & + ucobounds = [int(num_imgs,c_int64_t)], & + size_in_bytes = sizeof_notify, & + final_func = c_null_funptr, & + coarray_handle = coarray_handle_evt, & + allocated_memory = allocated_memory) + call c_f_pointer(allocated_memory, local_evt) + local_evt = dummy_notify ! default initialize + + ! integer :: ctr[*] + call prif_allocate_coarray( & + lcobounds = [1_c_int64_t], & + ucobounds = [int(num_imgs,c_int64_t)], & + size_in_bytes = sizeof_int, & + final_func = c_null_funptr, & + coarray_handle = coarray_handle_ctr, & + allocated_memory = allocated_memory) + call c_f_pointer(allocated_memory, local_ctr) + local_ctr = 0 ! initialize + + call prif_sync_all + + block + integer, parameter :: lim = 10000 + integer, target :: i + + do i=1, lim + ! every image writes to itself with notify + ! ctr[me, notify=evt] = i + call prif_put_with_notify( & + image_num = me, & + coarray_handle = coarray_handle_ctr, & + offset = 0_c_size_t, & + current_image_buffer = c_loc(i), & + size_in_bytes = sizeof_int, & + notify_coarray_handle = coarray_handle_evt, & + notify_offset = 0_c_size_t) + end do + + do concurrent(i = 1:lim) + ! NOTIFY WAIT( evt, UNTIL_COUNT=1 ) + call pure_notify_wait(c_loc(local_evt), until_count=1_c_int64_t) + end do + + end block + + call prif_deallocate_coarrays(([coarray_handle_ctr, coarray_handle_evt])) + end function + +end module diff --git a/test/test-uses-alloc.F90 b/test/test-uses-alloc.F90 index 47378c8c..6ac3b42e 100644 --- a/test/test-uses-alloc.F90 +++ b/test/test-uses-alloc.F90 @@ -5,7 +5,7 @@ #ifndef CAF_INCLUDED_TEST_USES_ALLOC #define CAF_INCLUDED_TEST_USES_ALLOC -#include "language-support.F90" +#include "version.h" use prif, only : & prif_allocate_coarray, &