| Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
| PendingMessageQueue |
|
| 3.5789473684210527;3.579 |
| 1 | /* | |
| 2 | * #%L | |
| 3 | * PendingMessageQueue.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
| 4 | * %% | |
| 5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
| 6 | * %% | |
| 7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
| 8 | * you may not use this file except in compliance with the License. | |
| 9 | * You may obtain a copy of the License at | |
| 10 | * | |
| 11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
| 12 | * | |
| 13 | * Unless required by applicable law or agreed to in writing, software | |
| 14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
| 15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 16 | * See the License for the specific language governing permissions and | |
| 17 | * limitations under the License. | |
| 18 | * #L% | |
| 19 | */ | |
| 20 | package com.allanbank.mongodb.client.message; | |
| 21 | ||
| 22 | import java.util.List; | |
| 23 | import java.util.concurrent.TimeUnit; | |
| 24 | import java.util.concurrent.atomic.AtomicInteger; | |
| 25 | import java.util.concurrent.locks.Condition; | |
| 26 | import java.util.concurrent.locks.Lock; | |
| 27 | import java.util.concurrent.locks.ReentrantLock; | |
| 28 | ||
| 29 | import com.allanbank.mongodb.LockType; | |
| 30 | import com.allanbank.mongodb.client.Message; | |
| 31 | import com.allanbank.mongodb.client.callback.ReplyCallback; | |
| 32 | ||
| 33 | /** | |
| 34 | * PendingMessageQueue provides an optimized queue for pending messages inspired | |
| 35 | * by the Disruptor project. | |
| 36 | * <p> | |
| 37 | * To reduce thread contention the queue uses a set of integer values to track | |
| 38 | * the position of the ready messages (the last message that is ready to be | |
| 39 | * read), reserve (the first message that can be reserved to be written to), and | |
| 40 | * the take (the next (first) message to be read). For an infinite queue the | |
| 41 | * following invariant holds: <blockquote> | |
| 42 | * | |
| 43 | * <pre> | |
| 44 | * <code> | |
| 45 | * take < readyBefore <= reserve | |
| 46 | * </code> | |
| 47 | * </pre> | |
| 48 | * | |
| 49 | * </blockquote> To make handling a limited size queue easier the size of the | |
| 50 | * queue is forced to power of 2 less than {@value #MAX_SIZE}. The roll over can | |
| 51 | * then be handled with a simple mask operation. | |
| 52 | * </p> | |
| 53 | * <p> | |
| 54 | * Rather than allocate a pending message per request we use an array of | |
| 55 | * pre-allocated PendingMessages and copy the data into and out of the objects. | |
| 56 | * this has a net positive effect on object allocation and garbage collection | |
| 57 | * time at the cost of a longer initialization. | |
| 58 | * </p> | |
| 59 | * <p> | |
| 60 | * Lastly, This queue assumes there is a single consumer of messages. This is | |
| 61 | * true for the driver's use case but don't copy the code and expect it to work | |
| 62 | * with multiple consumers. The consumer should use the following basic | |
| 63 | * structure: <blockquote> | |
| 64 | * | |
| 65 | * <pre> | |
| 66 | * <code> | |
| 67 | * PendingMessage pm = new {@link PendingMessage}(); | |
| 68 | * | |
| 69 | * queue.take(pm); // Blocks. | |
| 70 | * // Handle the message. | |
| 71 | * | |
| 72 | * // or | |
| 73 | * | |
| 74 | * if( queue.poll(pm) ) { // Non-blocking. | |
| 75 | * // Handle The Message. | |
| 76 | * } | |
| 77 | * </code> | |
| 78 | * </pre> | |
| 79 | * | |
| 80 | * </blockquote> | |
| 81 | * </p> | |
| 82 | * <p> | |
| 83 | * <b>Warning: </b> This class has been carefully tuned for the driver's use | |
| 84 | * case. Changes should be carefully bench marked and tested. Comments have been | |
| 85 | * embedded in the source indicating attempted changes and reverts. Due to its | |
| 86 | * position in the driver subtle changes in this class can cause large changes | |
| 87 | * in the performance of the driver. | |
| 88 | * </p> | |
| 89 | * | |
| 90 | * @see <a href="http://code.google.com/p/disruptor/">Disruptor Project</a> | |
| 91 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
| 92 | * mutated in incompatible ways between any two releases of the driver. | |
| 93 | * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved | |
| 94 | */ | |
| 95 | public final class PendingMessageQueue { | |
| 96 | ||
| 97 | /** The mask for constraining the size the message id. */ | |
| 98 | public static final long MAX_MESSAGE_ID_MASK = 0x0FFFFFFF; | |
| 99 | ||
| 100 | /** | |
| 101 | * The maximum size of the queue. This it currently 2^20 but must be at most | |
| 102 | * 2^30 to ensure masking works. | |
| 103 | */ | |
| 104 | public static final int MAX_SIZE = (1 << 20); | |
| 105 | ||
| 106 | /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */ | |
| 107 | 1 | public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100; |
| 108 | ||
| 109 | /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */ | |
| 110 | 1 | public static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1; |
| 111 | ||
| 112 | /** Number of times to spin before trying something different. */ | |
| 113 | private static final int SPIN_ITERATIONS = 10000; | |
| 114 | ||
| 115 | /** The condition used with the queue being full or empty. */ | |
| 116 | private final Condition myCondition; | |
| 117 | ||
| 118 | /** The mutex used with the queue. */ | |
| 119 | private final Lock myLock; | |
| 120 | ||
| 121 | /** The lock type to use with the queue. */ | |
| 122 | private final LockType myLockType; | |
| 123 | ||
| 124 | /** Tracks how many times we have looped through the ring buffer. */ | |
| 125 | private final AtomicInteger myLooped; | |
| 126 | ||
| 127 | /** The mask being used. */ | |
| 128 | private final int myMask; | |
| 129 | ||
| 130 | /** The queue of pending messages. */ | |
| 131 | private final PendingMessage[] myQueue; | |
| 132 | ||
| 133 | /** | |
| 134 | * The position of the last message that is ready to be taken. | |
| 135 | * <p> | |
| 136 | * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the | |
| 137 | * queue is empty. | |
| 138 | * </p> | |
| 139 | */ | |
| 140 | private final AtomicInteger myReadyBeforePosition; | |
| 141 | ||
| 142 | /** | |
| 143 | * The position of the next message that can be reserved. | |
| 144 | * <p> | |
| 145 | * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the | |
| 146 | * queue is full. | |
| 147 | * </p> | |
| 148 | */ | |
| 149 | private final AtomicInteger myReservePosition; | |
| 150 | ||
| 151 | /** | |
| 152 | * The position of the next message that can be taken. | |
| 153 | * <p> | |
| 154 | * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the | |
| 155 | * queue is full. | |
| 156 | * </p> | |
| 157 | * <p> | |
| 158 | * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the | |
| 159 | * queue is empty. | |
| 160 | * </p> | |
| 161 | */ | |
| 162 | private volatile int myTakePosition; | |
| 163 | ||
| 164 | /** Tracks how many threads are waiting for a message or a space to open. */ | |
| 165 | private final AtomicInteger myWaiting; | |
| 166 | ||
| 167 | /** | |
| 168 | * Creates a new PendingMessageQueue. | |
| 169 | * | |
| 170 | * @param size | |
| 171 | * The size of the queue to create. | |
| 172 | * @param lockType | |
| 173 | * The lock type to use with the queue. | |
| 174 | */ | |
| 175 | 254 | public PendingMessageQueue(final int size, final LockType lockType) { |
| 176 | 254 | int power = size; |
| 177 | 254 | if (MAX_SIZE < size) { |
| 178 | 2 | power = MAX_SIZE; |
| 179 | } | |
| 180 | 252 | else if (Integer.bitCount(size) != 1) { |
| 181 | // Find the next larger power of 2. | |
| 182 | 2 | power = 1; |
| 183 | 22 | while ((power < size) && (power != 0)) { |
| 184 | 20 | power <<= 1; |
| 185 | } | |
| 186 | } | |
| 187 | ||
| 188 | 254 | myLockType = lockType; |
| 189 | 254 | myQueue = new PendingMessage[power]; |
| 190 | 2347774 | for (int i = 0; i < myQueue.length; ++i) { |
| 191 | 2347520 | myQueue[i] = new PendingMessage(0, null); |
| 192 | } | |
| 193 | 254 | myMask = (power - 1); |
| 194 | ||
| 195 | 254 | myLooped = new AtomicInteger(0); |
| 196 | 254 | myTakePosition = -1; |
| 197 | 254 | myReadyBeforePosition = new AtomicInteger(0); |
| 198 | 254 | myReservePosition = new AtomicInteger(0); |
| 199 | 254 | myWaiting = new AtomicInteger(0); |
| 200 | ||
| 201 | 254 | myLock = new ReentrantLock(); |
| 202 | 254 | myCondition = myLock.newCondition(); |
| 203 | 254 | } |
| 204 | ||
| 205 | /** | |
| 206 | * Returns the size of the queue. | |
| 207 | * | |
| 208 | * @return The size of the queue. | |
| 209 | */ | |
| 210 | public int capacity() { | |
| 211 | 54260 | return myQueue.length - 1; |
| 212 | } | |
| 213 | ||
| 214 | /** | |
| 215 | * Drains the list of pending messages into the provided list. | |
| 216 | * | |
| 217 | * @param pending | |
| 218 | * The list to add all of the pending messages to. | |
| 219 | */ | |
| 220 | public void drainTo(final List<PendingMessage> pending) { | |
| 221 | 2 | PendingMessage pm = new PendingMessage(); |
| 222 | 2048 | while (poll(pm)) { |
| 223 | 2046 | pending.add(pm); |
| 224 | 2046 | pm = new PendingMessage(); |
| 225 | } | |
| 226 | 2 | } |
| 227 | ||
| 228 | /** | |
| 229 | * Returns true if the queue is empty. e.g., the next take position is the | |
| 230 | * read before position. | |
| 231 | * | |
| 232 | * @return If the queue is empty. | |
| 233 | */ | |
| 234 | public boolean isEmpty() { | |
| 235 | 10467 | final int take = myTakePosition; |
| 236 | 10467 | final int readyBefore = myReadyBeforePosition.get(); |
| 237 | ||
| 238 | 10467 | return (readyBefore == take) || (take < 0); |
| 239 | } | |
| 240 | ||
| 241 | /** | |
| 242 | * Puts a message onto the queue. This method will not block waiting for a | |
| 243 | * space to add the message. | |
| 244 | * | |
| 245 | * @param message | |
| 246 | * The message to add. | |
| 247 | * @param replyCallback | |
| 248 | * The callback for the message to add. | |
| 249 | * @return True if the message was added, false otherwise. | |
| 250 | */ | |
| 251 | public boolean offer(final Message message, | |
| 252 | final ReplyCallback replyCallback) { | |
| 253 | ||
| 254 | 0 | final int loop = myLooped.get(); |
| 255 | 0 | final int reserve = offer(); |
| 256 | 0 | if (reserve < 0) { |
| 257 | 0 | return false; |
| 258 | } | |
| 259 | ||
| 260 | 0 | final int messageid = toMessageId(loop, reserve); |
| 261 | ||
| 262 | 0 | myQueue[reserve].set(messageid, message, replyCallback); |
| 263 | ||
| 264 | 0 | markReady(reserve); |
| 265 | ||
| 266 | 0 | return true; |
| 267 | } | |
| 268 | ||
| 269 | /** | |
| 270 | * Puts a message onto the queue. This method will not block waiting for a | |
| 271 | * space to add the message. | |
| 272 | * | |
| 273 | * @param pendingMessage | |
| 274 | * The message to add. | |
| 275 | * @return True if the message was added, false otherwise. | |
| 276 | */ | |
| 277 | public boolean offer(final PendingMessage pendingMessage) { | |
| 278 | 8464 | final int reserve = offer(); |
| 279 | 8464 | if (reserve < 0) { |
| 280 | 2 | return false; |
| 281 | } | |
| 282 | ||
| 283 | 8462 | myQueue[reserve].set(pendingMessage); |
| 284 | ||
| 285 | 8462 | markReady(reserve); |
| 286 | ||
| 287 | 8462 | return true; |
| 288 | } | |
| 289 | ||
| 290 | /** | |
| 291 | * Returns the next message from the queue without blocking. <blockquote> | |
| 292 | * | |
| 293 | * <pre> | |
| 294 | * <code> | |
| 295 | * PendingMessage pm = new PendingMessage(); | |
| 296 | * if( queue.poll(pm) } { | |
| 297 | * // Handle the message copied into pm. | |
| 298 | * } | |
| 299 | * </code> | |
| 300 | * </pre> | |
| 301 | * | |
| 302 | * </blockquote> | |
| 303 | * | |
| 304 | * @param copyOut | |
| 305 | * The {@link PendingMessage} to copy the pending message into. | |
| 306 | * @return True if the pending message was updated. | |
| 307 | */ | |
| 308 | public boolean poll(final PendingMessage copyOut) { | |
| 309 | 344965415 | boolean result = false; |
| 310 | 344965415 | final int take = myTakePosition; |
| 311 | 344965415 | if ((myReadyBeforePosition.get() != take) && (take >= 0)) { // Empty, |
| 312 | // Not | |
| 313 | // started? | |
| 314 | 484240 | copyOut.set(myQueue[take]); |
| 315 | 484240 | myQueue[take].clear(); |
| 316 | 484240 | result = true; |
| 317 | ||
| 318 | 484240 | myTakePosition = increment(take); |
| 319 | 484240 | notifyWaiters(false); |
| 320 | } | |
| 321 | ||
| 322 | 344965415 | return result; |
| 323 | } | |
| 324 | ||
| 325 | /** | |
| 326 | * Puts a message onto the queue. This method will block waiting for a space | |
| 327 | * to add the message. | |
| 328 | * | |
| 329 | * @param message | |
| 330 | * The message to add. | |
| 331 | * @param replyCallback | |
| 332 | * The callback for the message to add. | |
| 333 | * | |
| 334 | * @throws InterruptedException | |
| 335 | * If the thread is interrupted while waiting for the message. | |
| 336 | * If thrown the message will not have been enqueued. | |
| 337 | */ | |
| 338 | public void put(final Message message, final ReplyCallback replyCallback) | |
| 339 | throws InterruptedException { | |
| 340 | ||
| 341 | 24586 | int loop = myLooped.get(); |
| 342 | 24586 | int reserve = offer(); |
| 343 | 24586 | if (reserve < 0) { |
| 344 | ||
| 345 | // Spinning here appears to slow things down. | |
| 346 | ||
| 347 | // Block. | |
| 348 | try { | |
| 349 | 2 | myWaiting.incrementAndGet(); |
| 350 | 2 | myLock.lock(); |
| 351 | ||
| 352 | 2 | loop = myLooped.get(); |
| 353 | 2 | reserve = offer(); |
| 354 | 2 | while (reserve < 0) { |
| 355 | 2 | myCondition.await(); |
| 356 | 0 | loop = myLooped.get(); |
| 357 | 0 | reserve = offer(); |
| 358 | } | |
| 359 | } | |
| 360 | finally { | |
| 361 | 2 | myLock.unlock(); |
| 362 | 2 | myWaiting.decrementAndGet(); |
| 363 | 0 | } |
| 364 | } | |
| 365 | ||
| 366 | 24584 | final int messageid = toMessageId(loop, reserve); |
| 367 | ||
| 368 | 24584 | myQueue[reserve].set(messageid, message, replyCallback); |
| 369 | ||
| 370 | 24584 | markReady(reserve); |
| 371 | 24584 | } |
| 372 | ||
| 373 | /** | |
| 374 | * Puts two messages onto the queue. This method will block waiting for a | |
| 375 | * space to add the messages but ensures the messages are in sequence in the | |
| 376 | * queue. | |
| 377 | * | |
| 378 | * @param message | |
| 379 | * The first message to add. | |
| 380 | * @param replyCallback | |
| 381 | * The callback for the first message to add. | |
| 382 | * @param message2 | |
| 383 | * The second message to add. | |
| 384 | * @param replyCallback2 | |
| 385 | * The callback for the second message to add. | |
| 386 | * | |
| 387 | * @throws InterruptedException | |
| 388 | * If the thread is interrupted while waiting for the message. | |
| 389 | * If thrown neither message will have been enqueued. | |
| 390 | */ | |
| 391 | public void put(final Message message, final ReplyCallback replyCallback, | |
| 392 | final Message message2, final ReplyCallback replyCallback2) | |
| 393 | throws InterruptedException { | |
| 394 | 3073 | int loop = myLooped.get(); |
| 395 | 3073 | int reserve = offer2(); |
| 396 | 3073 | if (reserve < 0) { |
| 397 | ||
| 398 | // Spinning here appears to slow things down. | |
| 399 | try { | |
| 400 | 2 | myWaiting.incrementAndGet(); |
| 401 | 2 | myLock.lock(); |
| 402 | ||
| 403 | 2 | loop = myLooped.get(); |
| 404 | 2 | reserve = offer2(); |
| 405 | 2 | while (reserve < 0) { |
| 406 | 2 | myCondition.await(); |
| 407 | 0 | loop = myLooped.get(); |
| 408 | 0 | reserve = offer2(); |
| 409 | } | |
| 410 | } | |
| 411 | finally { | |
| 412 | 2 | myLock.unlock(); |
| 413 | 2 | myWaiting.decrementAndGet(); |
| 414 | 0 | } |
| 415 | } | |
| 416 | ||
| 417 | // Use reserve + 1 for the second message id since it may have looped | |
| 418 | // and then the math does not work out causing messageId2 to be lower | |
| 419 | // than | |
| 420 | // messageId1, which is bad. | |
| 421 | 3071 | final int messageId1 = toMessageId(loop, reserve); |
| 422 | 3071 | final int messageId2 = toMessageId(loop, reserve + 1); |
| 423 | ||
| 424 | 3071 | final int second = increment(reserve); |
| 425 | 3071 | myQueue[reserve].set(messageId1, message, replyCallback); |
| 426 | 3071 | myQueue[second].set(messageId2, message2, replyCallback2); |
| 427 | ||
| 428 | 3071 | markReady2(reserve); |
| 429 | 3071 | } |
| 430 | ||
| 431 | /** | |
| 432 | * Puts a message onto the queue. This method will block waiting for a space | |
| 433 | * to add the message. | |
| 434 | * | |
| 435 | * @param pendingMessage | |
| 436 | * The message to add. | |
| 437 | * | |
| 438 | * @throws InterruptedException | |
| 439 | * If the thread is interrupted while waiting for the message. | |
| 440 | * If thrown the message will not have been enqueued. | |
| 441 | */ | |
| 442 | public void put(final PendingMessage pendingMessage) | |
| 443 | throws InterruptedException { | |
| 444 | 452738 | int reserve = offer(); |
| 445 | 453154 | if (reserve < 0) { |
| 446 | ||
| 447 | // Spinning here appears to slow things down. | |
| 448 | ||
| 449 | try { | |
| 450 | 16162 | myWaiting.incrementAndGet(); |
| 451 | 16170 | myLock.lock(); |
| 452 | ||
| 453 | 16178 | reserve = offer(); |
| 454 | 199767 | while (reserve < 0) { |
| 455 | 183591 | myCondition.await(); |
| 456 | 183589 | reserve = offer(); |
| 457 | } | |
| 458 | } | |
| 459 | finally { | |
| 460 | 16178 | myLock.unlock(); |
| 461 | 16178 | myWaiting.decrementAndGet(); |
| 462 | 16176 | } |
| 463 | } | |
| 464 | ||
| 465 | 453234 | myQueue[reserve].set(pendingMessage); |
| 466 | ||
| 467 | 453222 | markReady(reserve); |
| 468 | 453188 | } |
| 469 | ||
| 470 | /** | |
| 471 | * Returns the number of messages in the queue. | |
| 472 | * | |
| 473 | * @return The number of messages in the queue. | |
| 474 | */ | |
| 475 | public int size() { | |
| 476 | 11267 | final int take = myTakePosition; |
| 477 | 11267 | final int ready = myReadyBeforePosition.get(); |
| 478 | ||
| 479 | 11267 | if (take < 0) { |
| 480 | 3 | return 0; |
| 481 | } | |
| 482 | 11264 | else if (take <= ready) { |
| 483 | 11256 | return (ready - take); |
| 484 | } | |
| 485 | ||
| 486 | 8 | return (myQueue.length - take) + ready; |
| 487 | } | |
| 488 | ||
| 489 | /** | |
| 490 | * Returns the next message from the queue and will block waiting for a | |
| 491 | * message. | |
| 492 | * | |
| 493 | * @param copyOut | |
| 494 | * The {@link PendingMessage} to copy the pending message into. | |
| 495 | * @throws InterruptedException | |
| 496 | * If the thread is interrupted while waiting for the message. | |
| 497 | */ | |
| 498 | public void take(final PendingMessage copyOut) throws InterruptedException { | |
| 499 | 477875 | if (!poll(copyOut)) { |
| 500 | ||
| 501 | // Spin/yeild loop. | |
| 502 | 87099 | if (myLockType == LockType.LOW_LATENCY_SPIN) { |
| 503 | 59623 | long now = 0; |
| 504 | 59623 | long spinDeadline = 1; |
| 505 | 59623 | long yeildDeadline = 1; |
| 506 | 86217 | while (now < yeildDeadline) { |
| 507 | 344454520 | for (int i = 0; i < SPIN_ITERATIONS; ++i) { |
| 508 | 344427926 | if (poll(copyOut)) { |
| 509 | 58587 | return; |
| 510 | } | |
| 511 | } | |
| 512 | ||
| 513 | // Pause? | |
| 514 | 26594 | now = System.nanoTime(); |
| 515 | 26594 | if (spinDeadline == 1) { |
| 516 | 10009 | spinDeadline = now + SPIN_TIME_NS; |
| 517 | 10009 | yeildDeadline = now + YIELD_TIME_NS; |
| 518 | // First time free pass. | |
| 519 | } | |
| 520 | else { | |
| 521 | 16585 | if ((spinDeadline < now) && (now < yeildDeadline)) { |
| 522 | 15549 | Thread.yield(); |
| 523 | } | |
| 524 | } | |
| 525 | } | |
| 526 | } | |
| 527 | ||
| 528 | // Block. | |
| 529 | try { | |
| 530 | 28512 | myWaiting.incrementAndGet(); |
| 531 | 28512 | myLock.lock(); |
| 532 | ||
| 533 | 52998 | while (!poll(copyOut)) { |
| 534 | 24533 | myCondition.await(); |
| 535 | } | |
| 536 | } | |
| 537 | finally { | |
| 538 | 28512 | myLock.unlock(); |
| 539 | 28512 | myWaiting.decrementAndGet(); |
| 540 | 28465 | } |
| 541 | } | |
| 542 | 419241 | } |
| 543 | ||
| 544 | /** | |
| 545 | * Increments the index handling roll-over. | |
| 546 | * | |
| 547 | * @param index | |
| 548 | * The value to increment. | |
| 549 | * @return The incremented value. | |
| 550 | */ | |
| 551 | protected int increment(final int index) { | |
| 552 | 1571997 | return ((index + 1) & myMask); |
| 553 | } | |
| 554 | ||
| 555 | /** | |
| 556 | * Marks the position as ready by incrementing the ready position to the | |
| 557 | * provided position. This method uses a spin lock assuming any other | |
| 558 | * threads will increment the ready position quickly to the position just | |
| 559 | * before {@code index}. | |
| 560 | * | |
| 561 | * @param index | |
| 562 | * The index of the ready message. | |
| 563 | */ | |
| 564 | protected void markReady(final int index) { | |
| 565 | 486227 | final int after = increment(index); |
| 566 | ||
| 567 | 488498 | while (!myReadyBeforePosition.compareAndSet(index, after)) { |
| 568 | // Spinning here slows things down because we know that the other | |
| 569 | // thread should be runnable. Always Yield. | |
| 570 | 2470 | Thread.yield(); |
| 571 | } | |
| 572 | ||
| 573 | // Pull take position into the queue. | |
| 574 | 486231 | if ((index == 0) && (myTakePosition == -1)) { |
| 575 | 174 | myTakePosition = index; |
| 576 | } | |
| 577 | ||
| 578 | 486232 | notifyWaiters(false); |
| 579 | 486223 | } |
| 580 | ||
| 581 | /** | |
| 582 | * Marks the position and the next position as ready by incrementing the | |
| 583 | * ready position to the provided position + 1. This method uses a spin lock | |
| 584 | * assuming any other threads will increment the ready position quickly to | |
| 585 | * the position just before {@code index}. | |
| 586 | * | |
| 587 | * @param index | |
| 588 | * The index of the ready message. | |
| 589 | */ | |
| 590 | protected void markReady2(final int index) { | |
| 591 | 3071 | final int after = increment(index); |
| 592 | 3071 | final int twoAfter = increment(after); |
| 593 | ||
| 594 | 3071 | while (!myReadyBeforePosition.compareAndSet(index, twoAfter)) { |
| 595 | // Just keep swimming... | |
| 596 | 0 | Thread.yield(); |
| 597 | } | |
| 598 | ||
| 599 | // Pull take position into the queue. | |
| 600 | 3071 | if ((index == 0) && (myTakePosition == -1)) { |
| 601 | 3 | myTakePosition = index; |
| 602 | } | |
| 603 | ||
| 604 | // If someone is waiting let them know we created two messages. | |
| 605 | 3071 | notifyWaiters(true); |
| 606 | 3071 | } |
| 607 | ||
| 608 | /** | |
| 609 | * Notifies the waiting threads that the state of the queue has changed. | |
| 610 | * | |
| 611 | * @param all | |
| 612 | * If true then all threads will be woken. Otherwise only a | |
| 613 | * single thread is woken. | |
| 614 | */ | |
| 615 | protected void notifyWaiters(final boolean all) { | |
| 616 | 966117 | if (myWaiting.get() > 0) { |
| 617 | try { | |
| 618 | 523124 | myLock.lock(); |
| 619 | 523382 | if (all) { |
| 620 | 1 | myCondition.signalAll(); |
| 621 | } | |
| 622 | else { | |
| 623 | 523381 | myCondition.signal(); |
| 624 | } | |
| 625 | } | |
| 626 | finally { | |
| 627 | 523382 | myLock.unlock(); |
| 628 | 523283 | } |
| 629 | } | |
| 630 | 967782 | } |
| 631 | ||
| 632 | /** | |
| 633 | * Checks if there is remove for another message. If so returns the index of | |
| 634 | * the message to update. If not return a value less then zero. | |
| 635 | * | |
| 636 | * @return The position of the message that can be updated or a value of | |
| 637 | * less than zero if the queue is full. | |
| 638 | */ | |
| 639 | protected int offer() { | |
| 640 | 684936 | int result = -1; |
| 641 | 684997 | final int reserve = myReservePosition.get(); |
| 642 | 685368 | final int next = increment(reserve); |
| 643 | 685428 | if ((myTakePosition != next) /* Full? */ |
| 644 | && myReservePosition.compareAndSet(reserve, next)) { | |
| 645 | ||
| 646 | // Got a slot. | |
| 647 | 486280 | result = reserve; |
| 648 | ||
| 649 | // Check if we looped. | |
| 650 | 486282 | if (next < reserve) { |
| 651 | 1754 | myLooped.incrementAndGet(); |
| 652 | } | |
| 653 | } | |
| 654 | 685680 | return result; |
| 655 | } | |
| 656 | ||
| 657 | /** | |
| 658 | * Checks if there is remove for another two message. If so returns the | |
| 659 | * index of the first message to update. If not return a value less then | |
| 660 | * zero. | |
| 661 | * | |
| 662 | * @return The position of the first message that can be updated or a value | |
| 663 | * of less than zero if the queue is full. | |
| 664 | */ | |
| 665 | protected int offer2() { | |
| 666 | 3075 | int result = -1; |
| 667 | 3075 | final int reserve = myReservePosition.get(); |
| 668 | 3075 | final int first = increment(reserve); |
| 669 | 3075 | final int second = increment(first); |
| 670 | 3075 | final int take = myTakePosition; |
| 671 | 3075 | if ((take != first) && (take != second) /* Full? */ |
| 672 | && myReservePosition.compareAndSet(reserve, second)) { | |
| 673 | ||
| 674 | // Got two slots. Return the first. | |
| 675 | 3071 | result = reserve; |
| 676 | ||
| 677 | // Check if we looped. | |
| 678 | 3071 | if (second < reserve) { |
| 679 | 4 | myLooped.incrementAndGet(); |
| 680 | } | |
| 681 | } | |
| 682 | 3075 | return result; |
| 683 | } | |
| 684 | ||
| 685 | /** | |
| 686 | * Computes a new message id based on the current loop and reserve spot in | |
| 687 | * the queue. | |
| 688 | * | |
| 689 | * @param loop | |
| 690 | * The number of time the queue has looped over the queue. | |
| 691 | * @param reserve | |
| 692 | * The reserved position in the queue. This can be a virtual | |
| 693 | * postion. | |
| 694 | * @return The message id to use. | |
| 695 | */ | |
| 696 | private int toMessageId(final int loop, final long reserve) { | |
| 697 | 30726 | final long loopOffset = (((long) loop) * myQueue.length); |
| 698 | 30726 | if (loopOffset > MAX_MESSAGE_ID_MASK) { |
| 699 | 0 | myLooped.compareAndSet(loop, 0); |
| 700 | } | |
| 701 | // Add an extra 1 so the first value is 1 instead of zero. | |
| 702 | 30726 | return (int) ((loopOffset + reserve) & MAX_MESSAGE_ID_MASK) + 1; |
| 703 | } | |
| 704 | } |