PNXMPPFramework/Extensions/XEP-0198/XMPPStreamManagement.m
2016-02-24 16:56:39 +01:00

1930 lines
52 KiB
Objective-C

#import "XMPPStreamManagement.h"
#import "XMPPStreamManagementStanzas.h"
#import "XMPPInternal.h"
#import "XMPPTimer.h"
#import "XMPPLogging.h"
#import "NSNumber+XMPP.h"
#if ! __has_feature(objc_arc)
#warning This file must be compiled with ARC. Use -fobjc-arc flag (or convert project to ARC).
#endif
// Log levels: off, error, warn, info, verbose
// Log flags: trace
#if DEBUG
static const int xmppLogLevel = XMPP_LOG_LEVEL_WARN;
#else
static const int xmppLogLevel = XMPP_LOG_LEVEL_WARN;
#endif
/**
* Define various xmlns values.
**/
#define XMLNS_STREAM_MANAGEMENT @"urn:xmpp:sm:3"
/**
* Seeing a return statements within an inner block
* can sometimes be mistaken for a return point of the enclosing method.
* This makes inline blocks a bit easier to read.
**/
#define return_from_block return
@implementation XMPPStreamManagement
{
// Storage module (may be nil)
id <XMPPStreamManagementStorage> storage;
// State machine
BOOL isStarted; // either <enabled/> or <resumed/> received from server
BOOL enableQueued; // the <enable/> element is queued in xmppStream
BOOL enableSent; // the <enable/> element has been sent through xmppStream
BOOL wasCleanDisconnect; // xmppStream sent </stream:stream>
BOOL didAttemptResume;
BOOL didResume;
NSXMLElement *resume_response;
NSArray *resume_stanzaIds;
NSDate *disconnectDate;
// Configuration
BOOL autoResume;
NSUInteger autoRequest_stanzaCount;
NSTimeInterval autoRequest_timeout;
NSUInteger autoAck_stanzaCount;
NSTimeInterval autoAck_timeout;
NSTimeInterval ackResponseDelay;
// Enable
uint32_t requestedMax;
// Tracking outgoing stanzas
uint32_t lastHandledByServer; // last h value received from server
NSMutableArray *unackedByServer; // array of XMPPStreamManagementOutgoingStanza objects
NSUInteger unackedByServer_lastRequestOffset; // represents point at which we last sent a request
NSArray *prev_unackedByServer; // from previous connection, used when resuming session
NSMutableArray *unprocessedReceivedAcks; // acks received from server that we haven't processed yet
XMPPTimer *autoRequestTimer; // timer to fire a request
// Tracking incoming stanzas
uint32_t lastHandledByClient; // latest h value we can send to the server
NSMutableArray *unackedByClient; // array of XMPPStreamManagementIncomingStanza objects
NSUInteger unackedByClient_lastAckOffset; // number of items removed from array, but ack not sent to server
NSMutableArray *pendingHandledStanzaIds;// edge case handling
NSUInteger outstandingStanzaIds; // edge case handling + defensive programming
XMPPTimer *autoAckTimer; // timer to fire ack at server
XMPPTimer *ackResponseTimer; // timer for ackResponseDelay
}
@synthesize storage = storage;
- (id)init
{
// This will cause a crash - it's designed to.
// Only the init methods listed in XMPPStreamManagement.h are supported.
return [self initWithStorage:nil dispatchQueue:NULL];
}
- (id)initWithDispatchQueue:(dispatch_queue_t)queue
{
// This will cause a crash - it's designed to.
// Only the init methods listed in XMPPStreamManagement.h are supported.
return [self initWithStorage:nil dispatchQueue:queue];
}
- (id)initWithStorage:(id <XMPPStreamManagementStorage>)inStorage
{
return [self initWithStorage:inStorage dispatchQueue:NULL];
}
- (id)initWithStorage:(id <XMPPStreamManagementStorage>)inStorage dispatchQueue:(dispatch_queue_t)queue
{
if ((self = [super initWithDispatchQueue:queue]))
{
if ([inStorage configureWithParent:self queue:moduleQueue]) {
storage = inStorage;
}
else {
XMPPLogError(@"%@: %@ - Unable to configure storage!", THIS_FILE, THIS_METHOD);
}
unackedByServer = [[NSMutableArray alloc] init];
unackedByClient = [[NSMutableArray alloc] init];
}
return self;
}
- (NSSet *)xep0198Elements
{
return [NSSet setWithObjects:@"r", @"a", @"enable", @"enabled", @"resume", @"resumed", @"failed", nil];
}
- (void)didActivate
{
[xmppStream registerCustomElementNames:[self xep0198Elements]];
}
- (void)didDeactivate
{
[xmppStream unregisterCustomElementNames:[self xep0198Elements]];
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark Configuration
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- (BOOL)autoResume
{
XMPPLogTrace();
__block BOOL result = NO;
dispatch_block_t block = ^{
result = autoResume;
};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_sync(moduleQueue, block);
return result;
}
- (void)setAutoResume:(BOOL)newAutoResume
{
XMPPLogTrace();
dispatch_block_t block = ^{
autoResume = newAutoResume;
};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
- (void)automaticallyRequestAcksAfterStanzaCount:(NSUInteger)stanzaCount orTimeout:(NSTimeInterval)timeout
{
XMPPLogTrace();
dispatch_block_t block = ^{ @autoreleasepool{
autoRequest_stanzaCount = stanzaCount;
autoRequest_timeout = MAX(0.0, timeout);
if (autoRequestTimer) {
[autoRequestTimer updateTimeout:autoRequest_timeout fromOriginalStartTime:YES];
}
if (isStarted) {
[self maybeRequestAck];
}
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
- (void)getAutomaticallyRequestAcksAfterStanzaCount:(NSUInteger *)stanzaCountPtr orTimeout:(NSTimeInterval *)timeoutPtr
{
XMPPLogTrace();
__block NSUInteger stanzaCount = 0;
__block NSTimeInterval timeout = 0.0;
dispatch_block_t block = ^{
stanzaCount = autoRequest_stanzaCount;
timeout = autoRequest_timeout;
};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_sync(moduleQueue, block);
if (stanzaCountPtr) *stanzaCountPtr = stanzaCount;
if (timeoutPtr) *timeoutPtr = timeout;
}
- (void)automaticallySendAcksAfterStanzaCount:(NSUInteger)stanzaCount orTimeout:(NSTimeInterval)timeout
{
XMPPLogTrace();
dispatch_block_t block = ^{ @autoreleasepool{
autoAck_stanzaCount = stanzaCount;
autoAck_timeout = MAX(0.0, timeout);
if (autoAckTimer) {
[autoAckTimer updateTimeout:autoAck_timeout fromOriginalStartTime:YES];
}
if (isStarted) {
[self maybeSendAck];
}
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
- (void)getAutomaticallySendAcksAfterStanzaCount:(NSUInteger *)stanzaCountPtr orTimeout:(NSTimeInterval *)timeoutPtr
{
XMPPLogTrace();
__block NSUInteger stanzaCount = 0;
__block NSTimeInterval timeout = 0.0;
dispatch_block_t block = ^{
stanzaCount = autoAck_stanzaCount;
timeout = autoAck_timeout;
};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_sync(moduleQueue, block);
if (stanzaCountPtr) *stanzaCountPtr = stanzaCount;
if (timeoutPtr) *timeoutPtr = timeout;
}
- (NSTimeInterval)ackResponseDelay
{
XMPPLogTrace();
__block NSUInteger delay = 0.0;
dispatch_block_t block = ^{
delay = ackResponseDelay;
};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_sync(moduleQueue, block);
return delay;
}
- (void)setAckResponseDelay:(NSTimeInterval)delay
{
XMPPLogTrace();
dispatch_block_t block = ^{
ackResponseDelay = delay;
};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark Enable
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* This method sends the <enable> stanza to the server to request enabling stream management.
*
* XEP-0198 specifies that the <enable> stanza should only be sent by clients after authentication,
* and after binding has occurred.
*
* The servers response is reported via the delegate methods:
* @see xmppStreamManagement:wasEnabled:
* @see xmppStreamManagement:wasNotEnabled:
*
* @param supportsResumption
* Whether the client should request resumptions support.
* If YES, the resume attribute will be included. E.g. <enable resume='true'/>
*
* @param maxTimeout
* Allows you to specify the client's preferred maximum resumption time.
* This is optional, and will only be sent if you provide a positive value (maxTimeout > 0.0).
* Note that XEP-0198 only supports sending this value in seconds.
* So it the provided maxTimeout includes millisecond precision, this will be ignored via truncation
* (rounding down to nearest whole seconds value).
*
* @see supportsStreamManagement
**/
- (void)enableStreamManagementWithResumption:(BOOL)supportsResumption maxTimeout:(NSTimeInterval)maxTimeout
{
dispatch_block_t block = ^{ @autoreleasepool{
if (isStarted)
{
XMPPLogWarn(@"Stream management is already enabled/resumed.");
return;
}
if (enableQueued || enableSent)
{
XMPPLogWarn(@"Stream management is already started (pending response from server).");
return;
}
// State transition cleanup
[unackedByServer removeAllObjects];
unackedByServer_lastRequestOffset = 0;
[unackedByClient removeAllObjects];
unackedByClient_lastAckOffset = 0;
unprocessedReceivedAcks = nil;
pendingHandledStanzaIds = nil;
outstandingStanzaIds = 0;
// Send enable stanza:
//
// <enable xmlns='urn:xmpp:sm:3' ... />
NSXMLElement *enable = [NSXMLElement elementWithName:@"enable" xmlns:XMLNS_STREAM_MANAGEMENT];
if (supportsResumption) {
[enable addAttributeWithName:@"resume" stringValue:@"true"];
}
if (maxTimeout > 0.0) {
[enable addAttributeWithName:@"max" stringValue:[NSString stringWithFormat:@"%.0f", maxTimeout]];
}
[xmppStream sendElement:enable];
enableQueued = YES;
requestedMax = (maxTimeout > 0.0) ? (uint32_t)maxTimeout : (uint32_t)0;
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark Resume
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Utility method for handling canResume logic.
**/
- (BOOL)canResumeStreamWithResumptionId:(NSString *)resumptionId
timeout:(uint32_t)timeout
lastDisconnect:(NSDate *)lastDisconnect
{
if (resumptionId == nil) {
XMPPLogVerbose(@"%@: Cannot resume stream: resumptionId is nil", THIS_FILE);
return NO;
}
if (lastDisconnect == nil) {
XMPPLogVerbose(@"%@: Cannot resume stream: lastDisconnect is nil", THIS_FILE);
return NO;
}
NSTimeInterval elapsed = [lastDisconnect timeIntervalSinceNow] * -1.0;
if (elapsed < 0.0) // lastDisconnect is in the future ?
{
XMPPLogVerbose(@"%@: Cannot resume stream: invalid lastDisconnect - appears to be in future", THIS_FILE);
return NO;
}
if ((uint32_t)elapsed > timeout) // too much time has elapsed
{
XMPPLogVerbose(@"%@: Cannot resume stream: elapsed(%u) > timeout(%u)", THIS_FILE, (uint32_t)elapsed, timeout);
return NO;
}
return YES;
}
/**
* Returns YES if the stream can be resumed.
*
* This would be the case if there's an available resumptionId for the authenticated xmppStream,
* and the timeout from the last stream has not been exceeded.
**/
- (BOOL)canResumeStream
{
XMPPLogTrace();
// This is a PUBLIC method
__block BOOL result = NO;
dispatch_block_t block = ^{ @autoreleasepool{
if (isStarted || enableQueued || enableSent) {
return_from_block;
}
NSString *resumptionId = nil;
uint32_t timeout = 0;
NSDate *lastDisconnect = nil;
[storage getResumptionId:&resumptionId
timeout:&timeout
lastDisconnect:&lastDisconnect
forStream:xmppStream];
result = [self canResumeStreamWithResumptionId:resumptionId timeout:timeout lastDisconnect:lastDisconnect];
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_sync(moduleQueue, block);
return result;
}
/**
* Internal method that handles sending the <resume/> element, and the corresponding state transition.
**/
- (void)sendResumeRequestWithResumptionId:(NSString *)resumptionId
{
XMPPLogTrace();
dispatch_block_t block = ^{ @autoreleasepool {
// State transition cleanup
[unackedByServer removeAllObjects];
unackedByServer_lastRequestOffset = 0;
[unackedByClient removeAllObjects];
unackedByClient_lastAckOffset = 0;
unprocessedReceivedAcks = nil;
pendingHandledStanzaIds = nil;
outstandingStanzaIds = 0;
// Restore our state from the last stream
uint32_t newLastHandledByClient = 0;
uint32_t newLastHandledByServer = 0;
NSArray *pendingOutgoingStanzas = nil;
[storage getLastHandledByClient:&newLastHandledByClient
lastHandledByServer:&newLastHandledByServer
pendingOutgoingStanzas:&pendingOutgoingStanzas
forStream:xmppStream];
lastHandledByClient = newLastHandledByClient;
lastHandledByServer = newLastHandledByServer;
if ([pendingOutgoingStanzas count] > 0) {
prev_unackedByServer = [[NSMutableArray alloc] initWithArray:pendingOutgoingStanzas copyItems:YES];
}
XMPPLogVerbose(@"%@: Attempting to resume: lastHandledByClient(%u) lastHandledByServer(%u)",
THIS_FILE, lastHandledByClient, lastHandledByServer);
// Send the resume stanza:
//
// <resume h='lastHandledByClient' previd='resumptionId'/>
NSXMLElement *resume = [NSXMLElement elementWithName:@"resume" xmlns:XMLNS_STREAM_MANAGEMENT];
[resume addAttributeWithName:@"previd" stringValue:resumptionId];
[resume addAttributeWithName:@"h" stringValue:[NSString stringWithFormat:@"%u", lastHandledByClient]];
[xmppStream sendBindElement:resume];
didAttemptResume = YES;
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
/**
* Internal method to handle processing a resumed response from the server.
**/
- (void)processResumed:(NSXMLElement *)resumed
{
XMPPLogTrace();
dispatch_block_t block = ^{ @autoreleasepool {
uint32_t h = [resumed attributeUInt32ValueForName:@"h" withDefaultValue:lastHandledByServer];
uint32_t diff;
if (h >= lastHandledByServer)
diff = h - lastHandledByServer;
else
diff = (UINT32_MAX - lastHandledByServer) + h;
// IMPORTATNT:
// This code path uses prev_unackedByServer (NOT unackedByServer).
// This is because the ack has to do with stanzas sent from the previous connection.
if (diff > [prev_unackedByServer count])
{
XMPPLogWarn(@"Unexpected h value from resume: lastH=%lu, newH=%lu, numPendingStanzas=%lu",
(unsigned long)lastHandledByServer,
(unsigned long)h,
(unsigned long)[prev_unackedByServer count]);
diff = (uint32_t)[prev_unackedByServer count];
}
NSMutableArray *stanzaIds = [NSMutableArray arrayWithCapacity:(NSUInteger)diff];
for (uint32_t i = 0; i < diff; i++)
{
XMPPStreamManagementOutgoingStanza *outgoingStanza = prev_unackedByServer[(NSUInteger) i];
if (outgoingStanza.stanzaId) {
[stanzaIds addObject:outgoingStanza.stanzaId];
}
}
lastHandledByServer = h;
XMPPLogVerbose(@"%@: processResumed: lastHandledByServer(%u)", THIS_FILE, lastHandledByServer);
isStarted = YES;
didResume = YES;
prev_unackedByServer = nil;
resume_response = resumed;
resume_stanzaIds = [stanzaIds copy];
// Update storage
[storage setLastDisconnect:[NSDate date]
lastHandledByServer:lastHandledByServer
pendingOutgoingStanzas:nil
forStream:xmppStream];
// Notify delegate
[multicastDelegate xmppStreamManagement:self didReceiveAckForStanzaIds:stanzaIds];
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
/**
* This method is meant to be called by other extensions when they receive an xmppStreamDidAuthenticate callback.
*
* Returns YES if the stream was resumed during the authentication process.
* Returns NO otherwise (if resume wasn't available, or it failed).
*
* Other extensions may wish to skip certain setup processes that aren't
* needed if the stream was resumed (since the previous session state has been restored server-side).
**/
- (BOOL)didResume
{
__block BOOL result = NO;
dispatch_block_t block = ^{
result = didResume;
};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_sync(moduleQueue, block);
return result;
}
/**
* This method is meant to be called when you receive an xmppStreamDidAuthenticate callback.
*
* It is used instead of a standard delegate method in order to provide a cleaner API.
* By using this method, one can put all the logic for handling authentication in a single place.
* But more importantly, it solves several subtle timing and threading issues.
*
* > A delegate method could have hit either before or after xmppStreamDidAuthenticate, depending on thread scheduling.
* > We could have queued it up, and forced it to hit after.
* > But your code would likely still have needed to add a check within xmppStreamDidAuthenticate...
*
* @param stanzaIdsPtr (optional)
* Just like the stanzaIdsPtr provided in xmppStreamManagement:didReceiveAckForStanzaIds:.
* This comes from the h value provided within the <resumed h='X'/> stanza sent by the server.
*
* @param responsePtr (optional)
* Returns the response we got from the server. Either <resumed/> or <failed/>.
* This will be nil if resume wasn't tried.
*
* @return
* YES if the stream was resumed.
* NO otherwise.
**/
- (BOOL)didResumeWithAckedStanzaIds:(NSArray **)stanzaIdsPtr
serverResponse:(NSXMLElement **)responsePtr
{
__block BOOL result = NO;
__block NSArray *stanzaIds = nil;
__block NSXMLElement *response = nil;
dispatch_block_t block = ^{
result = didResume;
stanzaIds = resume_stanzaIds;
response = resume_response;
};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_sync(moduleQueue, block);
if (stanzaIdsPtr) *stanzaIdsPtr = stanzaIds;
if (responsePtr) *responsePtr = response;
return result;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark XMPPCustomBinding Protocol
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Attempts to start the custom binding process.
*
* If it isn't possible to start the process (perhaps due to missing information),
* this method should return XMPP_BIND_FAIL and set an appropriate error message.
*
* If binding isn't needed (for example, because custom SASL authentication already handled it),
* this method should return XMPP_BIND_SUCCESS.
* In this case, xmppStream will immediately move to its post-binding operations.
*
* Otherwise this method should send whatever stanzas are needed to begin the binding process.
* And then return XMPP_BIND_CONTINUE.
*
* This method is called by automatically XMPPStream.
* You MUST NOT invoke this method manually.
**/
- (XMPPBindResult)start:(NSError **)errPtr
{
XMPPLogTrace();
// Fetch the resumptionId,
// and check to see if we can resume the stream.
NSString *resumptionId = nil;
uint32_t timeout = 0;
NSDate *lastDisconnect = nil;
[storage getResumptionId:&resumptionId
timeout:&timeout
lastDisconnect:&lastDisconnect
forStream:xmppStream];
if (![self canResumeStreamWithResumptionId:resumptionId timeout:timeout lastDisconnect:lastDisconnect])
{
return XMPP_BIND_FAIL_FALLBACK;
}
// Start the resume proces
[self sendResumeRequestWithResumptionId:resumptionId];
return XMPP_BIND_CONTINUE;
}
/**
* After the custom binding process has started, all incoming xmpp stanzas are routed to this method.
* The method should process the stanza as appropriate, and return the coresponding result.
* If the process is not yet complete, it should return XMPP_BIND_CONTINUE,
* meaning the xmpp stream will continue to forward all incoming xmpp stanzas to this method.
*
* This method is called automatically by XMPPStream.
* You MUST NOT invoke this method manually.
**/
- (XMPPBindResult)handleBind:(NSXMLElement *)element withError:(NSError **)errPtr
{
XMPPLogTrace();
NSString *elementName = [element name];
if ([elementName isEqualToString:@"resumed"])
{
[self processResumed:element];
return XMPP_BIND_SUCCESS;
}
else
{
if (![elementName isEqualToString:@"failed"]) {
XMPPLogError(@"%@: Received unrecognized response from server: %@", THIS_METHOD, element);
}
dispatch_async(moduleQueue, ^{ @autoreleasepool {
didResume = NO;
resume_response = element;
prev_unackedByServer = nil;
}});
return XMPP_BIND_FAIL_FALLBACK;
}
}
/**
* Optionally implement this method to override the default behavior.
* By default behavior, we mean the behavior normally taken by xmppStream, which is:
*
* - IF the server includes <session xmlns='urn:ietf:params:xml:ns:xmpp-session'/> in its stream:features
* - AND xmppStream.skipStartSession property is NOT set
* - THEN xmppStream will send the session start request, and await the response before transitioning to authenticated
*
* Thus if you implement this method and return YES, then xmppStream will skip starting a session,
* regardless of the stream:features and the current xmppStream.skipStartSession property value.
*
* If you implement this method and return NO, then xmppStream will follow the default behavior detailed above.
* This means that, even if this method returns NO, the xmppStream may still skip starting a session if
* the server doesn't require it via its stream:features,
* or if the user has explicitly forbidden it via the xmppStream.skipStartSession property.
*
* The default value is NO.
**/
- (BOOL)shouldSkipStartSessionAfterSuccessfulBinding
{
return YES;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark Requesting Acks
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Sends a request <r/> element, requesting the server reply with an ack <a h='lastHandled'/>.
*
* You can also configure the extension to automatically sends requests.
* @see automaticallyRequestAcksAfterStanzaCount:orTimeout:
*
* When the server replies with an ack, the delegate method will be invoked.
* @see xmppStreamManagement:didReceiveAckForStanzaIds:
**/
- (void)requestAck
{
XMPPLogTrace();
// This is a PUBLIC method
dispatch_block_t block = ^{ @autoreleasepool{
if (isStarted || enableQueued || enableSent)
{
[self _requestAck];
}
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
- (void)_requestAck
{
XMPPLogTrace();
if (isStarted || enableQueued || enableSent)
{
// Send the XML element
NSXMLElement *r = [NSXMLElement elementWithName:@"r" xmlns:XMLNS_STREAM_MANAGEMENT];
[xmppStream sendElement:r];
// Reset offset
unackedByServer_lastRequestOffset = [unackedByServer count];
}
[autoRequestTimer cancel];
autoRequestTimer = nil;
}
- (BOOL)maybeRequestAck
{
XMPPLogTrace();
if (!isStarted && !(enableQueued || enableSent))
{
// cannot request ack if not started (or at least sent <enable/>)
return NO;
}
if ((autoRequest_stanzaCount == 0) && (autoRequest_timeout == 0.0))
{
// auto request disabled
return NO;
}
NSUInteger pending = [unackedByServer count] - unackedByServer_lastRequestOffset;
if (pending == 0)
{
// nothing new to request
return NO;
}
if ((autoRequest_stanzaCount > 0) && (pending >= autoRequest_stanzaCount))
{
[self _requestAck];
return YES;
}
else if ((autoRequest_timeout > 0.0) && (autoRequestTimer == nil))
{
__weak id weakSelf = self;
autoRequestTimer = [[XMPPTimer alloc] initWithQueue:moduleQueue eventHandler:^{ @autoreleasepool{
[weakSelf _requestAck];
}}];
[autoRequestTimer startWithTimeout:autoRequest_timeout interval:0];
}
return NO;
}
/**
* This method is invoked from one of the xmppStream:didSendX: methods.
**/
- (void)processSentElement:(XMPPElement *)element
{
XMPPLogTrace();
SEL selector = @selector(xmppStreamManagement:stanzaIdForSentElement:);
if (![multicastDelegate hasDelegateThatRespondsToSelector:selector])
{
// There are not any delegates that respond to the selector.
// So the stanzaId is the elementId (if there is one).
NSString *elementId = [element elementID];
XMPPStreamManagementOutgoingStanza *stanza =
[[XMPPStreamManagementOutgoingStanza alloc] initWithStanzaId:elementId];
[unackedByServer addObject:stanza];
[self updateStoredPendingOutgoingStanzas];
// At bottom of this method:
// [self maybeRequestAck];
}
else
{
// We need to query the delegate(s) to see if there's a specific stanzaId for this element.
// This is an asynchronous process, so we put a placeholder in the array for now.
XMPPStreamManagementOutgoingStanza *stanza =
[[XMPPStreamManagementOutgoingStanza alloc] initAwaitingStanzaId];
[unackedByServer addObject:stanza];
// Start the asynchronous process to find the proper stanzaId
GCDMulticastDelegateEnumerator *enumerator = [multicastDelegate delegateEnumerator];
dispatch_queue_t concurrentQ = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(concurrentQ, ^{ @autoreleasepool {
id stanzaId = nil;
id delegate = nil;
dispatch_queue_t dq = NULL;
while ([enumerator getNextDelegate:&delegate delegateQueue:&dq forSelector:selector])
{
stanzaId = [delegate xmppStreamManagement:self stanzaIdForSentElement:element];
if (stanzaId)
{
break;
}
}
if (stanzaId == nil)
{
stanzaId = [element elementID];
}
dispatch_async(moduleQueue, ^{ @autoreleasepool{
// Set the stanzaId.
stanza.stanzaId = stanzaId;
stanza.awaitingStanzaId = NO;
// It's possible that we received an ack from the sever (acking our stanza)
// before we were able to determine its stanzaId.
// This edge case is handled by storing the ack in the pendingAcks array for later processing.
// We may be able to process it now.
BOOL dequeuedPendingAck = NO;
while ([unprocessedReceivedAcks count] > 0)
{
NSXMLElement *ack = unprocessedReceivedAcks[0];
if ([self processReceivedAck:ack])
{
[unprocessedReceivedAcks removeObjectAtIndex:0];
dequeuedPendingAck = YES;
}
else
{
break;
}
}
if (!dequeuedPendingAck)
{
[self updateStoredPendingOutgoingStanzas];
}
}});
}});
}
XMPPLogVerbose(@"%@: processSentElement (%@): lastHandledByServer(%u) pending(%lu)",
THIS_FILE, [element name], lastHandledByServer, (unsigned long)[unackedByServer count]);
[self maybeRequestAck];
}
/**
* This method is invoked when an ack <a h='lastHandled'/> arrives.
*
* It attempts to process the ack.
* That is, there should be adequate outgoing stanzas (in the unackedByServer array) which have a set stanzaId.
*
* Because stanzaId's are set by the delegate(s), its possible (although unlikely) that we receive an ack before
* the delegate tells us the proper stanzaId for a sent element. When this occurs, we won't be able to completely
* process the ack. However, this method will process as many as possible (while maintaining serial order).
*
* @return
* YES if the ack can be marked as 100% processed.
* NO otherwise (if we're still awaiting a stanzaId from a delegate),
* in which case the caller MUST store the ack in the unprocessedReceivedAcks array.
**/
- (BOOL)processReceivedAck:(NSXMLElement *)ack
{
XMPPLogTrace();
uint32_t h = 0;
if (![NSNumber xmpp_parseString:[ack attributeStringValueForName:@"h"] intoUInt32:&h])
{
XMPPLogError(@"Error parsing h value from ack: %@", [ack compactXMLString]);
return YES;
}
uint32_t diff;
if (h >= lastHandledByServer)
diff = h - lastHandledByServer;
else
diff = (UINT32_MAX - lastHandledByServer) + h;
if (diff == 0)
{
// shortcut: server is reporting no new stanzas have been processed
return YES;
}
if (diff > [unackedByServer count])
{
XMPPLogWarn(@"Unexpected h value from ack: lastH=%lu, newH=%lu, numPendingStanzas=%lu",
(unsigned long)lastHandledByServer,
(unsigned long)h,
(unsigned long)[unackedByServer count]);
diff = (uint32_t)[unackedByServer count];
}
BOOL canProcessEntireAck = YES;
NSUInteger processed = 0;
NSMutableArray *stanzaIds = [NSMutableArray arrayWithCapacity:(NSUInteger)diff];
for (uint32_t i = 0; i < diff; i++)
{
XMPPStreamManagementOutgoingStanza *outgoingStanza = unackedByServer[(NSUInteger) i];
if ([outgoingStanza awaitingStanzaId])
{
canProcessEntireAck = NO;
break;
}
else
{
if (outgoingStanza.stanzaId) {
[stanzaIds addObject:outgoingStanza.stanzaId];
}
processed++;
}
}
if (canProcessEntireAck || processed > 0)
{
if (canProcessEntireAck)
{
[unackedByServer removeObjectsInRange:NSMakeRange(0, (NSUInteger)diff)];
if (unackedByServer_lastRequestOffset > diff)
unackedByServer_lastRequestOffset -= diff;
else
unackedByServer_lastRequestOffset = 0;
lastHandledByServer = h;
XMPPLogVerbose(@"%@: processReceivedAck (fully processed): lastHandledByServer(%u) pending(%lu)",
THIS_FILE, lastHandledByServer, (unsigned long)[unackedByServer count]);
}
else // if (processed > 0)
{
[unackedByServer removeObjectsInRange:NSMakeRange(0, processed)];
if (unackedByServer_lastRequestOffset > processed)
unackedByServer_lastRequestOffset -= processed;
else
unackedByServer_lastRequestOffset = 0;
lastHandledByServer += processed;
XMPPLogVerbose(@"%@: processReceivedAck (partially processed): lastHandledByServer(%u) pending(%lu)",
THIS_FILE, lastHandledByServer, (unsigned long)[unackedByServer count]);
}
// Update storage
NSArray *pending = [[NSArray alloc] initWithArray:unackedByServer copyItems:YES];
if (isStarted)
{
[storage setLastDisconnect:[NSDate date]
lastHandledByServer:lastHandledByServer
pendingOutgoingStanzas:pending
forStream:xmppStream];
}
else // edge case
{
[storage setLastDisconnect:disconnectDate
lastHandledByClient:lastHandledByClient
lastHandledByServer:lastHandledByServer
pendingOutgoingStanzas:pending
forStream:xmppStream];
}
// Notify delegate
[multicastDelegate xmppStreamManagement:self didReceiveAckForStanzaIds:stanzaIds];
}
else
{
XMPPLogVerbose(@"%@: processReceivedAck (unprocessed): lastHandledByServer(%u) pending(%lu)",
THIS_FILE, lastHandledByServer, (unsigned long)[unackedByServer count]);
}
return canProcessEntireAck;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark Sending Acks
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Sends an unrequested ack <a h='lastHandled'/> element, acking the server's recently received (and handled) elements.
*
* You can also configure the extension to automatically sends acks.
* @see automaticallySendAcksAfterStanzaCount:orTimeout:
*
* Keep in mind that the extension will automatically send an ack if it receives an explicit request.
**/
- (void)sendAck
{
XMPPLogTrace();
// This is a PUBLIC method
dispatch_block_t block = ^{ @autoreleasepool{
if (isStarted)
{
[self _sendAck];
}
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
/**
* Sends the ack <a h='x'/> element, and discards newly acked stanzas from the queue.
**/
- (void)_sendAck
{
NSUInteger pending = 0;
for (XMPPStreamManagementIncomingStanza *stanza in unackedByClient)
{
if (stanza.isHandled)
pending++;
else
break;
}
if (pending > 0)
{
[unackedByClient removeObjectsInRange:NSMakeRange(0, pending)];
unackedByClient_lastAckOffset += pending;
lastHandledByClient += pending;
XMPPLogVerbose(@"%@: sendAck: lastHandledByClient(%u) inc(%lu) totalPending(%lu)", THIS_FILE,
lastHandledByClient,
(unsigned long)pending,
(unsigned long)unackedByClient_lastAckOffset);
// Update info in storage.
if (isStarted)
{
[storage setLastDisconnect:[NSDate date]
lastHandledByClient:lastHandledByClient
forStream:xmppStream];
}
else // edge case
{
// An incoming stanza got markedAsHandled post-disconnect
NSArray *pending = [[NSArray alloc] initWithArray:unackedByServer copyItems:YES];
[storage setLastDisconnect:disconnectDate
lastHandledByClient:lastHandledByClient
lastHandledByServer:lastHandledByServer
pendingOutgoingStanzas:pending
forStream:xmppStream];
}
}
if (isStarted)
{
// Send the XML element
NSXMLElement *a = [NSXMLElement elementWithName:@"a" xmlns:XMLNS_STREAM_MANAGEMENT];
NSString *h = [NSString stringWithFormat:@"%u", (unsigned int)lastHandledByClient];
[a addAttributeWithName:@"h" stringValue:h];
[xmppStream sendElement:a];
// Reset offset
unackedByClient_lastAckOffset = 0;
}
// Stop the timer(s)
[autoAckTimer cancel];
autoAckTimer = nil;
[ackResponseTimer cancel];
ackResponseTimer = nil;
}
/**
* Returns the number of incoming stanzas that have been handled on our side,
* but which we haven't yet sent an ack to the server.
**/
- (NSUInteger)numIncomingStanzasThatCanBeAcked
{
// What is unackedByClient_lastAckOffset ?
//
// In the method maybeUpdateStoredLastHandledByClient,
// we remove items from the unackedByClient array, and increase the lastHandledByClient value.
// But we do NOT actually send an ack to the server at this point.
//
// Thus unackedByClient_lastAckOffset represents the number of items we're removed from the unackedByClient array,
// and for which we still need to send an ack to the server.
NSUInteger count = unackedByClient_lastAckOffset;
for (XMPPStreamManagementIncomingStanza *stanza in unackedByClient)
{
if (stanza.isHandled)
count++;
else
break;
}
return count;
}
/**
* Returns the number of incoming stanzas that cannot yet be acked because
* - the stanza hasn't been marked as handled yet
* - or a preceeding stanza has hasn't been marked as handled yet
**/
- (NSUInteger)numIncomingStanzasThatCannnotBeAcked
{
BOOL foundUnhandledStanza = NO;
NSUInteger count = 0;
for (XMPPStreamManagementIncomingStanza *stanza in unackedByClient)
{
if (foundUnhandledStanza)
{
count++;
}
else if (!stanza.isHandled)
{
foundUnhandledStanza = YES;
count++;
}
}
return count;
}
/**
* Sends an ack if needed (if pending meets/exceeds autoAck_stanzaCount).
**/
- (BOOL)maybeSendAck
{
XMPPLogTrace();
if (!isStarted)
{
// cannot send acks if we're not started
return NO;
}
if ((autoAck_stanzaCount == 0) && (autoAck_timeout == 0.0))
{
// auto ack disabled
return NO;
}
NSUInteger pending = [self numIncomingStanzasThatCanBeAcked];
if (pending == 0)
{
// nothing new to ack
return NO;
}
// Send ack according to autoAck configuration
if ((autoAck_stanzaCount > 0) && (pending >= autoAck_stanzaCount))
{
[self _sendAck];
return YES;
}
else if ((autoAck_timeout > 0.0) && (autoAckTimer == nil))
{
__weak id weakSelf = self;
autoAckTimer = [[XMPPTimer alloc] initWithQueue:moduleQueue eventHandler:^{ @autoreleasepool{
[weakSelf sendAck];
}}];
[autoAckTimer startWithTimeout:autoAck_timeout interval:0];
}
return NO;
}
- (void)markHandledStanzaId:(id)stanzaId
{
XMPPLogTrace();
if (stanzaId == nil) return;
dispatch_block_t block = ^{ @autoreleasepool {
// It's theoretically possible that the delegate(s) returned the same stanzaId for multiple elements.
// Although this is strongly discouraged, we should try to do our best to handle such a situation logically.
//
// In light of this edge case, here are the rules:
//
// Find the first stanza in the queue that is
// - not already marked as handled
// - has a matching stanzaId
//
// Mark this as handled, and then break.
//
// We also check to see if marking this stanza as handled has increased the pending count.
// For example (using the following queue):
//
// 0) <stanzaId=ABC, handled=YES>
// 1) <stanzaId=DEF, handled=NO > // <-- marking as handled increases pendingCount from 1 to 2
// 2) <stanzaId=GHI, handled=NO > // <-- marking as handled doesn't change pendingCount (still 1)
BOOL found = NO;
for (XMPPStreamManagementIncomingStanza *stanza in unackedByClient)
{
if (stanza.isHandled)
{
// continue
}
else if ([stanza.stanzaId isEqual:stanzaId])
{
stanza.isHandled = YES;
found = YES;
break;
}
}
if (found)
{
if (![self maybeSendAck])
{
[self maybeUpdateStoredLastHandledByClient];
}
}
else
{
// Edge case:
//
// The stanzaId was marked as handled before we finished figuring out what the stanzaId is.
//
// In order to get the stanzaId for a received element, we go through an asynchronous process.
// It's possible (but unlikely) that this process ends up taking longer than it does for the app
// to actually "handle" the element. So we have this odd edge case,
// which we handle by queuing up the stanzaId for later processing.
if (outstandingStanzaIds > 0)
{
if (pendingHandledStanzaIds == nil)
pendingHandledStanzaIds = [[NSMutableArray alloc] init];
[pendingHandledStanzaIds addObject:stanzaId];
}
}
}};
if (dispatch_get_specific(moduleQueueTag))
block();
else
dispatch_async(moduleQueue, block);
}
- (void)processReceivedElement:(XMPPElement *)element
{
XMPPLogTrace();
NSAssert(isStarted, @"State machine exception");
SEL selector = @selector(xmppStreamManagement:getIsHandled:stanzaId:forReceivedElement:);
if (![multicastDelegate hasDelegateThatRespondsToSelector:selector])
{
// None of the delegates implement the method.
// Use a shortcut.
XMPPStreamManagementIncomingStanza *stanza =
[[XMPPStreamManagementIncomingStanza alloc] initWithStanzaId:nil isHandled:YES];
[unackedByClient addObject:stanza];
// Since we know the element is 'handled' we can immediately check to see if we need to send an ack
if (![self maybeSendAck])
{
[self maybeUpdateStoredLastHandledByClient];
}
}
else
{
// We need to query the delegate(s) to see if the stanza can be marked as handled.
// This is an asynchronous process, so we put a placeholder in the array for now.
//
// Note: stanza.isHandled == NO
XMPPStreamManagementIncomingStanza *stanza =
[[XMPPStreamManagementIncomingStanza alloc] initWithStanzaId:nil isHandled:NO];
[unackedByClient addObject:stanza];
// Query the delegate(s). The Rules:
//
// If ANY of the delegates says the element is "not handled", then we can immediately set it as so.
// Otherwise the element will be marked as handled.
GCDMulticastDelegateEnumerator *enumerator = [multicastDelegate delegateEnumerator];
outstandingStanzaIds++;
dispatch_queue_t concurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(concurrentQueue, ^{ @autoreleasepool
{
__block BOOL isHandled = YES;
__block id stanzaId = nil;
id delegate;
dispatch_queue_t dq;
while (isHandled && [enumerator getNextDelegate:&delegate delegateQueue:&dq forSelector:selector])
{
dispatch_sync(dq, ^{ @autoreleasepool {
[delegate xmppStreamManagement:self
getIsHandled:&isHandled
stanzaId:&stanzaId
forReceivedElement:element];
NSAssert(isHandled || stanzaId != nil,
@"You MUST return a stanzaId for any elements you mark as not-yet-handled");
}});
}
dispatch_async(moduleQueue, ^{ @autoreleasepool
{
if (isHandled)
{
stanza.isHandled = YES;
}
else
{
stanza.stanzaId = stanzaId;
// Check for edge case:
// - stanzaId was marked as handled before we figured out what the stanzaId was
if ([pendingHandledStanzaIds count] > 0)
{
NSUInteger i = 0;
for (id pendingStanzaId in pendingHandledStanzaIds)
{
if ([pendingStanzaId isEqual:stanzaId])
{
[pendingHandledStanzaIds removeObjectAtIndex:i];
stanza.isHandled = YES;
break;
}
i++;
}
}
}
// Defensive programming.
// Don't let this array grow infinitely big (if markHandledStanzaId is being invoked incorrectly).
if (--outstandingStanzaIds == 0) {
[pendingHandledStanzaIds removeAllObjects];
}
if (stanza.isHandled)
{
if (![self maybeSendAck])
{
[self maybeUpdateStoredLastHandledByClient];
}
}
}});
}});
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark Storage Helpers
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* This method is used when the pendingStanzaIds have changed (ivar unackedByServer changed),
* but we weren't able to process an ack, or update the lastHandledByServer.
**/
- (void)updateStoredPendingOutgoingStanzas
{
XMPPLogTrace();
NSArray *pending = [[NSArray alloc] initWithArray:unackedByServer copyItems:YES];
if (isStarted)
{
[storage setLastDisconnect:[NSDate date]
lastHandledByServer:lastHandledByServer
pendingOutgoingStanzas:pending
forStream:xmppStream];
}
else
{
[storage setLastDisconnect:disconnectDate
lastHandledByClient:lastHandledByClient
lastHandledByServer:lastHandledByServer
pendingOutgoingStanzas:pending
forStream:xmppStream];
}
}
/**
* This method is used when we can maybe increment the lastHandledByClient value,
* but the change isn't significant enough to trigger an autoAck (or autoAck_stanzaCount is disabled).
*
* It updates the lastHandledByClient value (if needed), and notified storage.
**/
- (void)maybeUpdateStoredLastHandledByClient
{
XMPPLogTrace();
// Edge case note:
//
// This method may be invoked shortly after being disconnected.
// How is this handled?
//
// The unackedByClient array is cleared when we send <enable> or <resume>.
// And it cannot be appended to unless isStarted is YES.
// Thus this method works properly shortly after a disconnect, and can increment lastHandledByClient.
// And properly handles the edge case of being called in the middle of resuming a session.
NSUInteger pending = 0;
for (XMPPStreamManagementIncomingStanza *stanza in unackedByClient)
{
if (stanza.isHandled)
pending++;
else
break;
}
if (pending > 0)
{
[unackedByClient removeObjectsInRange:NSMakeRange(0, pending)];
unackedByClient_lastAckOffset += pending;
lastHandledByClient += pending;
XMPPLogVerbose(@"%@: sendAck: lastHandledByClient(%u) inc(%lu) totalPending(%lu)", THIS_FILE,
lastHandledByClient,
(unsigned long)pending,
(unsigned long)unackedByClient_lastAckOffset);
if (isStarted)
{
[storage setLastDisconnect:[NSDate date]
lastHandledByClient:lastHandledByClient
forStream:xmppStream];
}
else // edge case
{
// An incoming stanza got markedAsHandled post-disconnect
NSArray *pending = [[NSArray alloc] initWithArray:unackedByServer copyItems:YES];
[storage setLastDisconnect:disconnectDate
lastHandledByClient:lastHandledByClient
lastHandledByServer:lastHandledByServer
pendingOutgoingStanzas:pending
forStream:xmppStream];
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark XMPPStream Delegate
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Binding a JID resource is a standard part of the authentication process,
* and occurs after SASL authentication completes (which generally authenticates the JID username).
*
* This delegate method allows for a custom binding procedure to be used.
* For example:
* - a custom SASL authentication scheme might combine auth with binding
* - stream management (xep-0198) replaces binding if it can resume a previous session
*
* Return nil (or don't implement this method) if you wish to use the standard binding procedure.
**/
- (id <XMPPCustomBinding>)xmppStreamWillBind:(XMPPStream *)sender
{
if (autoResume)
{
// We will check canResume in start: method (part of XMPPCustomBinding protocol)
return self;
}
else
{
return nil;
}
}
- (void)xmppStream:(XMPPStream *)sender didSendIQ:(XMPPIQ *)iq
{
XMPPLogTrace();
if (isStarted || enableSent)
{
[self processSentElement:iq];
}
}
- (void)xmppStream:(XMPPStream *)sender didSendMessage:(XMPPMessage *)message
{
XMPPLogTrace();
if (isStarted || enableSent)
{
[self processSentElement:message];
}
}
- (void)xmppStream:(XMPPStream *)sender didSendPresence:(XMPPPresence *)presence
{
XMPPLogTrace();
if (isStarted || enableSent)
{
[self processSentElement:presence];
}
}
- (BOOL)xmppStream:(XMPPStream *)sender didReceiveIQ:(XMPPIQ *)iq
{
XMPPLogTrace();
if (isStarted)
{
[self processReceivedElement:iq];
}
return NO;
}
- (void)xmppStream:(XMPPStream *)sender didReceiveMessage:(XMPPMessage *)message
{
XMPPLogTrace();
if (isStarted)
{
[self processReceivedElement:message];
}
}
- (void)xmppStream:(XMPPStream *)sender didReceivePresence:(XMPPPresence *)presence
{
XMPPLogTrace();
if (isStarted)
{
[self processReceivedElement:presence];
}
}
/**
* This method is called if any of the xmppStream:willReceiveX: methods filter the incoming stanza.
*
* It may be useful for some extensions to know that something was received,
* even if it was filtered for some reason.
**/
- (void)xmppStreamDidFilterStanza:(XMPPStream *)sender
{
XMPPLogTrace();
if (isStarted)
{
// The element was filtered/consumed by something in the stack.
// So it is implicitly 'handled'.
XMPPStreamManagementIncomingStanza *stanza =
[[XMPPStreamManagementIncomingStanza alloc] initWithStanzaId:nil isHandled:YES];
[unackedByClient addObject:stanza];
XMPPLogVerbose(@"%@: xmppStreamDidFilterStanza: lastHandledByClient(%u) pendingToAck(%lu), pendingHandled(%lu)",
THIS_FILE, lastHandledByClient,
(unsigned long)[self numIncomingStanzasThatCanBeAcked],
(unsigned long)[self numIncomingStanzasThatCannnotBeAcked]);
if (![self maybeSendAck])
{
[self maybeUpdateStoredLastHandledByClient];
}
}
}
- (void)xmppStream:(XMPPStream *)sender didSendCustomElement:(NSXMLElement *)element
{
XMPPLogTrace();
if (enableQueued)
{
if ([[element name] isEqualToString:@"enable"])
{
enableQueued = NO;
enableSent = YES;
}
}
else if (isStarted)
{
if ([[element name] isEqualToString:@"r"])
{
[multicastDelegate xmppStreamManagementDidRequestAck:self];
}
}
}
- (void)xmppStream:(XMPPStream *)sender didReceiveCustomElement:(NSXMLElement *)element
{
XMPPLogTrace();
NSString *elementName = [element name];
if ([elementName isEqualToString:@"r"])
{
// We received a request <r/> from the server.
if (ackResponseDelay <= 0.0)
{
// Immediately respond to the request,
// as recommended in the XEP.
[self _sendAck];
}
else if (ackResponseTimer == nil)
{
// Use client-configured delay before responding to the request.
__weak id weakSelf = self;
ackResponseTimer = [[XMPPTimer alloc] initWithQueue:moduleQueue eventHandler:^{ @autoreleasepool{
[weakSelf _sendAck];
}}];
[ackResponseTimer startWithTimeout:ackResponseDelay interval:0];
}
}
else if ([elementName isEqualToString:@"a"])
{
// Try to process the ack.
// If we can't yet, then we'll put it into the pendingAcks array.
if (![self processReceivedAck:element])
{
if (unprocessedReceivedAcks == nil)
unprocessedReceivedAcks = [[NSMutableArray alloc] initWithCapacity:1];
[unprocessedReceivedAcks addObject:element];
}
}
else if ([elementName isEqualToString:@"enabled"])
{
if (enableSent)
{
// <enabled xmlns='urn:xmpp:sm:3' id='some-long-sm-id' resume='true'/>
NSString *resumptionId = nil;
uint32_t max = 0;
BOOL canResume = [element attributeBoolValueForName:@"resume" withDefaultValue:NO];
if (canResume)
{
resumptionId = [element attributeStringValueForName:@"id"];
max = [element attributeUInt32ValueForName:@"max" withDefaultValue:requestedMax];
}
[storage setResumptionId:resumptionId
timeout:max
lastDisconnect:[NSDate date]
forStream:xmppStream];
[multicastDelegate xmppStreamManagement:self wasEnabled:element];
isStarted = YES;
enableSent = NO;
lastHandledByClient = 0;
lastHandledByServer = 0;
unprocessedReceivedAcks = nil;
}
else
{
XMPPLogWarn(@"Received unrequested <enabled/> stanza");
}
}
else if ([elementName isEqualToString:@"failed"])
{
if (enableSent)
{
[storage removeAllForStream:xmppStream];
[multicastDelegate xmppStreamManagement:self wasNotEnabled:element];
isStarted = NO;
enableSent = NO;
[autoRequestTimer cancel];
autoRequestTimer = nil;
}
}
}
- (void)xmppStreamDidSendClosingStreamStanza:(XMPPStream *)sender
{
XMPPLogTrace();
wasCleanDisconnect = YES;
}
- (void)xmppStreamDidDisconnect:(XMPPStream *)sender withError:(NSError *)error
{
XMPPLogTrace();
if (wasCleanDisconnect)
{
disconnectDate = nil;
[storage removeAllForStream:xmppStream];
}
else
{
disconnectDate = [NSDate date];
NSArray *pending = [[NSArray alloc] initWithArray:unackedByServer copyItems:YES];
[storage setLastDisconnect:disconnectDate
lastHandledByClient:lastHandledByClient
lastHandledByServer:lastHandledByServer
pendingOutgoingStanzas:pending
forStream:xmppStream];
}
// Reset temporary state variables
isStarted = NO;
enableQueued = NO;
enableSent = NO;
wasCleanDisconnect = NO;
didAttemptResume = NO;
didResume = NO;
prev_unackedByServer = nil;
resume_response = nil;
resume_stanzaIds = nil;
// Cancel timers
[autoRequestTimer cancel];
autoRequestTimer = nil;
[autoAckTimer cancel];
autoAckTimer = nil;
[ackResponseTimer cancel];
ackResponseTimer = nil;
}
@end
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark -
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@implementation XMPPStream (XMPPStreamManagement)
- (BOOL)supportsStreamManagement
{
__block BOOL result = NO;
dispatch_block_t block = ^{ @autoreleasepool {
// The root element can be properly queried anytime after the
// stream:features are received, and TLS has been setup (if required).
if (self.state >= STATE_XMPP_POST_NEGOTIATION)
{
NSXMLElement *features = [self.rootElement elementForName:@"stream:features"];
NSXMLElement *sm = [features elementForName:@"sm" xmlns:XMLNS_STREAM_MANAGEMENT];
result = (sm != nil);
}
}};
if (dispatch_get_specific(self.xmppQueueTag))
block();
else
dispatch_sync(self.xmppQueue, block);
return result;
}
@end